881 lines
20 KiB
Go
881 lines
20 KiB
Go
/*
|
||
03-select.go - Go 语言 Select 语句详解
|
||
|
||
学习目标:
|
||
1. 理解 select 语句的概念和作用
|
||
2. 掌握 select 的基本语法和用法
|
||
3. 学会使用 select 处理多个 channel
|
||
4. 了解 select 的非阻塞操作
|
||
5. 掌握 select 的常见应用模式
|
||
|
||
知识点:
|
||
- select 语句的基本语法
|
||
- select 的随机选择特性
|
||
- default 分支的使用
|
||
- select 与超时控制
|
||
- select 的常见模式
|
||
- select 的最佳实践
|
||
*/
|
||
|
||
package main
|
||
|
||
import (
|
||
"fmt"
|
||
"math/rand"
|
||
"reflect"
|
||
"strings"
|
||
"time"
|
||
)
|
||
|
||
func main() {
|
||
fmt.Println("=== Go 语言 Select 语句详解 ===\n")
|
||
|
||
// 演示基本的 select 语句
|
||
demonstrateBasicSelect()
|
||
|
||
// 演示 select 的随机选择
|
||
demonstrateRandomSelection()
|
||
|
||
// 演示 select 的非阻塞操作
|
||
demonstrateNonBlockingSelect()
|
||
|
||
// 演示 select 的超时控制
|
||
demonstrateSelectTimeout()
|
||
|
||
// 演示 select 的常见模式
|
||
demonstrateSelectPatterns()
|
||
|
||
// 演示 select 的高级用法
|
||
demonstrateAdvancedSelect()
|
||
|
||
// 演示 select 的实际应用
|
||
demonstratePracticalSelect()
|
||
}
|
||
|
||
// demonstrateBasicSelect 演示基本的 select 语句
|
||
func demonstrateBasicSelect() {
|
||
fmt.Println("1. 基本的 Select 语句:")
|
||
|
||
// select 的基本概念
|
||
fmt.Printf(" Select 的基本概念:\n")
|
||
fmt.Printf(" - 类似于 switch,但用于 channel 操作\n")
|
||
fmt.Printf(" - 可以同时等待多个 channel 操作\n")
|
||
fmt.Printf(" - 随机选择一个可执行的 case\n")
|
||
fmt.Printf(" - 如果没有 case 可执行,会阻塞等待\n")
|
||
fmt.Printf(" - default 分支提供非阻塞行为\n")
|
||
|
||
// 基本 select 示例
|
||
fmt.Printf(" 基本 select 示例:\n")
|
||
|
||
ch1 := make(chan string)
|
||
ch2 := make(chan string)
|
||
|
||
// 启动两个 goroutine 发送数据
|
||
go func() {
|
||
time.Sleep(100 * time.Millisecond)
|
||
ch1 <- "来自 channel 1"
|
||
}()
|
||
|
||
go func() {
|
||
time.Sleep(200 * time.Millisecond)
|
||
ch2 <- "来自 channel 2"
|
||
}()
|
||
|
||
// 使用 select 等待任一 channel
|
||
select {
|
||
case msg1 := <-ch1:
|
||
fmt.Printf(" 接收到: %s\n", msg1)
|
||
case msg2 := <-ch2:
|
||
fmt.Printf(" 接收到: %s\n", msg2)
|
||
}
|
||
|
||
// 接收剩余的消息
|
||
select {
|
||
case msg1 := <-ch1:
|
||
fmt.Printf(" 接收到: %s\n", msg1)
|
||
case msg2 := <-ch2:
|
||
fmt.Printf(" 接收到: %s\n", msg2)
|
||
}
|
||
|
||
// 多个 channel 的 select
|
||
fmt.Printf(" 多个 channel 的 select:\n")
|
||
|
||
numbers := make(chan int)
|
||
strings := make(chan string)
|
||
booleans := make(chan bool)
|
||
|
||
// 启动发送者
|
||
go func() {
|
||
numbers <- 42
|
||
strings <- "Hello"
|
||
booleans <- true
|
||
}()
|
||
|
||
// 接收所有消息
|
||
for i := 0; i < 3; i++ {
|
||
select {
|
||
case num := <-numbers:
|
||
fmt.Printf(" 接收到数字: %d\n", num)
|
||
case str := <-strings:
|
||
fmt.Printf(" 接收到字符串: %s\n", str)
|
||
case b := <-booleans:
|
||
fmt.Printf(" 接收到布尔值: %t\n", b)
|
||
}
|
||
}
|
||
|
||
fmt.Println()
|
||
}
|
||
|
||
// demonstrateRandomSelection 演示 select 的随机选择
|
||
func demonstrateRandomSelection() {
|
||
fmt.Println("2. Select 的随机选择:")
|
||
|
||
// 多个 case 同时就绪时的随机选择
|
||
fmt.Printf(" 多个 case 同时就绪时的随机选择:\n")
|
||
|
||
ch1 := make(chan string, 1)
|
||
ch2 := make(chan string, 1)
|
||
ch3 := make(chan string, 1)
|
||
|
||
// 预先填充所有 channel
|
||
ch1 <- "Channel 1"
|
||
ch2 <- "Channel 2"
|
||
ch3 <- "Channel 3"
|
||
|
||
// 多次执行 select,观察随机选择
|
||
fmt.Printf(" 执行 10 次 select,观察随机选择:\n")
|
||
for i := 0; i < 10; i++ {
|
||
// 重新填充 channel
|
||
select {
|
||
case <-ch1:
|
||
case <-ch2:
|
||
case <-ch3:
|
||
}
|
||
|
||
ch1 <- "Channel 1"
|
||
ch2 <- "Channel 2"
|
||
ch3 <- "Channel 3"
|
||
|
||
select {
|
||
case msg := <-ch1:
|
||
fmt.Printf(" 第 %d 次: %s\n", i+1, msg)
|
||
case msg := <-ch2:
|
||
fmt.Printf(" 第 %d 次: %s\n", i+1, msg)
|
||
case msg := <-ch3:
|
||
fmt.Printf(" 第 %d 次: %s\n", i+1, msg)
|
||
}
|
||
}
|
||
|
||
// 公平调度示例
|
||
fmt.Printf(" 公平调度示例:\n")
|
||
|
||
worker1 := make(chan string, 5)
|
||
worker2 := make(chan string, 5)
|
||
|
||
// 填充任务
|
||
for i := 1; i <= 5; i++ {
|
||
worker1 <- fmt.Sprintf("Worker1-Task%d", i)
|
||
worker2 <- fmt.Sprintf("Worker2-Task%d", i)
|
||
}
|
||
|
||
// 公平地处理两个 worker 的任务
|
||
for i := 0; i < 10; i++ {
|
||
select {
|
||
case task := <-worker1:
|
||
fmt.Printf(" 处理: %s\n", task)
|
||
case task := <-worker2:
|
||
fmt.Printf(" 处理: %s\n", task)
|
||
}
|
||
}
|
||
|
||
fmt.Println()
|
||
}
|
||
|
||
// demonstrateNonBlockingSelect 演示 select 的非阻塞操作
|
||
func demonstrateNonBlockingSelect() {
|
||
fmt.Println("3. Select 的非阻塞操作:")
|
||
|
||
// 使用 default 分支实现非阻塞
|
||
fmt.Printf(" 使用 default 分支实现非阻塞:\n")
|
||
|
||
ch := make(chan string)
|
||
|
||
// 非阻塞接收
|
||
select {
|
||
case msg := <-ch:
|
||
fmt.Printf(" 接收到消息: %s\n", msg)
|
||
default:
|
||
fmt.Printf(" 没有消息可接收\n")
|
||
}
|
||
|
||
// 非阻塞发送
|
||
select {
|
||
case ch <- "Hello":
|
||
fmt.Printf(" 消息发送成功\n")
|
||
default:
|
||
fmt.Printf(" 消息发送失败,channel 未准备好\n")
|
||
}
|
||
|
||
// 非阻塞发送到缓冲 channel
|
||
fmt.Printf(" 非阻塞发送到缓冲 channel:\n")
|
||
|
||
buffered := make(chan int, 2)
|
||
|
||
// 发送到未满的缓冲 channel
|
||
for i := 1; i <= 3; i++ {
|
||
select {
|
||
case buffered <- i:
|
||
fmt.Printf(" 成功发送: %d\n", i)
|
||
default:
|
||
fmt.Printf(" 发送失败: %d (channel 已满)\n", i)
|
||
}
|
||
}
|
||
|
||
// 非阻塞接收
|
||
for i := 0; i < 3; i++ {
|
||
select {
|
||
case value := <-buffered:
|
||
fmt.Printf(" 接收到: %d\n", value)
|
||
default:
|
||
fmt.Printf(" 没有数据可接收\n")
|
||
}
|
||
}
|
||
|
||
// 轮询模式
|
||
fmt.Printf(" 轮询模式:\n")
|
||
|
||
status := make(chan string, 1)
|
||
|
||
// 启动状态更新器
|
||
go func() {
|
||
statuses := []string{"初始化", "运行中", "暂停", "完成"}
|
||
for _, s := range statuses {
|
||
time.Sleep(150 * time.Millisecond)
|
||
select {
|
||
case status <- s:
|
||
default:
|
||
// 如果 channel 满了,跳过这次更新
|
||
}
|
||
}
|
||
}()
|
||
|
||
// 轮询状态
|
||
for i := 0; i < 10; i++ {
|
||
select {
|
||
case s := <-status:
|
||
fmt.Printf(" 状态更新: %s\n", s)
|
||
default:
|
||
fmt.Printf(" 状态检查: 无更新\n")
|
||
}
|
||
time.Sleep(100 * time.Millisecond)
|
||
}
|
||
|
||
fmt.Println()
|
||
}
|
||
|
||
// demonstrateSelectTimeout 演示 select 的超时控制
|
||
func demonstrateSelectTimeout() {
|
||
fmt.Println("4. Select 的超时控制:")
|
||
|
||
// 基本超时控制
|
||
fmt.Printf(" 基本超时控制:\n")
|
||
|
||
slowCh := make(chan string)
|
||
|
||
// 启动一个慢速响应的 goroutine
|
||
go func() {
|
||
time.Sleep(300 * time.Millisecond)
|
||
slowCh <- "慢速响应"
|
||
}()
|
||
|
||
select {
|
||
case result := <-slowCh:
|
||
fmt.Printf(" 接收到结果: %s\n", result)
|
||
case <-time.After(200 * time.Millisecond):
|
||
fmt.Printf(" 操作超时\n")
|
||
}
|
||
|
||
// 多级超时控制
|
||
fmt.Printf(" 多级超时控制:\n")
|
||
|
||
fastCh := make(chan string)
|
||
mediumCh := make(chan string)
|
||
slowCh2 := make(chan string)
|
||
|
||
// 启动不同速度的响应
|
||
go func() {
|
||
time.Sleep(50 * time.Millisecond)
|
||
fastCh <- "快速响应"
|
||
}()
|
||
|
||
go func() {
|
||
time.Sleep(150 * time.Millisecond)
|
||
mediumCh <- "中速响应"
|
||
}()
|
||
|
||
go func() {
|
||
time.Sleep(350 * time.Millisecond)
|
||
slowCh2 <- "慢速响应"
|
||
}()
|
||
|
||
timeout1 := time.After(100 * time.Millisecond)
|
||
timeout2 := time.After(200 * time.Millisecond)
|
||
timeout3 := time.After(300 * time.Millisecond)
|
||
|
||
for i := 0; i < 3; i++ {
|
||
select {
|
||
case result := <-fastCh:
|
||
fmt.Printf(" 快速: %s\n", result)
|
||
case result := <-mediumCh:
|
||
fmt.Printf(" 中速: %s\n", result)
|
||
case result := <-slowCh2:
|
||
fmt.Printf(" 慢速: %s\n", result)
|
||
case <-timeout1:
|
||
fmt.Printf(" 第一级超时 (100ms)\n")
|
||
timeout1 = nil // 防止重复触发
|
||
case <-timeout2:
|
||
fmt.Printf(" 第二级超时 (200ms)\n")
|
||
timeout2 = nil
|
||
case <-timeout3:
|
||
fmt.Printf(" 第三级超时 (300ms)\n")
|
||
timeout3 = nil
|
||
}
|
||
}
|
||
|
||
// 心跳超时检测
|
||
fmt.Printf(" 心跳超时检测:\n")
|
||
|
||
heartbeat := make(chan bool)
|
||
|
||
// 启动心跳发送器
|
||
go func() {
|
||
ticker := time.NewTicker(80 * time.Millisecond)
|
||
defer ticker.Stop()
|
||
|
||
for i := 0; i < 5; i++ {
|
||
select {
|
||
case <-ticker.C:
|
||
heartbeat <- true
|
||
}
|
||
}
|
||
}()
|
||
|
||
// 监控心跳
|
||
for i := 0; i < 8; i++ {
|
||
select {
|
||
case <-heartbeat:
|
||
fmt.Printf(" 心跳正常 %d\n", i+1)
|
||
case <-time.After(100 * time.Millisecond):
|
||
fmt.Printf(" 心跳超时 %d\n", i+1)
|
||
}
|
||
}
|
||
|
||
fmt.Println()
|
||
}
|
||
|
||
// demonstrateSelectPatterns 演示 select 的常见模式
|
||
func demonstrateSelectPatterns() {
|
||
fmt.Println("5. Select 的常见模式:")
|
||
|
||
// 扇入模式
|
||
fmt.Printf(" 扇入模式:\n")
|
||
|
||
input1 := make(chan int)
|
||
input2 := make(chan int)
|
||
input3 := make(chan int)
|
||
|
||
// 启动数据源
|
||
go func() {
|
||
for i := 1; i <= 3; i++ {
|
||
input1 <- i
|
||
time.Sleep(100 * time.Millisecond)
|
||
}
|
||
close(input1)
|
||
}()
|
||
|
||
go func() {
|
||
for i := 10; i <= 12; i++ {
|
||
input2 <- i
|
||
time.Sleep(150 * time.Millisecond)
|
||
}
|
||
close(input2)
|
||
}()
|
||
|
||
go func() {
|
||
for i := 100; i <= 102; i++ {
|
||
input3 <- i
|
||
time.Sleep(200 * time.Millisecond)
|
||
}
|
||
close(input3)
|
||
}()
|
||
|
||
// 扇入合并
|
||
for {
|
||
select {
|
||
case val, ok := <-input1:
|
||
if ok {
|
||
fmt.Printf(" 来源1: %d\n", val)
|
||
} else {
|
||
input1 = nil
|
||
}
|
||
case val, ok := <-input2:
|
||
if ok {
|
||
fmt.Printf(" 来源2: %d\n", val)
|
||
} else {
|
||
input2 = nil
|
||
}
|
||
case val, ok := <-input3:
|
||
if ok {
|
||
fmt.Printf(" 来源3: %d\n", val)
|
||
} else {
|
||
input3 = nil
|
||
}
|
||
}
|
||
|
||
// 所有 channel 都关闭时退出
|
||
if input1 == nil && input2 == nil && input3 == nil {
|
||
break
|
||
}
|
||
}
|
||
|
||
// 优雅关闭模式
|
||
fmt.Printf(" 优雅关闭模式:\n")
|
||
|
||
data := make(chan int)
|
||
done := make(chan bool)
|
||
|
||
// 启动工作 goroutine
|
||
go func() {
|
||
defer close(data)
|
||
for i := 1; i <= 10; i++ {
|
||
select {
|
||
case data <- i:
|
||
fmt.Printf(" 发送数据: %d\n", i)
|
||
time.Sleep(50 * time.Millisecond)
|
||
case <-done:
|
||
fmt.Printf(" 接收到关闭信号,停止发送\n")
|
||
return
|
||
}
|
||
}
|
||
}()
|
||
|
||
// 接收数据,然后发送关闭信号
|
||
count := 0
|
||
for value := range data {
|
||
fmt.Printf(" 接收数据: %d\n", value)
|
||
count++
|
||
if count >= 5 {
|
||
fmt.Printf(" 发送关闭信号\n")
|
||
close(done)
|
||
}
|
||
}
|
||
|
||
// 多路复用模式
|
||
fmt.Printf(" 多路复用模式:\n")
|
||
|
||
requests := make(chan Request, 5)
|
||
responses := make(chan Response, 5)
|
||
errors := make(chan error, 5)
|
||
|
||
// 启动请求处理器
|
||
go requestProcessor(requests, responses, errors)
|
||
|
||
// 发送请求
|
||
for i := 1; i <= 5; i++ {
|
||
requests <- Request{ID: i, Data: fmt.Sprintf("请求-%d", i)}
|
||
}
|
||
close(requests)
|
||
|
||
// 处理响应和错误
|
||
for i := 0; i < 5; i++ {
|
||
select {
|
||
case resp := <-responses:
|
||
fmt.Printf(" 响应: ID=%d, Result=%s\n", resp.ID, resp.Result)
|
||
case err := <-errors:
|
||
fmt.Printf(" 错误: %v\n", err)
|
||
}
|
||
}
|
||
|
||
fmt.Println()
|
||
}
|
||
|
||
// demonstrateAdvancedSelect 演示 select 的高级用法
|
||
func demonstrateAdvancedSelect() {
|
||
fmt.Println("6. Select 的高级用法:")
|
||
|
||
// 动态 case 选择
|
||
fmt.Printf(" 动态 case 选择:\n")
|
||
|
||
channels := []chan int{
|
||
make(chan int, 1),
|
||
make(chan int, 1),
|
||
make(chan int, 1),
|
||
}
|
||
|
||
// 填充数据
|
||
for i, ch := range channels {
|
||
ch <- (i + 1) * 10
|
||
}
|
||
|
||
// 动态选择 channel
|
||
for i := 0; i < 3; i++ {
|
||
selectFromChannels(channels)
|
||
}
|
||
|
||
// 优先级选择
|
||
fmt.Printf(" 优先级选择:\n")
|
||
|
||
highPriority := make(chan string, 2)
|
||
lowPriority := make(chan string, 2)
|
||
|
||
// 填充不同优先级的任务
|
||
highPriority <- "高优先级任务1"
|
||
lowPriority <- "低优先级任务1"
|
||
highPriority <- "高优先级任务2"
|
||
lowPriority <- "低优先级任务2"
|
||
|
||
// 优先处理高优先级任务
|
||
for i := 0; i < 4; i++ {
|
||
select {
|
||
case task := <-highPriority:
|
||
fmt.Printf(" 处理: %s\n", task)
|
||
default:
|
||
select {
|
||
case task := <-lowPriority:
|
||
fmt.Printf(" 处理: %s\n", task)
|
||
default:
|
||
fmt.Printf(" 没有任务\n")
|
||
}
|
||
}
|
||
}
|
||
|
||
// 速率限制
|
||
fmt.Printf(" 速率限制:\n")
|
||
|
||
rateLimiter := time.NewTicker(100 * time.Millisecond)
|
||
defer rateLimiter.Stop()
|
||
|
||
tasks := make(chan string, 5)
|
||
for i := 1; i <= 5; i++ {
|
||
tasks <- fmt.Sprintf("任务%d", i)
|
||
}
|
||
close(tasks)
|
||
|
||
for task := range tasks {
|
||
select {
|
||
case <-rateLimiter.C:
|
||
fmt.Printf(" 执行: %s\n", task)
|
||
}
|
||
}
|
||
|
||
// 断路器模式
|
||
fmt.Printf(" 断路器模式:\n")
|
||
|
||
service := make(chan string)
|
||
failures := 0
|
||
maxFailures := 3
|
||
|
||
// 模拟服务调用
|
||
go func() {
|
||
defer close(service)
|
||
for i := 1; i <= 6; i++ {
|
||
time.Sleep(50 * time.Millisecond)
|
||
if rand.Float32() < 0.5 { // 50% 失败率
|
||
service <- fmt.Sprintf("成功响应%d", i)
|
||
} else {
|
||
service <- "ERROR"
|
||
}
|
||
}
|
||
}()
|
||
|
||
for response := range service {
|
||
if response == "ERROR" {
|
||
failures++
|
||
fmt.Printf(" 服务失败 (%d/%d)\n", failures, maxFailures)
|
||
if failures >= maxFailures {
|
||
fmt.Printf(" 断路器打开,停止调用服务\n")
|
||
break
|
||
}
|
||
} else {
|
||
failures = 0 // 重置失败计数
|
||
fmt.Printf(" %s\n", response)
|
||
}
|
||
}
|
||
|
||
fmt.Println()
|
||
}
|
||
|
||
// demonstratePracticalSelect 演示 select 的实际应用
|
||
func demonstratePracticalSelect() {
|
||
fmt.Println("7. Select 的实际应用:")
|
||
|
||
// Web 服务器超时处理
|
||
fmt.Printf(" Web 服务器超时处理:\n")
|
||
|
||
requests := make(chan WebRequest, 3)
|
||
|
||
// 模拟 Web 请求
|
||
go func() {
|
||
requests <- WebRequest{ID: 1, URL: "/api/fast", ProcessTime: 50 * time.Millisecond}
|
||
requests <- WebRequest{ID: 2, URL: "/api/slow", ProcessTime: 300 * time.Millisecond}
|
||
requests <- WebRequest{ID: 3, URL: "/api/medium", ProcessTime: 150 * time.Millisecond}
|
||
close(requests)
|
||
}()
|
||
|
||
for req := range requests {
|
||
handleWebRequest(req)
|
||
}
|
||
|
||
// 数据库连接池
|
||
fmt.Printf(" 数据库连接池:\n")
|
||
|
||
connectionPool := make(chan *DBConnection, 2)
|
||
|
||
// 初始化连接池
|
||
for i := 1; i <= 2; i++ {
|
||
connectionPool <- &DBConnection{ID: i}
|
||
}
|
||
|
||
// 模拟数据库操作
|
||
for i := 1; i <= 5; i++ {
|
||
go performDBOperation(i, connectionPool)
|
||
}
|
||
|
||
time.Sleep(500 * time.Millisecond)
|
||
|
||
// 消息队列处理
|
||
fmt.Printf(" 消息队列处理:\n")
|
||
|
||
messageQueue := make(chan Message, 10)
|
||
deadLetterQueue := make(chan Message, 10)
|
||
|
||
// 启动消息处理器
|
||
go messageProcessor(messageQueue, deadLetterQueue)
|
||
|
||
// 发送消息
|
||
messages := []Message{
|
||
{ID: 1, Content: "正常消息1", RetryCount: 0},
|
||
{ID: 2, Content: "错误消息", RetryCount: 0},
|
||
{ID: 3, Content: "正常消息2", RetryCount: 0},
|
||
}
|
||
|
||
for _, msg := range messages {
|
||
messageQueue <- msg
|
||
}
|
||
close(messageQueue)
|
||
|
||
// 处理死信队列
|
||
time.Sleep(200 * time.Millisecond)
|
||
close(deadLetterQueue)
|
||
|
||
fmt.Printf(" 死信队列中的消息:\n")
|
||
for deadMsg := range deadLetterQueue {
|
||
fmt.Printf(" 死信: ID=%d, Content=%s, Retries=%d\n",
|
||
deadMsg.ID, deadMsg.Content, deadMsg.RetryCount)
|
||
}
|
||
|
||
fmt.Println()
|
||
}
|
||
|
||
// ========== 辅助函数和类型定义 ==========
|
||
|
||
// Request 请求结构
|
||
type Request struct {
|
||
ID int
|
||
Data string
|
||
}
|
||
|
||
// Response 响应结构
|
||
type Response struct {
|
||
ID int
|
||
Result string
|
||
}
|
||
|
||
// requestProcessor 请求处理器
|
||
func requestProcessor(requests <-chan Request, responses chan<- Response, errors chan<- error) {
|
||
for req := range requests {
|
||
// 模拟处理时间
|
||
time.Sleep(50 * time.Millisecond)
|
||
|
||
// 模拟随机错误
|
||
if rand.Float32() < 0.2 { // 20% 错误率
|
||
errors <- fmt.Errorf("处理请求 %d 失败", req.ID)
|
||
} else {
|
||
responses <- Response{
|
||
ID: req.ID,
|
||
Result: fmt.Sprintf("处理完成: %s", req.Data),
|
||
}
|
||
}
|
||
}
|
||
close(responses)
|
||
close(errors)
|
||
}
|
||
|
||
// selectFromChannels 动态选择 channel
|
||
func selectFromChannels(channels []chan int) {
|
||
// 构建 select cases
|
||
cases := make([]reflect.SelectCase, len(channels))
|
||
for i, ch := range channels {
|
||
cases[i] = reflect.SelectCase{
|
||
Dir: reflect.SelectRecv,
|
||
Chan: reflect.ValueOf(ch),
|
||
}
|
||
}
|
||
|
||
// 执行 select
|
||
chosen, value, ok := reflect.Select(cases)
|
||
if ok {
|
||
fmt.Printf(" 从 channel %d 接收到: %d\n", chosen, value.Int())
|
||
} else {
|
||
fmt.Printf(" channel %d 已关闭\n", chosen)
|
||
}
|
||
}
|
||
|
||
// WebRequest Web 请求结构
|
||
type WebRequest struct {
|
||
ID int
|
||
URL string
|
||
ProcessTime time.Duration
|
||
}
|
||
|
||
// handleWebRequest 处理 Web 请求
|
||
func handleWebRequest(req WebRequest) {
|
||
fmt.Printf(" 处理请求 %d: %s\n", req.ID, req.URL)
|
||
|
||
result := make(chan string)
|
||
|
||
// 启动请求处理
|
||
go func() {
|
||
time.Sleep(req.ProcessTime)
|
||
result <- fmt.Sprintf("请求 %d 处理完成", req.ID)
|
||
}()
|
||
|
||
// 设置超时
|
||
select {
|
||
case response := <-result:
|
||
fmt.Printf(" %s\n", response)
|
||
case <-time.After(200 * time.Millisecond):
|
||
fmt.Printf(" 请求 %d 超时\n", req.ID)
|
||
}
|
||
}
|
||
|
||
// DBConnection 数据库连接
|
||
type DBConnection struct {
|
||
ID int
|
||
}
|
||
|
||
// performDBOperation 执行数据库操作
|
||
func performDBOperation(operationID int, pool chan *DBConnection) {
|
||
select {
|
||
case conn := <-pool:
|
||
fmt.Printf(" 操作 %d 获取连接 %d\n", operationID, conn.ID)
|
||
|
||
// 模拟数据库操作
|
||
time.Sleep(100 * time.Millisecond)
|
||
|
||
fmt.Printf(" 操作 %d 完成,释放连接 %d\n", operationID, conn.ID)
|
||
pool <- conn
|
||
|
||
case <-time.After(50 * time.Millisecond):
|
||
fmt.Printf(" 操作 %d 获取连接超时\n", operationID)
|
||
}
|
||
}
|
||
|
||
// Message 消息结构
|
||
type Message struct {
|
||
ID int
|
||
Content string
|
||
RetryCount int
|
||
}
|
||
|
||
// messageProcessor 消息处理器
|
||
func messageProcessor(messages <-chan Message, deadLetter chan<- Message) {
|
||
for msg := range messages {
|
||
fmt.Printf(" 处理消息 %d: %s\n", msg.ID, msg.Content)
|
||
|
||
// 模拟处理
|
||
success := !strings.Contains(msg.Content, "错误")
|
||
|
||
if success {
|
||
fmt.Printf(" 消息 %d 处理成功\n", msg.ID)
|
||
} else {
|
||
msg.RetryCount++
|
||
if msg.RetryCount < 3 {
|
||
fmt.Printf(" 消息 %d 处理失败,重试 %d\n", msg.ID, msg.RetryCount)
|
||
// 在实际应用中,这里会重新放入队列
|
||
} else {
|
||
fmt.Printf(" 消息 %d 重试次数超限,放入死信队列\n", msg.ID)
|
||
deadLetter <- msg
|
||
}
|
||
}
|
||
}
|
||
}
|
||
|
||
/*
|
||
运行这个程序:
|
||
go run 03-select.go
|
||
|
||
学习要点:
|
||
1. Select 语句用于处理多个 channel 操作
|
||
2. Select 会随机选择一个可执行的 case
|
||
3. Default 分支提供非阻塞行为
|
||
4. Select 常用于超时控制和多路复用
|
||
5. Select 是实现复杂并发模式的重要工具
|
||
|
||
Select 的特性:
|
||
1. 多路选择:可以同时等待多个 channel 操作
|
||
2. 随机选择:当多个 case 同时就绪时随机选择
|
||
3. 阻塞行为:没有 case 就绪时会阻塞等待
|
||
4. 非阻塞:default 分支提供非阻塞行为
|
||
5. 公平调度:避免某个 channel 被饿死
|
||
|
||
Select 语法:
|
||
1. 基本语法:select { case <-ch: ... }
|
||
2. 接收操作:case value := <-ch:
|
||
3. 发送操作:case ch <- value:
|
||
4. 默认分支:default:
|
||
5. 超时控制:case <-time.After(duration):
|
||
|
||
常见模式:
|
||
1. 超时控制:使用 time.After 设置超时
|
||
2. 非阻塞操作:使用 default 分支
|
||
3. 扇入合并:合并多个输入源
|
||
4. 优雅关闭:使用 done channel 通知退出
|
||
5. 多路复用:同时处理多种类型的消息
|
||
|
||
高级用法:
|
||
1. 动态 case:使用 reflect.Select 动态构建 case
|
||
2. 优先级选择:嵌套 select 实现优先级
|
||
3. 速率限制:结合 time.Ticker 控制频率
|
||
4. 断路器:实现服务降级和熔断
|
||
5. 负载均衡:在多个服务间分发请求
|
||
|
||
实际应用:
|
||
1. Web 服务器:请求超时处理
|
||
2. 数据库:连接池管理
|
||
3. 消息队列:消息路由和处理
|
||
4. 微服务:服务间通信
|
||
5. 监控系统:多源数据收集
|
||
|
||
最佳实践:
|
||
1. 避免在 select 中执行耗时操作
|
||
2. 合理设置超时时间
|
||
3. 正确处理 channel 关闭
|
||
4. 使用 default 避免死锁
|
||
5. 注意 select 的随机性
|
||
|
||
性能考虑:
|
||
1. Select 的开销比单个 channel 操作大
|
||
2. Case 数量影响性能
|
||
3. 随机选择有一定开销
|
||
4. 合理使用缓冲 channel
|
||
5. 避免过度复杂的 select 结构
|
||
|
||
注意事项:
|
||
1. Select 中的 case 必须是 channel 操作
|
||
2. nil channel 的 case 永远不会被选中
|
||
3. 关闭的 channel 可以被选中
|
||
4. Default 分支不能与其他阻塞操作共存
|
||
5. Select 语句本身不是循环
|
||
*/
|