第 10 章

Channel:goroutine 间的通信

Channel — goroutine 间的通信

Go 语言最著名的一句格言是:"Don't communicate by sharing memory; share memory by communicating."(不要通过共享内存来通信,而要通过通信来共享内存。)这句话出自 Rob Pike 在 2012 年的演讲"Concurrency is not Parallelism",它的理论根基来自 Tony Hoare 1978 年发表的论文"Communicating Sequential Processes"(CSP)。Channel 正是这一哲学在 Go 中的核心实现——它让 goroutine 之间通过消息传递而非锁来协调工作。

这一章,我们从 channel 的基本操作出发,逐步深入到 select 多路复用、经典并发模式,最终剖析 channel 使用中的各种陷阱。理解 channel 是写好 Go 并发程序的关键一步。

Level 1:你需要知道的

Channel 是什么

Channel 是 Go 语言中用于 goroutine 之间通信的管道。你可以把它想象成一条传送带:一端放东西(send),另一端取东西(receive)。这条传送带有类型——只能传送特定类型的数据。

// 创建一个传递 int 类型数据的 channel
ch := make(chan int)

Channel 的核心特性是同步——默认情况下,发送和接收操作会阻塞,直到对方准备好。这种同步机制天然地保证了数据安全,不需要额外的锁。

创建 Channel:make

Channel 是引用类型,必须用 make 创建后才能使用。未初始化的 channel 是 nil,对 nil channel 的操作有特殊行为(Level 4 会详细讨论)。

// 无缓冲 channel(synchronous)
ch1 := make(chan int)

// 有缓冲 channel(capacity = 5)
ch2 := make(chan int, 5)

// 只声明不初始化——这是 nil channel
var ch3 chan int // ch3 == nil

发送与接收:<- 操作符

Go 用 <- 操作符来表示数据的流动方向:

ch <- value  // 发送:数据流入 channel
v := <-ch    // 接收:数据从 channel 流出
<-ch         // 接收并丢弃值(常用于等待信号)

一个完整的例子:

func main() {
    ch := make(chan string)

    go func() {
        ch <- "hello from goroutine"
    }()

    msg := <-ch
    fmt.Println(msg) // 输出:hello from goroutine
}

这里 main goroutine 在 <-ch 处阻塞,直到另一个 goroutine 发送数据。这种阻塞不是浪费——它是一种同步点,确保发送方和接收方在同一时刻交换数据。

无缓冲 vs 有缓冲

这是 channel 最重要的区分。

无缓冲 channel(Unbuffered): 发送操作必须等待接收方准备好,反之亦然。它像一个没有中间存储的交接窗口——你必须手递手把东西交出去。

ch := make(chan int) // 无缓冲

// goroutine A
ch <- 42 // 阻塞,直到有人接收

// goroutine B
v := <-ch // 阻塞,直到有人发送

无缓冲 channel 提供了最强的同步保证:当发送操作完成时,你可以确定接收方已经拿到了数据。

有缓冲 channel(Buffered): 内部有一个固定大小的队列。发送操作只在队列满时才阻塞,接收操作只在队列空时才阻塞。

ch := make(chan int, 3) // 缓冲容量为 3

ch <- 1 // 不阻塞,队列:[1]
ch <- 2 // 不阻塞,队列:[1, 2]
ch <- 3 // 不阻塞,队列:[1, 2, 3]
ch <- 4 // 阻塞!队列已满,等待有人接收

什么时候用哪种?

场景 选择 理由
需要严格同步 无缓冲 保证发送完成 = 接收完成
信号通知(done/quit) 无缓冲 语义清晰,不需要缓冲
生产者速率 > 消费者 有缓冲 用缓冲吸收短期峰值
限制并发数(信号量) 有缓冲 容量 = 最大并发数

关闭 Channel:close

发送方可以关闭 channel,表示"我不会再发送数据了"。

ch := make(chan int, 5)
ch <- 1
ch <- 2
ch <- 3
close(ch)

// 关闭后仍然可以接收已缓冲的数据
fmt.Println(<-ch) // 1
fmt.Println(<-ch) // 2
fmt.Println(<-ch) // 3
fmt.Println(<-ch) // 0(零值),channel 已关闭且为空

检测 channel 是否已关闭:

v, ok := <-ch
if !ok {
    fmt.Println("channel is closed")
}

