Files
2025-08-24 11:24:52 +08:00

881 lines
20 KiB
Go
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

/*
03-select.go - Go 语言 Select 语句详解
学习目标:
1. 理解 select 语句的概念和作用
2. 掌握 select 的基本语法和用法
3. 学会使用 select 处理多个 channel
4. 了解 select 的非阻塞操作
5. 掌握 select 的常见应用模式
知识点:
- select 语句的基本语法
- select 的随机选择特性
- default 分支的使用
- select 与超时控制
- select 的常见模式
- select 的最佳实践
*/
package main
import (
"fmt"
"math/rand"
"reflect"
"strings"
"time"
)
func main() {
fmt.Println("=== Go 语言 Select 语句详解 ===\n")
// 演示基本的 select 语句
demonstrateBasicSelect()
// 演示 select 的随机选择
demonstrateRandomSelection()
// 演示 select 的非阻塞操作
demonstrateNonBlockingSelect()
// 演示 select 的超时控制
demonstrateSelectTimeout()
// 演示 select 的常见模式
demonstrateSelectPatterns()
// 演示 select 的高级用法
demonstrateAdvancedSelect()
// 演示 select 的实际应用
demonstratePracticalSelect()
}
// demonstrateBasicSelect 演示基本的 select 语句
func demonstrateBasicSelect() {
fmt.Println("1. 基本的 Select 语句:")
// select 的基本概念
fmt.Printf(" Select 的基本概念:\n")
fmt.Printf(" - 类似于 switch但用于 channel 操作\n")
fmt.Printf(" - 可以同时等待多个 channel 操作\n")
fmt.Printf(" - 随机选择一个可执行的 case\n")
fmt.Printf(" - 如果没有 case 可执行,会阻塞等待\n")
fmt.Printf(" - default 分支提供非阻塞行为\n")
// 基本 select 示例
fmt.Printf(" 基本 select 示例:\n")
ch1 := make(chan string)
ch2 := make(chan string)
// 启动两个 goroutine 发送数据
go func() {
time.Sleep(100 * time.Millisecond)
ch1 <- "来自 channel 1"
}()
go func() {
time.Sleep(200 * time.Millisecond)
ch2 <- "来自 channel 2"
}()
// 使用 select 等待任一 channel
select {
case msg1 := <-ch1:
fmt.Printf(" 接收到: %s\n", msg1)
case msg2 := <-ch2:
fmt.Printf(" 接收到: %s\n", msg2)
}
// 接收剩余的消息
select {
case msg1 := <-ch1:
fmt.Printf(" 接收到: %s\n", msg1)
case msg2 := <-ch2:
fmt.Printf(" 接收到: %s\n", msg2)
}
// 多个 channel 的 select
fmt.Printf(" 多个 channel 的 select:\n")
numbers := make(chan int)
strings := make(chan string)
booleans := make(chan bool)
// 启动发送者
go func() {
numbers <- 42
strings <- "Hello"
booleans <- true
}()
// 接收所有消息
for i := 0; i < 3; i++ {
select {
case num := <-numbers:
fmt.Printf(" 接收到数字: %d\n", num)
case str := <-strings:
fmt.Printf(" 接收到字符串: %s\n", str)
case b := <-booleans:
fmt.Printf(" 接收到布尔值: %t\n", b)
}
}
fmt.Println()
}
// demonstrateRandomSelection 演示 select 的随机选择
func demonstrateRandomSelection() {
fmt.Println("2. Select 的随机选择:")
// 多个 case 同时就绪时的随机选择
fmt.Printf(" 多个 case 同时就绪时的随机选择:\n")
ch1 := make(chan string, 1)
ch2 := make(chan string, 1)
ch3 := make(chan string, 1)
// 预先填充所有 channel
ch1 <- "Channel 1"
ch2 <- "Channel 2"
ch3 <- "Channel 3"
// 多次执行 select观察随机选择
fmt.Printf(" 执行 10 次 select观察随机选择:\n")
for i := 0; i < 10; i++ {
// 重新填充 channel
select {
case <-ch1:
case <-ch2:
case <-ch3:
}
ch1 <- "Channel 1"
ch2 <- "Channel 2"
ch3 <- "Channel 3"
select {
case msg := <-ch1:
fmt.Printf(" 第 %d 次: %s\n", i+1, msg)
case msg := <-ch2:
fmt.Printf(" 第 %d 次: %s\n", i+1, msg)
case msg := <-ch3:
fmt.Printf(" 第 %d 次: %s\n", i+1, msg)
}
}
// 公平调度示例
fmt.Printf(" 公平调度示例:\n")
worker1 := make(chan string, 5)
worker2 := make(chan string, 5)
// 填充任务
for i := 1; i <= 5; i++ {
worker1 <- fmt.Sprintf("Worker1-Task%d", i)
worker2 <- fmt.Sprintf("Worker2-Task%d", i)
}
// 公平地处理两个 worker 的任务
for i := 0; i < 10; i++ {
select {
case task := <-worker1:
fmt.Printf(" 处理: %s\n", task)
case task := <-worker2:
fmt.Printf(" 处理: %s\n", task)
}
}
fmt.Println()
}
// demonstrateNonBlockingSelect 演示 select 的非阻塞操作
func demonstrateNonBlockingSelect() {
fmt.Println("3. Select 的非阻塞操作:")
// 使用 default 分支实现非阻塞
fmt.Printf(" 使用 default 分支实现非阻塞:\n")
ch := make(chan string)
// 非阻塞接收
select {
case msg := <-ch:
fmt.Printf(" 接收到消息: %s\n", msg)
default:
fmt.Printf(" 没有消息可接收\n")
}
// 非阻塞发送
select {
case ch <- "Hello":
fmt.Printf(" 消息发送成功\n")
default:
fmt.Printf(" 消息发送失败channel 未准备好\n")
}
// 非阻塞发送到缓冲 channel
fmt.Printf(" 非阻塞发送到缓冲 channel:\n")
buffered := make(chan int, 2)
// 发送到未满的缓冲 channel
for i := 1; i <= 3; i++ {
select {
case buffered <- i:
fmt.Printf(" 成功发送: %d\n", i)
default:
fmt.Printf(" 发送失败: %d (channel 已满)\n", i)
}
}
// 非阻塞接收
for i := 0; i < 3; i++ {
select {
case value := <-buffered:
fmt.Printf(" 接收到: %d\n", value)
default:
fmt.Printf(" 没有数据可接收\n")
}
}
// 轮询模式
fmt.Printf(" 轮询模式:\n")
status := make(chan string, 1)
// 启动状态更新器
go func() {
statuses := []string{"初始化", "运行中", "暂停", "完成"}
for _, s := range statuses {
time.Sleep(150 * time.Millisecond)
select {
case status <- s:
default:
// 如果 channel 满了,跳过这次更新
}
}
}()
// 轮询状态
for i := 0; i < 10; i++ {
select {
case s := <-status:
fmt.Printf(" 状态更新: %s\n", s)
default:
fmt.Printf(" 状态检查: 无更新\n")
}
time.Sleep(100 * time.Millisecond)
}
fmt.Println()
}
// demonstrateSelectTimeout 演示 select 的超时控制
func demonstrateSelectTimeout() {
fmt.Println("4. Select 的超时控制:")
// 基本超时控制
fmt.Printf(" 基本超时控制:\n")
slowCh := make(chan string)
// 启动一个慢速响应的 goroutine
go func() {
time.Sleep(300 * time.Millisecond)
slowCh <- "慢速响应"
}()
select {
case result := <-slowCh:
fmt.Printf(" 接收到结果: %s\n", result)
case <-time.After(200 * time.Millisecond):
fmt.Printf(" 操作超时\n")
}
// 多级超时控制
fmt.Printf(" 多级超时控制:\n")
fastCh := make(chan string)
mediumCh := make(chan string)
slowCh2 := make(chan string)
// 启动不同速度的响应
go func() {
time.Sleep(50 * time.Millisecond)
fastCh <- "快速响应"
}()
go func() {
time.Sleep(150 * time.Millisecond)
mediumCh <- "中速响应"
}()
go func() {
time.Sleep(350 * time.Millisecond)
slowCh2 <- "慢速响应"
}()
timeout1 := time.After(100 * time.Millisecond)
timeout2 := time.After(200 * time.Millisecond)
timeout3 := time.After(300 * time.Millisecond)
for i := 0; i < 3; i++ {
select {
case result := <-fastCh:
fmt.Printf(" 快速: %s\n", result)
case result := <-mediumCh:
fmt.Printf(" 中速: %s\n", result)
case result := <-slowCh2:
fmt.Printf(" 慢速: %s\n", result)
case <-timeout1:
fmt.Printf(" 第一级超时 (100ms)\n")
timeout1 = nil // 防止重复触发
case <-timeout2:
fmt.Printf(" 第二级超时 (200ms)\n")
timeout2 = nil
case <-timeout3:
fmt.Printf(" 第三级超时 (300ms)\n")
timeout3 = nil
}
}
// 心跳超时检测
fmt.Printf(" 心跳超时检测:\n")
heartbeat := make(chan bool)
// 启动心跳发送器
go func() {
ticker := time.NewTicker(80 * time.Millisecond)
defer ticker.Stop()
for i := 0; i < 5; i++ {
select {
case <-ticker.C:
heartbeat <- true
}
}
}()
// 监控心跳
for i := 0; i < 8; i++ {
select {
case <-heartbeat:
fmt.Printf(" 心跳正常 %d\n", i+1)
case <-time.After(100 * time.Millisecond):
fmt.Printf(" 心跳超时 %d\n", i+1)
}
}
fmt.Println()
}
// demonstrateSelectPatterns 演示 select 的常见模式
func demonstrateSelectPatterns() {
fmt.Println("5. Select 的常见模式:")
// 扇入模式
fmt.Printf(" 扇入模式:\n")
input1 := make(chan int)
input2 := make(chan int)
input3 := make(chan int)
// 启动数据源
go func() {
for i := 1; i <= 3; i++ {
input1 <- i
time.Sleep(100 * time.Millisecond)
}
close(input1)
}()
go func() {
for i := 10; i <= 12; i++ {
input2 <- i
time.Sleep(150 * time.Millisecond)
}
close(input2)
}()
go func() {
for i := 100; i <= 102; i++ {
input3 <- i
time.Sleep(200 * time.Millisecond)
}
close(input3)
}()
// 扇入合并
for {
select {
case val, ok := <-input1:
if ok {
fmt.Printf(" 来源1: %d\n", val)
} else {
input1 = nil
}
case val, ok := <-input2:
if ok {
fmt.Printf(" 来源2: %d\n", val)
} else {
input2 = nil
}
case val, ok := <-input3:
if ok {
fmt.Printf(" 来源3: %d\n", val)
} else {
input3 = nil
}
}
// 所有 channel 都关闭时退出
if input1 == nil && input2 == nil && input3 == nil {
break
}
}
// 优雅关闭模式
fmt.Printf(" 优雅关闭模式:\n")
data := make(chan int)
done := make(chan bool)
// 启动工作 goroutine
go func() {
defer close(data)
for i := 1; i <= 10; i++ {
select {
case data <- i:
fmt.Printf(" 发送数据: %d\n", i)
time.Sleep(50 * time.Millisecond)
case <-done:
fmt.Printf(" 接收到关闭信号,停止发送\n")
return
}
}
}()
// 接收数据,然后发送关闭信号
count := 0
for value := range data {
fmt.Printf(" 接收数据: %d\n", value)
count++
if count >= 5 {
fmt.Printf(" 发送关闭信号\n")
close(done)
}
}
// 多路复用模式
fmt.Printf(" 多路复用模式:\n")
requests := make(chan Request, 5)
responses := make(chan Response, 5)
errors := make(chan error, 5)
// 启动请求处理器
go requestProcessor(requests, responses, errors)
// 发送请求
for i := 1; i <= 5; i++ {
requests <- Request{ID: i, Data: fmt.Sprintf("请求-%d", i)}
}
close(requests)
// 处理响应和错误
for i := 0; i < 5; i++ {
select {
case resp := <-responses:
fmt.Printf(" 响应: ID=%d, Result=%s\n", resp.ID, resp.Result)
case err := <-errors:
fmt.Printf(" 错误: %v\n", err)
}
}
fmt.Println()
}
// demonstrateAdvancedSelect 演示 select 的高级用法
func demonstrateAdvancedSelect() {
fmt.Println("6. Select 的高级用法:")
// 动态 case 选择
fmt.Printf(" 动态 case 选择:\n")
channels := []chan int{
make(chan int, 1),
make(chan int, 1),
make(chan int, 1),
}
// 填充数据
for i, ch := range channels {
ch <- (i + 1) * 10
}
// 动态选择 channel
for i := 0; i < 3; i++ {
selectFromChannels(channels)
}
// 优先级选择
fmt.Printf(" 优先级选择:\n")
highPriority := make(chan string, 2)
lowPriority := make(chan string, 2)
// 填充不同优先级的任务
highPriority <- "高优先级任务1"
lowPriority <- "低优先级任务1"
highPriority <- "高优先级任务2"
lowPriority <- "低优先级任务2"
// 优先处理高优先级任务
for i := 0; i < 4; i++ {
select {
case task := <-highPriority:
fmt.Printf(" 处理: %s\n", task)
default:
select {
case task := <-lowPriority:
fmt.Printf(" 处理: %s\n", task)
default:
fmt.Printf(" 没有任务\n")
}
}
}
// 速率限制
fmt.Printf(" 速率限制:\n")
rateLimiter := time.NewTicker(100 * time.Millisecond)
defer rateLimiter.Stop()
tasks := make(chan string, 5)
for i := 1; i <= 5; i++ {
tasks <- fmt.Sprintf("任务%d", i)
}
close(tasks)
for task := range tasks {
select {
case <-rateLimiter.C:
fmt.Printf(" 执行: %s\n", task)
}
}
// 断路器模式
fmt.Printf(" 断路器模式:\n")
service := make(chan string)
failures := 0
maxFailures := 3
// 模拟服务调用
go func() {
defer close(service)
for i := 1; i <= 6; i++ {
time.Sleep(50 * time.Millisecond)
if rand.Float32() < 0.5 { // 50% 失败率
service <- fmt.Sprintf("成功响应%d", i)
} else {
service <- "ERROR"
}
}
}()
for response := range service {
if response == "ERROR" {
failures++
fmt.Printf(" 服务失败 (%d/%d)\n", failures, maxFailures)
if failures >= maxFailures {
fmt.Printf(" 断路器打开,停止调用服务\n")
break
}
} else {
failures = 0 // 重置失败计数
fmt.Printf(" %s\n", response)
}
}
fmt.Println()
}
// demonstratePracticalSelect 演示 select 的实际应用
func demonstratePracticalSelect() {
fmt.Println("7. Select 的实际应用:")
// Web 服务器超时处理
fmt.Printf(" Web 服务器超时处理:\n")
requests := make(chan WebRequest, 3)
// 模拟 Web 请求
go func() {
requests <- WebRequest{ID: 1, URL: "/api/fast", ProcessTime: 50 * time.Millisecond}
requests <- WebRequest{ID: 2, URL: "/api/slow", ProcessTime: 300 * time.Millisecond}
requests <- WebRequest{ID: 3, URL: "/api/medium", ProcessTime: 150 * time.Millisecond}
close(requests)
}()
for req := range requests {
handleWebRequest(req)
}
// 数据库连接池
fmt.Printf(" 数据库连接池:\n")
connectionPool := make(chan *DBConnection, 2)
// 初始化连接池
for i := 1; i <= 2; i++ {
connectionPool <- &DBConnection{ID: i}
}
// 模拟数据库操作
for i := 1; i <= 5; i++ {
go performDBOperation(i, connectionPool)
}
time.Sleep(500 * time.Millisecond)
// 消息队列处理
fmt.Printf(" 消息队列处理:\n")
messageQueue := make(chan Message, 10)
deadLetterQueue := make(chan Message, 10)
// 启动消息处理器
go messageProcessor(messageQueue, deadLetterQueue)
// 发送消息
messages := []Message{
{ID: 1, Content: "正常消息1", RetryCount: 0},
{ID: 2, Content: "错误消息", RetryCount: 0},
{ID: 3, Content: "正常消息2", RetryCount: 0},
}
for _, msg := range messages {
messageQueue <- msg
}
close(messageQueue)
// 处理死信队列
time.Sleep(200 * time.Millisecond)
close(deadLetterQueue)
fmt.Printf(" 死信队列中的消息:\n")
for deadMsg := range deadLetterQueue {
fmt.Printf(" 死信: ID=%d, Content=%s, Retries=%d\n",
deadMsg.ID, deadMsg.Content, deadMsg.RetryCount)
}
fmt.Println()
}
// ========== 辅助函数和类型定义 ==========
// Request 请求结构
type Request struct {
ID int
Data string
}
// Response 响应结构
type Response struct {
ID int
Result string
}
// requestProcessor 请求处理器
func requestProcessor(requests <-chan Request, responses chan<- Response, errors chan<- error) {
for req := range requests {
// 模拟处理时间
time.Sleep(50 * time.Millisecond)
// 模拟随机错误
if rand.Float32() < 0.2 { // 20% 错误率
errors <- fmt.Errorf("处理请求 %d 失败", req.ID)
} else {
responses <- Response{
ID: req.ID,
Result: fmt.Sprintf("处理完成: %s", req.Data),
}
}
}
close(responses)
close(errors)
}
// selectFromChannels 动态选择 channel
func selectFromChannels(channels []chan int) {
// 构建 select cases
cases := make([]reflect.SelectCase, len(channels))
for i, ch := range channels {
cases[i] = reflect.SelectCase{
Dir: reflect.SelectRecv,
Chan: reflect.ValueOf(ch),
}
}
// 执行 select
chosen, value, ok := reflect.Select(cases)
if ok {
fmt.Printf(" 从 channel %d 接收到: %d\n", chosen, value.Int())
} else {
fmt.Printf(" channel %d 已关闭\n", chosen)
}
}
// WebRequest Web 请求结构
type WebRequest struct {
ID int
URL string
ProcessTime time.Duration
}
// handleWebRequest 处理 Web 请求
func handleWebRequest(req WebRequest) {
fmt.Printf(" 处理请求 %d: %s\n", req.ID, req.URL)
result := make(chan string)
// 启动请求处理
go func() {
time.Sleep(req.ProcessTime)
result <- fmt.Sprintf("请求 %d 处理完成", req.ID)
}()
// 设置超时
select {
case response := <-result:
fmt.Printf(" %s\n", response)
case <-time.After(200 * time.Millisecond):
fmt.Printf(" 请求 %d 超时\n", req.ID)
}
}
// DBConnection 数据库连接
type DBConnection struct {
ID int
}
// performDBOperation 执行数据库操作
func performDBOperation(operationID int, pool chan *DBConnection) {
select {
case conn := <-pool:
fmt.Printf(" 操作 %d 获取连接 %d\n", operationID, conn.ID)
// 模拟数据库操作
time.Sleep(100 * time.Millisecond)
fmt.Printf(" 操作 %d 完成,释放连接 %d\n", operationID, conn.ID)
pool <- conn
case <-time.After(50 * time.Millisecond):
fmt.Printf(" 操作 %d 获取连接超时\n", operationID)
}
}
// Message 消息结构
type Message struct {
ID int
Content string
RetryCount int
}
// messageProcessor 消息处理器
func messageProcessor(messages <-chan Message, deadLetter chan<- Message) {
for msg := range messages {
fmt.Printf(" 处理消息 %d: %s\n", msg.ID, msg.Content)
// 模拟处理
success := !strings.Contains(msg.Content, "错误")
if success {
fmt.Printf(" 消息 %d 处理成功\n", msg.ID)
} else {
msg.RetryCount++
if msg.RetryCount < 3 {
fmt.Printf(" 消息 %d 处理失败,重试 %d\n", msg.ID, msg.RetryCount)
// 在实际应用中,这里会重新放入队列
} else {
fmt.Printf(" 消息 %d 重试次数超限,放入死信队列\n", msg.ID)
deadLetter <- msg
}
}
}
}
/*
运行这个程序:
go run 03-select.go
学习要点:
1. Select 语句用于处理多个 channel 操作
2. Select 会随机选择一个可执行的 case
3. Default 分支提供非阻塞行为
4. Select 常用于超时控制和多路复用
5. Select 是实现复杂并发模式的重要工具
Select 的特性:
1. 多路选择:可以同时等待多个 channel 操作
2. 随机选择:当多个 case 同时就绪时随机选择
3. 阻塞行为:没有 case 就绪时会阻塞等待
4. 非阻塞default 分支提供非阻塞行为
5. 公平调度:避免某个 channel 被饿死
Select 语法:
1. 基本语法select { case <-ch: ... }
2. 接收操作case value := <-ch:
3. 发送操作case ch <- value:
4. 默认分支default:
5. 超时控制case <-time.After(duration):
常见模式:
1. 超时控制:使用 time.After 设置超时
2. 非阻塞操作:使用 default 分支
3. 扇入合并:合并多个输入源
4. 优雅关闭:使用 done channel 通知退出
5. 多路复用:同时处理多种类型的消息
高级用法:
1. 动态 case使用 reflect.Select 动态构建 case
2. 优先级选择:嵌套 select 实现优先级
3. 速率限制:结合 time.Ticker 控制频率
4. 断路器:实现服务降级和熔断
5. 负载均衡:在多个服务间分发请求
实际应用:
1. Web 服务器:请求超时处理
2. 数据库:连接池管理
3. 消息队列:消息路由和处理
4. 微服务:服务间通信
5. 监控系统:多源数据收集
最佳实践:
1. 避免在 select 中执行耗时操作
2. 合理设置超时时间
3. 正确处理 channel 关闭
4. 使用 default 避免死锁
5. 注意 select 的随机性
性能考虑:
1. Select 的开销比单个 channel 操作大
2. Case 数量影响性能
3. 随机选择有一定开销
4. 合理使用缓冲 channel
5. 避免过度复杂的 select 结构
注意事项:
1. Select 中的 case 必须是 channel 操作
2. nil channel 的 case 永远不会被选中
3. 关闭的 channel 可以被选中
4. Default 分支不能与其他阻塞操作共存
5. Select 语句本身不是循环
*/