2026/4/17 2:51:41
网站建设
项目流程
基于.NET的电子商务网站开发,网站开发企业,春暖花开 wordpress主题,公司简历模板电子版CSP#xff08;Communicating Sequential Processes#xff09;模型详解
Actor vs CSP 对比
Actor 模型:
┌─────────┐ ┌─────────┐
│ Actor A │ ──msg──►│ Actor B │ 每个 Actor 有自己的邮箱
│ [邮箱] │ │ [邮箱] │…CSPCommunicating Sequential Processes模型详解Actor vs CSP 对比Actor 模型: ┌─────────┐ ┌─────────┐ │ Actor A │ ──msg──►│ Actor B │ 每个 Actor 有自己的邮箱 │ [邮箱] │ │ [邮箱] │ 发送者不等待 └─────────┘ └─────────┘ CSP 模型: ┌─────────┐ ┌─────────┐ │ Process │◄───────►│ Process │ 进程通过 Channel 通信 │ A │ Channel │ B │ Channel 是独立的 └─────────┘ └─────────┘ │ │ └───────┬───────────┘ ▼ ┌─────────────┐ │ Channel │ Channel 是一等公民 │ [buffer] │ 可以有缓冲或无缓冲 └─────────────┘CSP 核心概念┌──────────────────────────────────────────────────────────────┐ │ CSP 模型 │ │ │ │ ┌─────────┐ ┌─────────────────┐ ┌─────────┐ │ │ │ Sender │───►│ Channel │───►│ Receiver│ │ │ │ │ │ ┌───┬───┬───┐ │ │ │ │ │ │ send() │ │ │ 1 │ 2 │ 3 │ │ │ recv() │ │ │ │ 可能阻塞 │ │ └───┴───┴───┘ │ │ 可能阻塞 │ │ │ └─────────┘ │ 有界缓冲区 │ └─────────┘ │ │ └─────────────────┘ │ │ │ │ 特点: │ │ 1. Channel 独立于进程存在 │ │ 2. 同一个 Channel 可被多个进程共享 │ │ 3. 发送/接收可以阻塞同步点 │ └──────────────────────────────────────────────────────────────┘完整代码实现#includecondition_variable#includefunctional#includeiostream#includemutex#includeoptional#includequeue#includethread#includevector// // Channel: The core of CSP model// A typed, thread-safe, bounded channel// Similar to Gos buffered channel// templatetypenameTclassChannel{public:// // Constructor// capacity 0 means unbuffered (synchronous)// capacity 0 means buffered (async up to capacity)// explicitChannel(size_t capacity0):capacity_(capacity),closed_(false){}// // Send a value into the channel// Blocks if buffer is full// Returns false if channel is closed// boolsend(T value){std::unique_lockstd::mutexlock(mu_);// Wait until:// 1. Theres room in buffer, OR// 2. Channel is closed// For unbuffered (capacity_0): wait until someone is receivingcv_send_.wait(lock,[this](){returnclosed_||queue_.size()capacity_||(capacity_0receivers_waiting_0);});// Cant send to closed channelif(closed_){returnfalse;}// Put value in queuequeue_.push(std::move(value));// Wake up one waiting receivercv_recv_.notify_one();// For unbuffered channel: wait until value is takenif(capacity_0){cv_send_.wait(lock,[this](){returnqueue_.empty()||closed_;});}returntrue;}// // Receive a value from the channel// Blocks if buffer is empty// Returns std::nullopt if channel is closed and empty// std::optionalTrecv(){std::unique_lockstd::mutexlock(mu_);// Increment waiting receivers (for unbuffered channel)receivers_waiting_;cv_send_.notify_one();// Wake sender for unbuffered case// Wait until:// 1. Theres data in buffer, OR// 2. Channel is closedcv_recv_.wait(lock,[this](){return!queue_.empty()||closed_;});--receivers_waiting_;// Channel closed and emptyif(queue_.empty()){returnstd::nullopt;}// Get value from queueT valuestd::move(queue_.front());queue_.pop();// Wake up one waiting sendercv_send_.notify_one();returnvalue;}// // Close the channel// No more sends allowed, but can still receive remaining data// voidclose(){std::lock_guardstd::mutexlock(mu_);closed_true;cv_send_.notify_all();// Wake all senderscv_recv_.notify_all();// Wake all receivers}// // Check if channel is closed// boolis_closed()const{std::lock_guardstd::mutexlock(mu_);returnclosed_;}private:mutablestd::mutex mu_;std::condition_variable cv_send_;// Senders wait on thisstd::condition_variable cv_recv_;// Receivers wait on thisstd::queueTqueue_;// The buffersize_t capacity_;// Max buffer size (0 unbuffered)boolclosed_;// Is channel closed?size_t receivers_waiting_0;// Count of waiting receivers};// // Convenience operator for sending (like Gos ch - value)// templatetypenameTChannelToperator(ChannelTch,T value){ch.send(std::move(value));returnch;}// // Convenience operator for receiving (like Gos value -ch)// templatetypenameTChannelToperator(ChannelTch,Tvalue){autoresultch.recv();if(result){valuestd::move(*result);}returnch;}Channel 类型对比无缓冲 Channel (capacity 0): ─────────────────────────────────────────────────────────────── 发送者 Channel 接收者 │ │ │ │── send(x) ──► │ │ │ [阻塞等待] │ │ │ │ ◄── recv() ────│ │ [数据直接传递] ───────┼──────────────────────────►│ │ [解除阻塞] │ │ │ │ │ → 同步通信发送者必须等待接收者 有缓冲 Channel (capacity 3): ─────────────────────────────────────────────────────────────── 发送者 Channel 接收者 │ ┌─────────┐ │ │── send(1) ──► │ [1] │ │ │── send(2) ──► │ [1][2] │ │ │── send(3) ──► │ [1][2][3]│ │ │── send(4) ──► │ [阻塞] │ ◄── recv() ────────│ │ [等待空间] │ [2][3][4]│ 得到 1 │ │ [解除阻塞] └─────────┘ │ → 异步通信缓冲满时才阻塞使用示例示例 1: 基本用法intmain(){// Create a buffered channel with capacity 2Channelintch(2);// Producer threadstd::threadproducer([ch](){for(inti1;i5;i){std::coutSending: istd::endl;ch.send(i);std::coutSent: istd::endl;}ch.close();// No more data});// Consumer threadstd::threadconsumer([ch](){while(true){autovaluech.recv();if(!value){std::coutChannel closedstd::endl;break;}std::coutReceived: *valuestd::endl;std::this_thread::sleep_for(std::chrono::milliseconds(100));}});producer.join();consumer.join();return0;}输出Sending: 1 Sent: 1 Sending: 2 Sent: 2 Sending: 3 ← 缓冲满阻塞 Received: 1 Sent: 3 Sending: 4 Received: 2 Sent: 4 ... Channel closed示例 2: 多生产者多消费者intmain(){Channelstd::stringch(5);// Multiple producersstd::vectorstd::threadproducers;for(inti0;i3;i){producers.emplace_back([ch,i](){for(intj0;j3;j){std::string msgProducerstd::to_string(i)-Msgstd::to_string(j);ch.send(msg);}});}// Multiple consumersstd::vectorstd::threadconsumers;for(inti0;i2;i){consumers.emplace_back([ch,i](){while(true){automsgch.recv();if(!msg)break;std::coutConsumeri got: *msgstd::endl;}});}// Wait for producersfor(autot:producers)t.join();// Close channel after all producers donech.close();// Wait for consumersfor(autot:consumers)t.join();return0;}多生产者多消费者模式: ┌────────────┐ │ Producer 0 │──┐ └────────────┘ │ │ ┌─────────────┐ ┌────────────┐ ┌────────────┐ ├────►│ Channel │────►│ Consumer 0 │ │ Producer 1 │──┤ │ [buffer] │ └────────────┘ └────────────┘ │ └─────────────┘ │ │ ┌────────────┐ ┌────────────┐ │ └───────────►│ Consumer 1 │ │ Producer 2 │──┘ └────────────┘ └────────────┘示例 3: Pipeline 模式// Pipeline: numbers → square → printintmain(){Channelintnumbers(3);Channelintsquares(3);// Stage 1: Generate numbersstd::threadgenerator([numbers](){for(inti1;i5;i){numbers.send(i);}numbers.close();});// Stage 2: Square numbersstd::threadsquarer([numbers,squares](){while(autonnumbers.recv()){squares.send((*n)*(*n));}squares.close();});// Stage 3: Print resultsstd::threadprinter([squares](){while(autonsquares.recv()){std::coutResult: *nstd::endl;}});generator.join();squarer.join();printer.join();return0;}Pipeline 模式: ┌───────────┐ numbers ┌───────────┐ squares ┌───────────┐ │ Generator │──────────────►│ Squarer │─────────────►│ Printer │ │ 1,2,3 │ Channel │ n → n² │ Channel │ 打印结果 │ └───────────┘ └───────────┘ └───────────┘ │ │ │ ▼ ▼ ▼ 1,2,3,4,5 1,4,9,16,25 Result: 1 Result: 4 Result: 9 ...示例 4: Fan-out / Fan-in 模式intmain(){Channelintjobs(10);Channelintresults(10);// Fan-out: multiple workers process jobsstd::vectorstd::threadworkers;for(intw0;w3;w){workers.emplace_back([jobs,results,w](){while(autojobjobs.recv()){// Simulate workstd::this_thread::sleep_for(std::chrono::milliseconds(50));intresult(*job)*2;std::coutWorker w processed *jobstd::endl;results.send(result);}});}// Send jobsstd::threadsender([jobs](){for(inti1;i9;i){jobs.send(i);}jobs.close();});// Fan-in: collect all resultsstd::threadcollector([results](){intcount0;while(autorresults.recv()){std::coutResult: *rstd::endl;if(count9)break;// Know we have 9 jobs}});sender.join();for(autow:workers)w.join();results.close();collector.join();return0;}Fan-out / Fan-in: ┌──────────┐ ┌───►│ Worker 0 │───┐ │ └──────────┘ │ ┌────────┐ jobs │ ┌──────────┐ │ results ┌───────────┐ │ Sender │──────────┼───►│ Worker 1 │───┼────────────►│ Collector │ └────────┘ Channel │ └──────────┘ │ Channel └───────────┘ │ ┌──────────┐ │ └───►│ Worker 2 │───┘ └──────────┘ 工作负载自动分配给空闲的 workerActor vs CSP 代码对比// Actor 模型 classCounter{intcount_0;Actor actor_;public:voidincrement(){actor_.send([this](){count_;});}voidget(std::functionvoid(int)callback){actor_.send([this,callback](){callback(count_);});}};// 使用Counter counter;counter.increment();counter.increment();counter.get([](intv){std::coutvstd::endl;});// CSP 模型 voidcounter_process(Channelstd::stringcmd,Channelintresult){intcount0;while(autoccmd.recv()){if(*cinc){count;}elseif(*cget){result.send(count);}}}// 使用Channelstd::stringcmd(1);Channelintresult(1);std::threadt(counter_process,std::ref(cmd),std::ref(result));cmd.send(inc);cmd.send(inc);cmd.send(get);intvalue;resultvalue;std::coutvaluestd::endl;总结概念说明Channel独立的通信管道连接多个进程有缓冲 Channel异步缓冲满时阻塞发送者无缓冲 Channel同步发送者等待接收者send()发送数据可能阻塞recv()接收数据可能阻塞close()关闭 Channel不能再发送Pipeline多个阶段串联处理Fan-out/Fan-in多个 worker 并行处理Actor vs CSPActorCSP通信方式直接发给 Actor通过 Channel邮箱归属属于 Actor独立存在耦合度较高较低典型语言Erlang, AkkaGo, Clojure