Files
2025-08-24 11:24:52 +08:00

841 lines
18 KiB
Go
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

/*
02-channels.go - Go 语言 Channels 详解
学习目标:
1. 理解 channel 的概念和特性
2. 掌握 channel 的创建和使用
3. 学会不同类型的 channel
4. 了解 channel 的方向性
5. 掌握 channel 的关闭和检测
知识点:
- channel 的基本概念
- 无缓冲和有缓冲 channel
- channel 的方向性
- channel 的关闭
- range 和 select 与 channel
- channel 的常见模式
*/
package main
import (
"fmt"
"math/rand"
"sync"
"time"
)
func main() {
fmt.Println("=== Go 语言 Channels 详解 ===\n")
// 演示基本的 channel
demonstrateBasicChannels()
// 演示缓冲 channel
demonstrateBufferedChannels()
// 演示 channel 的方向性
demonstrateChannelDirections()
// 演示 channel 的关闭
demonstrateChannelClosing()
// 演示 channel 与 range
demonstrateChannelRange()
// 演示 channel 的常见模式
demonstrateChannelPatterns()
// 演示 channel 的高级用法
demonstrateAdvancedChannelUsage()
}
// demonstrateBasicChannels 演示基本的 channel
func demonstrateBasicChannels() {
fmt.Println("1. 基本的 Channel:")
// channel 的基本概念
fmt.Printf(" Channel 的基本概念:\n")
fmt.Printf(" - Go 语言的核心特性,用于 goroutine 间通信\n")
fmt.Printf(" - 类型安全的管道,可以传递特定类型的数据\n")
fmt.Printf(" - 默认是无缓冲的,发送和接收会阻塞\n")
fmt.Printf(" - 遵循 'Don't communicate by sharing memory; share memory by communicating'\n")
// 创建和使用 channel
fmt.Printf(" 创建和使用 channel:\n")
// 创建一个 int 类型的 channel
ch := make(chan int)
// 在 goroutine 中发送数据
go func() {
fmt.Printf(" 发送数据到 channel\n")
ch <- 42
}()
// 从 channel 接收数据
value := <-ch
fmt.Printf(" 从 channel 接收到: %d\n", value)
// 双向通信
fmt.Printf(" 双向通信:\n")
messages := make(chan string)
responses := make(chan string)
// 启动一个 echo 服务
go echoServer(messages, responses)
// 发送消息并接收响应
messages <- "Hello"
response := <-responses
fmt.Printf(" 发送: Hello, 接收: %s\n", response)
messages <- "World"
response = <-responses
fmt.Printf(" 发送: World, 接收: %s\n", response)
// 关闭 channels
close(messages)
close(responses)
// 同步使用 channel
fmt.Printf(" 同步使用 channel:\n")
done := make(chan bool)
go func() {
fmt.Printf(" 执行异步任务...\n")
time.Sleep(200 * time.Millisecond)
fmt.Printf(" 异步任务完成\n")
done <- true
}()
fmt.Printf(" 等待异步任务完成...\n")
<-done
fmt.Printf(" 主程序继续执行\n")
fmt.Println()
}
// demonstrateBufferedChannels 演示缓冲 channel
func demonstrateBufferedChannels() {
fmt.Println("2. 缓冲 Channel:")
// 无缓冲 vs 有缓冲
fmt.Printf(" 无缓冲 vs 有缓冲:\n")
// 无缓冲 channel同步
fmt.Printf(" 无缓冲 channel (同步):\n")
unbuffered := make(chan string)
go func() {
fmt.Printf(" 发送到无缓冲 channel\n")
unbuffered <- "sync message"
fmt.Printf(" 无缓冲 channel 发送完成\n")
}()
time.Sleep(100 * time.Millisecond) // 让 goroutine 先运行
msg := <-unbuffered
fmt.Printf(" 接收到: %s\n", msg)
// 有缓冲 channel异步
fmt.Printf(" 有缓冲 channel (异步):\n")
buffered := make(chan string, 2)
fmt.Printf(" 发送到有缓冲 channel\n")
buffered <- "async message 1"
buffered <- "async message 2"
fmt.Printf(" 有缓冲 channel 发送完成(未阻塞)\n")
fmt.Printf(" 接收到: %s\n", <-buffered)
fmt.Printf(" 接收到: %s\n", <-buffered)
// 缓冲区满时的行为
fmt.Printf(" 缓冲区满时的行为:\n")
fullBuffer := make(chan int, 2)
// 填满缓冲区
fullBuffer <- 1
fullBuffer <- 2
fmt.Printf(" 缓冲区已满 (2/2)\n")
// 尝试再发送一个值(会阻塞)
go func() {
fmt.Printf(" 尝试发送第三个值(会阻塞)\n")
fullBuffer <- 3
fmt.Printf(" 第三个值发送成功\n")
}()
time.Sleep(100 * time.Millisecond)
fmt.Printf(" 接收一个值: %d\n", <-fullBuffer)
time.Sleep(100 * time.Millisecond)
fmt.Printf(" 接收一个值: %d\n", <-fullBuffer)
fmt.Printf(" 接收一个值: %d\n", <-fullBuffer)
// 生产者-消费者模式
fmt.Printf(" 生产者-消费者模式:\n")
const bufferSize = 5
const numProducers = 2
const numConsumers = 3
const numItems = 10
items := make(chan int, bufferSize)
var wg sync.WaitGroup
// 启动生产者
for i := 0; i < numProducers; i++ {
wg.Add(1)
go producer(i+1, items, numItems/numProducers, &wg)
}
// 启动消费者
for i := 0; i < numConsumers; i++ {
wg.Add(1)
go consumer(i+1, items, &wg)
}
// 等待所有生产者完成,然后关闭 channel
go func() {
wg.Wait()
close(items)
}()
// 等待一段时间让消费者处理完所有项目
time.Sleep(500 * time.Millisecond)
fmt.Println()
}
// demonstrateChannelDirections 演示 channel 的方向性
func demonstrateChannelDirections() {
fmt.Println("3. Channel 的方向性:")
// 双向 channel
fmt.Printf(" 双向 channel:\n")
bidirectional := make(chan string)
go func() {
bidirectional <- "双向消息"
}()
message := <-bidirectional
fmt.Printf(" 接收到: %s\n", message)
// 只发送 channel
fmt.Printf(" 只发送 channel:\n")
sendOnly := make(chan int)
go sender(sendOnly)
value := <-sendOnly
fmt.Printf(" 从只发送 channel 接收到: %d\n", value)
// 只接收 channel
fmt.Printf(" 只接收 channel:\n")
receiveOnly := make(chan int, 1)
receiveOnly <- 100
go receiver(receiveOnly)
time.Sleep(100 * time.Millisecond)
// 管道模式
fmt.Printf(" 管道模式:\n")
numbers := make(chan int)
squares := make(chan int)
cubes := make(chan int)
// 启动管道
go generateNumbers(numbers)
go squareNumbers(numbers, squares)
go cubeNumbers(squares, cubes)
// 读取最终结果
for i := 0; i < 5; i++ {
result := <-cubes
fmt.Printf(" 管道结果: %d\n", result)
}
fmt.Println()
}
// demonstrateChannelClosing 演示 channel 的关闭
func demonstrateChannelClosing() {
fmt.Println("4. Channel 的关闭:")
// 基本的 channel 关闭
fmt.Printf(" 基本的 channel 关闭:\n")
ch := make(chan int, 3)
// 发送一些值
ch <- 1
ch <- 2
ch <- 3
// 关闭 channel
close(ch)
// 从已关闭的 channel 读取
for i := 0; i < 4; i++ {
value, ok := <-ch
if ok {
fmt.Printf(" 接收到值: %d\n", value)
} else {
fmt.Printf(" channel 已关闭,接收到零值: %d\n", value)
}
}
// 检测 channel 是否关闭
fmt.Printf(" 检测 channel 是否关闭:\n")
status := make(chan string, 2)
status <- "active"
status <- "inactive"
close(status)
for {
value, ok := <-status
if !ok {
fmt.Printf(" channel 已关闭,退出循环\n")
break
}
fmt.Printf(" 状态: %s\n", value)
}
// 多个发送者的关闭模式
fmt.Printf(" 多个发送者的关闭模式:\n")
data := make(chan int)
done := make(chan bool)
// 启动多个发送者
for i := 1; i <= 3; i++ {
go multipleSender(i, data, done)
}
// 接收数据
go func() {
for value := range data {
fmt.Printf(" 接收到: %d\n", value)
}
fmt.Printf(" 数据接收完成\n")
}()
// 等待一段时间后通知所有发送者停止
time.Sleep(300 * time.Millisecond)
close(done)
// 等待发送者停止后关闭数据 channel
time.Sleep(100 * time.Millisecond)
close(data)
time.Sleep(100 * time.Millisecond)
fmt.Println()
}
// demonstrateChannelRange 演示 channel 与 range
func demonstrateChannelRange() {
fmt.Println("5. Channel 与 Range:")
// 使用 range 遍历 channel
fmt.Printf(" 使用 range 遍历 channel:\n")
numbers := make(chan int)
// 发送数据
go func() {
for i := 1; i <= 5; i++ {
numbers <- i
time.Sleep(50 * time.Millisecond)
}
close(numbers)
}()
// 使用 range 接收数据
for num := range numbers {
fmt.Printf(" 接收到数字: %d\n", num)
}
// 斐波那契数列生成器
fmt.Printf(" 斐波那契数列生成器:\n")
fib := fibonacci(10)
for value := range fib {
fmt.Printf(" 斐波那契: %d\n", value)
}
// 素数生成器
fmt.Printf(" 素数生成器:\n")
primes := sieve(30)
fmt.Printf(" 30 以内的素数: ")
for prime := range primes {
fmt.Printf("%d ", prime)
}
fmt.Printf("\n")
fmt.Println()
}
// demonstrateChannelPatterns 演示 channel 的常见模式
func demonstrateChannelPatterns() {
fmt.Println("6. Channel 的常见模式:")
// 扇出模式(一个输入,多个输出)
fmt.Printf(" 扇出模式:\n")
input := make(chan int)
output1 := make(chan int)
output2 := make(chan int)
output3 := make(chan int)
// 扇出
go fanOut(input, output1, output2, output3)
// 发送数据
go func() {
for i := 1; i <= 6; i++ {
input <- i
}
close(input)
}()
// 接收数据
var wg sync.WaitGroup
wg.Add(3)
go func() {
defer wg.Done()
for value := range output1 {
fmt.Printf(" 输出1: %d\n", value)
}
}()
go func() {
defer wg.Done()
for value := range output2 {
fmt.Printf(" 输出2: %d\n", value)
}
}()
go func() {
defer wg.Done()
for value := range output3 {
fmt.Printf(" 输出3: %d\n", value)
}
}()
wg.Wait()
// 扇入模式(多个输入,一个输出)
fmt.Printf(" 扇入模式:\n")
input1 := make(chan string)
input2 := make(chan string)
input3 := make(chan string)
// 启动输入源
go func() {
for i := 1; i <= 3; i++ {
input1 <- fmt.Sprintf("源1-%d", i)
time.Sleep(100 * time.Millisecond)
}
close(input1)
}()
go func() {
for i := 1; i <= 3; i++ {
input2 <- fmt.Sprintf("源2-%d", i)
time.Sleep(150 * time.Millisecond)
}
close(input2)
}()
go func() {
for i := 1; i <= 3; i++ {
input3 <- fmt.Sprintf("源3-%d", i)
time.Sleep(200 * time.Millisecond)
}
close(input3)
}()
// 扇入
merged := fanInMultiple(input1, input2, input3)
fmt.Printf(" 合并结果:\n")
for result := range merged {
fmt.Printf(" %s\n", result)
}
// 工作池模式
fmt.Printf(" 工作池模式:\n")
jobs := make(chan WorkJob, 10)
results := make(chan WorkResult, 10)
// 启动工作池
const numWorkers = 3
for w := 1; w <= numWorkers; w++ {
go workPoolWorker(w, jobs, results)
}
// 发送任务
for j := 1; j <= 9; j++ {
jobs <- WorkJob{ID: j, Data: fmt.Sprintf("任务-%d", j)}
}
close(jobs)
// 收集结果
for r := 1; r <= 9; r++ {
result := <-results
fmt.Printf(" %s\n", result.Message)
}
fmt.Println()
}
// demonstrateAdvancedChannelUsage 演示 channel 的高级用法
func demonstrateAdvancedChannelUsage() {
fmt.Println("7. Channel 的高级用法:")
// 超时模式
fmt.Printf(" 超时模式:\n")
slowCh := make(chan string)
go func() {
time.Sleep(300 * time.Millisecond)
slowCh <- "慢速响应"
}()
select {
case result := <-slowCh:
fmt.Printf(" 接收到: %s\n", result)
case <-time.After(200 * time.Millisecond):
fmt.Printf(" 操作超时\n")
}
// 非阻塞操作
fmt.Printf(" 非阻塞操作:\n")
nonBlockingCh := make(chan int, 1)
// 非阻塞发送
select {
case nonBlockingCh <- 42:
fmt.Printf(" 非阻塞发送成功\n")
default:
fmt.Printf(" 非阻塞发送失败channel 已满\n")
}
// 非阻塞接收
select {
case value := <-nonBlockingCh:
fmt.Printf(" 非阻塞接收成功: %d\n", value)
default:
fmt.Printf(" 非阻塞接收失败channel 为空\n")
}
// 心跳模式
fmt.Printf(" 心跳模式:\n")
heartbeat := time.NewTicker(100 * time.Millisecond)
defer heartbeat.Stop()
timeout := time.After(350 * time.Millisecond)
for {
select {
case <-heartbeat.C:
fmt.Printf(" 心跳\n")
case <-timeout:
fmt.Printf(" 心跳监控结束\n")
goto heartbeatEnd
}
}
heartbeatEnd:
// 速率限制
fmt.Printf(" 速率限制:\n")
rateLimiter := time.NewTicker(100 * time.Millisecond)
defer rateLimiter.Stop()
requests := make(chan int, 5)
for i := 1; i <= 5; i++ {
requests <- i
}
close(requests)
for req := range requests {
<-rateLimiter.C // 等待速率限制器
fmt.Printf(" 处理请求 %d (限速)\n", req)
}
fmt.Println()
}
// ========== 辅助函数和类型定义 ==========
// echoServer 简单的回声服务器
func echoServer(messages <-chan string, responses chan<- string) {
for msg := range messages {
responses <- "Echo: " + msg
}
}
// producer 生产者函数
func producer(id int, items chan<- int, count int, wg *sync.WaitGroup) {
defer wg.Done()
for i := 0; i < count; i++ {
item := id*100 + i
items <- item
fmt.Printf(" 生产者 %d 生产: %d\n", id, item)
time.Sleep(time.Duration(rand.Intn(100)) * time.Millisecond)
}
fmt.Printf(" 生产者 %d 完成\n", id)
}
// consumer 消费者函数
func consumer(id int, items <-chan int, wg *sync.WaitGroup) {
defer wg.Done()
for item := range items {
fmt.Printf(" 消费者 %d 消费: %d\n", id, item)
time.Sleep(time.Duration(rand.Intn(150)) * time.Millisecond)
}
fmt.Printf(" 消费者 %d 完成\n", id)
}
// sender 只发送函数
func sender(ch chan<- int) {
ch <- 42
close(ch)
}
// receiver 只接收函数
func receiver(ch <-chan int) {
value := <-ch
fmt.Printf(" 只接收 channel 接收到: %d\n", value)
}
// generateNumbers 生成数字
func generateNumbers(out chan<- int) {
for i := 1; i <= 5; i++ {
out <- i
}
close(out)
}
// squareNumbers 计算平方
func squareNumbers(in <-chan int, out chan<- int) {
for num := range in {
out <- num * num
}
close(out)
}
// cubeNumbers 计算立方
func cubeNumbers(in <-chan int, out chan<- int) {
for num := range in {
out <- num * num * num
}
close(out)
}
// multipleSender 多发送者函数
func multipleSender(id int, data chan<- int, done <-chan bool) {
for {
select {
case data <- id*10 + rand.Intn(10):
time.Sleep(100 * time.Millisecond)
case <-done:
fmt.Printf(" 发送者 %d 停止\n", id)
return
}
}
}
// fibonacci 斐波那契生成器
func fibonacci(n int) <-chan int {
ch := make(chan int)
go func() {
defer close(ch)
a, b := 0, 1
for i := 0; i < n; i++ {
ch <- a
a, b = b, a+b
}
}()
return ch
}
// sieve 埃拉托斯特尼筛法生成素数
func sieve(max int) <-chan int {
ch := make(chan int)
go func() {
defer close(ch)
isPrime := make([]bool, max+1)
for i := 2; i <= max; i++ {
isPrime[i] = true
}
for i := 2; i*i <= max; i++ {
if isPrime[i] {
for j := i * i; j <= max; j += i {
isPrime[j] = false
}
}
}
for i := 2; i <= max; i++ {
if isPrime[i] {
ch <- i
}
}
}()
return ch
}
// fanOut 扇出函数
func fanOut(input <-chan int, outputs ...chan<- int) {
go func() {
defer func() {
for _, output := range outputs {
close(output)
}
}()
i := 0
for value := range input {
outputs[i%len(outputs)] <- value
i++
}
}()
}
// fanInMultiple 多路扇入函数
func fanInMultiple(inputs ...<-chan string) <-chan string {
output := make(chan string)
var wg sync.WaitGroup
// 为每个输入启动一个 goroutine
for _, input := range inputs {
wg.Add(1)
go func(ch <-chan string) {
defer wg.Done()
for value := range ch {
output <- value
}
}(input)
}
// 等待所有输入完成后关闭输出
go func() {
wg.Wait()
close(output)
}()
return output
}
// WorkJob 工作任务
type WorkJob struct {
ID int
Data string
}
// WorkResult 工作结果
type WorkResult struct {
JobID int
Message string
}
// workPoolWorker 工作池工作者
func workPoolWorker(id int, jobs <-chan WorkJob, results chan<- WorkResult) {
for job := range jobs {
fmt.Printf(" 工作者 %d 开始处理任务 %d\n", id, job.ID)
time.Sleep(time.Duration(rand.Intn(200)) * time.Millisecond)
results <- WorkResult{
JobID: job.ID,
Message: fmt.Sprintf("工作者 %d 完成任务 %d: %s", id, job.ID, job.Data),
}
}
}
/*
运行这个程序:
go run 02-channels.go
学习要点:
1. Channel 是 Go 语言中 goroutine 间通信的主要方式
2. Channel 有无缓冲和有缓冲两种类型
3. Channel 具有方向性,可以限制为只发送或只接收
4. Channel 可以被关闭,关闭后不能再发送数据
5. 使用 range 可以方便地遍历 channel 中的数据
Channel 的特性:
1. 类型安全:只能传递指定类型的数据
2. 同步机制:无缓冲 channel 提供同步语义
3. 异步机制:有缓冲 channel 提供异步语义
4. 方向性:可以限制 channel 的使用方向
5. 可关闭:发送方可以关闭 channel 通知接收方
Channel 类型:
1. 无缓冲 channelmake(chan T)
2. 有缓冲 channelmake(chan T, size)
3. 只发送 channelchan<- T
4. 只接收 channel<-chan T
5. 双向 channelchan T
Channel 操作:
1. 发送ch <- value
2. 接收value := <-ch
3. 接收并检查value, ok := <-ch
4. 关闭close(ch)
5. 遍历for value := range ch
常见模式:
1. 生产者-消费者:使用缓冲 channel 解耦生产和消费
2. 扇出:一个输入分发到多个输出
3. 扇入:多个输入合并到一个输出
4. 管道:链式处理数据
5. 工作池:固定数量的 worker 处理任务
高级用法:
1. 超时控制:使用 time.After
2. 非阻塞操作:使用 select 的 default 分支
3. 心跳机制:使用 time.Ticker
4. 速率限制:控制操作频率
5. 信号通知:使用 channel 作为信号
最佳实践:
1. 发送方负责关闭 channel
2. 不要从接收方关闭 channel
3. 不要关闭已关闭的 channel
4. 使用 range 遍历 channel
5. 使用 select 处理多个 channel
注意事项:
1. 向已关闭的 channel 发送数据会 panic
2. 关闭已关闭的 channel 会 panic
3. 从已关闭的 channel 接收会得到零值
4. nil channel 的发送和接收都会阻塞
5. 无缓冲 channel 的发送和接收必须同时准备好
性能考虑:
1. 无缓冲 channel 有同步开销
2. 有缓冲 channel 可以减少阻塞
3. Channel 操作比直接内存访问慢
4. 合理选择缓冲区大小
5. 避免创建过多的 channel
*/