第 17 章

Channel 源码:发送、接收与 select

第十七章:Channel 源码:发送、接收与 select

"不要通过共享内存来通信,要通过通信来共享内存。"

这句话出自 Rob Pike,是 Go 并发哲学的灵魂。许多程序员第一次读到这句话时,会觉得这不过是一句营销口号——毕竟最终数据还是在内存里传递,channel 怎么可能真的绕开"共享内存"?

本章将从源码层面剖析 channel 的实现,解答这个疑问,并揭示为什么 channel 在正确使用时,能让并发代码比锁更安全、更易于推理。但我们也不回避它的代价和陷阱:channel 并非银弹,在某些场景下它比 mutex 慢十倍。理解背后的原因,才能在正确的场景选择正确的工具。


Level 1 · 哲学:为什么是 CSP

并发的两种范式

在系统软件的历史上,并发有两种主流范式:

共享内存(Shared Memory):线程通过访问同一块内存来协作。你用 mutex 保护临界区,用条件变量通知等待方,用原子操作做无锁同步。C、C++、Java、Python 的 threading 模块都遵循这个模式。

问题是:共享内存将"谁拥有这块数据"的所有权变成了隐式的、运行时的概念。锁保护的不是数据本身,而是对数据的访问权。你必须记住哪些数据需要保护,在哪里加锁,在哪里解锁。遗漏一处,就是数据竞争;锁顺序错误,就是死锁。这种隐式所有权极难在大型代码库中保持正确。

消息传递(Message Passing):进程/协程通过发送消息来协作,数据的所有权随消息转移。Erlang、Elixir、Rust 的 channel 都遵循这个模式。

Go 选择的是 CSP(Communicating Sequential Processes),由 Tony Hoare 在 1978 年提出。CSP 的核心观点是:并发实体之间的协作应该通过同步通信来表达,而不是通过共享状态。

CSP 的核心洞见

CSP 并非说"不使用内存",而是说"数据的所有权应当是显式的、单一的"。当你通过 channel 发送一个值时:

  1. 如果发送的是值(value),接收方得到的是一份副本,双方各自拥有独立的数据,不存在共享。
  2. 如果发送的是指针,Go 的约定(不是强制)是:发送后,发送方不再使用该指针,所有权已转移。

这将隐式的"锁保护区"变成了显式的"所有权转移"。代码读起来更像是在描述"这份数据从 A 流向 B",而不是"在访问这份数据前先取锁"。

这就是"通过通信来共享内存"的真正含义:用通信的时序来隐含地保证所有权的唯一性,而不是用锁来显式地保护共享访问。

channel 的语义

Go 的 channel 有三种状态和两种类型:

类型 语义
无缓冲 channelmake(chan T) 发送方和接收方必须同时就绪,才能完成一次传递(同步 rendezvous)
有缓冲 channelmake(chan T, n) 缓冲区未满时发送不阻塞;缓冲区非空时接收不阻塞

无缓冲 channel 是 CSP 的纯粹形态:发送和接收是同步的握手。有缓冲 channel 则引入了异步性,缓冲区充当了一个容量有限的队列。


Level 2 · 原理:hchan 的内部结构

hchan 结构体

每个 make(chan T, n) 调用都会在堆上分配一个 hchan 结构体(runtime/chan.go):

hchan {
    qcount   uint           // 当前缓冲区中的元素数量
    dataqsiz uint           // 缓冲区容量(make 时的第二个参数)
    buf      unsafe.Pointer // 指向环形缓冲区的指针
    elemsize uint16         // 单个元素的大小(字节)
    closed   uint32         // 是否已关闭(0=open, 1=closed)
    elemtype *_type         // 元素的类型信息(用于 GC 扫描)
    sendx    uint           // 发送索引(环形缓冲区的写位置)
    recvx    uint           // 接收索引(环形缓冲区的读位置)
    recvq    waitq          // 等待接收的 goroutine 队列(sudog 链表)
    sendq    waitq          // 等待发送的 goroutine 队列(sudog 链表)
    lock     mutex          // 保护 hchan 所有字段的互斥锁
}

这个结构体揭示了一个关键事实:channel 本质上是一个带锁的环形队列,附加了两个等待队列

环形缓冲区的内存布局

对于有缓冲 channel,buf 指向一段连续的内存区域,大小为 elemsize * dataqsiz

