/* 02-channels.go - Go 语言 Channels 详解 学习目标: 1. 理解 channel 的概念和特性 2. 掌握 channel 的创建和使用 3. 学会不同类型的 channel 4. 了解 channel 的方向性 5. 掌握 channel 的关闭和检测 知识点: - channel 的基本概念 - 无缓冲和有缓冲 channel - channel 的方向性 - channel 的关闭 - range 和 select 与 channel - channel 的常见模式 */ package main import ( "fmt" "math/rand" "sync" "time" ) func main() { fmt.Println("=== Go 语言 Channels 详解 ===\n") // 演示基本的 channel demonstrateBasicChannels() // 演示缓冲 channel demonstrateBufferedChannels() // 演示 channel 的方向性 demonstrateChannelDirections() // 演示 channel 的关闭 demonstrateChannelClosing() // 演示 channel 与 range demonstrateChannelRange() // 演示 channel 的常见模式 demonstrateChannelPatterns() // 演示 channel 的高级用法 demonstrateAdvancedChannelUsage() } // demonstrateBasicChannels 演示基本的 channel func demonstrateBasicChannels() { fmt.Println("1. 基本的 Channel:") // channel 的基本概念 fmt.Printf(" Channel 的基本概念:\n") fmt.Printf(" - Go 语言的核心特性,用于 goroutine 间通信\n") fmt.Printf(" - 类型安全的管道,可以传递特定类型的数据\n") fmt.Printf(" - 默认是无缓冲的,发送和接收会阻塞\n") fmt.Printf(" - 遵循 'Don't communicate by sharing memory; share memory by communicating'\n") // 创建和使用 channel fmt.Printf(" 创建和使用 channel:\n") // 创建一个 int 类型的 channel ch := make(chan int) // 在 goroutine 中发送数据 go func() { fmt.Printf(" 发送数据到 channel\n") ch <- 42 }() // 从 channel 接收数据 value := <-ch fmt.Printf(" 从 channel 接收到: %d\n", value) // 双向通信 fmt.Printf(" 双向通信:\n") messages := make(chan string) responses := make(chan string) // 启动一个 echo 服务 go echoServer(messages, responses) // 发送消息并接收响应 messages <- "Hello" response := <-responses fmt.Printf(" 发送: Hello, 接收: %s\n", response) messages <- "World" response = <-responses fmt.Printf(" 发送: World, 接收: %s\n", response) // 关闭 channels close(messages) close(responses) // 同步使用 channel fmt.Printf(" 同步使用 channel:\n") done := make(chan bool) go func() { fmt.Printf(" 执行异步任务...\n") time.Sleep(200 * time.Millisecond) fmt.Printf(" 异步任务完成\n") done <- true }() fmt.Printf(" 等待异步任务完成...\n") <-done fmt.Printf(" 主程序继续执行\n") fmt.Println() } // demonstrateBufferedChannels 演示缓冲 channel func demonstrateBufferedChannels() { fmt.Println("2. 缓冲 Channel:") // 无缓冲 vs 有缓冲 fmt.Printf(" 无缓冲 vs 有缓冲:\n") // 无缓冲 channel(同步) fmt.Printf(" 无缓冲 channel (同步):\n") unbuffered := make(chan string) go func() { fmt.Printf(" 发送到无缓冲 channel\n") unbuffered <- "sync message" fmt.Printf(" 无缓冲 channel 发送完成\n") }() time.Sleep(100 * time.Millisecond) // 让 goroutine 先运行 msg := <-unbuffered fmt.Printf(" 接收到: %s\n", msg) // 有缓冲 channel(异步) fmt.Printf(" 有缓冲 channel (异步):\n") buffered := make(chan string, 2) fmt.Printf(" 发送到有缓冲 channel\n") buffered <- "async message 1" buffered <- "async message 2" fmt.Printf(" 有缓冲 channel 发送完成(未阻塞)\n") fmt.Printf(" 接收到: %s\n", <-buffered) fmt.Printf(" 接收到: %s\n", <-buffered) // 缓冲区满时的行为 fmt.Printf(" 缓冲区满时的行为:\n") fullBuffer := make(chan int, 2) // 填满缓冲区 fullBuffer <- 1 fullBuffer <- 2 fmt.Printf(" 缓冲区已满 (2/2)\n") // 尝试再发送一个值(会阻塞) go func() { fmt.Printf(" 尝试发送第三个值(会阻塞)\n") fullBuffer <- 3 fmt.Printf(" 第三个值发送成功\n") }() time.Sleep(100 * time.Millisecond) fmt.Printf(" 接收一个值: %d\n", <-fullBuffer) time.Sleep(100 * time.Millisecond) fmt.Printf(" 接收一个值: %d\n", <-fullBuffer) fmt.Printf(" 接收一个值: %d\n", <-fullBuffer) // 生产者-消费者模式 fmt.Printf(" 生产者-消费者模式:\n") const bufferSize = 5 const numProducers = 2 const numConsumers = 3 const numItems = 10 items := make(chan int, bufferSize) var wg sync.WaitGroup // 启动生产者 for i := 0; i < numProducers; i++ { wg.Add(1) go producer(i+1, items, numItems/numProducers, &wg) } // 启动消费者 for i := 0; i < numConsumers; i++ { wg.Add(1) go consumer(i+1, items, &wg) } // 等待所有生产者完成,然后关闭 channel go func() { wg.Wait() close(items) }() // 等待一段时间让消费者处理完所有项目 time.Sleep(500 * time.Millisecond) fmt.Println() } // demonstrateChannelDirections 演示 channel 的方向性 func demonstrateChannelDirections() { fmt.Println("3. Channel 的方向性:") // 双向 channel fmt.Printf(" 双向 channel:\n") bidirectional := make(chan string) go func() { bidirectional <- "双向消息" }() message := <-bidirectional fmt.Printf(" 接收到: %s\n", message) // 只发送 channel fmt.Printf(" 只发送 channel:\n") sendOnly := make(chan int) go sender(sendOnly) value := <-sendOnly fmt.Printf(" 从只发送 channel 接收到: %d\n", value) // 只接收 channel fmt.Printf(" 只接收 channel:\n") receiveOnly := make(chan int, 1) receiveOnly <- 100 go receiver(receiveOnly) time.Sleep(100 * time.Millisecond) // 管道模式 fmt.Printf(" 管道模式:\n") numbers := make(chan int) squares := make(chan int) cubes := make(chan int) // 启动管道 go generateNumbers(numbers) go squareNumbers(numbers, squares) go cubeNumbers(squares, cubes) // 读取最终结果 for i := 0; i < 5; i++ { result := <-cubes fmt.Printf(" 管道结果: %d\n", result) } fmt.Println() } // demonstrateChannelClosing 演示 channel 的关闭 func demonstrateChannelClosing() { fmt.Println("4. Channel 的关闭:") // 基本的 channel 关闭 fmt.Printf(" 基本的 channel 关闭:\n") ch := make(chan int, 3) // 发送一些值 ch <- 1 ch <- 2 ch <- 3 // 关闭 channel close(ch) // 从已关闭的 channel 读取 for i := 0; i < 4; i++ { value, ok := <-ch if ok { fmt.Printf(" 接收到值: %d\n", value) } else { fmt.Printf(" channel 已关闭,接收到零值: %d\n", value) } } // 检测 channel 是否关闭 fmt.Printf(" 检测 channel 是否关闭:\n") status := make(chan string, 2) status <- "active" status <- "inactive" close(status) for { value, ok := <-status if !ok { fmt.Printf(" channel 已关闭,退出循环\n") break } fmt.Printf(" 状态: %s\n", value) } // 多个发送者的关闭模式 fmt.Printf(" 多个发送者的关闭模式:\n") data := make(chan int) done := make(chan bool) // 启动多个发送者 for i := 1; i <= 3; i++ { go multipleSender(i, data, done) } // 接收数据 go func() { for value := range data { fmt.Printf(" 接收到: %d\n", value) } fmt.Printf(" 数据接收完成\n") }() // 等待一段时间后通知所有发送者停止 time.Sleep(300 * time.Millisecond) close(done) // 等待发送者停止后关闭数据 channel time.Sleep(100 * time.Millisecond) close(data) time.Sleep(100 * time.Millisecond) fmt.Println() } // demonstrateChannelRange 演示 channel 与 range func demonstrateChannelRange() { fmt.Println("5. Channel 与 Range:") // 使用 range 遍历 channel fmt.Printf(" 使用 range 遍历 channel:\n") numbers := make(chan int) // 发送数据 go func() { for i := 1; i <= 5; i++ { numbers <- i time.Sleep(50 * time.Millisecond) } close(numbers) }() // 使用 range 接收数据 for num := range numbers { fmt.Printf(" 接收到数字: %d\n", num) } // 斐波那契数列生成器 fmt.Printf(" 斐波那契数列生成器:\n") fib := fibonacci(10) for value := range fib { fmt.Printf(" 斐波那契: %d\n", value) } // 素数生成器 fmt.Printf(" 素数生成器:\n") primes := sieve(30) fmt.Printf(" 30 以内的素数: ") for prime := range primes { fmt.Printf("%d ", prime) } fmt.Printf("\n") fmt.Println() } // demonstrateChannelPatterns 演示 channel 的常见模式 func demonstrateChannelPatterns() { fmt.Println("6. Channel 的常见模式:") // 扇出模式(一个输入,多个输出) fmt.Printf(" 扇出模式:\n") input := make(chan int) output1 := make(chan int) output2 := make(chan int) output3 := make(chan int) // 扇出 go fanOut(input, output1, output2, output3) // 发送数据 go func() { for i := 1; i <= 6; i++ { input <- i } close(input) }() // 接收数据 var wg sync.WaitGroup wg.Add(3) go func() { defer wg.Done() for value := range output1 { fmt.Printf(" 输出1: %d\n", value) } }() go func() { defer wg.Done() for value := range output2 { fmt.Printf(" 输出2: %d\n", value) } }() go func() { defer wg.Done() for value := range output3 { fmt.Printf(" 输出3: %d\n", value) } }() wg.Wait() // 扇入模式(多个输入,一个输出) fmt.Printf(" 扇入模式:\n") input1 := make(chan string) input2 := make(chan string) input3 := make(chan string) // 启动输入源 go func() { for i := 1; i <= 3; i++ { input1 <- fmt.Sprintf("源1-%d", i) time.Sleep(100 * time.Millisecond) } close(input1) }() go func() { for i := 1; i <= 3; i++ { input2 <- fmt.Sprintf("源2-%d", i) time.Sleep(150 * time.Millisecond) } close(input2) }() go func() { for i := 1; i <= 3; i++ { input3 <- fmt.Sprintf("源3-%d", i) time.Sleep(200 * time.Millisecond) } close(input3) }() // 扇入 merged := fanInMultiple(input1, input2, input3) fmt.Printf(" 合并结果:\n") for result := range merged { fmt.Printf(" %s\n", result) } // 工作池模式 fmt.Printf(" 工作池模式:\n") jobs := make(chan WorkJob, 10) results := make(chan WorkResult, 10) // 启动工作池 const numWorkers = 3 for w := 1; w <= numWorkers; w++ { go workPoolWorker(w, jobs, results) } // 发送任务 for j := 1; j <= 9; j++ { jobs <- WorkJob{ID: j, Data: fmt.Sprintf("任务-%d", j)} } close(jobs) // 收集结果 for r := 1; r <= 9; r++ { result := <-results fmt.Printf(" %s\n", result.Message) } fmt.Println() } // demonstrateAdvancedChannelUsage 演示 channel 的高级用法 func demonstrateAdvancedChannelUsage() { fmt.Println("7. Channel 的高级用法:") // 超时模式 fmt.Printf(" 超时模式:\n") slowCh := make(chan string) go func() { time.Sleep(300 * time.Millisecond) slowCh <- "慢速响应" }() select { case result := <-slowCh: fmt.Printf(" 接收到: %s\n", result) case <-time.After(200 * time.Millisecond): fmt.Printf(" 操作超时\n") } // 非阻塞操作 fmt.Printf(" 非阻塞操作:\n") nonBlockingCh := make(chan int, 1) // 非阻塞发送 select { case nonBlockingCh <- 42: fmt.Printf(" 非阻塞发送成功\n") default: fmt.Printf(" 非阻塞发送失败,channel 已满\n") } // 非阻塞接收 select { case value := <-nonBlockingCh: fmt.Printf(" 非阻塞接收成功: %d\n", value) default: fmt.Printf(" 非阻塞接收失败,channel 为空\n") } // 心跳模式 fmt.Printf(" 心跳模式:\n") heartbeat := time.NewTicker(100 * time.Millisecond) defer heartbeat.Stop() timeout := time.After(350 * time.Millisecond) for { select { case <-heartbeat.C: fmt.Printf(" 心跳\n") case <-timeout: fmt.Printf(" 心跳监控结束\n") goto heartbeatEnd } } heartbeatEnd: // 速率限制 fmt.Printf(" 速率限制:\n") rateLimiter := time.NewTicker(100 * time.Millisecond) defer rateLimiter.Stop() requests := make(chan int, 5) for i := 1; i <= 5; i++ { requests <- i } close(requests) for req := range requests { <-rateLimiter.C // 等待速率限制器 fmt.Printf(" 处理请求 %d (限速)\n", req) } fmt.Println() } // ========== 辅助函数和类型定义 ========== // echoServer 简单的回声服务器 func echoServer(messages <-chan string, responses chan<- string) { for msg := range messages { responses <- "Echo: " + msg } } // producer 生产者函数 func producer(id int, items chan<- int, count int, wg *sync.WaitGroup) { defer wg.Done() for i := 0; i < count; i++ { item := id*100 + i items <- item fmt.Printf(" 生产者 %d 生产: %d\n", id, item) time.Sleep(time.Duration(rand.Intn(100)) * time.Millisecond) } fmt.Printf(" 生产者 %d 完成\n", id) } // consumer 消费者函数 func consumer(id int, items <-chan int, wg *sync.WaitGroup) { defer wg.Done() for item := range items { fmt.Printf(" 消费者 %d 消费: %d\n", id, item) time.Sleep(time.Duration(rand.Intn(150)) * time.Millisecond) } fmt.Printf(" 消费者 %d 完成\n", id) } // sender 只发送函数 func sender(ch chan<- int) { ch <- 42 close(ch) } // receiver 只接收函数 func receiver(ch <-chan int) { value := <-ch fmt.Printf(" 只接收 channel 接收到: %d\n", value) } // generateNumbers 生成数字 func generateNumbers(out chan<- int) { for i := 1; i <= 5; i++ { out <- i } close(out) } // squareNumbers 计算平方 func squareNumbers(in <-chan int, out chan<- int) { for num := range in { out <- num * num } close(out) } // cubeNumbers 计算立方 func cubeNumbers(in <-chan int, out chan<- int) { for num := range in { out <- num * num * num } close(out) } // multipleSender 多发送者函数 func multipleSender(id int, data chan<- int, done <-chan bool) { for { select { case data <- id*10 + rand.Intn(10): time.Sleep(100 * time.Millisecond) case <-done: fmt.Printf(" 发送者 %d 停止\n", id) return } } } // fibonacci 斐波那契生成器 func fibonacci(n int) <-chan int { ch := make(chan int) go func() { defer close(ch) a, b := 0, 1 for i := 0; i < n; i++ { ch <- a a, b = b, a+b } }() return ch } // sieve 埃拉托斯特尼筛法生成素数 func sieve(max int) <-chan int { ch := make(chan int) go func() { defer close(ch) isPrime := make([]bool, max+1) for i := 2; i <= max; i++ { isPrime[i] = true } for i := 2; i*i <= max; i++ { if isPrime[i] { for j := i * i; j <= max; j += i { isPrime[j] = false } } } for i := 2; i <= max; i++ { if isPrime[i] { ch <- i } } }() return ch } // fanOut 扇出函数 func fanOut(input <-chan int, outputs ...chan<- int) { go func() { defer func() { for _, output := range outputs { close(output) } }() i := 0 for value := range input { outputs[i%len(outputs)] <- value i++ } }() } // fanInMultiple 多路扇入函数 func fanInMultiple(inputs ...<-chan string) <-chan string { output := make(chan string) var wg sync.WaitGroup // 为每个输入启动一个 goroutine for _, input := range inputs { wg.Add(1) go func(ch <-chan string) { defer wg.Done() for value := range ch { output <- value } }(input) } // 等待所有输入完成后关闭输出 go func() { wg.Wait() close(output) }() return output } // WorkJob 工作任务 type WorkJob struct { ID int Data string } // WorkResult 工作结果 type WorkResult struct { JobID int Message string } // workPoolWorker 工作池工作者 func workPoolWorker(id int, jobs <-chan WorkJob, results chan<- WorkResult) { for job := range jobs { fmt.Printf(" 工作者 %d 开始处理任务 %d\n", id, job.ID) time.Sleep(time.Duration(rand.Intn(200)) * time.Millisecond) results <- WorkResult{ JobID: job.ID, Message: fmt.Sprintf("工作者 %d 完成任务 %d: %s", id, job.ID, job.Data), } } } /* 运行这个程序: go run 02-channels.go 学习要点: 1. Channel 是 Go 语言中 goroutine 间通信的主要方式 2. Channel 有无缓冲和有缓冲两种类型 3. Channel 具有方向性,可以限制为只发送或只接收 4. Channel 可以被关闭,关闭后不能再发送数据 5. 使用 range 可以方便地遍历 channel 中的数据 Channel 的特性: 1. 类型安全:只能传递指定类型的数据 2. 同步机制:无缓冲 channel 提供同步语义 3. 异步机制:有缓冲 channel 提供异步语义 4. 方向性:可以限制 channel 的使用方向 5. 可关闭:发送方可以关闭 channel 通知接收方 Channel 类型: 1. 无缓冲 channel:make(chan T) 2. 有缓冲 channel:make(chan T, size) 3. 只发送 channel:chan<- T 4. 只接收 channel:<-chan T 5. 双向 channel:chan T Channel 操作: 1. 发送:ch <- value 2. 接收:value := <-ch 3. 接收并检查:value, ok := <-ch 4. 关闭:close(ch) 5. 遍历:for value := range ch 常见模式: 1. 生产者-消费者:使用缓冲 channel 解耦生产和消费 2. 扇出:一个输入分发到多个输出 3. 扇入:多个输入合并到一个输出 4. 管道:链式处理数据 5. 工作池:固定数量的 worker 处理任务 高级用法: 1. 超时控制:使用 time.After 2. 非阻塞操作:使用 select 的 default 分支 3. 心跳机制:使用 time.Ticker 4. 速率限制:控制操作频率 5. 信号通知:使用 channel 作为信号 最佳实践: 1. 发送方负责关闭 channel 2. 不要从接收方关闭 channel 3. 不要关闭已关闭的 channel 4. 使用 range 遍历 channel 5. 使用 select 处理多个 channel 注意事项: 1. 向已关闭的 channel 发送数据会 panic 2. 关闭已关闭的 channel 会 panic 3. 从已关闭的 channel 接收会得到零值 4. nil channel 的发送和接收都会阻塞 5. 无缓冲 channel 的发送和接收必须同时准备好 性能考虑: 1. 无缓冲 channel 有同步开销 2. 有缓冲 channel 可以减少阻塞 3. Channel 操作比直接内存访问慢 4. 合理选择缓冲区大小 5. 避免创建过多的 channel */