关键规则:

用 for-range 遍历 Channel

for range 可以持续从 channel 接收数据,直到 channel 被关闭:

func producer(ch chan<- int) {
    for i := 0; i < 5; i++ {
        ch <- i
    }
    close(ch) // 必须关闭,否则 range 会永远阻塞
}

func consumer(ch <-chan int) {
    for v := range ch {
        fmt.Println(v)
    }
    // 到这里说明 channel 已关闭
    fmt.Println("done")
}

func main() {
    ch := make(chan int, 2)
    go producer(ch)
    consumer(ch)
}

这个模式非常常用——生产者发完数据后关闭 channel,消费者用 for range 优雅地读取所有数据。

常见错误

错误 1:忘记关闭 channel 导致 goroutine 泄漏

func bad() <-chan int {
    ch := make(chan int)
    go func() {
        for i := 0; ; i++ {
            ch <- i // 如果没有人接收,这里永远阻塞
        }
        // 假设这里有 close(ch),但代码永远到不了
    }()
    return ch
}

func main() {
    ch := bad()
    fmt.Println(<-ch) // 0
    fmt.Println(<-ch) // 1
    // 不再接收了——goroutine 泄漏!
}

修复:使用 done channel 通知退出

func good(done <-chan struct{}) <-chan int {
    ch := make(chan int)
    go func() {
        defer close(ch)
        for i := 0; ; i++ {
            select {
            case ch <- i:
            case <-done:
                return
            }
        }
    }()
    return ch
}

func main() {
    done := make(chan struct{})
    ch := good(done)
    fmt.Println(<-ch) // 0
    fmt.Println(<-ch) // 1
    close(done) // 通知退出
}

错误 2:多个 goroutine 关闭同一个 channel

// 错误!可能 panic
func bad() {
    ch := make(chan int)
    for i := 0; i < 3; i++ {
        go func() {
            // ... 做一些工作
            close(ch) // 第二个 goroutine 关闭时会 panic
        }()
    }
}

修复:用 sync.Once 确保只关闭一次

func good() {
    ch := make(chan int)
    var once sync.Once
    closeCh := func() { once.Do(func() { close(ch) }) }
    
    for i := 0; i < 3; i++ {
        go func() {
            // ... 做一些工作
            closeCh() // 安全:只会执行一次
        }()
    }
}

实战:并发下载器

func download(urls []string) []string {
    results := make(chan string, len(urls))
    
    for _, url := range urls {
        go func(u string) {
            resp, err := http.Get(u)
            if err != nil {
                results <- fmt.Sprintf("error: %s", u)
                return
            }
            defer resp.Body.Close()
            body, _ := io.ReadAll(resp.Body)
            results <- string(body)
        }(url)
    }
    
    var bodies []string
    for range urls {
        bodies = append(bodies, <-results)
    }
    return bodies
}

注意这里 channel 的缓冲容量等于 URL 数量——这确保所有 goroutine 都能发送而不阻塞,即使接收方还没开始消费。

Level 2:它是怎么运行的

select 多路复用

select 语句让一个 goroutine 能同时等待多个 channel 操作,类似于 UNIX 的 select(2) 系统调用对文件描述符的多路复用。

select {
case v := <-ch1:
    fmt.Println("received from ch1:", v)
case ch2 <- 42:
    fmt.Println("sent to ch2")
case <-time.After(1 * time.Second):
    fmt.Println("timeout")
default:
    fmt.Println("no channel ready")
}

select 的核心语义:

  1. 所有 case 同时求值:进入 select 时,所有 channel 表达式和发送的值都会被求值
  2. 随机选择:如果多个 case 同时就绪,Go 运行时会伪随机选择一个(不是按顺序)
  3. 阻塞等待:如果没有 case 就绪且没有 default,select 会阻塞
  4. default 立即执行:如果有 default 且没有 case 就绪,立即执行 default

为什么是随机选择? 这是 Go 团队的有意设计(Rob Pike 在多次演讲中强调过)。如果按顺序选择,排在前面的 case 会"饿死"后面的 case,导致不公平调度。随机选择保证了所有就绪的 channel 都有被服务的机会。

超时控制