buf (elemsize=8, dataqsiz=4)

  recvx=1          sendx=3
     ↓                 ↓
  ┌─────┬─────┬─────┬─────┐
  │ [0] │ [1] │ [2] │ [3] │
  │  -  │  A  │  B  │  -  │   qcount=2
  └─────┴─────┴─────┴─────┘
           ↑
       下一个出队元素

sendx 和 recvx 作为游标在数组上循环推进(通过取模实现)。这个设计的优势在于:内存连续,cache-friendly;不需要动态分配节点,只需要在固定大小的数组上移动指针。

sudog:goroutine 的等待代理

当一个 goroutine 尝试向满缓冲区发送(或从空缓冲区接收),它不能继续执行,必须"停车"(park)。这时,运行时会创建(或从池中取出)一个 sudog 结构体:

type sudog struct {
    g        *g            // 等待的 goroutine
    next     *sudog        // 链表下一个节点
    prev     *sudog        // 链表上一个节点
    elem     unsafe.Pointer // 指向要发送/接收的数据
    c        *hchan        // 等待的 channel
    // ... 其他字段(select 相关)
}

sendqrecvq 都是 waitq 类型,它是一个 sudog 的双向链表:

recvq:
  head → [sudog: g=G1, elem=&x1] → [sudog: g=G2, elem=&x2] → nil
         ↑
       最先等待的 goroutine 在队头(FIFO 顺序)

sudog 从全局池(runtime.sudog pool)获取,避免频繁分配。每个 goroutine 同时最多只能等待一个 channel(select 例外,下文详述)。

三条发送路径

当执行 ch <- val 时,runtime 的 chansend 函数按优先级依次检查三种情况:

路径一:直接递交(Direct Send)

如果 recvq 不为空,说明有 goroutine 正在等待接收。这时可以绕过缓冲区,直接把数据写入等待方的内存位置:

G_sender  --[直接写入 elem]--> G_receiver 的栈变量
                                     ↑
                              sudog.elem 指向这里

这是最快的路径,因为数据只拷贝一次(从发送方到接收方),缓冲区完全没有参与。之后 runtime 调用 goready(gp) 将接收方从 recvq 中移出并唤醒。

路径二:写入缓冲区

如果缓冲区有空位(qcount < dataqsiz),将数据拷贝到 buf[sendx],递增 sendx(取模),递增 qcount,立即返回,发送方不阻塞。

路径三:阻塞等待

缓冲区满(或无缓冲),且 recvq 为空。创建 sudog,记录发送方 goroutine 和数据地址,加入 sendq,然后调用 gopark 将当前 goroutine 挂起,释放 M(操作系统线程)去调度其他 goroutine。

三条接收路径

chanrecv 的逻辑是 chansend 的镜像:

  1. sendq 非空且无缓冲:直接从发送方 sudog 复制数据,唤醒发送方。
  2. sendq 非空且有缓冲:从缓冲区头部取出数据,将 sendq 队头的发送方的数据写入缓冲区尾部(保持 FIFO),唤醒发送方。
  3. 缓冲区有数据:从缓冲区取数据,不涉及任何 goroutine 切换。
  4. 阻塞:缓冲区空且 sendq 为空,创建 sudog 加入 recvq,调用 gopark。

goroutine 的停车与唤醒

goparkgoready 是 channel 阻塞机制的核心:

gopark(unlockf, lock, reason, ...):
  1. 将当前 goroutine 状态从 _Grunning 改为 _Gwaiting
  2. 调用 unlockf 释放 hchan.lock(原子地"停车同时解锁")
  3. 调用 schedule(),让出 M,让调度器运行其他 goroutine

goready(gp, ...):
  1. 将 gp 状态从 _Gwaiting 改为 _Grunnable
  2. 将 gp 放入当前 P 的本地运行队列(或全局队列)
  3. 如果有空闲 P,可能触发 newproc1 来唤醒更多 M

关键点:goroutine 的"阻塞"不是线程阻塞,而是协作式地让出执行权。阻塞的 goroutine 的 M 会继续执行其他 goroutine,这是 Go 并发模型高效的基础。


Level 3 · 代码实践

模式一:Pipeline(流水线)

Pipeline 是 channel 最经典的用法,将处理过程分解为独立的阶段:

package main

import "fmt"

