sync 包:Mutex、WaitGroup、Once、Pool
sync 包 — Mutex、WaitGroup、Once、Pool
Go 的并发模型以 CSP(通信顺序进程)为核心,channel 是首选的同步方式。但现实中,不是所有并发问题都适合用 channel 解决。当多个 goroutine 需要访问共享数据结构时,直接加锁往往比通过 channel 传递更简洁高效。Go 标准库的 sync 包提供了一组经过精心设计的低级同步原语,它们是构建高性能并发程序的基础工具。
sync 包的设计哲学是"宁缺毋滥"——它只提供最核心的原语,每一个都有清晰的使用场景。正如 Russ Cox 在 Go 2017 的讨论中所说:"sync 包是为那些 channel 解决不了或解决起来太别扭的场景准备的。"
Level 1:你需要知道的
Mutex:互斥锁
sync.Mutex 是最基本的同步原语——确保同一时刻只有一个 goroutine 能访问临界区。
type SafeCounter struct {
mu sync.Mutex
count int
}
func (c *SafeCounter) Increment() {
c.mu.Lock()
c.count++
c.mu.Unlock()
}
func (c *SafeCounter) Get() int {
c.mu.Lock()
defer c.mu.Unlock()
return c.count
}
关键规则:
Lock()获取锁,如果锁已被持有则阻塞Unlock()释放锁,如果锁未被持有则 panic- 锁不与 goroutine 绑定——goroutine A 加锁,goroutine B 可以解锁(但这是坏实践)
- 总是用
defer解锁,除非你有明确的性能原因不这样做
func (c *SafeCounter) IncrementBad() {
c.mu.Lock()
// 如果这里 panic 了,锁永远不会被释放——死锁!
riskyOperation()
c.mu.Unlock()
}
func (c *SafeCounter) IncrementGood() {
c.mu.Lock()
defer c.mu.Unlock() // 即使 panic 也会释放锁
riskyOperation()
}
完整实战:线程安全的 Map
标准库的 map 不是并发安全的。在 Go 1.6+ 中,并发读写 map 会直接 panic(不是数据竞争的未定义行为,而是显式检测到后 fatal)。
type SafeMap[K comparable, V any] struct {
mu sync.Mutex
m map[K]V
}
func NewSafeMap[K comparable, V any]() *SafeMap[K, V] {
return &SafeMap[K, V]{m: make(map[K]V)}
}
func (sm *SafeMap[K, V]) Get(key K) (V, bool) {
sm.mu.Lock()
defer sm.mu.Unlock()
v, ok := sm.m[key]
return v, ok
}
func (sm *SafeMap[K, V]) Set(key K, value V) {
sm.mu.Lock()
defer sm.mu.Unlock()
sm.m[key] = value
}
func (sm *SafeMap[K, V]) Delete(key K) {
sm.mu.Lock()
defer sm.mu.Unlock()
delete(sm.m, key)
}
RWMutex:读写锁
如果读操作远多于写操作,用 sync.Mutex 会让所有读操作互相阻塞——这很浪费。sync.RWMutex 允许多个读操作并发进行,只在写操作时互斥。
type Config struct {
mu sync.RWMutex
data map[string]string
}
func (c *Config) Get(key string) string {
c.mu.RLock() // 读锁:多个读可以并发
defer c.mu.RUnlock()
return c.data[key]
}
func (c *Config) Set(key, value string) {
c.mu.Lock() // 写锁:独占访问
defer c.mu.Unlock()
c.data[key] = value
}
读写锁的语义:
RLock(): 获取读锁。如果有写锁被持有,阻塞;否则成功(可以与其他读锁共存)RUnlock(): 释放读锁Lock(): 获取写锁。如果有任何锁(读或写)被持有,阻塞Unlock(): 释放写锁
什么时候用 RWMutex?
经验法则:读写比超过 10:1 时,RWMutex 才有意义。如果读写比接近 1:1,RWMutex 的额外开销(内部维护读者计数器)反而让它比普通 Mutex 更慢。
读写比 推荐
1:1 sync.Mutex
5:1 sync.Mutex(边界情况,需要 benchmark)
10:1+ sync.RWMutex
100:1+ 考虑 sync.Map 或 atomic
WaitGroup:等待一组 goroutine
sync.WaitGroup 用于等待一组 goroutine 全部完成。它是 "fork-join" 并发模型的核心工具。
func fetchAll(urls []string) []string {
var wg sync.WaitGroup
results := make([]string, len(urls))
for i, url := range urls {
wg.Add(1) // 在启动 goroutine 前调用 Add
go func(idx int, u string) {
defer wg.Done() // goroutine 完成时调用 Done
resp, err := http.Get(u)
if err != nil {
results[idx] = "error"
return
}
defer resp.Body.Close()
body, _ := io.ReadAll(resp.Body)
results[idx] = string(body)
}(i, url)
}
wg.Wait() // 阻塞直到所有 goroutine 调用了 Done
return results
}
关键规则:
Add(n)必须在go语句之前调用(否则 Wait 可能在 Add 之前返回)Done()等价于Add(-1)- 计数器变为负数会 panic
- WaitGroup 可以重用(计数器回到 0 后可以再次 Add)
常见错误:在 goroutine 内部调用 Add
// 错误!可能导致 Wait 提前返回
for _, url := range urls {
go func(u string) {
wg.Add(1) // 太晚了!main goroutine 可能已经到了 Wait()
defer wg.Done()
fetch(u)
}(url)
}
wg.Wait()
正确做法:
for _, url := range urls {
wg.Add(1) // 在启动 goroutine 前 Add
go func(u string) {
defer wg.Done()
fetch(u)
}(url)
}
wg.Wait()
sync.Once:保证只执行一次
sync.Once 确保一个函数无论被多少个 goroutine 调用,都只执行一次。最常见的用途是单例初始化(singleton initialization)。
var (
instance *Database
once sync.Once
)
func GetDB() *Database {
once.Do(func() {
// 这个函数只会被执行一次,即使 1000 个 goroutine 同时调用 GetDB
instance = &Database{
conn: connectToDB(),
}
})
return instance
}
sync.Once 的保证:
- 函数只执行一次(即使并发调用)
- 所有调用者都会等待第一次执行完成后才返回
- 第一次执行完成后的所有 Do 调用立即返回(几乎零开销)
注意:如果 Once.Do 中的函数 panic 了,Once 仍然认为已经"执行过"了。 后续调用不会再次执行:
var once sync.Once
once.Do(func() {
panic("oops") // panic 了
})
once.Do(func() {
fmt.Println("this will never print") // 不会执行
})
从 Go 1.21 开始,新增了 sync.OnceFunc、sync.OnceValue 和 sync.OnceValues,提供更方便的 API:
// Go 1.21+
getDB := sync.OnceValue(func() *Database {
return &Database{conn: connectToDB()}
})
db := getDB() // 第一次调用执行初始化,之后直接返回缓存值
实战:并发安全的缓存
综合运用 Mutex、WaitGroup、Once 构建一个生产级缓存:
type Cache struct {
mu sync.RWMutex
items map[string]*cacheItem
}
type cacheItem struct {
value interface{}
expiry time.Time
}
func NewCache() *Cache {
c := &Cache{items: make(map[string]*cacheItem)}
// 启动后台清理过期项的 goroutine
go c.janitor()
return c
}
func (c *Cache) Get(key string) (interface{}, bool) {
c.mu.RLock()
defer c.mu.RUnlock()
item, exists := c.items[key]
if !exists {
return nil, false
}
if time.Now().After(item.expiry) {
return nil, false // 已过期,视为不存在
}
return item.value, true
}
func (c *Cache) Set(key string, value interface{}, ttl time.Duration) {
c.mu.Lock()
defer c.mu.Unlock()
c.items[key] = &cacheItem{
value: value,
expiry: time.Now().Add(ttl),
}
}
func (c *Cache) janitor() {
ticker := time.NewTicker(1 * time.Minute)
defer ticker.Stop()
for range ticker.C {
c.mu.Lock()
for key, item := range c.items {
if time.Now().After(item.expiry) {
delete(c.items, key)
}
}
c.mu.Unlock()
}
}
Level 2:它是怎么运行的
sync.Pool:对象复用
sync.Pool 是一个临时对象池,用于缓存已分配的对象以供后续复用,减少内存分配和 GC 压力。
var bufPool = sync.Pool{
New: func() interface{} {
return new(bytes.Buffer)
},
}
func processRequest(data []byte) string {
buf := bufPool.Get().(*bytes.Buffer) // 从池中获取
buf.Reset() // 重置状态
defer bufPool.Put(buf) // 用完放回
buf.Write(data)
buf.WriteString(" processed")
return buf.String()
}
sync.Pool 的特性:
- Get(): 从池中获取一个对象。如果池为空,调用 New 函数创建新对象
- Put(): 将对象放回池中
- GC 时清空: 每次 GC 时,Pool 中的所有对象可能被清除(不保证存活)
- 无大小限制: 池会根据需要增长,GC 时回收
关键约束:Pool 中的对象随时可能被回收。 不要在 Pool 中存储需要持久保存的数据,也不要依赖 Pool 的大小。
标准库中的实际使用——fmt 包:
// fmt/print.go(简化)
var ppFree = sync.Pool{
New: func() interface{} { return new(pp) },
}
func Fprintf(w io.Writer, format string, a ...interface{}) (n int, err error) {
p := ppFree.Get().(*pp)
p.doPrintf(format, a)
n, err = w.Write(p.buf)
p.free() // 内部调用 ppFree.Put(p)
return
}
fmt.Printf 每次调用都需要一个 pp 结构体来格式化输出。如果每次都 new,在高频调用下会产生大量垃圾。用 Pool 复用后,GC 压力大幅降低。
性能对比:
// Benchmark: 不使用 Pool
func BenchmarkNoPool(b *testing.B) {
for i := 0; i < b.N; i++ {
buf := new(bytes.Buffer)
buf.WriteString("hello")
_ = buf.String()
}
}
// Benchmark: 使用 Pool
func BenchmarkWithPool(b *testing.B) {
pool := sync.Pool{New: func() interface{} { return new(bytes.Buffer) }}
for i := 0; i < b.N; i++ {
buf := pool.Get().(*bytes.Buffer)
buf.Reset()
buf.WriteString("hello")
_ = buf.String()
pool.Put(buf)
}
}
// 典型结果(Go 1.21, Apple M1):
// BenchmarkNoPool-8 30000000 42 ns/op 64 B/op 1 allocs/op
// BenchmarkWithPool-8 50000000 28 ns/op 0 B/op 0 allocs/op
Pool 减少了约 33% 的操作时间,更重要的是 0 allocs/op——不给 GC 增加负担。
sync.Map:并发安全的 Map
sync.Map 是 Go 1.9 引入的并发安全 map,针对特定场景优化。
var cache sync.Map
// 存储
cache.Store("key1", "value1")
// 加载
if val, ok := cache.Load("key1"); ok {
fmt.Println(val.(string))
}
// 加载或存储(原子操作)
actual, loaded := cache.LoadOrStore("key2", "value2")
// loaded = false: 之前不存在,已存储 "value2"
// loaded = true: 之前已存在,返回旧值
// 删除
cache.Delete("key1")
// 遍历
cache.Range(func(key, value interface{}) bool {
fmt.Println(key, value)
return true // 返回 false 则停止遍历
})
sync.Map 适用的两种场景(来自官方文档):
- Entry 只写一次但读很多次(如不断增长的缓存)
- 多个 goroutine 读写不相交的 key 集合
在其他场景下,sync.Mutex + 普通 map 通常更快。
为什么? sync.Map 内部使用了两个 map——一个只读的 read map 和一个需要加锁的 dirty map。读操作首先检查 read map(无锁,原子操作),只有 miss 时才加锁访问 dirty map。如果 key 集合稳定(很少新增),大部分操作都走无锁路径。
// sync.Map 内部结构(简化)
type Map struct {
mu Mutex
read atomic.Pointer[readOnly] // 无锁读取
dirty map[interface{}]*entry // 需要加锁
misses int
}
type readOnly struct {
m map[interface{}]*entry
amended bool // dirty 中是否有 read 中没有的 key
}
性能对比:不同场景下的选择
场景 sync.Map Mutex+Map
读多写少(99:1) 快 2-5x 慢
读写均衡(50:50) 慢 1-3x 快
Key 集合不断增长 慢 快
Key 固定,多 goroutine 操作不同 key 快 3-10x 慢
Mutex 与 Channel 的选择
什么时候用 Mutex,什么时候用 Channel?这是 Go 开发者最常问的问题之一。
用 Mutex 的场景:
- 保护共享数据结构(map、slice、struct 字段)
- 简单的计数器、标志位
- 短临界区(几行代码)
- 性能敏感的热路径
用 Channel 的场景:
- 在 goroutine 之间传递数据的所有权
- 协调多个 goroutine 的执行顺序
- 实现超时、取消
- Fan-out/fan-in 等复杂模式
Rob Pike 的建议:"如果你在保护一个数据结构,用 mutex。如果你在协调工作流,用 channel。"
// Mutex:保护共享状态
type Counter struct {
mu sync.Mutex
n int
}
// Channel:协调工作流
func pipeline(input <-chan int) <-chan int {
output := make(chan int)
go func() {
defer close(output)
for v := range input {
output <- transform(v)
}
}()
return output
}
WaitGroup 的内部实现
sync.WaitGroup 的核心是一个 64 位的原子计数器和一个信号量:
// 简化的 WaitGroup 结构
type WaitGroup struct {
// 高 32 位:计数器(counter)
// 低 32 位:等待者数量(waiter count)
state atomic.Uint64
sema uint32 // 信号量,用于阻塞/唤醒
}
Add(n): 原子地将计数器加 nDone(): 原子地将计数器减 1;如果计数器归零,唤醒所有等待者Wait(): 如果计数器 > 0,将等待者数量加 1,然后调用runtime_Semacquire阻塞
计数器和等待者数量被打包在一个 64 位整数中,这样 Add 可以通过单次原子操作同时检查计数器是否归零和是否有等待者——比分别用两个变量更高效。
Cond:条件变量
sync.Cond 是一个相对少用但在特定场景非常有价值的原语。它允许 goroutine 等待某个条件为真。
type BoundedQueue struct {
mu sync.Mutex
notEmpty *sync.Cond
notFull *sync.Cond
buf []int
capacity int
}
func NewBoundedQueue(cap int) *BoundedQueue {
q := &BoundedQueue{
buf: make([]int, 0, cap),
capacity: cap,
}
q.notEmpty = sync.NewCond(&q.mu)
q.notFull = sync.NewCond(&q.mu)
return q
}
func (q *BoundedQueue) Put(val int) {
q.mu.Lock()
defer q.mu.Unlock()
for len(q.buf) == q.capacity {
q.notFull.Wait() // 释放锁并等待,被唤醒后重新获取锁
}
q.buf = append(q.buf, val)
q.notEmpty.Signal() // 通知一个等待的消费者
}
func (q *BoundedQueue) Get() int {
q.mu.Lock()
defer q.mu.Unlock()
for len(q.buf) == 0 {
q.notEmpty.Wait()
}
val := q.buf[0]
q.buf = q.buf[1:]
q.notFull.Signal()
return val
}
Cond.Wait() 的三步操作(原子执行):
- 释放关联的锁
- 挂起当前 goroutine
- 被唤醒后重新获取锁
为什么要用 for 循环而不是 if? 因为 Wait 被唤醒后条件可能已经不再为真(其他 goroutine 可能先行处理了)。这叫做"虚假唤醒"(spurious wakeup),虽然 Go 的实现不会产生真正的虚假唤醒,但规范建议始终在循环中检查条件。
Signal() 唤醒一个等待者,Broadcast() 唤醒所有等待者。
原子操作:sync/atomic
对于简单的计数器和标志位,原子操作比 Mutex 更轻量:
import "sync/atomic"
var counter int64
func increment() {
atomic.AddInt64(&counter, 1)
}
func get() int64 {
return atomic.LoadInt64(&counter)
}
Go 1.19 引入了类型化的原子变量,更安全易用:
var counter atomic.Int64
func increment() {
counter.Add(1)
}
func get() int64 {
return counter.Load()
}
atomic 操作列表:
| 操作 | 函数 | Go 1.19+ 类型方法 |
|---|---|---|
| 读取 | LoadInt64(&x) |
x.Load() |
| 写入 | StoreInt64(&x, v) |
x.Store(v) |
| 加 | AddInt64(&x, n) |
x.Add(n) |
| CAS | CompareAndSwapInt64(&x, old, new) |
x.CompareAndSwap(old, new) |
| 交换 | SwapInt64(&x, new) |
x.Swap(new) |
什么时候用 atomic vs Mutex?
- 单个变量的简单操作 → atomic
- 多个变量需要一起更新 → Mutex
- 需要复杂逻辑(if-then-update) → Mutex
Level 3:规范怎么定义的
Mutex 的实现:从自旋到信号量
Go 的 Mutex 实现经历了多次演进。当前实现(Go 1.9+)结合了自旋(spinning)和信号量(semaphore),并引入了饥饿模式(starvation mode)。
// sync/mutex.go(简化)
type Mutex struct {
state int32 // 锁状态(多个标志位)
sema uint32 // 信号量
}
const (
mutexLocked = 1 << iota // 1: 锁被持有
mutexWoken // 2: 有 goroutine 被唤醒
mutexStarving // 4: 饥饿模式
mutexWaiterShift = iota // 3: 等待者计数的起始位
)
Lock() 的完整流程:
-
快速路径(Fast path): CAS 尝试将 state 从 0 设为
mutexLocked。如果成功,直接返回——这是无竞争情况下的路径,只需一次原子操作。 -
慢路径(Slow path): 如果快速路径失败(锁已被持有),进入
lockSlow():- 自旋阶段:如果锁被持有且处于正常模式,goroutine 会自旋等待(类似于 SpinLock)。自旋条件:
- 运行在多核机器上
- 当前 GOMAXPROCS > 1
- 至少有一个其他 P(处理器)在运行
- 自旋次数 < 4
- 信号量阶段:自旋超过限制后,goroutine 调用
runtime_SemacquireMutex睡眠
- 自旋阶段:如果锁被持有且处于正常模式,goroutine 会自旋等待(类似于 SpinLock)。自旋条件:
-
唤醒后:被信号量唤醒的 goroutine 需要与新到达的 goroutine 竞争锁。
为什么先自旋后信号量? 自旋避免了线程切换的开销(约 1-2 微秒),对于短临界区(几十纳秒),自旋等到锁释放比睡眠唤醒快得多。但无限自旋会浪费 CPU,所以超过 4 次后切换到信号量。
饥饿模式(Go 1.9+)
Go 1.9 之前的 Mutex 有一个严重问题:新到达的 goroutine 比已等待的 goroutine 更容易获取锁(因为新 goroutine 正在 CPU 上运行,可以立即自旋竞争)。这导致已等待的 goroutine 可能被"饿死"——等待时间无上限。
Go 1.9 引入了饥饿模式来解决这个问题:
正常模式(Normal mode):
- 等待者按 FIFO 排队
- 被唤醒的等待者与新到达的 goroutine 竞争
- 新到达的 goroutine 有优势(已在 CPU 上运行)
饥饿模式(Starvation mode):
- 触发条件:一个等待者等待超过 1ms
- 行为:锁直接移交给等待队列头部的 goroutine,新到达的 goroutine 不参与竞争
- 退出条件:当前获得锁的 goroutine 是队列中最后一个,或等待时间 < 1ms
时间线(正常模式下的饥饿问题):
G1 持有锁
G2 等待... (100μs)
G3 到达 → 自旋 → 抢到锁(G2 继续等待)
G4 到达 → 自旋 → 抢到锁(G2 继续等待)
...
G2 可能等待非常久
时间线(饥饿模式):
G1 持有锁
G2 等待... (>1ms) → 触发饥饿模式
G1 解锁 → 直接交给 G2(G3, G4 必须排队)
Dmitry Vyukov 在 Go issue #13086 中提出了这个改进,commit message:"sync: make Mutex more fair"。基准测试显示,饥饿模式将最坏情况延迟从数百毫秒降低到约 1ms,但平均吞吐量略有下降(因为减少了自旋的机会)。
sync.Pool 与 GC 的交互
sync.Pool 的生命周期与 GC 紧密耦合。其内部实现使用了分 P(processor)的本地池来减少锁竞争:
// sync/pool.go(简化)
type Pool struct {
noCopy noCopy
local unsafe.Pointer // [P]poolLocal 数组
localSize uintptr
victim unsafe.Pointer // 上一轮 GC 的 local
victimSize uintptr
New func() interface{}
}
type poolLocal struct {
poolLocalInternal
pad [128 - unsafe.Sizeof(poolLocalInternal{})%128]byte // 防止 false sharing
}
type poolLocalInternal struct {
private interface{} // 只有当前 P 可以访问(无锁)
shared poolChain // 其他 P 也可以偷取(lock-free)
}
Get() 的流程:
- 固定当前 goroutine 到 P(
pin()) - 检查当前 P 的
private字段——无锁 - 如果 private 为空,从当前 P 的
shared链表头部弹出 - 如果 shared 也为空,从其他 P 的 shared 链表尾部偷取(work-stealing)
- 如果都为空,检查
victimpool(上一轮 GC 的残留) - 如果全部为空,调用
New()
GC 时的清理:
- 每次 GC 时:
victim = local; local = nil - 这意味着对象最多存活两轮 GC:第一轮从 local 移到 victim,第二轮清除 victim
- 这种 "双缓冲" 策略避免了对象在 GC 后立即被全部清除导致的性能悬崖
为什么 Pool 不适合做连接池?
// 错误用法:数据库连接池
var connPool = sync.Pool{
New: func() interface{} {
conn, _ := sql.Open("mysql", dsn)
return conn
},
}
// 问题:GC 后连接被清除,下次 Get 需要重新建连接(昂贵操作)
// 正确做法:用 sql.DB 自带的连接池,或自己实现基于 channel 的池
sync.Pool 适合的是"创建成本低但频率高"的临时对象(如 bytes.Buffer),不适合"创建成本高但频率低"的长期资源(如 DB 连接)。
sync.Once 的实现
Once 的实现看似简单,实则有精妙的性能考量:
// sync/once.go
type Once struct {
done atomic.Uint32
m Mutex
}
func (o *Once) Do(f func()) {
// 快速路径:已经执行过,直接返回
if o.done.Load() == 1 {
return
}
// 慢路径:第一次调用(或第一次正在执行中)
o.doSlow(f)
}
func (o *Once) doSlow(f func()) {
o.m.Lock()
defer o.m.Unlock()
if o.done.Load() == 0 {
defer o.done.Store(1)
f()
}
}
为什么不直接用 CAS 实现?
// 错误的实现(概念性)
func (o *Once) Do(f func()) {
if atomic.CompareAndSwapUint32(&o.done, 0, 1) {
f()
}
}
问题:如果 goroutine A 赢得了 CAS 并开始执行 f(),goroutine B 看到 done=1 后立即返回——但 f() 可能还没执行完!B 可能使用了尚未初始化完成的对象。
正确实现用 Mutex 确保:所有并发调用者都会等待 f() 完成后才返回。这比 "只执行一次" 的语义更强——它保证 "执行完成后才让其他人看到"。
Memory Model 中的 sync 保证
Go Memory Model 对 sync 包的 happens-before 保证:
- Mutex: 第 n 次
Unlock()happens-before 第 n+1 次Lock()返回 - RWMutex: 对于任何
RLock()调用,存在某个 n 使得第 n 次Unlock()happens-before 该RLock()返回,且对应的RUnlock()happens-before 第 n+1 次Lock()返回 - Once:
once.Do(f)中 f 的完成 happens-before 任何once.Do的返回 - WaitGroup:
wg.Done()happens-before 对应的wg.Wait()返回 - atomic: Go 1.19+ 明确了 atomic 操作产生 happens-before 关系(之前是未定义的)
这些保证意味着:
var data string
var mu sync.Mutex
// Goroutine A
mu.Lock()
data = "hello"
mu.Unlock()
// Goroutine B(在 A 解锁后获取锁)
mu.Lock()
fmt.Println(data) // 保证看到 "hello"
mu.Unlock()
没有 Mutex,即使 A 先执行,B 也不保证能看到 A 写入的值(因为 CPU 缓存和编译器优化)。
RWMutex 的实现细节
RWMutex 使用一个计数器追踪读者数量,加上一个 Mutex 保护写操作:
// sync/rwmutex.go(简化)
type RWMutex struct {
w Mutex // 写锁互斥
writerSem uint32 // 写者信号量
readerSem uint32 // 读者信号量
readerCount atomic.Int32 // 读者数量(可能为负)
readerWait atomic.Int32 // 等待完成的读者数量
}
const rwmutexMaxReaders = 1 << 30
RLock() 的实现:
func (rw *RWMutex) RLock() {
if rw.readerCount.Add(1) < 0 {
// 有写者在等待或持有写锁,阻塞
runtime_SemacquireRWMutexR(&rw.readerSem, false, 0)
}
}
Lock()(写锁)的实现:
func (rw *RWMutex) Lock() {
rw.w.Lock() // 排斥其他写者
// 通知读者有写者到来:将 readerCount 减去 rwmutexMaxReaders
r := rw.readerCount.Add(-rwmutexMaxReaders) + rwmutexMaxReaders
// 如果还有活跃的读者,等待它们完成
if r != 0 && rw.readerWait.Add(r) != 0 {
runtime_SemacquireRWMutex(&rw.writerSem, false, 0)
}
}
巧妙之处在于:readerCount 变为负数作为"有写者在等"的信号。新的 RLock() 调用看到负数就知道要等待。
Level 4:边界与陷阱
陷阱 1:锁拷贝(Mutex/WaitGroup 不可复制)
这是 Go 初学者最常见的错误之一:
type Service struct {
mu sync.Mutex
// ... fields
}
// 错误!值传递会复制 Mutex
func process(s Service) {
s.mu.Lock()
// ... 操作的是副本的锁,原始对象没有被保护
s.mu.Unlock()
}
// 正确:传递指针
func process(s *Service) {
s.mu.Lock()
defer s.mu.Unlock()
// ...
}
为什么锁不能复制? Mutex 的内部状态(锁是否被持有、等待队列)是特定于那个实例的。复制一个正在被持有的锁会导致两个 goroutine 同时认为自己持有了"同一把锁"——但实际上是两把不同的锁。
go vet 检测: Go 自带的 go vet 工具可以检测锁拷贝:
$ go vet ./...
# example.com/myapp
./main.go:15:17: process passes lock by value: example.com/myapp.Service contains sync.Mutex
所有 sync 类型都不可复制: Mutex、RWMutex、WaitGroup、Once、Cond、Pool、Map。
实现这个约束的机制是 noCopy 结构体(一个空 struct,实现了 sync.Locker 接口),go vet 会检测包含 noCopy 字段的结构体是否被复制。
陷阱 2:死锁模式
模式 1:自己锁自己(递归锁死)
Go 的 Mutex 不是可重入的(reentrant)——同一个 goroutine 对同一个 Mutex 加两次锁会死锁:
func (s *Service) A() {
s.mu.Lock()
defer s.mu.Unlock()
s.B() // 死锁!B 也要加锁
}
func (s *Service) B() {
s.mu.Lock() // 永远阻塞——锁已被当前 goroutine 持有
defer s.mu.Unlock()
// ...
}
为什么 Go 不做可重入锁? Russ Cox 在 Go issue #14939 中明确解释:"Recursive mutexes do not protect invariants. Mutual exclusion locks protect invariants. If the lock protects some invariant, then no reentrant call is safe to make while the invariant may be broken."
即:如果 A 修改了共享数据到一半就调用了 B,B 再获取锁进入后会看到不一致的中间状态。可重入锁掩盖了这个问题而非解决了它。
修复方式:
// 方式 1:拆分为内部不加锁版本
func (s *Service) A() {
s.mu.Lock()
defer s.mu.Unlock()
s.bLocked() // 内部版本不加锁
}
func (s *Service) B() {
s.mu.Lock()
defer s.mu.Unlock()
s.bLocked()
}
func (s *Service) bLocked() {
// 假设调用者已持有锁
// ...
}
模式 2:AB-BA 死锁
// Goroutine 1 Goroutine 2
mu1.Lock() mu2.Lock()
mu2.Lock() // 等 G2 mu1.Lock() // 等 G1
// 死锁!
修复:始终按固定顺序加锁
// 约定:总是先锁 mu1,后锁 mu2
func transferLocked(mu1, mu2 *sync.Mutex) {
// 按地址排序确保全局一致
if uintptr(unsafe.Pointer(mu1)) > uintptr(unsafe.Pointer(mu2)) {
mu1, mu2 = mu2, mu1
}
mu1.Lock()
mu2.Lock()
// ...
mu2.Unlock()
mu1.Unlock()
}
模式 3:Lock 在 goroutine 中忘记释放
func bad(mu *sync.Mutex) {
mu.Lock()
if someCondition {
return // 忘记 Unlock!
}
mu.Unlock()
}
解决:始终用 defer
陷阱 3:RWMutex 写锁饥饿
当读操作非常频繁时,写者可能永远获取不到锁:
// 场景:100 个 goroutine 不停地 RLock
// 1 个 goroutine 尝试 Lock
// 如果读者总是不间断,写者永远等不到所有读者释放的那一刻
Go 的 RWMutex 对此有保护: 当写者到来时(调用 Lock),新的读者会被阻塞(因为 readerCount 变为负数)。已有的读者可以继续完成,但不会有新读者加入。这确保了写者最终能获取锁。
但是,如果已有的读者在临界区内做了很长时间的操作(比如网络请求),写者仍然需要等待很久。
最佳实践:
- 读临界区要尽量短
- 如果读操作涉及 IO,先在锁内复制需要的数据,然后解锁后再做 IO
// 错误:在读锁内做网络请求
func (c *Cache) GetAndFetch(key string) (string, error) {
c.mu.RLock()
defer c.mu.RUnlock()
if val, ok := c.data[key]; ok {
return val, nil
}
// 在读锁内做网络请求——会阻塞写者很久
return http.Get("http://example.com/" + key)
}
// 正确:最小化临界区
func (c *Cache) GetAndFetch(key string) (string, error) {
c.mu.RLock()
val, ok := c.data[key]
c.mu.RUnlock() // 立即释放
if ok {
return val, nil
}
// 在锁外做网络请求
return http.Get("http://example.com/" + key)
}
陷阱 4:sync.Pool 的使用错误
错误 1:忘记 Reset
var bufPool = sync.Pool{
New: func() interface{} { return new(bytes.Buffer) },
}
func process(data string) string {
buf := bufPool.Get().(*bytes.Buffer)
// 忘记 buf.Reset()!
// buf 中可能还有上一次使用留下的数据
buf.WriteString(data)
result := buf.String() // 可能包含上一次的残留数据
bufPool.Put(buf)
return result
}
错误 2:Put 后继续使用
func process() {
buf := bufPool.Get().(*bytes.Buffer)
buf.Reset()
buf.WriteString("hello")
bufPool.Put(buf)
// 错误!buf 已经回到池中,可能被其他 goroutine 获取并修改
fmt.Println(buf.String()) // 数据竞争!
}
错误 3:在 Pool 中存储带指针的大对象
// 大 slice 被 Pool 引用,GC 无法回收其底层数组
var bigBufPool = sync.Pool{
New: func() interface{} {
buf := make([]byte, 0, 1<<20) // 1MB
return &buf
},
}
// 更好的做法:限制放回池中的对象大小
func putBuf(buf *[]byte) {
if cap(*buf) > 1<<20 {
return // 太大了,让 GC 回收
}
*buf = (*buf)[:0]
bigBufPool.Put(buf)
}
陷阱 5:WaitGroup 重用导致竞态
var wg sync.WaitGroup
// 第一轮
wg.Add(2)
go func() { defer wg.Done(); work1() }()
go func() { defer wg.Done(); work2() }()
wg.Wait()
// 第二轮——注意:这里必须确保第一轮完全结束后才开始第二轮
// 如果 wg.Wait() 返回后、wg.Add(2) 之前,
// 某个慢速 goroutine 的 Done() 还没执行完(race condition),
// 会 panic "sync: negative WaitGroup counter"
wg.Add(2) // 安全的:Wait 返回意味着所有 Done 已完成
实际上 Go 的 WaitGroup 实现保证了 Wait() 返回时所有 Done() 已经完成,所以上面的代码是安全的。但如果你在 Wait() 返回后和 Add() 之间有其他 goroutine 可能调用 Done()(设计错误),就会出问题。
陷阱 6:sync.Map 的类型安全
sync.Map 使用 interface{} 作为 key 和 value 类型,失去了编译期类型检查:
var m sync.Map
m.Store("count", 42)
m.Store("count", "not a number") // 运行时才发现类型不匹配
val, _ := m.Load("count")
n := val.(int) // 如果存的是 string,这里 panic
Go 1.18+ 的解决方案——用泛型包装:
type TypedMap[K comparable, V any] struct {
m sync.Map
}
func (tm *TypedMap[K, V]) Store(key K, value V) {
tm.m.Store(key, value)
}
func (tm *TypedMap[K, V]) Load(key K) (V, bool) {
val, ok := tm.m.Load(key)
if !ok {
var zero V
return zero, false
}
return val.(V), true
}
真实案例:Docker 中的死锁 Bug
Docker 曾经有一个著名的死锁 bug(docker/docker#22507):容器的 Mutex 和网络的 Mutex 形成了 AB-BA 死锁。简化版:
// container.go
func (c *Container) Stop() {
c.mu.Lock() // 锁 A
defer c.mu.Unlock()
c.network.Disconnect(c) // 内部需要锁 B
}
// network.go
func (n *Network) Disconnect(c *Container) {
n.mu.Lock() // 锁 B
defer n.mu.Unlock()
c.UpdateState() // 需要锁 A → 死锁!
}
修复方式:将锁的范围缩小,避免在持有一个锁时调用可能获取另一个锁的函数。
面试考点
-
sync.Mutex 是可重入的吗?为什么?
- 不是。可重入锁不能保护不变量——在锁内调用的函数可能看到中间状态
-
sync.Pool 中的对象什么时候被回收?
- 每次 GC 时可能被清除。具体实现是双缓冲:local → victim → 清除
-
什么场景用 sync.Map 比 Mutex+map 快?
- 读多写少,或多个 goroutine 操作不相交的 key 集合
-
如何检测锁拷贝?
go vet工具自动检测包含sync.Mutex等字段的结构体被值传递
-
Go 1.9 的 Mutex 饥饿模式解决了什么问题?
- 防止等待者被新到达的 goroutine 无限抢占。等待超过 1ms 触发饥饿模式,锁直接移交
-
sync.Once 和 atomic.CompareAndSwap 的区别?
- Once 保证函数执行完成后才让其他调用者返回;CAS 只保证只有一个执行者,不等执行完
总结
sync 包是 Go 并发工具箱中"低级但高效"的部分。它的每个原语都有明确的使用场景:
| 原语 | 核心用途 | 注意事项 |
|---|---|---|
| Mutex | 保护共享数据 | 不可重入、不可复制 |
| RWMutex | 读多写少的共享数据 | 读临界区要短 |
| WaitGroup | 等待一组 goroutine 完成 | Add 在 go 之前 |
| Once | 单次初始化 | panic 也算 "已执行" |
| Pool | 减少频繁的小对象分配 | 不适合连接池 |
| Map | 特定模式的并发 map | 读多写少时使用 |
| Cond | 等待条件成立 | 用 for 循环检查 |
| atomic | 单变量原子操作 | 不能保护多变量 |
选择准则:
- 需要传递所有权 → channel
- 需要保护共享状态 → mutex
- 需要减少 GC → Pool
- 需要等待完成 → WaitGroup
- 需要只做一次 → Once