func fetchWithTimeout(url string, timeout time.Duration) (string, error) {
    ch := make(chan string, 1) // 缓冲为1,避免 goroutine 泄漏
    
    go func() {
        resp, err := http.Get(url)
        if err != nil {
            return
        }
        defer resp.Body.Close()
        body, _ := io.ReadAll(resp.Body)
        ch <- string(body)
    }()
    
    select {
    case result := <-ch:
        return result, nil
    case <-time.After(timeout):
        return "", fmt.Errorf("timeout after %v", timeout)
    }
}

注意:ch 的缓冲容量为 1 是关键。如果超时后 goroutine 完成了请求,它仍能将结果写入 channel 而不会永远阻塞。如果用无缓冲 channel,超时后 goroutine 会因为没人接收而泄漏。

非阻塞操作

// 非阻塞发送
select {
case ch <- value:
    // 发送成功
default:
    // channel 满了或无人接收,跳过
}

// 非阻塞接收
select {
case v := <-ch:
    // 接收成功
default:
    // channel 空了或无人发送,跳过
}

Channel 方向

Go 允许在函数签名中声明 channel 的方向,这是编译期的类型安全检查:

// 只写 channel(只能发送)
func producer(ch chan<- int) {
    ch <- 42
    // <-ch  // 编译错误!不能从只写 channel 接收
}

// 只读 channel(只能接收)
func consumer(ch <-chan int) {
    v := <-ch
    // ch <- 1  // 编译错误!不能向只读 channel 发送
    // close(ch) // 编译错误!不能关闭只读 channel
    fmt.Println(v)
}

func main() {
    ch := make(chan int) // 双向 channel
    go producer(ch)     // 隐式转换为 chan<- int
    consumer(ch)        // 隐式转换为 <-chan int
}

为什么要声明方向? 两个原因:

  1. 文档作用:一看函数签名就知道谁是发送方谁是接收方
  2. 编译检查:阻止你在不该发送的地方发送,在不该关闭的地方关闭

转换规则:双向 channel 可以隐式转换为单向 channel,但反过来不行。

for-range channel 的退出条件

for v := range ch {
    process(v)
}

这个循环等价于:

for {
    v, ok := <-ch
    if !ok {
        break // channel 已关闭且为空
    }
    process(v)
}

关键点:for range 只在 channel 关闭且缓冲区为空 时退出。如果 channel 没有被关闭,即使缓冲区暂时为空,for range 也会阻塞等待。

Channel 的内部结构

Go runtime 中 channel 的实现位于 runtime/chan.go。其核心数据结构是 hchan

// runtime/chan.go(简化版)
type hchan struct {
    qcount   uint           // 当前队列中的元素数量
    dataqsiz uint           // 环形缓冲区容量
    buf      unsafe.Pointer // 环形缓冲区指针
    elemsize uint16         // 每个元素的大小
    closed   uint32         // 是否已关闭
    elemtype *_type         // 元素类型
    sendx    uint           // 发送索引
    recvx    uint           // 接收索引
    recvq    waitq          // 等待接收的 goroutine 队列
    sendq    waitq          // 等待发送的 goroutine 队列
    lock     mutex          // 互斥锁
}

有缓冲 channel 使用环形缓冲区(Ring Buffer):

                  sendx
                    ↓
buf: [ 5 | 7 | _ | _ | 3 ]
              ↑
            recvx

容量 = 5,当前 3 个元素:3, 5, 7

sendx 追上 recvx(队列满)时,发送 goroutine 被挂起并放入 sendq;当 recvx 追上 sendx(队列空)时,接收 goroutine 被挂起并放入 recvq

无缓冲 channel 的 buf 为 nildataqsiz 为 0。每次通信都是直接从发送方的栈拷贝到接收方的栈(zero-copy optimization in some cases)。

发送和接收的完整流程

发送操作 ch <- value

  1. 加锁 hchan.lock
  2. 如果 recvq 中有等待的接收者:
    • 直接把数据拷贝给接收者(跳过缓冲区)
    • 唤醒该接收者
  3. 否则,如果缓冲区未满:
    • 把数据拷贝到 buf[sendx]
    • sendx++
  4. 否则(缓冲区满或无缓冲):
    • 将当前 goroutine 打包为 sudog 放入 sendq
    • 调用 gopark 挂起当前 goroutine
  5. 解锁