// generate 产生整数序列
func generate(nums ...int) <-chan int {
    out := make(chan int)
    go func() {
        for _, n := range nums {
            out <- n
        }
        close(out)
    }()
    return out
}

// square 对每个整数求平方
func square(in <-chan int) <-chan int {
    out := make(chan int)
    go func() {
        for n := range in {
            out <- n * n
        }
        close(out)
    }()
    return out
}

func main() {
    // 搭建 pipeline: generate -> square -> square
    c := generate(2, 3, 4, 5)
    c = square(c)
    c = square(c)

    for n := range c {
        fmt.Println(n)  // 16, 81, 256, 625
    }
}

Pipeline 的关键规则:生产者负责 close channel,消费者用 range 迭代(range 在 channel 关闭后自动退出)。永远不要在接收方关闭 channel,否则发送方会 panic。

模式二:Fan-out(扇出)与 Fan-in(扇入)

Fan-out 将一个 channel 的工作分发给多个 worker,Fan-in 将多个 channel 的结果合并:

package main

import (
    "fmt"
    "sync"
)

// fanOut 将输入分发给 numWorkers 个 worker
func fanOut(in <-chan int, numWorkers int) []<-chan int {
    outputs := make([]<-chan int, numWorkers)
    for i := 0; i < numWorkers; i++ {
        outputs[i] = worker(in)
    }
    return outputs
}

func worker(in <-chan int) <-chan int {
    out := make(chan int)
    go func() {
        for n := range in {
            out <- n * n  // 模拟计算密集型任务
        }
        close(out)
    }()
    return out
}

// fanIn 将多个 channel 合并为一个
func fanIn(inputs ...<-chan int) <-chan int {
    var wg sync.WaitGroup
    merged := make(chan int)

    output := func(c <-chan int) {
        defer wg.Done()
        for n := range c {
            merged <- n
        }
    }

    wg.Add(len(inputs))
    for _, c := range inputs {
        go output(c)
    }

    // 所有 input 关闭后,关闭 merged
    go func() {
        wg.Wait()
        close(merged)
    }()

    return merged
}

func main() {
    in := make(chan int)
    go func() {
        for i := 0; i < 10; i++ {
            in <- i
        }
        close(in)
    }()

    outputs := fanOut(in, 3)
    merged := fanIn(outputs...)

    for n := range merged {
        fmt.Println(n)
    }
}

模式三:超时与取消(select + time.After)

package main

import (
    "context"
    "fmt"
    "time"
)

func fetchData(ctx context.Context) (string, error) {
    result := make(chan string, 1)

    go func() {
        // 模拟耗时操作
        time.Sleep(200 * time.Millisecond)
        result <- "data from server"
    }()

    select {
    case data := <-result:
        return data, nil
    case <-ctx.Done():
        return "", ctx.Err()  // context.DeadlineExceeded 或 Canceled
    }
}

func main() {
    // 方式一:time.After(简单场景)
    ch := make(chan int, 1)
    select {
    case v := <-ch:
        fmt.Println("received:", v)
    case <-time.After(100 * time.Millisecond):
        fmt.Println("timeout!")
    }

    // 方式二:context(推荐,可以传递取消信号)
    ctx, cancel := context.WithTimeout(context.Background(), 500*time.Millisecond)
    defer cancel()

    data, err := fetchData(ctx)
    if err != nil {
        fmt.Println("error:", err)
        return
    }
    fmt.Println("got:", data)
}

注意time.After 每次调用都会创建一个新的 Timer,在高频循环中使用会造成 Timer 泄漏(直到触发才被 GC)。在循环中应使用 time.NewTimer + timer.Reset

timer := time.NewTimer(100 * time.Millisecond)
defer timer.Stop()

for {
    timer.Reset(100 * time.Millisecond)
    select {
    case v := <-ch:
        _ = v
    case <-timer.C:
        // timeout
    }
}

检测 channel 泄漏

channel 泄漏是指:goroutine 永远阻塞在 channel 的发送或接收上,无法退出,导致 goroutine 数量持续增长。

// 泄漏示例:没有退出机制
func leaky(ch chan int) {
    for {
        v := <-ch  // 如果没有人发送数据,这里永远阻塞
        _ = v
    }
}

// 正确写法:带 done channel 的退出机制
func notLeaky(ch <-chan int, done <-chan struct{}) {
    for {
        select {
        case v := <-ch:
            _ = v
        case <-done:
            return  // 外部通知退出
        }
    }
}

