/* 05-worker-pools.go - Go 语言工作池模式详解 学习目标: 1. 理解工作池模式的概念和优势 2. 掌握固定大小工作池的实现 3. 学会动态工作池的设计 4. 了解工作池的优雅关闭 5. 掌握工作池的实际应用场景 知识点: - 工作池模式的基本概念 - 固定大小工作池 - 动态工作池 - 工作池的生命周期管理 - 错误处理和监控 - 性能优化技巧 */ package main import ( "context" "fmt" "math/rand" "runtime" "sync" "time" ) func main() { fmt.Println("=== Go 语言工作池模式详解 ===\\n") // 演示基本工作池 demonstrateBasicWorkerPool() // 演示缓冲工作池 demonstrateBufferedWorkerPool() // 演示动态工作池 demonstrateDynamicWorkerPool() // 演示带上下文的工作池 demonstrateContextWorkerPool() // 演示工作池的错误处理 demonstrateErrorHandling() // 演示工作池的监控 demonstrateWorkerPoolMonitoring() // 演示实际应用场景 demonstratePracticalApplications() } // demonstrateBasicWorkerPool 演示基本工作池 func demonstrateBasicWorkerPool() { fmt.Println("1. 基本工作池:") // 工作池的基本概念 fmt.Printf(" 工作池的基本概念:\\n") fmt.Printf(" - 预先创建固定数量的工作 goroutine\\n") fmt.Printf(" - 通过 channel 分发任务给工作者\\n") fmt.Printf(" - 控制并发数量,避免创建过多 goroutine\\n") fmt.Printf(" - 提高资源利用率和系统稳定性\\n") fmt.Printf(" - 适用于任务数量大且不确定的场景\\n") // 基本工作池示例 fmt.Printf(" 基本工作池示例:\\n") const numWorkers = 3 const numJobs = 10 jobs := make(chan Job, numJobs) results := make(chan Result, numJobs) // 启动工作者 for w := 1; w <= numWorkers; w++ { go worker(w, jobs, results) } // 发送任务 for j := 1; j <= numJobs; j++ { jobs <- Job{ ID: j, Data: fmt.Sprintf("任务数据-%d", j), } } close(jobs) // 收集结果 for r := 1; r <= numJobs; r++ { result := <-results fmt.Printf(" %s\\n", result.Message) } // 简化的工作池实现 fmt.Printf(" 简化的工作池实现:\\n") simplePool := NewSimpleWorkerPool(4) simplePool.Start() // 提交任务 for i := 1; i <= 8; i++ { taskID := i simplePool.Submit(func() { fmt.Printf(" 执行简单任务 %d\\n", taskID) time.Sleep(100 * time.Millisecond) }) } simplePool.Stop() fmt.Println() } // demonstrateBufferedWorkerPool 演示缓冲工作池 func demonstrateBufferedWorkerPool() { fmt.Println("2. 缓冲工作池:") // 缓冲工作池的优势 fmt.Printf(" 缓冲工作池的优势:\\n") fmt.Printf(" - 任务队列缓冲,减少阻塞\\n") fmt.Printf(" - 平滑处理突发任务\\n") fmt.Printf(" - 提高系统吞吐量\\n") fmt.Printf(" - 更好的负载均衡\\n") // 缓冲工作池示例 fmt.Printf(" 缓冲工作池示例:\\n") bufferedPool := NewBufferedWorkerPool(3, 10) // 3个工作者,10个缓冲 bufferedPool.Start() // 快速提交大量任务 for i := 1; i <= 15; i++ { taskID := i success := bufferedPool.TrySubmit(func() { fmt.Printf(" 缓冲任务 %d 执行\\n", taskID) time.Sleep(50 * time.Millisecond) }) if !success { fmt.Printf(" 任务 %d 提交失败,队列已满\\n", taskID) } } time.Sleep(1 * time.Second) // 等待任务完成 bufferedPool.Stop() // 优先级工作池 fmt.Printf(" 优先级工作池:\\n") priorityPool := NewPriorityWorkerPool(2) priorityPool.Start() // 提交不同优先级的任务 tasks := []PriorityTask{ {Priority: 1, Name: "低优先级任务1"}, {Priority: 3, Name: "高优先级任务1"}, {Priority: 2, Name: "中优先级任务1"}, {Priority: 3, Name: "高优先级任务2"}, {Priority: 1, Name: "低优先级任务2"}, } for _, task := range tasks { priorityPool.Submit(task) } time.Sleep(500 * time.Millisecond) priorityPool.Stop() fmt.Println() } // demonstrateDynamicWorkerPool 演示动态工作池 func demonstrateDynamicWorkerPool() { fmt.Println("3. 动态工作池:") // 动态工作池的特点 fmt.Printf(" 动态工作池的特点:\\n") fmt.Printf(" - 根据负载动态调整工作者数量\\n") fmt.Printf(" - 自动扩容和缩容\\n") fmt.Printf(" - 更好的资源利用率\\n") fmt.Printf(" - 适应不同的工作负载\\n") // 动态工作池示例 fmt.Printf(" 动态工作池示例:\\n") dynamicPool := NewDynamicWorkerPool(2, 8) // 最小2个,最大8个工作者 dynamicPool.Start() // 模拟不同的负载模式 fmt.Printf(" 阶段1: 轻负载\\n") for i := 1; i <= 3; i++ { taskID := i dynamicPool.Submit(func() { fmt.Printf(" 轻负载任务 %d\\n", taskID) time.Sleep(200 * time.Millisecond) }) } time.Sleep(300 * time.Millisecond) fmt.Printf(" 当前工作者数量: %d\\n", dynamicPool.WorkerCount()) fmt.Printf(" 阶段2: 重负载\\n") for i := 1; i <= 12; i++ { taskID := i dynamicPool.Submit(func() { fmt.Printf(" 重负载任务 %d\\n", taskID) time.Sleep(100 * time.Millisecond) }) } time.Sleep(200 * time.Millisecond) fmt.Printf(" 当前工作者数量: %d\\n", dynamicPool.WorkerCount()) fmt.Printf(" 阶段3: 负载降低\\n") time.Sleep(1 * time.Second) fmt.Printf(" 当前工作者数量: %d\\n", dynamicPool.WorkerCount()) dynamicPool.Stop() // 自适应工作池 fmt.Printf(" 自适应工作池:\\n") adaptivePool := NewAdaptiveWorkerPool() adaptivePool.Start() // 提交任务并观察自适应行为 for i := 1; i <= 20; i++ { taskID := i adaptivePool.Submit(func() { // 模拟不同执行时间的任务 duration := time.Duration(rand.Intn(300)) * time.Millisecond time.Sleep(duration) fmt.Printf(" 自适应任务 %d 完成 (耗时: %v)\\n", taskID, duration) }) if i%5 == 0 { fmt.Printf(" 提交 %d 个任务后,工作者数量: %d\\n", i, adaptivePool.WorkerCount()) } } time.Sleep(2 * time.Second) adaptivePool.Stop() fmt.Println() } // demonstrateContextWorkerPool 演示带上下文的工作池 func demonstrateContextWorkerPool() { fmt.Println("4. 带上下文的工作池:") // 上下文工作池的优势 fmt.Printf(" 上下文工作池的优势:\\n") fmt.Printf(" - 支持取消和超时控制\\n") fmt.Printf(" - 优雅的关闭机制\\n") fmt.Printf(" - 更好的错误处理\\n") fmt.Printf(" - 支持请求追踪\\n") // 带取消的工作池 fmt.Printf(" 带取消的工作池:\\n") ctx, cancel := context.WithCancel(context.Background()) contextPool := NewContextWorkerPool(3) contextPool.Start(ctx) // 提交任务 for i := 1; i <= 10; i++ { taskID := i contextPool.Submit(ContextTask{ ID: taskID, Fn: func(ctx context.Context) error { select { case <-ctx.Done(): fmt.Printf(" 任务 %d 被取消\\n", taskID) return ctx.Err() case <-time.After(200 * time.Millisecond): fmt.Printf(" 任务 %d 完成\\n", taskID) return nil } }, }) } // 3秒后取消所有任务 time.Sleep(300 * time.Millisecond) fmt.Printf(" 取消所有任务\\n") cancel() time.Sleep(100 * time.Millisecond) // 带超时的工作池 fmt.Printf(" 带超时的工作池:\\n") timeoutCtx, timeoutCancel := context.WithTimeout(context.Background(), 500*time.Millisecond) defer timeoutCancel() timeoutPool := NewContextWorkerPool(2) timeoutPool.Start(timeoutCtx) // 提交一些长时间运行的任务 for i := 1; i <= 5; i++ { taskID := i timeoutPool.Submit(ContextTask{ ID: taskID, Fn: func(ctx context.Context) error { select { case <-ctx.Done(): fmt.Printf(" 超时任务 %d 被取消: %v\\n", taskID, ctx.Err()) return ctx.Err() case <-time.After(300 * time.Millisecond): fmt.Printf(" 超时任务 %d 完成\\n", taskID) return nil } }, }) } time.Sleep(600 * time.Millisecond) fmt.Println() } // demonstrateErrorHandling 演示工作池的错误处理 func demonstrateErrorHandling() { fmt.Println("5. 工作池的错误处理:") // 错误处理的重要性 fmt.Printf(" 错误处理的重要性:\\n") fmt.Printf(" - 防止单个任务错误影响整个系统\\n") fmt.Printf(" - 提供错误恢复机制\\n") fmt.Printf(" - 记录和监控错误\\n") fmt.Printf(" - 支持重试机制\\n") // 带错误处理的工作池 fmt.Printf(" 带错误处理的工作池:\\n") errorPool := NewErrorHandlingWorkerPool(3) errorPool.Start() // 提交一些可能失败的任务 tasks := []ErrorTask{ {ID: 1, ShouldFail: false, Data: "正常任务1"}, {ID: 2, ShouldFail: true, Data: "错误任务1"}, {ID: 3, ShouldFail: false, Data: "正常任务2"}, {ID: 4, ShouldFail: true, Data: "错误任务2"}, {ID: 5, ShouldFail: false, Data: "正常任务3"}, } for _, task := range tasks { errorPool.Submit(task) } time.Sleep(500 * time.Millisecond) // 获取错误统计 stats := errorPool.GetStats() fmt.Printf(" 处理统计: 成功=%d, 失败=%d, 重试=%d\\n", stats.Success, stats.Failed, stats.Retried) errorPool.Stop() // 带重试机制的工作池 fmt.Printf(" 带重试机制的工作池:\\n") retryPool := NewRetryWorkerPool(2, 3) // 2个工作者,最多重试3次 retryPool.Start() // 提交需要重试的任务 retryTasks := []RetryTask{ {ID: 1, MaxRetries: 2, Data: "重试任务1"}, {ID: 2, MaxRetries: 1, Data: "重试任务2"}, {ID: 3, MaxRetries: 3, Data: "重试任务3"}, } for _, task := range retryTasks { retryPool.Submit(task) } time.Sleep(1 * time.Second) retryPool.Stop() fmt.Println() } // demonstrateWorkerPoolMonitoring 演示工作池的监控 func demonstrateWorkerPoolMonitoring() { fmt.Println("6. 工作池的监控:") // 监控的重要性 fmt.Printf(" 监控的重要性:\\n") fmt.Printf(" - 实时了解工作池状态\\n") fmt.Printf(" - 性能调优的依据\\n") fmt.Printf(" - 及时发现问题\\n") fmt.Printf(" - 容量规划参考\\n") // 带监控的工作池 fmt.Printf(" 带监控的工作池:\\n") monitoredPool := NewMonitoredWorkerPool(4) monitoredPool.Start() // 启动监控 go func() { ticker := time.NewTicker(200 * time.Millisecond) defer ticker.Stop() for i := 0; i < 5; i++ { <-ticker.C metrics := monitoredPool.GetMetrics() fmt.Printf(" 监控: 活跃=%d, 队列=%d, 完成=%d, 平均耗时=%.2fms\\n", metrics.ActiveWorkers, metrics.QueueSize, metrics.CompletedTasks, metrics.AvgProcessTime) } }() // 提交任务 for i := 1; i <= 20; i++ { taskID := i monitoredPool.Submit(MonitoredTask{ ID: taskID, Fn: func() { // 模拟不同执行时间 duration := time.Duration(rand.Intn(300)) * time.Millisecond time.Sleep(duration) }, }) } time.Sleep(1200 * time.Millisecond) // 最终统计 finalMetrics := monitoredPool.GetMetrics() fmt.Printf(" 最终统计: 总任务=%d, 完成=%d, 平均耗时=%.2fms\\n", finalMetrics.TotalTasks, finalMetrics.CompletedTasks, finalMetrics.AvgProcessTime) monitoredPool.Stop() fmt.Println() } // demonstratePracticalApplications 演示实际应用场景 func demonstratePracticalApplications() { fmt.Println("7. 实际应用场景:") // Web 服务器请求处理 fmt.Printf(" Web 服务器请求处理:\\n") webPool := NewWebServerPool(runtime.NumCPU()) webPool.Start() // 模拟HTTP请求 requests := []HTTPRequest{ {ID: 1, Method: "GET", URL: "/api/users", Body: ""}, {ID: 2, Method: "POST", URL: "/api/users", Body: `{"name":"Alice"}`}, {ID: 3, Method: "GET", URL: "/api/orders", Body: ""}, {ID: 4, Method: "PUT", URL: "/api/users/1", Body: `{"name":"Bob"}`}, {ID: 5, Method: "DELETE", URL: "/api/users/2", Body: ""}, } for _, req := range requests { webPool.HandleRequest(req) } time.Sleep(300 * time.Millisecond) webPool.Stop() // 图片处理服务 fmt.Printf(" 图片处理服务:\\n") imagePool := NewImageProcessingPool(3) imagePool.Start() // 模拟图片处理任务 images := []ImageTask{ {ID: 1, Filename: "photo1.jpg", Operations: []string{"resize", "compress"}}, {ID: 2, Filename: "photo2.png", Operations: []string{"crop", "filter"}}, {ID: 3, Filename: "photo3.gif", Operations: []string{"resize", "watermark"}}, {ID: 4, Filename: "photo4.jpg", Operations: []string{"rotate", "compress"}}, } for _, img := range images { imagePool.ProcessImage(img) } time.Sleep(500 * time.Millisecond) imagePool.Stop() // 数据处理管道 fmt.Printf(" 数据处理管道:\\n") pipeline := NewDataPipeline(2, 2, 2) // 读取、处理、写入各2个工作者 pipeline.Start() // 模拟数据处理 dataItems := []DataItem{ {ID: 1, Content: "数据1", Type: "json"}, {ID: 2, Content: "数据2", Type: "xml"}, {ID: 3, Content: "数据3", Type: "csv"}, {ID: 4, Content: "数据4", Type: "json"}, {ID: 5, Content: "数据5", Type: "xml"}, } for _, item := range dataItems { pipeline.ProcessData(item) } time.Sleep(800 * time.Millisecond) pipeline.Stop() // 批量任务处理 fmt.Printf(" 批量任务处理:\\n") batchPool := NewBatchWorkerPool(3, 5) // 3个工作者,批量大小5 batchPool.Start() // 提交大量小任务 for i := 1; i <= 23; i++ { batchPool.Submit(BatchItem{ ID: i, Data: fmt.Sprintf("批量数据-%d", i), }) } time.Sleep(600 * time.Millisecond) batchPool.Stop() fmt.Println() } // ========== 类型定义和辅助函数 ========== // Job 基本任务 type Job struct { ID int Data string } // Result 任务结果 type Result struct { JobID int Message string } // worker 基本工作者函数 func worker(id int, jobs <-chan Job, results chan<- Result) { for job := range jobs { fmt.Printf(" 工作者 %d 开始处理任务 %d\\n", id, job.ID) time.Sleep(100 * time.Millisecond) // 模拟工作 results <- Result{ JobID: job.ID, Message: fmt.Sprintf("工作者 %d 完成任务 %d: %s", id, job.ID, job.Data), } } } // SimpleWorkerPool 简单工作池 type SimpleWorkerPool struct { workerCount int taskQueue chan func() quit chan bool wg sync.WaitGroup } func NewSimpleWorkerPool(workerCount int) *SimpleWorkerPool { return &SimpleWorkerPool{ workerCount: workerCount, taskQueue: make(chan func()), quit: make(chan bool), } } func (p *SimpleWorkerPool) Start() { for i := 0; i < p.workerCount; i++ { p.wg.Add(1) go p.worker(i + 1) } } func (p *SimpleWorkerPool) Submit(task func()) { p.taskQueue <- task } func (p *SimpleWorkerPool) Stop() { close(p.taskQueue) p.wg.Wait() } func (p *SimpleWorkerPool) worker(id int) { defer p.wg.Done() for task := range p.taskQueue { task() } } // BufferedWorkerPool 缓冲工作池 type BufferedWorkerPool struct { workerCount int taskQueue chan func() quit chan bool wg sync.WaitGroup } func NewBufferedWorkerPool(workerCount, bufferSize int) *BufferedWorkerPool { return &BufferedWorkerPool{ workerCount: workerCount, taskQueue: make(chan func(), bufferSize), quit: make(chan bool), } } func (p *BufferedWorkerPool) Start() { for i := 0; i < p.workerCount; i++ { p.wg.Add(1) go p.worker(i + 1) } } func (p *BufferedWorkerPool) TrySubmit(task func()) bool { select { case p.taskQueue <- task: return true default: return false } } func (p *BufferedWorkerPool) Stop() { close(p.taskQueue) p.wg.Wait() } func (p *BufferedWorkerPool) worker(id int) { defer p.wg.Done() for task := range p.taskQueue { task() } } // PriorityTask 优先级任务 type PriorityTask struct { Priority int Name string } // PriorityWorkerPool 优先级工作池 type PriorityWorkerPool struct { workerCount int taskQueue chan PriorityTask wg sync.WaitGroup } func NewPriorityWorkerPool(workerCount int) *PriorityWorkerPool { return &PriorityWorkerPool{ workerCount: workerCount, taskQueue: make(chan PriorityTask, 100), } } func (p *PriorityWorkerPool) Start() { for i := 0; i < p.workerCount; i++ { p.wg.Add(1) go p.worker(i + 1) } } func (p *PriorityWorkerPool) Submit(task PriorityTask) { p.taskQueue <- task } func (p *PriorityWorkerPool) Stop() { close(p.taskQueue) p.wg.Wait() } func (p *PriorityWorkerPool) worker(id int) { defer p.wg.Done() for task := range p.taskQueue { fmt.Printf(" 工作者 %d 执行 %s (优先级: %d)\\n", id, task.Name, task.Priority) time.Sleep(100 * time.Millisecond) } } // DynamicWorkerPool 动态工作池 type DynamicWorkerPool struct { minWorkers int maxWorkers int currentWorkers int taskQueue chan func() workerQueue chan chan func() quit chan bool wg sync.WaitGroup mu sync.RWMutex } func NewDynamicWorkerPool(minWorkers, maxWorkers int) *DynamicWorkerPool { return &DynamicWorkerPool{ minWorkers: minWorkers, maxWorkers: maxWorkers, taskQueue: make(chan func()), workerQueue: make(chan chan func(), maxWorkers), quit: make(chan bool), } } func (p *DynamicWorkerPool) Start() { // 启动最小数量的工作者 for i := 0; i < p.minWorkers; i++ { p.addWorker() } // 启动调度器 go p.dispatcher() } func (p *DynamicWorkerPool) Submit(task func()) { p.taskQueue <- task } func (p *DynamicWorkerPool) WorkerCount() int { p.mu.RLock() defer p.mu.RUnlock() return p.currentWorkers } func (p *DynamicWorkerPool) Stop() { close(p.quit) p.wg.Wait() } func (p *DynamicWorkerPool) addWorker() { p.mu.Lock() defer p.mu.Unlock() if p.currentWorkers < p.maxWorkers { p.currentWorkers++ p.wg.Add(1) go p.worker(p.currentWorkers) } } func (p *DynamicWorkerPool) worker(id int) { defer p.wg.Done() defer func() { p.mu.Lock() p.currentWorkers-- p.mu.Unlock() }() taskChan := make(chan func()) for { // 注册工作者 p.workerQueue <- taskChan select { case task := <-taskChan: task() case <-p.quit: return } } } func (p *DynamicWorkerPool) dispatcher() { for { select { case task := <-p.taskQueue: select { case workerTaskChan := <-p.workerQueue: workerTaskChan <- task default: // 没有可用工作者,尝试添加新工作者 p.addWorker() workerTaskChan := <-p.workerQueue workerTaskChan <- task } case <-p.quit: return } } } // AdaptiveWorkerPool 自适应工作池 type AdaptiveWorkerPool struct { workers map[int]*AdaptiveWorker taskQueue chan func() workerQueue chan chan func() quit chan bool wg sync.WaitGroup mu sync.RWMutex nextID int } type AdaptiveWorker struct { id int taskChan chan func() quit chan bool pool *AdaptiveWorkerPool } func NewAdaptiveWorkerPool() *AdaptiveWorkerPool { return &AdaptiveWorkerPool{ workers: make(map[int]*AdaptiveWorker), taskQueue: make(chan func()), workerQueue: make(chan chan func(), 100), quit: make(chan bool), } } func (p *AdaptiveWorkerPool) Start() { // 启动初始工作者 p.addWorker() go p.dispatcher() go p.monitor() } func (p *AdaptiveWorkerPool) Submit(task func()) { p.taskQueue <- task } func (p *AdaptiveWorkerPool) WorkerCount() int { p.mu.RLock() defer p.mu.RUnlock() return len(p.workers) } func (p *AdaptiveWorkerPool) Stop() { close(p.quit) p.wg.Wait() } func (p *AdaptiveWorkerPool) addWorker() { p.mu.Lock() defer p.mu.Unlock() p.nextID++ worker := &AdaptiveWorker{ id: p.nextID, taskChan: make(chan func()), quit: make(chan bool), pool: p, } p.workers[worker.id] = worker p.wg.Add(1) go worker.run() } func (p *AdaptiveWorkerPool) removeWorker(id int) { p.mu.Lock() defer p.mu.Unlock() if worker, exists := p.workers[id]; exists { close(worker.quit) delete(p.workers, id) } } func (p *AdaptiveWorkerPool) dispatcher() { for { select { case task := <-p.taskQueue: select { case workerTaskChan := <-p.workerQueue: workerTaskChan <- task default: // 需要更多工作者 p.addWorker() workerTaskChan := <-p.workerQueue workerTaskChan <- task } case <-p.quit: return } } } func (p *AdaptiveWorkerPool) monitor() { ticker := time.NewTicker(1 * time.Second) defer ticker.Stop() for { select { case <-ticker.C: // 简单的自适应逻辑:如果队列为空且工作者多于1个,减少工作者 if len(p.taskQueue) == 0 && len(p.workers) > 1 { p.mu.RLock() var workerID int for id := range p.workers { workerID = id break } p.mu.RUnlock() p.removeWorker(workerID) } case <-p.quit: return } } } func (w *AdaptiveWorker) run() { defer w.pool.wg.Done() for { // 注册到工作者队列 w.pool.workerQueue <- w.taskChan select { case task := <-w.taskChan: task() case <-w.quit: return case <-w.pool.quit: return } } } // ContextTask 上下文任务 type ContextTask struct { ID int Fn func(context.Context) error } // ContextWorkerPool 上下文工作池 type ContextWorkerPool struct { workerCount int taskQueue chan ContextTask wg sync.WaitGroup } func NewContextWorkerPool(workerCount int) *ContextWorkerPool { return &ContextWorkerPool{ workerCount: workerCount, taskQueue: make(chan ContextTask), } } func (p *ContextWorkerPool) Start(ctx context.Context) { for i := 0; i < p.workerCount; i++ { p.wg.Add(1) go p.worker(ctx, i+1) } } func (p *ContextWorkerPool) Submit(task ContextTask) { p.taskQueue <- task } func (p *ContextWorkerPool) worker(ctx context.Context, id int) { defer p.wg.Done() for { select { case task := <-p.taskQueue: task.Fn(ctx) case <-ctx.Done(): return } } } // ErrorTask 错误任务 type ErrorTask struct { ID int ShouldFail bool Data string } // ErrorStats 错误统计 type ErrorStats struct { Success int Failed int Retried int } // ErrorHandlingWorkerPool 错误处理工作池 type ErrorHandlingWorkerPool struct { workerCount int taskQueue chan ErrorTask wg sync.WaitGroup stats ErrorStats statsMu sync.Mutex } func NewErrorHandlingWorkerPool(workerCount int) *ErrorHandlingWorkerPool { return &ErrorHandlingWorkerPool{ workerCount: workerCount, taskQueue: make(chan ErrorTask), } } func (p *ErrorHandlingWorkerPool) Start() { for i := 0; i < p.workerCount; i++ { p.wg.Add(1) go p.worker(i + 1) } } func (p *ErrorHandlingWorkerPool) Submit(task ErrorTask) { p.taskQueue <- task } func (p *ErrorHandlingWorkerPool) GetStats() ErrorStats { p.statsMu.Lock() defer p.statsMu.Unlock() return p.stats } func (p *ErrorHandlingWorkerPool) Stop() { close(p.taskQueue) p.wg.Wait() } func (p *ErrorHandlingWorkerPool) worker(id int) { defer p.wg.Done() for task := range p.taskQueue { err := p.processTask(task) p.statsMu.Lock() if err != nil { p.stats.Failed++ fmt.Printf(" 工作者 %d 任务 %d 失败: %v\\n", id, task.ID, err) } else { p.stats.Success++ fmt.Printf(" 工作者 %d 任务 %d 成功\\n", id, task.ID) } p.statsMu.Unlock() } } func (p *ErrorHandlingWorkerPool) processTask(task ErrorTask) error { time.Sleep(50 * time.Millisecond) if task.ShouldFail { return fmt.Errorf("任务 %d 模拟失败", task.ID) } return nil } // RetryTask 重试任务 type RetryTask struct { ID int MaxRetries int Data string attempts int } // RetryWorkerPool 重试工作池 type RetryWorkerPool struct { workerCount int maxRetries int taskQueue chan RetryTask retryQueue chan RetryTask wg sync.WaitGroup } func NewRetryWorkerPool(workerCount, maxRetries int) *RetryWorkerPool { return &RetryWorkerPool{ workerCount: workerCount, maxRetries: maxRetries, taskQueue: make(chan RetryTask), retryQueue: make(chan RetryTask, 100), } } func (p *RetryWorkerPool) Start() { for i := 0; i < p.workerCount; i++ { p.wg.Add(1) go p.worker(i + 1) } // 启动重试处理器 p.wg.Add(1) go p.retryHandler() } func (p *RetryWorkerPool) Submit(task RetryTask) { p.taskQueue <- task } func (p *RetryWorkerPool) Stop() { close(p.taskQueue) close(p.retryQueue) p.wg.Wait() } func (p *RetryWorkerPool) worker(id int) { defer p.wg.Done() for task := range p.taskQueue { task.attempts++ // 模拟任务执行(30%成功率) success := rand.Float32() < 0.3 if success { fmt.Printf(" 工作者 %d 任务 %d 成功 (尝试 %d 次)\\n", id, task.ID, task.attempts) } else if task.attempts < task.MaxRetries { fmt.Printf(" 工作者 %d 任务 %d 失败,准备重试 (尝试 %d/%d)\\n", id, task.ID, task.attempts, task.MaxRetries) p.retryQueue <- task } else { fmt.Printf(" 工作者 %d 任务 %d 最终失败 (尝试 %d 次)\\n", id, task.ID, task.attempts) } } } func (p *RetryWorkerPool) retryHandler() { defer p.wg.Done() for task := range p.retryQueue { // 延迟重试 time.Sleep(100 * time.Millisecond) p.taskQueue <- task } } // MonitoredTask 监控任务 type MonitoredTask struct { ID int Fn func() } // WorkerPoolMetrics 工作池指标 type WorkerPoolMetrics struct { ActiveWorkers int QueueSize int TotalTasks int CompletedTasks int AvgProcessTime float64 } // MonitoredWorkerPool 监控工作池 type MonitoredWorkerPool struct { workerCount int taskQueue chan MonitoredTask wg sync.WaitGroup metrics WorkerPoolMetrics metricsMu sync.RWMutex processTimes []time.Duration activeWorkers int } func NewMonitoredWorkerPool(workerCount int) *MonitoredWorkerPool { return &MonitoredWorkerPool{ workerCount: workerCount, taskQueue: make(chan MonitoredTask, 100), processTimes: make([]time.Duration, 0), } } func (p *MonitoredWorkerPool) Start() { for i := 0; i < p.workerCount; i++ { p.wg.Add(1) go p.worker(i + 1) } } func (p *MonitoredWorkerPool) Submit(task MonitoredTask) { p.metricsMu.Lock() p.metrics.TotalTasks++ p.metricsMu.Unlock() p.taskQueue <- task } func (p *MonitoredWorkerPool) GetMetrics() WorkerPoolMetrics { p.metricsMu.RLock() defer p.metricsMu.RUnlock() metrics := p.metrics metrics.QueueSize = len(p.taskQueue) metrics.ActiveWorkers = p.activeWorkers // 计算平均处理时间 if len(p.processTimes) > 0 { var total time.Duration for _, t := range p.processTimes { total += t } metrics.AvgProcessTime = float64(total.Nanoseconds()) / float64(len(p.processTimes)) / 1e6 } return metrics } func (p *MonitoredWorkerPool) Stop() { close(p.taskQueue) p.wg.Wait() } func (p *MonitoredWorkerPool) worker(id int) { defer p.wg.Done() for task := range p.taskQueue { p.metricsMu.Lock() p.activeWorkers++ p.metricsMu.Unlock() start := time.Now() task.Fn() duration := time.Since(start) p.metricsMu.Lock() p.activeWorkers-- p.metrics.CompletedTasks++ p.processTimes = append(p.processTimes, duration) // 保持最近100个处理时间 if len(p.processTimes) > 100 { p.processTimes = p.processTimes[1:] } p.metricsMu.Unlock() } } // HTTPRequest HTTP请求 type HTTPRequest struct { ID int Method string URL string Body string } // WebServerPool Web服务器池 type WebServerPool struct { workerCount int requestChan chan HTTPRequest wg sync.WaitGroup } func NewWebServerPool(workerCount int) *WebServerPool { return &WebServerPool{ workerCount: workerCount, requestChan: make(chan HTTPRequest, 100), } } func (p *WebServerPool) Start() { for i := 0; i < p.workerCount; i++ { p.wg.Add(1) go p.worker(i + 1) } } func (p *WebServerPool) HandleRequest(req HTTPRequest) { p.requestChan <- req } func (p *WebServerPool) Stop() { close(p.requestChan) p.wg.Wait() } func (p *WebServerPool) worker(id int) { defer p.wg.Done() for req := range p.requestChan { // 模拟请求处理 processingTime := time.Duration(rand.Intn(100)) * time.Millisecond time.Sleep(processingTime) fmt.Printf(" 工作者 %d 处理请求 %d: %s %s (耗时: %v)\\n", id, req.ID, req.Method, req.URL, processingTime) } } // ImageTask 图片任务 type ImageTask struct { ID int Filename string Operations []string } // ImageProcessingPool 图片处理池 type ImageProcessingPool struct { workerCount int taskChan chan ImageTask wg sync.WaitGroup } func NewImageProcessingPool(workerCount int) *ImageProcessingPool { return &ImageProcessingPool{ workerCount: workerCount, taskChan: make(chan ImageTask, 50), } } func (p *ImageProcessingPool) Start() { for i := 0; i < p.workerCount; i++ { p.wg.Add(1) go p.worker(i + 1) } } func (p *ImageProcessingPool) ProcessImage(task ImageTask) { p.taskChan <- task } func (p *ImageProcessingPool) Stop() { close(p.taskChan) p.wg.Wait() } func (p *ImageProcessingPool) worker(id int) { defer p.wg.Done() for task := range p.taskChan { fmt.Printf(" 工作者 %d 处理图片 %s:\\n", id, task.Filename) for _, op := range task.Operations { // 模拟图片操作 time.Sleep(50 * time.Millisecond) fmt.Printf(" - 执行操作: %s\\n", op) } fmt.Printf(" 图片 %s 处理完成\\n", task.Filename) } } // DataItem 数据项 type DataItem struct { ID int Content string Type string } // DataPipeline 数据处理管道 type DataPipeline struct { readWorkers int processWorkers int writeWorkers int inputChan chan DataItem processedChan chan DataItem outputChan chan DataItem wg sync.WaitGroup } func NewDataPipeline(readWorkers, processWorkers, writeWorkers int) *DataPipeline { return &DataPipeline{ readWorkers: readWorkers, processWorkers: processWorkers, writeWorkers: writeWorkers, inputChan: make(chan DataItem, 20), processedChan: make(chan DataItem, 20), outputChan: make(chan DataItem, 20), } } func (p *DataPipeline) Start() { // 启动读取工作者 for i := 0; i < p.readWorkers; i++ { p.wg.Add(1) go p.readWorker(i + 1) } // 启动处理工作者 for i := 0; i < p.processWorkers; i++ { p.wg.Add(1) go p.processWorker(i + 1) } // 启动写入工作者 for i := 0; i < p.writeWorkers; i++ { p.wg.Add(1) go p.writeWorker(i + 1) } } func (p *DataPipeline) ProcessData(item DataItem) { p.inputChan <- item } func (p *DataPipeline) Stop() { close(p.inputChan) p.wg.Wait() } func (p *DataPipeline) readWorker(id int) { defer p.wg.Done() for item := range p.inputChan { // 模拟读取处理 time.Sleep(30 * time.Millisecond) fmt.Printf(" 读取工作者 %d 读取数据 %d\\n", id, item.ID) p.processedChan <- item } if id == 1 { // 只让第一个工作者关闭下一阶段 close(p.processedChan) } } func (p *DataPipeline) processWorker(id int) { defer p.wg.Done() for item := range p.processedChan { // 模拟数据处理 time.Sleep(50 * time.Millisecond) item.Content = fmt.Sprintf("处理后的%s", item.Content) fmt.Printf(" 处理工作者 %d 处理数据 %d\\n", id, item.ID) p.outputChan <- item } if id == 1 { // 只让第一个工作者关闭下一阶段 close(p.outputChan) } } func (p *DataPipeline) writeWorker(id int) { defer p.wg.Done() for item := range p.outputChan { // 模拟写入处理 time.Sleep(20 * time.Millisecond) fmt.Printf(" 写入工作者 %d 写入数据 %d: %s\\n", id, item.ID, item.Content) } } // BatchItem 批量项 type BatchItem struct { ID int Data string } // BatchWorkerPool 批量工作池 type BatchWorkerPool struct { workerCount int batchSize int itemChan chan BatchItem wg sync.WaitGroup } func NewBatchWorkerPool(workerCount, batchSize int) *BatchWorkerPool { return &BatchWorkerPool{ workerCount: workerCount, batchSize: batchSize, itemChan: make(chan BatchItem, 100), } } func (p *BatchWorkerPool) Start() { for i := 0; i < p.workerCount; i++ { p.wg.Add(1) go p.worker(i + 1) } } func (p *BatchWorkerPool) Submit(item BatchItem) { p.itemChan <- item } func (p *BatchWorkerPool) Stop() { close(p.itemChan) p.wg.Wait() } func (p *BatchWorkerPool) worker(id int) { defer p.wg.Done() batch := make([]BatchItem, 0, p.batchSize) for item := range p.itemChan { batch = append(batch, item) if len(batch) >= p.batchSize { p.processBatch(id, batch) batch = batch[:0] // 重置批次 } } // 处理剩余的项目 if len(batch) > 0 { p.processBatch(id, batch) } } func (p *BatchWorkerPool) processBatch(workerID int, batch []BatchItem) { fmt.Printf(" 工作者 %d 处理批次 (大小: %d):\\n", workerID, len(batch)) for _, item := range batch { fmt.Printf(" - 处理项目 %d: %s\\n", item.ID, item.Data) } // 模拟批量处理时间 time.Sleep(100 * time.Millisecond) fmt.Printf(" 工作者 %d 批次处理完成\\n", workerID) } /* 运行这个程序: go run 05-worker-pools.go 学习要点: 1. 工作池模式是控制并发的重要技术 2. 可以根据需求实现不同类型的工作池 3. 工作池提供了更好的资源控制和系统稳定性 4. 支持动态调整、错误处理、监控等高级特性 5. 在实际应用中有广泛的使用场景 工作池的优势: 1. 控制并发数量,避免资源耗尽 2. 重用 goroutine,减少创建销毁开销 3. 提供任务队列,平滑处理突发负载 4. 支持优雅关闭和错误处理 5. 便于监控和调优 工作池类型: 1. 固定大小工作池:预先创建固定数量的工作者 2. 缓冲工作池:带有任务队列缓冲 3. 动态工作池:根据负载动态调整工作者数量 4. 优先级工作池:支持任务优先级 5. 上下文工作池:支持取消和超时 高级特性: 1. 错误处理:捕获和处理任务错误 2. 重试机制:自动重试失败的任务 3. 监控指标:实时监控工作池状态 4. 优雅关闭:安全地停止工作池 5. 负载均衡:合理分配任务 实际应用: 1. Web 服务器:处理 HTTP 请求 2. 图片处理:批量处理图片 3. 数据处理:ETL 数据管道 4. 批量任务:批量处理业务逻辑 5. 消息队列:处理消息 设计考虑: 1. 工作者数量:根据 CPU 核心数和任务类型 2. 队列大小:平衡内存使用和响应时间 3. 任务粒度:避免任务过大或过小 4. 错误策略:决定如何处理失败任务 5. 监控指标:选择合适的监控维度 性能优化: 1. 合理设置工作者数量 2. 使用缓冲队列减少阻塞 3. 批量处理提高效率 4. 避免频繁创建销毁 5. 监控和调优关键指标 最佳实践: 1. 根据任务特性选择合适的工作池类型 2. 实现优雅关闭机制 3. 添加监控和日志 4. 处理 panic 和错误 5. 进行性能测试和调优 */