接收操作 v := <-ch

  1. 加锁 hchan.lock
  2. 如果 sendq 中有等待的发送者:
    • 对于无缓冲 channel:直接从发送者拷贝数据
    • 对于有缓冲 channel:从 buf[recvx] 取出数据,把发送者的数据放入 buf[sendx]
    • 唤醒该发送者
  3. 否则,如果缓冲区非空:
    • buf[recvx] 取出数据
    • recvx++
  4. 否则(缓冲区空):
    • 将当前 goroutine 打包为 sudog 放入 recvq
    • 调用 gopark 挂起当前 goroutine
  5. 解锁

第 2 步的优化很重要——当有等待者时,跳过缓冲区直接传递,减少了一次内存拷贝。

select 的实现原理

Go 编译器将 select 语句转换为对 runtime.selectgo() 的调用。其主要步骤:

  1. 打乱 case 顺序(Fisher-Yates shuffle)——实现随机选择语义
  2. 按 channel 地址排序,用于加锁(避免死锁)
  3. 第一轮检查:遍历所有 case,看是否有已就绪的
    • 如果有多个就绪,选择第一个(因为已经打乱过,等效于随机)
  4. 如果没有就绪的 case
    • 如果有 default:执行 default
    • 否则:将当前 goroutine 注册到所有 case 的 channel 的等待队列中,然后 gopark
  5. 被唤醒后:检查是哪个 case 触发的,清理其他 channel 的等待队列

步骤 2 是一个精巧的设计:多个 goroutine 可能在 select 中等待相同的 channel,如果加锁顺序不一致会导致死锁。按地址排序保证了全局一致的加锁顺序。

性能特征

channel 操作的性能成本主要来自:

  1. 互斥锁:每次发送/接收都要加锁
  2. 内存拷贝:数据从发送方栈 → channel 缓冲区 → 接收方栈
  3. goroutine 调度:阻塞时需要 park/unpark
操作                    大约耗时 (ns)
-----------------------------------
无缓冲 send+recv       ~200
有缓冲 send(未满)    ~60
有缓冲 recv(非空)    ~60
select 2 cases         ~300
select 4 cases         ~500
mutex lock+unlock      ~20

数据来源:在 Go 1.21, Apple M1 上的基准测试。具体数字取决于硬件和 Go 版本,但量级关系稳定。

结论: channel 比 mutex 慢 3-10 倍。如果你只需要保护一个共享变量,mutex 更高效。Channel 的优势在于编排复杂的 goroutine 交互模式

Level 3:规范怎么定义的

CSP 理论基础

Go 的 channel 模型直接来源于 Tony Hoare 的 CSP(Communicating Sequential Processes,通信顺序进程)理论,首次发表于 1978 年的论文"Communicating Sequential Processes"(Communications of the ACM, Vol. 21, No. 8)。

CSP 的核心思想:

  1. 进程(Process):独立的顺序计算单元(对应 Go 的 goroutine)
  2. 通道(Channel):进程间唯一的通信方式,通信是同步的
  3. 选择(Choice):一个进程可以等待多个通信事件(对应 Go 的 select)

Hoare 在论文中定义了 channel 通信的同步语义:发送和接收必须同时发生(rendezvous)。这正是 Go 无缓冲 channel 的行为。有缓冲 channel 是对 CSP 的扩展——缓冲区满时才会退化为同步通信。

Rob Pike 和 Ken Thompson 在设计 Go 之前的工作——Newsqueak(1989)和 Plan 9 的 Alef 语言(1995)——已经在实践 CSP 风格的并发。Go 的 channel 是这条技术路线 30 年演进的最终产物。

Go 语言规范中的 Channel 定义