检测泄漏的工具:

import "runtime"

// 打印当前 goroutine 数量
fmt.Println(runtime.NumGoroutine())

// 使用 goleak 库(推荐用于测试)
// go get go.uber.org/goleak
func TestNoLeak(t *testing.T) {
    defer goleak.VerifyNone(t)
    // ... 测试代码
}

向已关闭的 channel 发送数据会 panic

ch := make(chan int, 1)
close(ch)
ch <- 1  // panic: send on closed channel

// 安全发送模式(使用 recover,但不推荐作为常规手段)
func safeSend(ch chan int, val int) (closed bool) {
    defer func() {
        if r := recover(); r != nil {
            closed = true
        }
    }()
    ch <- val
    return false
}

更好的做法是从架构上保证只有发送方负责关闭,接收方永远不关闭 channel。使用 sync.Once 确保只关闭一次:

type SafeChan struct {
    ch   chan int
    once sync.Once
}

func (s *SafeChan) Close() {
    s.once.Do(func() { close(s.ch) })
}

Level 4 · 进阶:select 内部机制与性能陷阱

select 的随机化原理

当多个 case 同时就绪时,Go 的 select 随机选择一个,而非按代码顺序选择第一个。这个随机化是有意为之的,目的是避免饥饿(starvation):如果 select 总是选第一个就绪的 case,某些 channel 可能永远得不到处理机会。

// 演示 select 的随机性
ch1 := make(chan string, 1)
ch2 := make(chan string, 1)
ch1 <- "one"
ch2 <- "two"

// 多次运行,select 会随机选择 ch1 或 ch2
select {
case v := <-ch1:
    fmt.Println("ch1:", v)
case v := <-ch2:
    fmt.Println("ch2:", v)
}

runtime 实现中(runtime/select.goselectgo 函数),select 的执行流程:

  1. lockAll:对所有参与 select 的 channel 加锁(按地址排序,避免死锁)。
  2. 扫描就绪 case:遍历所有 case,检查是否有可以立即执行的(channel 有数据/空间)。
  3. 随机打乱 case 顺序:使用 fastrandn 生成随机排列。
  4. 有就绪 case:随机选一个,执行,unlockAll,返回。
  5. 没有就绪 case(且无 default):为每个 case 创建 sudog,加入对应 channel 的等待队列,unlockAll,gopark 挂起。
  6. 唤醒后:从所有等待队列中移除 sudog,执行被选中的 case,unlockAll,返回。
select 的加锁顺序(按 hchan 地址排序):

case <-ch3  (addr: 0xc00001a080)
case <-ch1  (addr: 0xc00001a0a0)   排序后:ch1, ch2, ch3
case <-ch2  (addr: 0xc00001a060)        ↑
                                    按地址从小到大加锁

这个排序至关重要:如果两个 goroutine 各自运行 select 并选择相同的 channel 集合,如果加锁顺序不一致,就会死锁。按地址排序保证了全局一致的加锁顺序。

nil channel 在 select 中的妙用

向 nil channel 发送或接收会永久阻塞。但在 select 中,nil channel 的 case 会被永远跳过(不会被选中)。利用这个特性,可以动态地启用/禁用某个 case:

func merge(ch1, ch2 <-chan int) <-chan int {
    out := make(chan int)
    go func() {
        defer close(out)
        for ch1 != nil || ch2 != nil {
            select {
            case v, ok := <-ch1:
                if !ok {
                    ch1 = nil  // 将已关闭的 channel 设为 nil,禁用这个 case
                    continue
                }
                out <- v
            case v, ok := <-ch2:
                if !ok {
                    ch2 = nil
                    continue
                }
                out <- v
            }
        }
    }()
    return out
}

这是一个优雅的 fan-in 实现:当某个 channel 关闭时,将其设为 nil,select 就不再尝试从它接收,避免了无限接收零值的问题。

channel vs mutex:性能对比与选型

channel 和 mutex 都可以实现并发安全,但性能特征完全不同:

// 基准测试:有缓冲 channel vs mutex 计数器
// BenchmarkChannel-8    5000000    302 ns/op
// BenchmarkMutex-8     20000000     62 ns/op

// channel 实现的计数器
func chanCounter(n int) {
    ch := make(chan int, 1)
    ch <- 0
    var wg sync.WaitGroup
    for i := 0; i < n; i++ {
        wg.Add(1)
        go func() {
            defer wg.Done()
            v := <-ch
            ch <- v + 1
        }()
    }
    wg.Wait()
}

