841 lines
18 KiB
Go
841 lines
18 KiB
Go
/*
|
||
02-channels.go - Go 语言 Channels 详解
|
||
|
||
学习目标:
|
||
1. 理解 channel 的概念和特性
|
||
2. 掌握 channel 的创建和使用
|
||
3. 学会不同类型的 channel
|
||
4. 了解 channel 的方向性
|
||
5. 掌握 channel 的关闭和检测
|
||
|
||
知识点:
|
||
- channel 的基本概念
|
||
- 无缓冲和有缓冲 channel
|
||
- channel 的方向性
|
||
- channel 的关闭
|
||
- range 和 select 与 channel
|
||
- channel 的常见模式
|
||
*/
|
||
|
||
package main
|
||
|
||
import (
|
||
"fmt"
|
||
"math/rand"
|
||
"sync"
|
||
"time"
|
||
)
|
||
|
||
func main() {
|
||
fmt.Println("=== Go 语言 Channels 详解 ===\n")
|
||
|
||
// 演示基本的 channel
|
||
demonstrateBasicChannels()
|
||
|
||
// 演示缓冲 channel
|
||
demonstrateBufferedChannels()
|
||
|
||
// 演示 channel 的方向性
|
||
demonstrateChannelDirections()
|
||
|
||
// 演示 channel 的关闭
|
||
demonstrateChannelClosing()
|
||
|
||
// 演示 channel 与 range
|
||
demonstrateChannelRange()
|
||
|
||
// 演示 channel 的常见模式
|
||
demonstrateChannelPatterns()
|
||
|
||
// 演示 channel 的高级用法
|
||
demonstrateAdvancedChannelUsage()
|
||
}
|
||
|
||
// demonstrateBasicChannels 演示基本的 channel
|
||
func demonstrateBasicChannels() {
|
||
fmt.Println("1. 基本的 Channel:")
|
||
|
||
// channel 的基本概念
|
||
fmt.Printf(" Channel 的基本概念:\n")
|
||
fmt.Printf(" - Go 语言的核心特性,用于 goroutine 间通信\n")
|
||
fmt.Printf(" - 类型安全的管道,可以传递特定类型的数据\n")
|
||
fmt.Printf(" - 默认是无缓冲的,发送和接收会阻塞\n")
|
||
fmt.Printf(" - 遵循 'Don't communicate by sharing memory; share memory by communicating'\n")
|
||
|
||
// 创建和使用 channel
|
||
fmt.Printf(" 创建和使用 channel:\n")
|
||
|
||
// 创建一个 int 类型的 channel
|
||
ch := make(chan int)
|
||
|
||
// 在 goroutine 中发送数据
|
||
go func() {
|
||
fmt.Printf(" 发送数据到 channel\n")
|
||
ch <- 42
|
||
}()
|
||
|
||
// 从 channel 接收数据
|
||
value := <-ch
|
||
fmt.Printf(" 从 channel 接收到: %d\n", value)
|
||
|
||
// 双向通信
|
||
fmt.Printf(" 双向通信:\n")
|
||
|
||
messages := make(chan string)
|
||
responses := make(chan string)
|
||
|
||
// 启动一个 echo 服务
|
||
go echoServer(messages, responses)
|
||
|
||
// 发送消息并接收响应
|
||
messages <- "Hello"
|
||
response := <-responses
|
||
fmt.Printf(" 发送: Hello, 接收: %s\n", response)
|
||
|
||
messages <- "World"
|
||
response = <-responses
|
||
fmt.Printf(" 发送: World, 接收: %s\n", response)
|
||
|
||
// 关闭 channels
|
||
close(messages)
|
||
close(responses)
|
||
|
||
// 同步使用 channel
|
||
fmt.Printf(" 同步使用 channel:\n")
|
||
|
||
done := make(chan bool)
|
||
|
||
go func() {
|
||
fmt.Printf(" 执行异步任务...\n")
|
||
time.Sleep(200 * time.Millisecond)
|
||
fmt.Printf(" 异步任务完成\n")
|
||
done <- true
|
||
}()
|
||
|
||
fmt.Printf(" 等待异步任务完成...\n")
|
||
<-done
|
||
fmt.Printf(" 主程序继续执行\n")
|
||
|
||
fmt.Println()
|
||
}
|
||
|
||
// demonstrateBufferedChannels 演示缓冲 channel
|
||
func demonstrateBufferedChannels() {
|
||
fmt.Println("2. 缓冲 Channel:")
|
||
|
||
// 无缓冲 vs 有缓冲
|
||
fmt.Printf(" 无缓冲 vs 有缓冲:\n")
|
||
|
||
// 无缓冲 channel(同步)
|
||
fmt.Printf(" 无缓冲 channel (同步):\n")
|
||
unbuffered := make(chan string)
|
||
|
||
go func() {
|
||
fmt.Printf(" 发送到无缓冲 channel\n")
|
||
unbuffered <- "sync message"
|
||
fmt.Printf(" 无缓冲 channel 发送完成\n")
|
||
}()
|
||
|
||
time.Sleep(100 * time.Millisecond) // 让 goroutine 先运行
|
||
msg := <-unbuffered
|
||
fmt.Printf(" 接收到: %s\n", msg)
|
||
|
||
// 有缓冲 channel(异步)
|
||
fmt.Printf(" 有缓冲 channel (异步):\n")
|
||
buffered := make(chan string, 2)
|
||
|
||
fmt.Printf(" 发送到有缓冲 channel\n")
|
||
buffered <- "async message 1"
|
||
buffered <- "async message 2"
|
||
fmt.Printf(" 有缓冲 channel 发送完成(未阻塞)\n")
|
||
|
||
fmt.Printf(" 接收到: %s\n", <-buffered)
|
||
fmt.Printf(" 接收到: %s\n", <-buffered)
|
||
|
||
// 缓冲区满时的行为
|
||
fmt.Printf(" 缓冲区满时的行为:\n")
|
||
|
||
fullBuffer := make(chan int, 2)
|
||
|
||
// 填满缓冲区
|
||
fullBuffer <- 1
|
||
fullBuffer <- 2
|
||
fmt.Printf(" 缓冲区已满 (2/2)\n")
|
||
|
||
// 尝试再发送一个值(会阻塞)
|
||
go func() {
|
||
fmt.Printf(" 尝试发送第三个值(会阻塞)\n")
|
||
fullBuffer <- 3
|
||
fmt.Printf(" 第三个值发送成功\n")
|
||
}()
|
||
|
||
time.Sleep(100 * time.Millisecond)
|
||
fmt.Printf(" 接收一个值: %d\n", <-fullBuffer)
|
||
time.Sleep(100 * time.Millisecond)
|
||
fmt.Printf(" 接收一个值: %d\n", <-fullBuffer)
|
||
fmt.Printf(" 接收一个值: %d\n", <-fullBuffer)
|
||
|
||
// 生产者-消费者模式
|
||
fmt.Printf(" 生产者-消费者模式:\n")
|
||
|
||
const bufferSize = 5
|
||
const numProducers = 2
|
||
const numConsumers = 3
|
||
const numItems = 10
|
||
|
||
items := make(chan int, bufferSize)
|
||
var wg sync.WaitGroup
|
||
|
||
// 启动生产者
|
||
for i := 0; i < numProducers; i++ {
|
||
wg.Add(1)
|
||
go producer(i+1, items, numItems/numProducers, &wg)
|
||
}
|
||
|
||
// 启动消费者
|
||
for i := 0; i < numConsumers; i++ {
|
||
wg.Add(1)
|
||
go consumer(i+1, items, &wg)
|
||
}
|
||
|
||
// 等待所有生产者完成,然后关闭 channel
|
||
go func() {
|
||
wg.Wait()
|
||
close(items)
|
||
}()
|
||
|
||
// 等待一段时间让消费者处理完所有项目
|
||
time.Sleep(500 * time.Millisecond)
|
||
|
||
fmt.Println()
|
||
}
|
||
|
||
// demonstrateChannelDirections 演示 channel 的方向性
|
||
func demonstrateChannelDirections() {
|
||
fmt.Println("3. Channel 的方向性:")
|
||
|
||
// 双向 channel
|
||
fmt.Printf(" 双向 channel:\n")
|
||
|
||
bidirectional := make(chan string)
|
||
|
||
go func() {
|
||
bidirectional <- "双向消息"
|
||
}()
|
||
|
||
message := <-bidirectional
|
||
fmt.Printf(" 接收到: %s\n", message)
|
||
|
||
// 只发送 channel
|
||
fmt.Printf(" 只发送 channel:\n")
|
||
|
||
sendOnly := make(chan int)
|
||
go sender(sendOnly)
|
||
|
||
value := <-sendOnly
|
||
fmt.Printf(" 从只发送 channel 接收到: %d\n", value)
|
||
|
||
// 只接收 channel
|
||
fmt.Printf(" 只接收 channel:\n")
|
||
|
||
receiveOnly := make(chan int, 1)
|
||
receiveOnly <- 100
|
||
|
||
go receiver(receiveOnly)
|
||
time.Sleep(100 * time.Millisecond)
|
||
|
||
// 管道模式
|
||
fmt.Printf(" 管道模式:\n")
|
||
|
||
numbers := make(chan int)
|
||
squares := make(chan int)
|
||
cubes := make(chan int)
|
||
|
||
// 启动管道
|
||
go generateNumbers(numbers)
|
||
go squareNumbers(numbers, squares)
|
||
go cubeNumbers(squares, cubes)
|
||
|
||
// 读取最终结果
|
||
for i := 0; i < 5; i++ {
|
||
result := <-cubes
|
||
fmt.Printf(" 管道结果: %d\n", result)
|
||
}
|
||
|
||
fmt.Println()
|
||
}
|
||
|
||
// demonstrateChannelClosing 演示 channel 的关闭
|
||
func demonstrateChannelClosing() {
|
||
fmt.Println("4. Channel 的关闭:")
|
||
|
||
// 基本的 channel 关闭
|
||
fmt.Printf(" 基本的 channel 关闭:\n")
|
||
|
||
ch := make(chan int, 3)
|
||
|
||
// 发送一些值
|
||
ch <- 1
|
||
ch <- 2
|
||
ch <- 3
|
||
|
||
// 关闭 channel
|
||
close(ch)
|
||
|
||
// 从已关闭的 channel 读取
|
||
for i := 0; i < 4; i++ {
|
||
value, ok := <-ch
|
||
if ok {
|
||
fmt.Printf(" 接收到值: %d\n", value)
|
||
} else {
|
||
fmt.Printf(" channel 已关闭,接收到零值: %d\n", value)
|
||
}
|
||
}
|
||
|
||
// 检测 channel 是否关闭
|
||
fmt.Printf(" 检测 channel 是否关闭:\n")
|
||
|
||
status := make(chan string, 2)
|
||
status <- "active"
|
||
status <- "inactive"
|
||
close(status)
|
||
|
||
for {
|
||
value, ok := <-status
|
||
if !ok {
|
||
fmt.Printf(" channel 已关闭,退出循环\n")
|
||
break
|
||
}
|
||
fmt.Printf(" 状态: %s\n", value)
|
||
}
|
||
|
||
// 多个发送者的关闭模式
|
||
fmt.Printf(" 多个发送者的关闭模式:\n")
|
||
|
||
data := make(chan int)
|
||
done := make(chan bool)
|
||
|
||
// 启动多个发送者
|
||
for i := 1; i <= 3; i++ {
|
||
go multipleSender(i, data, done)
|
||
}
|
||
|
||
// 接收数据
|
||
go func() {
|
||
for value := range data {
|
||
fmt.Printf(" 接收到: %d\n", value)
|
||
}
|
||
fmt.Printf(" 数据接收完成\n")
|
||
}()
|
||
|
||
// 等待一段时间后通知所有发送者停止
|
||
time.Sleep(300 * time.Millisecond)
|
||
close(done)
|
||
|
||
// 等待发送者停止后关闭数据 channel
|
||
time.Sleep(100 * time.Millisecond)
|
||
close(data)
|
||
|
||
time.Sleep(100 * time.Millisecond)
|
||
|
||
fmt.Println()
|
||
}
|
||
|
||
// demonstrateChannelRange 演示 channel 与 range
|
||
func demonstrateChannelRange() {
|
||
fmt.Println("5. Channel 与 Range:")
|
||
|
||
// 使用 range 遍历 channel
|
||
fmt.Printf(" 使用 range 遍历 channel:\n")
|
||
|
||
numbers := make(chan int)
|
||
|
||
// 发送数据
|
||
go func() {
|
||
for i := 1; i <= 5; i++ {
|
||
numbers <- i
|
||
time.Sleep(50 * time.Millisecond)
|
||
}
|
||
close(numbers)
|
||
}()
|
||
|
||
// 使用 range 接收数据
|
||
for num := range numbers {
|
||
fmt.Printf(" 接收到数字: %d\n", num)
|
||
}
|
||
|
||
// 斐波那契数列生成器
|
||
fmt.Printf(" 斐波那契数列生成器:\n")
|
||
|
||
fib := fibonacci(10)
|
||
for value := range fib {
|
||
fmt.Printf(" 斐波那契: %d\n", value)
|
||
}
|
||
|
||
// 素数生成器
|
||
fmt.Printf(" 素数生成器:\n")
|
||
|
||
primes := sieve(30)
|
||
fmt.Printf(" 30 以内的素数: ")
|
||
for prime := range primes {
|
||
fmt.Printf("%d ", prime)
|
||
}
|
||
fmt.Printf("\n")
|
||
|
||
fmt.Println()
|
||
}
|
||
|
||
// demonstrateChannelPatterns 演示 channel 的常见模式
|
||
func demonstrateChannelPatterns() {
|
||
fmt.Println("6. Channel 的常见模式:")
|
||
|
||
// 扇出模式(一个输入,多个输出)
|
||
fmt.Printf(" 扇出模式:\n")
|
||
|
||
input := make(chan int)
|
||
output1 := make(chan int)
|
||
output2 := make(chan int)
|
||
output3 := make(chan int)
|
||
|
||
// 扇出
|
||
go fanOut(input, output1, output2, output3)
|
||
|
||
// 发送数据
|
||
go func() {
|
||
for i := 1; i <= 6; i++ {
|
||
input <- i
|
||
}
|
||
close(input)
|
||
}()
|
||
|
||
// 接收数据
|
||
var wg sync.WaitGroup
|
||
wg.Add(3)
|
||
|
||
go func() {
|
||
defer wg.Done()
|
||
for value := range output1 {
|
||
fmt.Printf(" 输出1: %d\n", value)
|
||
}
|
||
}()
|
||
|
||
go func() {
|
||
defer wg.Done()
|
||
for value := range output2 {
|
||
fmt.Printf(" 输出2: %d\n", value)
|
||
}
|
||
}()
|
||
|
||
go func() {
|
||
defer wg.Done()
|
||
for value := range output3 {
|
||
fmt.Printf(" 输出3: %d\n", value)
|
||
}
|
||
}()
|
||
|
||
wg.Wait()
|
||
|
||
// 扇入模式(多个输入,一个输出)
|
||
fmt.Printf(" 扇入模式:\n")
|
||
|
||
input1 := make(chan string)
|
||
input2 := make(chan string)
|
||
input3 := make(chan string)
|
||
|
||
// 启动输入源
|
||
go func() {
|
||
for i := 1; i <= 3; i++ {
|
||
input1 <- fmt.Sprintf("源1-%d", i)
|
||
time.Sleep(100 * time.Millisecond)
|
||
}
|
||
close(input1)
|
||
}()
|
||
|
||
go func() {
|
||
for i := 1; i <= 3; i++ {
|
||
input2 <- fmt.Sprintf("源2-%d", i)
|
||
time.Sleep(150 * time.Millisecond)
|
||
}
|
||
close(input2)
|
||
}()
|
||
|
||
go func() {
|
||
for i := 1; i <= 3; i++ {
|
||
input3 <- fmt.Sprintf("源3-%d", i)
|
||
time.Sleep(200 * time.Millisecond)
|
||
}
|
||
close(input3)
|
||
}()
|
||
|
||
// 扇入
|
||
merged := fanInMultiple(input1, input2, input3)
|
||
|
||
fmt.Printf(" 合并结果:\n")
|
||
for result := range merged {
|
||
fmt.Printf(" %s\n", result)
|
||
}
|
||
|
||
// 工作池模式
|
||
fmt.Printf(" 工作池模式:\n")
|
||
|
||
jobs := make(chan WorkJob, 10)
|
||
results := make(chan WorkResult, 10)
|
||
|
||
// 启动工作池
|
||
const numWorkers = 3
|
||
for w := 1; w <= numWorkers; w++ {
|
||
go workPoolWorker(w, jobs, results)
|
||
}
|
||
|
||
// 发送任务
|
||
for j := 1; j <= 9; j++ {
|
||
jobs <- WorkJob{ID: j, Data: fmt.Sprintf("任务-%d", j)}
|
||
}
|
||
close(jobs)
|
||
|
||
// 收集结果
|
||
for r := 1; r <= 9; r++ {
|
||
result := <-results
|
||
fmt.Printf(" %s\n", result.Message)
|
||
}
|
||
|
||
fmt.Println()
|
||
}
|
||
|
||
// demonstrateAdvancedChannelUsage 演示 channel 的高级用法
|
||
func demonstrateAdvancedChannelUsage() {
|
||
fmt.Println("7. Channel 的高级用法:")
|
||
|
||
// 超时模式
|
||
fmt.Printf(" 超时模式:\n")
|
||
|
||
slowCh := make(chan string)
|
||
|
||
go func() {
|
||
time.Sleep(300 * time.Millisecond)
|
||
slowCh <- "慢速响应"
|
||
}()
|
||
|
||
select {
|
||
case result := <-slowCh:
|
||
fmt.Printf(" 接收到: %s\n", result)
|
||
case <-time.After(200 * time.Millisecond):
|
||
fmt.Printf(" 操作超时\n")
|
||
}
|
||
|
||
// 非阻塞操作
|
||
fmt.Printf(" 非阻塞操作:\n")
|
||
|
||
nonBlockingCh := make(chan int, 1)
|
||
|
||
// 非阻塞发送
|
||
select {
|
||
case nonBlockingCh <- 42:
|
||
fmt.Printf(" 非阻塞发送成功\n")
|
||
default:
|
||
fmt.Printf(" 非阻塞发送失败,channel 已满\n")
|
||
}
|
||
|
||
// 非阻塞接收
|
||
select {
|
||
case value := <-nonBlockingCh:
|
||
fmt.Printf(" 非阻塞接收成功: %d\n", value)
|
||
default:
|
||
fmt.Printf(" 非阻塞接收失败,channel 为空\n")
|
||
}
|
||
|
||
// 心跳模式
|
||
fmt.Printf(" 心跳模式:\n")
|
||
|
||
heartbeat := time.NewTicker(100 * time.Millisecond)
|
||
defer heartbeat.Stop()
|
||
|
||
timeout := time.After(350 * time.Millisecond)
|
||
|
||
for {
|
||
select {
|
||
case <-heartbeat.C:
|
||
fmt.Printf(" 心跳\n")
|
||
case <-timeout:
|
||
fmt.Printf(" 心跳监控结束\n")
|
||
goto heartbeatEnd
|
||
}
|
||
}
|
||
heartbeatEnd:
|
||
|
||
// 速率限制
|
||
fmt.Printf(" 速率限制:\n")
|
||
|
||
rateLimiter := time.NewTicker(100 * time.Millisecond)
|
||
defer rateLimiter.Stop()
|
||
|
||
requests := make(chan int, 5)
|
||
for i := 1; i <= 5; i++ {
|
||
requests <- i
|
||
}
|
||
close(requests)
|
||
|
||
for req := range requests {
|
||
<-rateLimiter.C // 等待速率限制器
|
||
fmt.Printf(" 处理请求 %d (限速)\n", req)
|
||
}
|
||
|
||
fmt.Println()
|
||
}
|
||
|
||
// ========== 辅助函数和类型定义 ==========
|
||
|
||
// echoServer 简单的回声服务器
|
||
func echoServer(messages <-chan string, responses chan<- string) {
|
||
for msg := range messages {
|
||
responses <- "Echo: " + msg
|
||
}
|
||
}
|
||
|
||
// producer 生产者函数
|
||
func producer(id int, items chan<- int, count int, wg *sync.WaitGroup) {
|
||
defer wg.Done()
|
||
for i := 0; i < count; i++ {
|
||
item := id*100 + i
|
||
items <- item
|
||
fmt.Printf(" 生产者 %d 生产: %d\n", id, item)
|
||
time.Sleep(time.Duration(rand.Intn(100)) * time.Millisecond)
|
||
}
|
||
fmt.Printf(" 生产者 %d 完成\n", id)
|
||
}
|
||
|
||
// consumer 消费者函数
|
||
func consumer(id int, items <-chan int, wg *sync.WaitGroup) {
|
||
defer wg.Done()
|
||
for item := range items {
|
||
fmt.Printf(" 消费者 %d 消费: %d\n", id, item)
|
||
time.Sleep(time.Duration(rand.Intn(150)) * time.Millisecond)
|
||
}
|
||
fmt.Printf(" 消费者 %d 完成\n", id)
|
||
}
|
||
|
||
// sender 只发送函数
|
||
func sender(ch chan<- int) {
|
||
ch <- 42
|
||
close(ch)
|
||
}
|
||
|
||
// receiver 只接收函数
|
||
func receiver(ch <-chan int) {
|
||
value := <-ch
|
||
fmt.Printf(" 只接收 channel 接收到: %d\n", value)
|
||
}
|
||
|
||
// generateNumbers 生成数字
|
||
func generateNumbers(out chan<- int) {
|
||
for i := 1; i <= 5; i++ {
|
||
out <- i
|
||
}
|
||
close(out)
|
||
}
|
||
|
||
// squareNumbers 计算平方
|
||
func squareNumbers(in <-chan int, out chan<- int) {
|
||
for num := range in {
|
||
out <- num * num
|
||
}
|
||
close(out)
|
||
}
|
||
|
||
// cubeNumbers 计算立方
|
||
func cubeNumbers(in <-chan int, out chan<- int) {
|
||
for num := range in {
|
||
out <- num * num * num
|
||
}
|
||
close(out)
|
||
}
|
||
|
||
// multipleSender 多发送者函数
|
||
func multipleSender(id int, data chan<- int, done <-chan bool) {
|
||
for {
|
||
select {
|
||
case data <- id*10 + rand.Intn(10):
|
||
time.Sleep(100 * time.Millisecond)
|
||
case <-done:
|
||
fmt.Printf(" 发送者 %d 停止\n", id)
|
||
return
|
||
}
|
||
}
|
||
}
|
||
|
||
// fibonacci 斐波那契生成器
|
||
func fibonacci(n int) <-chan int {
|
||
ch := make(chan int)
|
||
go func() {
|
||
defer close(ch)
|
||
a, b := 0, 1
|
||
for i := 0; i < n; i++ {
|
||
ch <- a
|
||
a, b = b, a+b
|
||
}
|
||
}()
|
||
return ch
|
||
}
|
||
|
||
// sieve 埃拉托斯特尼筛法生成素数
|
||
func sieve(max int) <-chan int {
|
||
ch := make(chan int)
|
||
go func() {
|
||
defer close(ch)
|
||
isPrime := make([]bool, max+1)
|
||
for i := 2; i <= max; i++ {
|
||
isPrime[i] = true
|
||
}
|
||
|
||
for i := 2; i*i <= max; i++ {
|
||
if isPrime[i] {
|
||
for j := i * i; j <= max; j += i {
|
||
isPrime[j] = false
|
||
}
|
||
}
|
||
}
|
||
|
||
for i := 2; i <= max; i++ {
|
||
if isPrime[i] {
|
||
ch <- i
|
||
}
|
||
}
|
||
}()
|
||
return ch
|
||
}
|
||
|
||
// fanOut 扇出函数
|
||
func fanOut(input <-chan int, outputs ...chan<- int) {
|
||
go func() {
|
||
defer func() {
|
||
for _, output := range outputs {
|
||
close(output)
|
||
}
|
||
}()
|
||
|
||
i := 0
|
||
for value := range input {
|
||
outputs[i%len(outputs)] <- value
|
||
i++
|
||
}
|
||
}()
|
||
}
|
||
|
||
// fanInMultiple 多路扇入函数
|
||
func fanInMultiple(inputs ...<-chan string) <-chan string {
|
||
output := make(chan string)
|
||
var wg sync.WaitGroup
|
||
|
||
// 为每个输入启动一个 goroutine
|
||
for _, input := range inputs {
|
||
wg.Add(1)
|
||
go func(ch <-chan string) {
|
||
defer wg.Done()
|
||
for value := range ch {
|
||
output <- value
|
||
}
|
||
}(input)
|
||
}
|
||
|
||
// 等待所有输入完成后关闭输出
|
||
go func() {
|
||
wg.Wait()
|
||
close(output)
|
||
}()
|
||
|
||
return output
|
||
}
|
||
|
||
// WorkJob 工作任务
|
||
type WorkJob struct {
|
||
ID int
|
||
Data string
|
||
}
|
||
|
||
// WorkResult 工作结果
|
||
type WorkResult struct {
|
||
JobID int
|
||
Message string
|
||
}
|
||
|
||
// workPoolWorker 工作池工作者
|
||
func workPoolWorker(id int, jobs <-chan WorkJob, results chan<- WorkResult) {
|
||
for job := range jobs {
|
||
fmt.Printf(" 工作者 %d 开始处理任务 %d\n", id, job.ID)
|
||
time.Sleep(time.Duration(rand.Intn(200)) * time.Millisecond)
|
||
|
||
results <- WorkResult{
|
||
JobID: job.ID,
|
||
Message: fmt.Sprintf("工作者 %d 完成任务 %d: %s", id, job.ID, job.Data),
|
||
}
|
||
}
|
||
}
|
||
|
||
/*
|
||
运行这个程序:
|
||
go run 02-channels.go
|
||
|
||
学习要点:
|
||
1. Channel 是 Go 语言中 goroutine 间通信的主要方式
|
||
2. Channel 有无缓冲和有缓冲两种类型
|
||
3. Channel 具有方向性,可以限制为只发送或只接收
|
||
4. Channel 可以被关闭,关闭后不能再发送数据
|
||
5. 使用 range 可以方便地遍历 channel 中的数据
|
||
|
||
Channel 的特性:
|
||
1. 类型安全:只能传递指定类型的数据
|
||
2. 同步机制:无缓冲 channel 提供同步语义
|
||
3. 异步机制:有缓冲 channel 提供异步语义
|
||
4. 方向性:可以限制 channel 的使用方向
|
||
5. 可关闭:发送方可以关闭 channel 通知接收方
|
||
|
||
Channel 类型:
|
||
1. 无缓冲 channel:make(chan T)
|
||
2. 有缓冲 channel:make(chan T, size)
|
||
3. 只发送 channel:chan<- T
|
||
4. 只接收 channel:<-chan T
|
||
5. 双向 channel:chan T
|
||
|
||
Channel 操作:
|
||
1. 发送:ch <- value
|
||
2. 接收:value := <-ch
|
||
3. 接收并检查:value, ok := <-ch
|
||
4. 关闭:close(ch)
|
||
5. 遍历:for value := range ch
|
||
|
||
常见模式:
|
||
1. 生产者-消费者:使用缓冲 channel 解耦生产和消费
|
||
2. 扇出:一个输入分发到多个输出
|
||
3. 扇入:多个输入合并到一个输出
|
||
4. 管道:链式处理数据
|
||
5. 工作池:固定数量的 worker 处理任务
|
||
|
||
高级用法:
|
||
1. 超时控制:使用 time.After
|
||
2. 非阻塞操作:使用 select 的 default 分支
|
||
3. 心跳机制:使用 time.Ticker
|
||
4. 速率限制:控制操作频率
|
||
5. 信号通知:使用 channel 作为信号
|
||
|
||
最佳实践:
|
||
1. 发送方负责关闭 channel
|
||
2. 不要从接收方关闭 channel
|
||
3. 不要关闭已关闭的 channel
|
||
4. 使用 range 遍历 channel
|
||
5. 使用 select 处理多个 channel
|
||
|
||
注意事项:
|
||
1. 向已关闭的 channel 发送数据会 panic
|
||
2. 关闭已关闭的 channel 会 panic
|
||
3. 从已关闭的 channel 接收会得到零值
|
||
4. nil channel 的发送和接收都会阻塞
|
||
5. 无缓冲 channel 的发送和接收必须同时准备好
|
||
|
||
性能考虑:
|
||
1. 无缓冲 channel 有同步开销
|
||
2. 有缓冲 channel 可以减少阻塞
|
||
3. Channel 操作比直接内存访问慢
|
||
4. 合理选择缓冲区大小
|
||
5. 避免创建过多的 channel
|
||
*/
|