Channel 源码:发送、接收与 select
第十七章:Channel 源码:发送、接收与 select
"不要通过共享内存来通信,要通过通信来共享内存。"
这句话出自 Rob Pike,是 Go 并发哲学的灵魂。许多程序员第一次读到这句话时,会觉得这不过是一句营销口号——毕竟最终数据还是在内存里传递,channel 怎么可能真的绕开"共享内存"?
本章将从源码层面剖析 channel 的实现,解答这个疑问,并揭示为什么 channel 在正确使用时,能让并发代码比锁更安全、更易于推理。但我们也不回避它的代价和陷阱:channel 并非银弹,在某些场景下它比 mutex 慢十倍。理解背后的原因,才能在正确的场景选择正确的工具。
Level 1 · 哲学:为什么是 CSP
并发的两种范式
在系统软件的历史上,并发有两种主流范式:
共享内存(Shared Memory):线程通过访问同一块内存来协作。你用 mutex 保护临界区,用条件变量通知等待方,用原子操作做无锁同步。C、C++、Java、Python 的 threading 模块都遵循这个模式。
问题是:共享内存将"谁拥有这块数据"的所有权变成了隐式的、运行时的概念。锁保护的不是数据本身,而是对数据的访问权。你必须记住哪些数据需要保护,在哪里加锁,在哪里解锁。遗漏一处,就是数据竞争;锁顺序错误,就是死锁。这种隐式所有权极难在大型代码库中保持正确。
消息传递(Message Passing):进程/协程通过发送消息来协作,数据的所有权随消息转移。Erlang、Elixir、Rust 的 channel 都遵循这个模式。
Go 选择的是 CSP(Communicating Sequential Processes),由 Tony Hoare 在 1978 年提出。CSP 的核心观点是:并发实体之间的协作应该通过同步通信来表达,而不是通过共享状态。
CSP 的核心洞见
CSP 并非说"不使用内存",而是说"数据的所有权应当是显式的、单一的"。当你通过 channel 发送一个值时:
- 如果发送的是值(value),接收方得到的是一份副本,双方各自拥有独立的数据,不存在共享。
- 如果发送的是指针,Go 的约定(不是强制)是:发送后,发送方不再使用该指针,所有权已转移。
这将隐式的"锁保护区"变成了显式的"所有权转移"。代码读起来更像是在描述"这份数据从 A 流向 B",而不是"在访问这份数据前先取锁"。
这就是"通过通信来共享内存"的真正含义:用通信的时序来隐含地保证所有权的唯一性,而不是用锁来显式地保护共享访问。
channel 的语义
Go 的 channel 有三种状态和两种类型:
| 类型 | 语义 |
|---|---|
无缓冲 channel(make(chan T)) |
发送方和接收方必须同时就绪,才能完成一次传递(同步 rendezvous) |
有缓冲 channel(make(chan T, n)) |
缓冲区未满时发送不阻塞;缓冲区非空时接收不阻塞 |
无缓冲 channel 是 CSP 的纯粹形态:发送和接收是同步的握手。有缓冲 channel 则引入了异步性,缓冲区充当了一个容量有限的队列。
Level 2 · 原理:hchan 的内部结构
hchan 结构体
每个 make(chan T, n) 调用都会在堆上分配一个 hchan 结构体(runtime/chan.go):
hchan {
qcount uint // 当前缓冲区中的元素数量
dataqsiz uint // 缓冲区容量(make 时的第二个参数)
buf unsafe.Pointer // 指向环形缓冲区的指针
elemsize uint16 // 单个元素的大小(字节)
closed uint32 // 是否已关闭(0=open, 1=closed)
elemtype *_type // 元素的类型信息(用于 GC 扫描)
sendx uint // 发送索引(环形缓冲区的写位置)
recvx uint // 接收索引(环形缓冲区的读位置)
recvq waitq // 等待接收的 goroutine 队列(sudog 链表)
sendq waitq // 等待发送的 goroutine 队列(sudog 链表)
lock mutex // 保护 hchan 所有字段的互斥锁
}
这个结构体揭示了一个关键事实:channel 本质上是一个带锁的环形队列,附加了两个等待队列。
环形缓冲区的内存布局
对于有缓冲 channel,buf 指向一段连续的内存区域,大小为 elemsize * dataqsiz:
buf (elemsize=8, dataqsiz=4)
recvx=1 sendx=3
↓ ↓
┌─────┬─────┬─────┬─────┐
│ [0] │ [1] │ [2] │ [3] │
│ - │ A │ B │ - │ qcount=2
└─────┴─────┴─────┴─────┘
↑
下一个出队元素
sendx 和 recvx 作为游标在数组上循环推进(通过取模实现)。这个设计的优势在于:内存连续,cache-friendly;不需要动态分配节点,只需要在固定大小的数组上移动指针。
sudog:goroutine 的等待代理
当一个 goroutine 尝试向满缓冲区发送(或从空缓冲区接收),它不能继续执行,必须"停车"(park)。这时,运行时会创建(或从池中取出)一个 sudog 结构体:
type sudog struct {
g *g // 等待的 goroutine
next *sudog // 链表下一个节点
prev *sudog // 链表上一个节点
elem unsafe.Pointer // 指向要发送/接收的数据
c *hchan // 等待的 channel
// ... 其他字段(select 相关)
}
sendq 和 recvq 都是 waitq 类型,它是一个 sudog 的双向链表:
recvq:
head → [sudog: g=G1, elem=&x1] → [sudog: g=G2, elem=&x2] → nil
↑
最先等待的 goroutine 在队头(FIFO 顺序)
sudog 从全局池(runtime.sudog pool)获取,避免频繁分配。每个 goroutine 同时最多只能等待一个 channel(select 例外,下文详述)。
三条发送路径
当执行 ch <- val 时,runtime 的 chansend 函数按优先级依次检查三种情况:
路径一:直接递交(Direct Send)
如果 recvq 不为空,说明有 goroutine 正在等待接收。这时可以绕过缓冲区,直接把数据写入等待方的内存位置:
G_sender --[直接写入 elem]--> G_receiver 的栈变量
↑
sudog.elem 指向这里
这是最快的路径,因为数据只拷贝一次(从发送方到接收方),缓冲区完全没有参与。之后 runtime 调用 goready(gp) 将接收方从 recvq 中移出并唤醒。
路径二:写入缓冲区
如果缓冲区有空位(qcount < dataqsiz),将数据拷贝到 buf[sendx],递增 sendx(取模),递增 qcount,立即返回,发送方不阻塞。
路径三:阻塞等待
缓冲区满(或无缓冲),且 recvq 为空。创建 sudog,记录发送方 goroutine 和数据地址,加入 sendq,然后调用 gopark 将当前 goroutine 挂起,释放 M(操作系统线程)去调度其他 goroutine。
三条接收路径
chanrecv 的逻辑是 chansend 的镜像:
- sendq 非空且无缓冲:直接从发送方 sudog 复制数据,唤醒发送方。
- sendq 非空且有缓冲:从缓冲区头部取出数据,将 sendq 队头的发送方的数据写入缓冲区尾部(保持 FIFO),唤醒发送方。
- 缓冲区有数据:从缓冲区取数据,不涉及任何 goroutine 切换。
- 阻塞:缓冲区空且 sendq 为空,创建 sudog 加入 recvq,调用 gopark。
goroutine 的停车与唤醒
gopark 和 goready 是 channel 阻塞机制的核心:
gopark(unlockf, lock, reason, ...):
1. 将当前 goroutine 状态从 _Grunning 改为 _Gwaiting
2. 调用 unlockf 释放 hchan.lock(原子地"停车同时解锁")
3. 调用 schedule(),让出 M,让调度器运行其他 goroutine
goready(gp, ...):
1. 将 gp 状态从 _Gwaiting 改为 _Grunnable
2. 将 gp 放入当前 P 的本地运行队列(或全局队列)
3. 如果有空闲 P,可能触发 newproc1 来唤醒更多 M
关键点:goroutine 的"阻塞"不是线程阻塞,而是协作式地让出执行权。阻塞的 goroutine 的 M 会继续执行其他 goroutine,这是 Go 并发模型高效的基础。
Level 3 · 代码实践
模式一:Pipeline(流水线)
Pipeline 是 channel 最经典的用法,将处理过程分解为独立的阶段:
package main
import "fmt"
// generate 产生整数序列
func generate(nums ...int) <-chan int {
out := make(chan int)
go func() {
for _, n := range nums {
out <- n
}
close(out)
}()
return out
}
// square 对每个整数求平方
func square(in <-chan int) <-chan int {
out := make(chan int)
go func() {
for n := range in {
out <- n * n
}
close(out)
}()
return out
}
func main() {
// 搭建 pipeline: generate -> square -> square
c := generate(2, 3, 4, 5)
c = square(c)
c = square(c)
for n := range c {
fmt.Println(n) // 16, 81, 256, 625
}
}
Pipeline 的关键规则:生产者负责 close channel,消费者用 range 迭代(range 在 channel 关闭后自动退出)。永远不要在接收方关闭 channel,否则发送方会 panic。
模式二:Fan-out(扇出)与 Fan-in(扇入)
Fan-out 将一个 channel 的工作分发给多个 worker,Fan-in 将多个 channel 的结果合并:
package main
import (
"fmt"
"sync"
)
// fanOut 将输入分发给 numWorkers 个 worker
func fanOut(in <-chan int, numWorkers int) []<-chan int {
outputs := make([]<-chan int, numWorkers)
for i := 0; i < numWorkers; i++ {
outputs[i] = worker(in)
}
return outputs
}
func worker(in <-chan int) <-chan int {
out := make(chan int)
go func() {
for n := range in {
out <- n * n // 模拟计算密集型任务
}
close(out)
}()
return out
}
// fanIn 将多个 channel 合并为一个
func fanIn(inputs ...<-chan int) <-chan int {
var wg sync.WaitGroup
merged := make(chan int)
output := func(c <-chan int) {
defer wg.Done()
for n := range c {
merged <- n
}
}
wg.Add(len(inputs))
for _, c := range inputs {
go output(c)
}
// 所有 input 关闭后,关闭 merged
go func() {
wg.Wait()
close(merged)
}()
return merged
}
func main() {
in := make(chan int)
go func() {
for i := 0; i < 10; i++ {
in <- i
}
close(in)
}()
outputs := fanOut(in, 3)
merged := fanIn(outputs...)
for n := range merged {
fmt.Println(n)
}
}
模式三:超时与取消(select + time.After)
package main
import (
"context"
"fmt"
"time"
)
func fetchData(ctx context.Context) (string, error) {
result := make(chan string, 1)
go func() {
// 模拟耗时操作
time.Sleep(200 * time.Millisecond)
result <- "data from server"
}()
select {
case data := <-result:
return data, nil
case <-ctx.Done():
return "", ctx.Err() // context.DeadlineExceeded 或 Canceled
}
}
func main() {
// 方式一:time.After(简单场景)
ch := make(chan int, 1)
select {
case v := <-ch:
fmt.Println("received:", v)
case <-time.After(100 * time.Millisecond):
fmt.Println("timeout!")
}
// 方式二:context(推荐,可以传递取消信号)
ctx, cancel := context.WithTimeout(context.Background(), 500*time.Millisecond)
defer cancel()
data, err := fetchData(ctx)
if err != nil {
fmt.Println("error:", err)
return
}
fmt.Println("got:", data)
}
注意:time.After 每次调用都会创建一个新的 Timer,在高频循环中使用会造成 Timer 泄漏(直到触发才被 GC)。在循环中应使用 time.NewTimer + timer.Reset:
timer := time.NewTimer(100 * time.Millisecond)
defer timer.Stop()
for {
timer.Reset(100 * time.Millisecond)
select {
case v := <-ch:
_ = v
case <-timer.C:
// timeout
}
}
检测 channel 泄漏
channel 泄漏是指:goroutine 永远阻塞在 channel 的发送或接收上,无法退出,导致 goroutine 数量持续增长。
// 泄漏示例:没有退出机制
func leaky(ch chan int) {
for {
v := <-ch // 如果没有人发送数据,这里永远阻塞
_ = v
}
}
// 正确写法:带 done channel 的退出机制
func notLeaky(ch <-chan int, done <-chan struct{}) {
for {
select {
case v := <-ch:
_ = v
case <-done:
return // 外部通知退出
}
}
}
检测泄漏的工具:
import "runtime"
// 打印当前 goroutine 数量
fmt.Println(runtime.NumGoroutine())
// 使用 goleak 库(推荐用于测试)
// go get go.uber.org/goleak
func TestNoLeak(t *testing.T) {
defer goleak.VerifyNone(t)
// ... 测试代码
}
向已关闭的 channel 发送数据会 panic
ch := make(chan int, 1)
close(ch)
ch <- 1 // panic: send on closed channel
// 安全发送模式(使用 recover,但不推荐作为常规手段)
func safeSend(ch chan int, val int) (closed bool) {
defer func() {
if r := recover(); r != nil {
closed = true
}
}()
ch <- val
return false
}
更好的做法是从架构上保证只有发送方负责关闭,接收方永远不关闭 channel。使用 sync.Once 确保只关闭一次:
type SafeChan struct {
ch chan int
once sync.Once
}
func (s *SafeChan) Close() {
s.once.Do(func() { close(s.ch) })
}
Level 4 · 进阶:select 内部机制与性能陷阱
select 的随机化原理
当多个 case 同时就绪时,Go 的 select 随机选择一个,而非按代码顺序选择第一个。这个随机化是有意为之的,目的是避免饥饿(starvation):如果 select 总是选第一个就绪的 case,某些 channel 可能永远得不到处理机会。
// 演示 select 的随机性
ch1 := make(chan string, 1)
ch2 := make(chan string, 1)
ch1 <- "one"
ch2 <- "two"
// 多次运行,select 会随机选择 ch1 或 ch2
select {
case v := <-ch1:
fmt.Println("ch1:", v)
case v := <-ch2:
fmt.Println("ch2:", v)
}
runtime 实现中(runtime/select.go 的 selectgo 函数),select 的执行流程:
- lockAll:对所有参与 select 的 channel 加锁(按地址排序,避免死锁)。
- 扫描就绪 case:遍历所有 case,检查是否有可以立即执行的(channel 有数据/空间)。
- 随机打乱 case 顺序:使用
fastrandn生成随机排列。 - 有就绪 case:随机选一个,执行,unlockAll,返回。
- 没有就绪 case(且无 default):为每个 case 创建 sudog,加入对应 channel 的等待队列,unlockAll,gopark 挂起。
- 唤醒后:从所有等待队列中移除 sudog,执行被选中的 case,unlockAll,返回。
select 的加锁顺序(按 hchan 地址排序):
case <-ch3 (addr: 0xc00001a080)
case <-ch1 (addr: 0xc00001a0a0) 排序后:ch1, ch2, ch3
case <-ch2 (addr: 0xc00001a060) ↑
按地址从小到大加锁
这个排序至关重要:如果两个 goroutine 各自运行 select 并选择相同的 channel 集合,如果加锁顺序不一致,就会死锁。按地址排序保证了全局一致的加锁顺序。
nil channel 在 select 中的妙用
向 nil channel 发送或接收会永久阻塞。但在 select 中,nil channel 的 case 会被永远跳过(不会被选中)。利用这个特性,可以动态地启用/禁用某个 case:
func merge(ch1, ch2 <-chan int) <-chan int {
out := make(chan int)
go func() {
defer close(out)
for ch1 != nil || ch2 != nil {
select {
case v, ok := <-ch1:
if !ok {
ch1 = nil // 将已关闭的 channel 设为 nil,禁用这个 case
continue
}
out <- v
case v, ok := <-ch2:
if !ok {
ch2 = nil
continue
}
out <- v
}
}
}()
return out
}
这是一个优雅的 fan-in 实现:当某个 channel 关闭时,将其设为 nil,select 就不再尝试从它接收,避免了无限接收零值的问题。
channel vs mutex:性能对比与选型
channel 和 mutex 都可以实现并发安全,但性能特征完全不同:
// 基准测试:有缓冲 channel vs mutex 计数器
// BenchmarkChannel-8 5000000 302 ns/op
// BenchmarkMutex-8 20000000 62 ns/op
// channel 实现的计数器
func chanCounter(n int) {
ch := make(chan int, 1)
ch <- 0
var wg sync.WaitGroup
for i := 0; i < n; i++ {
wg.Add(1)
go func() {
defer wg.Done()
v := <-ch
ch <- v + 1
}()
}
wg.Wait()
}
// mutex 实现的计数器
func mutexCounter(n int) {
var mu sync.Mutex
count := 0
var wg sync.WaitGroup
for i := 0; i < n; i++ {
wg.Add(1)
go func() {
defer wg.Done()
mu.Lock()
count++
mu.Unlock()
}()
}
wg.Wait()
}
为什么 channel 慢于 mutex(在计数器这个场景)?
- channel 操作涉及更多内存操作:数据要从栈拷贝到环形缓冲区,再拷贝回来。mutex 只需要修改一个整型的计数器。
- channel 可能触发调度器:阻塞时会调用 gopark/goready,涉及上下文切换。mutex 在无竞争时只是一次原子操作。
- channel 需要持有锁:hchan.lock 是一个完整的互斥锁,而很多 mutex 实现在无竞争时可以用 CAS 完成。
选型原则:
| 场景 | 推荐 |
|---|---|
| 传递数据所有权 | channel |
| goroutine 协调(通知、信号) | channel |
| 保护共享状态(简单计数器、缓存) | mutex |
| 并行计算结果收集 | channel(fan-in) |
| 高频、低延迟的临界区保护 | mutex 或 atomic |
高性能场景:无锁环形队列
当 channel 的开销成为瓶颈时(每秒数百万操作),可以考虑使用基于 CAS 的无锁环形队列:
// 简化版无锁单生产者单消费者队列(SPSC)
type RingBuffer struct {
buf []int64
head uint64 // 消费者读取位置(原子)
_ [56]byte // cache line padding
tail uint64 // 生产者写入位置(原子)
_ [56]byte
}
func NewRingBuffer(size uint64) *RingBuffer {
return &RingBuffer{buf: make([]int64, size)}
}
func (r *RingBuffer) Push(val int64) bool {
tail := atomic.LoadUint64(&r.tail)
head := atomic.LoadUint64(&r.head)
if tail-head >= uint64(len(r.buf)) {
return false // full
}
r.buf[tail%uint64(len(r.buf))] = val
atomic.StoreUint64(&r.tail, tail+1)
return true
}
func (r *RingBuffer) Pop() (int64, bool) {
head := atomic.LoadUint64(&r.head)
tail := atomic.LoadUint64(&r.tail)
if head >= tail {
return 0, false // empty
}
val := r.buf[head%uint64(len(r.buf))]
atomic.StoreUint64(&r.head, head+1)
return val, true
}
注意 [56]byte 的 padding:这是为了防止 head 和 tail 落在同一个 CPU cache line(64 字节)上,避免 false sharing。这种细节在高性能场景下可以带来 2-3 倍的性能提升。
select 与 goroutine 泄漏的微妙关系
一个常见的陷阱:select 在多个 goroutine 等待同一个 channel 时,只有一个能收到值:
// 错误:N 个 goroutine 竞争一个 done channel,close 后全部退出
done := make(chan struct{})
for i := 0; i < 5; i++ {
go func(id int) {
select {
case <-done:
fmt.Printf("goroutine %d exiting\n", id)
}
}(i)
}
// close 会广播给所有等待的 goroutine(从已关闭 channel 接收立即返回零值)
close(done) // 正确!所有 goroutine 都会退出
关闭 channel 是一种广播机制:所有等待在该 channel 上的 goroutine 都会被唤醒并收到零值。这与发送一个值(只有一个 goroutine 收到)有本质区别。context.Context 的取消机制正是基于这个特性实现的。
小结
Channel 是 Go 并发模型的核心抽象。从实现层面看,它是一个带锁的环形队列 + 两个等待队列;从语义层面看,它是所有权转移的载体,将隐式的锁保护转换为显式的数据流动。
理解 hchan 的三条发送路径(直接递交、写入缓冲区、阻塞等待)和 select 的 lockAll/随机化机制,能帮助你写出正确且高效的并发代码。在性能敏感的场景中,channel 的开销(锁 + 可能的调度切换)可能是瓶颈,这时应优先考虑 mutex 或原子操作。
记住两条核心规则:发送方负责关闭 channel;每个 goroutine 都要有退出路径。