// mutex 实现的计数器
func mutexCounter(n int) {
    var mu sync.Mutex
    count := 0
    var wg sync.WaitGroup
    for i := 0; i < n; i++ {
        wg.Add(1)
        go func() {
            defer wg.Done()
            mu.Lock()
            count++
            mu.Unlock()
        }()
    }
    wg.Wait()
}

为什么 channel 慢于 mutex(在计数器这个场景)?

  1. channel 操作涉及更多内存操作:数据要从栈拷贝到环形缓冲区,再拷贝回来。mutex 只需要修改一个整型的计数器。
  2. channel 可能触发调度器:阻塞时会调用 gopark/goready,涉及上下文切换。mutex 在无竞争时只是一次原子操作。
  3. channel 需要持有锁:hchan.lock 是一个完整的互斥锁,而很多 mutex 实现在无竞争时可以用 CAS 完成。

选型原则

场景 推荐
传递数据所有权 channel
goroutine 协调(通知、信号) channel
保护共享状态(简单计数器、缓存) mutex
并行计算结果收集 channel(fan-in)
高频、低延迟的临界区保护 mutex 或 atomic

高性能场景:无锁环形队列

当 channel 的开销成为瓶颈时(每秒数百万操作),可以考虑使用基于 CAS 的无锁环形队列:

// 简化版无锁单生产者单消费者队列(SPSC)
type RingBuffer struct {
    buf  []int64
    head uint64  // 消费者读取位置(原子)
    _    [56]byte // cache line padding
    tail uint64  // 生产者写入位置(原子)
    _    [56]byte
}

func NewRingBuffer(size uint64) *RingBuffer {
    return &RingBuffer{buf: make([]int64, size)}
}

func (r *RingBuffer) Push(val int64) bool {
    tail := atomic.LoadUint64(&r.tail)
    head := atomic.LoadUint64(&r.head)
    if tail-head >= uint64(len(r.buf)) {
        return false  // full
    }
    r.buf[tail%uint64(len(r.buf))] = val
    atomic.StoreUint64(&r.tail, tail+1)
    return true
}

func (r *RingBuffer) Pop() (int64, bool) {
    head := atomic.LoadUint64(&r.head)
    tail := atomic.LoadUint64(&r.tail)
    if head >= tail {
        return 0, false  // empty
    }
    val := r.buf[head%uint64(len(r.buf))]
    atomic.StoreUint64(&r.head, head+1)
    return val, true
}

注意 [56]byte 的 padding:这是为了防止 head 和 tail 落在同一个 CPU cache line(64 字节)上,避免 false sharing。这种细节在高性能场景下可以带来 2-3 倍的性能提升。

select 与 goroutine 泄漏的微妙关系

一个常见的陷阱:select 在多个 goroutine 等待同一个 channel 时,只有一个能收到值:

// 错误:N 个 goroutine 竞争一个 done channel,close 后全部退出
done := make(chan struct{})
for i := 0; i < 5; i++ {
    go func(id int) {
        select {
        case <-done:
            fmt.Printf("goroutine %d exiting\n", id)
        }
    }(i)
}

// close 会广播给所有等待的 goroutine(从已关闭 channel 接收立即返回零值)
close(done)  // 正确!所有 goroutine 都会退出

关闭 channel 是一种广播机制:所有等待在该 channel 上的 goroutine 都会被唤醒并收到零值。这与发送一个值(只有一个 goroutine 收到)有本质区别。context.Context 的取消机制正是基于这个特性实现的。


小结

Channel 是 Go 并发模型的核心抽象。从实现层面看,它是一个带锁的环形队列 + 两个等待队列;从语义层面看,它是所有权转移的载体,将隐式的锁保护转换为显式的数据流动。

理解 hchan 的三条发送路径(直接递交、写入缓冲区、阻塞等待)和 select 的 lockAll/随机化机制,能帮助你写出正确且高效的并发代码。在性能敏感的场景中,channel 的开销(锁 + 可能的调度切换)可能是瓶颈,这时应优先考虑 mutex 或原子操作。

记住两条核心规则:发送方负责关闭 channel每个 goroutine 都要有退出路径

本章评分
4.9  / 5  (17 评分)

💬 留言讨论