根据 Go Language Specification(https://go.dev/ref/spec)中 "Channel types" 一节:

A channel provides a mechanism for concurrently executing functions to communicate by sending and receiving values of a specified element type.

规范中对 channel 操作的关键定义:

发送语句(Send statements):

A send statement sends a value on a channel. The channel expression's core type must be a channel, the channel direction must permit send operations, and the type of the value to be sent must be assignable to the channel's element type.

Both the channel and the value expression are evaluated before communication begins. Communication blocks until the send can proceed. A send on an unbuffered channel can proceed if a receiver is ready. A send on a buffered channel can proceed if there is room in the buffer.

接收操作符(Receive operator):

For an operand ch whose core type is a channel, the value of the receive operation <-ch is the value received from the channel ch. The channel direction must permit receive operations. The type of a receive operation is the element type of the channel.

The expression blocks until a value is available.

关闭操作(Close):

规范明确指出以下行为是运行时 panic:

nil channel 的行为:

A receive from a nil channel blocks forever. A send to a nil channel blocks forever.

happens-before 语义

Go Memory Model(https://go.dev/ref/mem)定义了 channel 操作的 happens-before 关系,这是并发正确性的基础:

  1. 规则 1: 第 n 次向有容量 C 的 channel 发送,happens-before 第 n+C 次从该 channel 接收完成
  2. 规则 2: 从无缓冲 channel 接收,happens-before 对应的发送完成
  3. 规则 3: channel 的关闭 happens-before 接收到零值的返回

规则 2 特别有趣——对于无缓冲 channel,接收 happens-before 发送完成(而非发送 happens-before 接收)。这是因为接收方先准备好,发送方才能将数据拷贝过去并返回。

这些规则保证了通过 channel 传递的数据具有可见性:

var a string

func main() {
    ch := make(chan struct{})
    go func() {
        a = "hello"  // (1)
        <-ch         // (2) happens-before (3)
    }()
    ch <- struct{}{}  // (3) 发送完成意味着 (2) 已执行
    fmt.Println(a)    // 保证看到 "hello"
}

常见并发模式

这些模式是 Go 并发编程的"设计模式",出自 Rob Pike 的 "Go Concurrency Patterns"(Google I/O 2012)和 Sameer Ajmani 的 "Advanced Go Concurrency Patterns"(Google I/O 2013)。

Pipeline(流水线)

// 每个阶段:接收输入 → 处理 → 发送输出
func gen(nums ...int) <-chan int {
    out := make(chan int)
    go func() {
        for _, n := range nums {
            out <- n
        }
        close(out)
    }()
    return out
}

func sq(in <-chan int) <-chan int {
    out := make(chan int)
    go func() {
        for n := range in {
            out <- n * n
        }
        close(out)
    }()
    return out
}

func main() {
    // 组装流水线:gen -> sq -> print
    for v := range sq(sq(gen(2, 3, 4))) {
        fmt.Println(v) // 16, 81, 256((2²)², (3²)², (4²)²)
    }
}

Pipeline 模式的优势:

Fan-Out / Fan-In

Fan-Out: 多个 goroutine 从同一个 channel 读取,分摊工作。

Fan-In: 多个 channel 的输出合并到一个 channel。

// Fan-out: 启动多个 worker 从 jobs 读取
func fanOut(jobs <-chan int, numWorkers int) []<-chan int {
    workers := make([]<-chan int, numWorkers)
    for i := 0; i < numWorkers; i++ {
        workers[i] = worker(jobs)
    }
    return workers
}

func worker(jobs <-chan int) <-chan int {
    results := make(chan int)
    go func() {
        defer close(results)
        for j := range jobs {
            results <- process(j)
        }
    }()
    return results
}

// Fan-in: 把多个 channel 合并为一个
func fanIn(channels ...<-chan int) <-chan int {
    var wg sync.WaitGroup
    merged := make(chan int)

    for _, ch := range channels {
        wg.Add(1)
        go func(c <-chan int) {
            defer wg.Done()
            for v := range c {
                merged <- v
            }
        }(ch)
    }

    go func() {
        wg.Wait()
        close(merged)
    }()

    return merged
}

Worker Pool

这是生产环境中最常用的模式——固定数量的 worker goroutine 处理来自队列的任务:

func workerPool(numWorkers int, jobs <-chan Job, results chan<- Result) {
    var wg sync.WaitGroup
    for i := 0; i < numWorkers; i++ {
        wg.Add(1)
        go func(id int) {
            defer wg.Done()
            for job := range jobs {
                result := processJob(job)
                results <- result
            }
        }(i)
    }

    go func() {
        wg.Wait()
        close(results)
    }()
}

func main() {
    jobs := make(chan Job, 100)
    results := make(chan Result, 100)

    workerPool(5, jobs, results)

    // 发送任务
    go func() {
        for _, j := range getAllJobs() {
            jobs <- j
        }
        close(jobs) // 关闭 jobs channel,worker 会自动退出
    }()

    // 收集结果
    for r := range results {
        handleResult(r)
    }
}

Worker pool 的容量规划:

Done Channel(取消模式)

func cancellableWork(done <-chan struct{}) <-chan int {
    results := make(chan int)
    go func() {
        defer close(results)
        for i := 0; ; i++ {
            select {
            case results <- compute(i):
            case <-done:
                fmt.Println("cancelled, cleaning up...")
                return
            }
        }
    }()
    return results
}

func main() {
    done := make(chan struct{})
    results := cancellableWork(done)

    for i := 0; i < 5; i++ {
        fmt.Println(<-results)
    }
    close(done) // 广播取消信号
}

close(done) 是一种广播机制——关闭 channel 会使所有等待该 channel 的接收操作立即返回零值。这比向每个 goroutine 逐个发送取消信号高效得多。

Or Channel(多取消源合并)

当有多个取消信号源时,or 函数将它们合并为一个:

func or(channels ...<-chan struct{}) <-chan struct{} {
    switch len(channels) {
    case 0:
        return nil
    case 1:
        return channels[0]
    }

    orDone := make(chan struct{})
    go func() {
        defer close(orDone)
        switch len(channels) {
        case 2:
            select {
            case <-channels[0]:
            case <-channels[1]:
            }
        default:
            select {
            case <-channels[0]:
            case <-channels[1]:
            case <-channels[2]:
            case <-or(append(channels[3:], orDone)...):
            }
        }
    }()
    return orDone
}

这个递归实现来自 Katherine Cox-Buday 的《Concurrency in Go》(O'Reilly, 2017)。它的巧妙之处在于:当任何一个输入 channel 关闭时,orDone 也关闭,而 orDone 本身也被传递给递归调用,确保所有 goroutine 都能退出。

用缓冲 Channel 做信号量

type Semaphore chan struct{}

func NewSemaphore(n int) Semaphore {
    return make(Semaphore, n)
}

func (s Semaphore) Acquire() { s <- struct{}{} }
func (s Semaphore) Release() { <-s }

func main() {
    sem := NewSemaphore(3) // 最多 3 个并发

    for i := 0; i < 10; i++ {
        go func(id int) {
            sem.Acquire()
            defer sem.Release()
            
            // 这里最多同时有 3 个 goroutine
            doWork(id)
        }(i)
    }
}

与其他并发模型的比较

特性 Go Channel (CSP) Erlang (Actor) Java (Locks)
通信方式 命名 channel 进程 mailbox 共享内存 + 锁
通信对象 channel(可多对多) 进程 PID(一对一) 任何共享变量
同步机制 send/recv 阻塞 send 异步, recv 阻塞 lock/unlock
选择机制 select receive 模式匹配 无内置
错误处理 panic/recover link/monitor exception
分布式 不内置 原生支持 需要框架

Go 的 channel 选择了 CSP 的同步通信模型(rendezvous),而 Erlang 的 Actor 模型使用异步消息传递。同步模型更容易推理(发送完成意味着对方已接收),但对高延迟场景不友好;异步模型更灵活但需要额外处理 mailbox 溢出。

Level 4:边界与陷阱

陷阱 1:向已关闭的 Channel 发送导致 Panic

这是 Go channel 最臭名昭著的陷阱——它是一个运行时 panic,不是编译错误,而且通常在并发场景中随机出现,难以复现。

func danger() {
    ch := make(chan int, 1)
    
    go func() {
        time.Sleep(10 * time.Millisecond)
        ch <- 42 // PANIC: send on closed channel
    }()
    
    close(ch)
    time.Sleep(20 * time.Millisecond)
}

为什么 Go 要这样设计? 在 Go 的设计哲学中,向已关闭的 channel 发送数据表示一个逻辑错误——你的程序对 channel 的生命周期管理有 bug。而对于逻辑错误,Go 选择用 panic 立即暴露,而不是静默忽略(那样更危险,因为数据会丢失)。

安全模式: 只有发送方才应该关闭 channel。如果有多个发送方,用单独的协调机制:

func safeSenders() {
    ch := make(chan int)
    done := make(chan struct{})
    
    // 多个发送方
    var wg sync.WaitGroup
    for i := 0; i < 3; i++ {
        wg.Add(1)
        go func(id int) {
            defer wg.Done()
            for j := 0; j < 5; j++ {
                select {
                case ch <- id*10 + j:
                case <-done:
                    return
                }
            }
        }(i)
    }
    
    // 由独立的 goroutine 负责关闭
    go func() {
        wg.Wait()
        close(ch)
    }()
    
    // 接收方
    for v := range ch {
        fmt.Println(v)
    }
}

陷阱 2:nil Channel 永远阻塞

对 nil channel 的发送和接收都会永远阻塞,不会 panic:

var ch chan int // nil

go func() {
    ch <- 42 // 永远阻塞,不是 panic
}()

// <-ch  // 也是永远阻塞

这不是 bug,而是 feature。 nil channel 在 select 中有重要用途——动态禁用某个 case:

func merge(ch1, ch2 <-chan int) <-chan int {
    out := make(chan int)
    go func() {
        defer close(out)
        for ch1 != nil || ch2 != nil {
            select {
            case v, ok := <-ch1:
                if !ok {
                    ch1 = nil // 禁用这个 case
                    continue
                }
                out <- v
            case v, ok := <-ch2:
                if !ok {
                    ch2 = nil // 禁用这个 case
                    continue
                }
                out <- v
            }
        }
    }()
    return out
}

ch1 被设为 nil 后,case v, ok := <-ch1 永远不会被选中(因为它永远不会就绪)。这是一种优雅的方式来处理"一个输入已关闭,继续处理另一个"的场景。

陷阱 3:select 的随机选择

ch := make(chan int, 1)
ch <- 42

select {
case v := <-ch:
    fmt.Println("received:", v)
case ch <- 100:
    fmt.Println("sent")
}

这段代码的结果是不确定的。channel 既可读(有数据)又可写(有空间),select 会随机选择一个。

常见 bug: 依赖 select 中 case 的顺序来实现优先级。

// 错误:以为 done 有优先级
select {
case <-done:
    return
case result <- value:
    // ...
}

如果 doneresult 同时就绪(比如 done 刚被关闭、result 又有空间),Go 不保证选择 done

正确做法: 如果需要优先级,用嵌套 select 或先检查高优先级 channel:

// 方案 1:先检查 done
select {
case <-done:
    return
default:
}
// 如果 done 没有就绪,才进入正常 select
select {
case <-done:
    return
case result <- value:
    // ...
}
// 方案 2:在循环中加入优先级检查
for {
    select {
    case <-done:
        return
    case job := <-jobs:
        // 处理前再检查一次 done
        select {
        case <-done:
            return
        default:
        }
        process(job)
    }
}

陷阱 4:Goroutine 泄漏

Goroutine 泄漏是 Go 程序中最常见的资源泄漏——泄漏的 goroutine 永远不会被 GC 回收(因为它的栈上可能引用了其他对象),最终导致内存耗尽。

// 经典泄漏:没人消费的 goroutine
func leak() {
    ch := make(chan int)
    go func() {
        val := expensiveComputation()
        ch <- val // 如果没人接收,永远阻塞
    }()
    // 假设因为错误,提前 return 了
    return
}

诊断工具:

import "runtime"

// 定期检查 goroutine 数量
fmt.Println("goroutines:", runtime.NumGoroutine())

// 获取所有 goroutine 的栈——找出泄漏的 goroutine 在等什么
buf := make([]byte, 1<<16)
runtime.Stack(buf, true)
fmt.Println(string(buf))

生产环境中,可以用 net/http/pprof 查看 goroutine profile:

import _ "net/http/pprof"

go func() {
    http.ListenAndServe(":6060", nil)
}()
// 访问 http://localhost:6060/debug/pprof/goroutine?debug=1

最佳实践:每个 goroutine 都必须有明确的退出条件。 启动 goroutine 时就要想好:

陷阱 5:Channel 大小选择不当

缓冲为 0(无缓冲)的问题:

// 如果 handler 处理慢,所有发送方都阻塞
func eventLoop(events <-chan Event) {
    for e := range events {
        handleEvent(e) // 如果这里很慢...
    }
}

缓冲过大的问题:

// 10000 的缓冲——掩盖了消费者跟不上的问题
ch := make(chan *BigStruct, 10000)
// 缓冲满之前一切正常,满了之后突然卡死
// 更糟糕的是:10000 个 BigStruct 会占用大量内存

正确思路:

  1. 默认用无缓冲:强制发送方和接收方同步,更容易发现问题
  2. 需要缓冲时,从小开始:先用 1 或 numWorkers,通过压测调整
  3. 缓冲区不是"越大越好":它只是把问题延后了,不是解决了
  4. 如果你不确定,就用无缓冲:引用 Rob Pike 的话——"Channels are synchronization devices. The buffered channel is an optimization."

陷阱 6:在 select 中使用 time.After 导致内存泄漏

// 每次循环都创建一个新的 Timer——内存泄漏!
for {
    select {
    case v := <-ch:
        process(v)
    case <-time.After(5 * time.Second):
        fmt.Println("timeout")
        return
    }
}

time.After 每次调用都会创建一个新的 time.Timer。在高频循环中,如果 ch 不断有数据,每次进入循环都会创建一个新 Timer,旧 Timer 直到超时才会被 GC。如果循环频率是 1000 次/秒,5 秒的 Timer 意味着有 5000 个 Timer 同时存活。

修复:复用 Timer

timer := time.NewTimer(5 * time.Second)
defer timer.Stop()

for {
    select {
    case v := <-ch:
        // 收到数据,重置 timer
        if !timer.Stop() {
            select {
            case <-timer.C:
            default:
            }
        }
        timer.Reset(5 * time.Second)
        process(v)
    case <-timer.C:
        fmt.Println("timeout")
        return
    }
}

陷阱 7:range over closed channel 的时机

ch := make(chan int, 3)
ch <- 1
ch <- 2
ch <- 3
close(ch)

// 这是安全的——会读到 1, 2, 3 然后退出
for v := range ch {
    fmt.Println(v)
}

但是:

ch := make(chan int)
close(ch)

v := <-ch   // v = 0, 立即返回
v = <-ch    // v = 0, 再次立即返回
// 从已关闭的空 channel 读取永远返回零值,不阻塞

这意味着如果你忘记检查 ok,可能会处理大量的零值:

// bug:ch 关闭后会疯狂打印 0
for {
    fmt.Println(<-ch)
}

真实案例:Kubernetes 中的 Channel 使用

Kubernetes 的 controller-runtime 库大量使用 channel 模式。以 workqueue 为例(简化版):

type Queue struct {
    queue    []interface{}
    dirty    set
    processing set
    cond     *sync.Cond
    shutdown bool
}

func (q *Queue) Get() (interface{}, bool) {
    q.cond.L.Lock()
    for len(q.queue) == 0 && !q.shutdown {
        q.cond.Wait()
    }
    if len(q.queue) == 0 {
        q.cond.L.Unlock()
        return nil, true // shutdown
    }
    item := q.queue[0]
    q.queue = q.queue[1:]
    q.processing.insert(item)
    q.cond.L.Unlock()
    return item, false
}

注意 Kubernetes 在这里选择了 sync.Cond 而非 channel——因为它需要更精细的控制(dirty set、processing set),channel 无法方便地实现这些语义。这说明 channel 不是万能的,选择合适的同步工具很重要。

面试考点

  1. 无缓冲 channel 和有缓冲 channel 的区别是什么?什么时候用哪个?

    • 无缓冲提供同步保证(rendezvous),有缓冲提供异步解耦
    • 信号通知用无缓冲,削峰填谷用有缓冲
  2. 向已关闭的 channel 发送数据会怎样?从已关闭的 channel 接收呢?

    • 发送:panic
    • 接收:立即返回零值,ok = false
  3. 如何实现一个安全的 "只关闭一次" 的 channel?

    • sync.Once + close,或者让单一所有者负责关闭
  4. select 中多个 case 同时就绪时会怎样?

    • 随机选择(Go 运行时通过 Fisher-Yates shuffle 实现)
  5. 什么是 goroutine 泄漏?如何避免?

    • goroutine 阻塞在 channel 操作上且永远不会被解除
    • 使用 context 或 done channel 确保可取消
  6. channel 的底层数据结构是什么?

    • hchan:包含环形缓冲区 buf、发送/接收索引、等待队列 sendq/recvq、互斥锁

总结

Channel 是 Go 并发模型的核心抽象。它不只是一个数据结构——它是一种思维方式:把并发程序分解为独立的顺序进程,通过消息传递来协调。

关键要点:

本章评分
4.5  / 5  (42 评分)

💬 留言讨论