2026/2/17 0:28:01
网站建设
项目流程
网站平台建设所需开发工具,外贸手工做兼职的网站,网站优点,宝塔做两个网站面试官必问#xff1a;Go WaitGroup 底层是怎么实现的#xff1f;源码拆解 原理分析_哔哩哔哩_bilibili
如果大家不想看文字版的可以去观看我b站对应的视频#xff0c;超详细#xff0c;欢迎大家观看#xff0c;链接在上面。
一、介绍waitGroup
waitGroup就像一个任务…面试官必问Go WaitGroup 底层是怎么实现的源码拆解 原理分析_哔哩哔哩_bilibili如果大家不想看文字版的可以去观看我b站对应的视频超详细欢迎大家观看链接在上面。一、介绍waitGroupwaitGroup就像一个任务完成计数器其有三个方法1. Add(3)表示还有3个任务要做2. Done()完成一个任务计数器-13. Wait()等待所有任务完成通过一个案例给大家介绍一下waitGroup的用法func main() { var wg sync.WaitGroup wg.Add(2) // 设置计数器数值即 goroutine的个数 // 第一个goroutine任务函数 go func() { // 开始任务, 模拟任务时长 time.Sleep(1 * time.Second) fmt.Println(Goroutine 1 finished!) wg.Done() // 第一个协程任务结束计数器-1变为1 }() // 第二个goroutine任务函数 go func() { defer wg.Done() // 任务完成计数器变为0 // 开始任务, 模拟任务时长 time.Sleep(2 * time.Second) fmt.Println(Goroutine 2 finished!) }() wg.Wait() // 主goroutine阻塞等待计数器变为0 fmt.Println(All Goroutine finished) }打印结果通过上面的例子以及结合定义说明waitGroup是 go 应用开发过程中经常使用的并发控制技术可理解为 wait-goroutine-group即等待一组goroutine结束比如某个 goroutine 需要等待其他几个 goroutine 全部完成那么使用 waitGroup可以轻松实现 。二、waitGroup的底层结构1. go 1.13版本及之前type WaitGroup struct { statel [3]uint32 }代码中字段是一个数量为3的数组1. statel [0]计数器counter记录还有多少个 goroutine任务没有完成2. statel [1]等待者数量waiter count记录有多少个协程在调用 wg.wait() 等待3. statel [2]信号量semaphore用于阻塞和唤醒等待的goroutine2. go 1.14版本及之后type WaitGroup struct { noCopy noCopy state atomic.Uint64 // high 32 bits are counter, low 32 bits are waiter count. sema uint32 }1. noCopy:此类型禁止复制是一个空的结构体包括 unlock() 和 lock() 方法确保go能检测到 waitGroup被复制的情况2. state状态字段。高32位是计数器低32位是等待者的数量3. sema信号量用于阻塞/唤醒等待的goroutine3. 工作机制:1)当 counter0时wg.wait()会阻塞在信号量上2)当 counter0时通过信号量唤醒所有等待的 goroutine4. 版本比较1. 1.14的waitGroup相比于之前的多了一个noCopy防止复制导致不同副本状态不一致问题2. 旧版本存在内存对齐问题新版的atomic.Uint64类型本身保证8字节对齐3. 清晰的字段分离旧版的需要文档说明每个元素的含义新版明确了存储状态三、Add()底层源码解析func (wg *WaitGroup) Add(delta int) { // 竞态检测检测是否存在数据竞态两个或多个协程并发访问同一块内存。且至少有一个是写操作 if race.Enabled { // 表示竞态检测已开启 if delta 0 { // 实现内存同步当Add方法被调用时此代码确保协程中在Done()之前的所有写操作 race.ReleaseMerge(unsafe.Pointer(wg)) } race.Disable() // 临时禁用当前协程的竞态检测 defer race.Enable() // 函数返回时重新启用竞态检测 } state : wg.state.Add(uint64(delta) 32) v : int32(state 32)// 左移32位任务计数器 w : uint32(state) // 等待着数量 if race.Enabled delta 0 v int32(delta) { race.Read(unsafe.Pointer(wg.sema)) } // 基本检查计数器不能为负如果为负数就意味着Done()被调用的次数超过了Add()的次数报panic if v 0 { panic(sync: negative WaitGroup counter) } // 并发使用检查w ! 0说明有等待者delta 0 v int32(delta) 说明计数器增加之前为0就像等待者以为所有工作都完成了但实际上可能还有工作刚刚被添加go语言选择让这种情况报panic如果忽略会导致难以调试的bug程序也会不稳定 if w ! 0 delta 0 v int32(delta) { panic(sync: WaitGroup misuse: Add called concurrently with Wait) } // 表示还有任务没有完成或者是根本没有人等待所以不需要唤醒直接返回 if v 0 || w 0 { return } // 唤醒前的最终检查检查是否发生并发修改在Add() 开始的时候记录状态的快照state在结束的时候再次读取当前状态如果两个值不同说明执行器件状态被其他协程给修改了 if wg.state.Load() ! state { panic(sync: WaitGroup misuse: Add called concurrently with Wait) } // 唤醒所有等待者 wg.state.Store(0) for ; w ! 0; w-- { runtime_Semrelease(wg.sema, false, 0) } }Add() 一般在任务函数执行之前调用表示本次程序总共有多少任务函数要执行四、Done() 底层源码解析func (wg *WaitGroup) Done() { wg.Add(-1) }Done就比较简单一般在一个任务结束时调用让计数器-1五、Wait() 底层源码解析func (wg *WaitGroup) Wait() { // 竞态检测开关在正式编译时这些代码不会执行只在go run -race时启用 if race.Enabled { race.Disable() } for { state : wg.state.Load() v : int32(state 32) // 计数器 w : uint32(state) // 等待者数量 // 计数器为0说明所有任务已经完成无需等待直接返回 if v 0 { // Counter is 0, no need to wait. if race.Enabled { // 竞态检测 race.Enable() race.Acquire(unsafe.Pointer(wg)) } return } // 将等待者数量加1通过原子比较并交换操作完成的确保线程安全 if wg.state.CompareAndSwap(state, state1) { if race.Enabled w 0 { race.Write(unsafe.Pointer(wg.sema)) } runtime_SemacquireWaitGroup(wg.sema) 再次检查状态如果状态不为0说明 waitGroup 被重新使用了会导致panic if wg.state.Load() ! 0 { panic(sync: WaitGroup is reused before previous Wait has returned) } // 竞态检测 if race.Enabled { race.Enable() race.Acquire(unsafe.Pointer(wg)) } return } } }wait方法等待直到 waitGroup 的计数器变为0表示所有任务完成如果计数器不是0则当前goroutine会被阻塞直到其他goroutine调用Done方法将计数器减到0一般在主goroutine最后调用