2026/4/18 12:23:38
网站建设
项目流程
响应式企业网站开发所用的平台,好业宝微商城,深圳网站推广公司,网站qq一键登录MPSC Queue#xff1a;多生产者单消费者无锁队列
SPSC无锁队列#xff0c;参考https://blog.csdn.net/qq_46105170/article/details/157458924。
概述
MPSC#xff08;Multi-Producer Single-Consumer#xff09;队列是一种允许多个生产者线程同时写入、但只有一个消费者…MPSC Queue多生产者单消费者无锁队列SPSC无锁队列参考https://blog.csdn.net/qq_46105170/article/details/157458924。概述MPSCMulti-Producer Single-Consumer队列是一种允许多个生产者线程同时写入、但只有一个消费者线程读取的无锁数据结构。典型应用场景包括多个业务线程向单一日志线程发送消息、多个工作线程向单一聚合线程汇报结果等。与 SPSC 的核心区别在 SPSC 队列中生产者可以直接写入数据然后用store推进tail因为只有一个生产者不存在竞争。但在 MPSC 中多个生产者可能同时尝试写入如果都读到相同的tail值就会写入同一个槽位导致数据丢失。MPSC 的核心挑战有两个槽位竞争问题和写入可见性问题。槽位竞争是指多个生产者必须以某种方式抢占槽位确保每个槽位只被一个生产者使用。写入可见性是指消费者需要知道某个槽位的数据是否已经写入完成。解决方案针对槽位竞争我们使用 CASCompare-And-Swap操作。生产者先读取当前tail然后尝试原子地将其推进。如果 CAS 成功该生产者就拥有了这个槽位如果失败说明其他生产者抢先了需要重试。针对写入可见性问题在于生产者必须先通过 CAS 推进tail来抢占槽位然后才能写入数据。这意味着tail的推进发生在数据写入之前。消费者如果只看tail可能会读到尚未写入的槽位。解决方案是为每个槽位增加一个ready标志生产者写入数据后将其置为true消费者在读取前检查这个标志。实现#includearray#includeatomic#includecstddef#includeemmintrin.htemplatetypenameT,size_t CapclassMPSCQueue{// 缓存行对齐避免 false sharingalignas(64)std::atomicsize_thead{0};// 消费者读取位置alignas(64)std::atomicsize_ttail{0};// 生产者写入位置alignas(64)std::arrayT,Cap1buffer;// 环形缓冲区1 用于区分空和满alignas(64)std::arraystd::atomicbool,Cap1ready;// 槽位就绪标志// 计算下一个位置环形缓冲区回绕staticsize_tnext_pos(size_t pos){return(pos1)%(Cap1);}public:// 构造函数初始化所有 ready 标志为 falseMPSCQueue(){for(autor:ready)r.store(false,std::memory_order_relaxed);}// 生产者调用多线程安全boolpush(constTval){size_t pos,next;// 第一步用 CAS 抢占槽位do{postail.load(std::memory_order_relaxed);nextnext_pos(pos);// 检查队列是否已满if(nexthead.load(std::memory_order_relaxed))returnfalse;}while(!tail.compare_exchange_weak(pos,next,std::memory_order_relaxed,std::memory_order_relaxed));// 第二步写入数据此时我们独占这个槽位buffer[pos]val;// 第三步标记槽位就绪使用 release 确保数据写入对消费者可见ready[pos].store(true,std::memory_order_release);returntrue;}// 消费者调用单线程boolpop(Tval){size_t poshead.load(std::memory_order_relaxed);// 检查队列是否为空if(postail.load(std::memory_order_acquire))returnfalse;// 自旋等待数据就绪// 槽位已被生产者抢占但数据可能还未写入完成while(!ready[pos].load(std::memory_order_acquire))_mm_pause();// CPU 提示正在自旋降低功耗和总线竞争// 读取数据valstd::move(buffer[pos]);// 清除就绪标志ready[pos].store(false,std::memory_order_relaxed);// 推进 headrelease 确保上述操作对生产者可见head.store(next_pos(pos),std::memory_order_release);returntrue;}};内存序分析在 CAS 操作中我们使用relaxed是因为 CAS 本身只是抢占槽位此时还没有数据需要同步。真正的同步点在ready标志上。ready[pos].store(true, release)与ready[pos].load(acquire)构成同步关系确保生产者对buffer[pos]的写入在消费者读取之前完成。head.store(next, release)确保消费者对数据的读取在生产者看到head推进之前完成防止生产者过早覆盖数据。检查队列满时的head.load可以用relaxed因为看到旧值最多导致误判队列已满而放弃写入不会导致正确性问题。我们使用compare_exchange_weak而不是strong是因为 weak 版本允许伪失败spurious failure即使比较成功也可能返回失败。但由于我们已经在循环中伪失败只是多一次重试。weak 版本在某些架构如 ARM上能生成更高效的指令。消费者行为说明pop函数的行为是如果队列为空head tail立即返回false如果队列非空但槽位数据未就绪自旋等待。这种设计假设生产者写入很快完成自旋时间很短。如果需要纯非阻塞版本可以在ready检查失败时也返回false让调用者决定如何处理。总结MPSC 队列相比 SPSC 的主要变化是生产者端使用 CAS 竞争槽位以及引入ready标志解决写入可见性问题。消费者端逻辑相对简单因为只有一个消费者不需要竞争。这种设计在多个生产者、单个消费者的场景下提供了高效的无锁通信机制。