Files
golang/golang-learning/06-concurrency/05-worker-pools.go
2025-08-24 11:24:52 +08:00

1562 lines
34 KiB
Go
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

/*
05-worker-pools.go - Go 语言工作池模式详解
学习目标:
1. 理解工作池模式的概念和优势
2. 掌握固定大小工作池的实现
3. 学会动态工作池的设计
4. 了解工作池的优雅关闭
5. 掌握工作池的实际应用场景
知识点:
- 工作池模式的基本概念
- 固定大小工作池
- 动态工作池
- 工作池的生命周期管理
- 错误处理和监控
- 性能优化技巧
*/
package main
import (
"context"
"fmt"
"math/rand"
"runtime"
"sync"
"time"
)
func main() {
fmt.Println("=== Go 语言工作池模式详解 ===\\n")
// 演示基本工作池
demonstrateBasicWorkerPool()
// 演示缓冲工作池
demonstrateBufferedWorkerPool()
// 演示动态工作池
demonstrateDynamicWorkerPool()
// 演示带上下文的工作池
demonstrateContextWorkerPool()
// 演示工作池的错误处理
demonstrateErrorHandling()
// 演示工作池的监控
demonstrateWorkerPoolMonitoring()
// 演示实际应用场景
demonstratePracticalApplications()
}
// demonstrateBasicWorkerPool 演示基本工作池
func demonstrateBasicWorkerPool() {
fmt.Println("1. 基本工作池:")
// 工作池的基本概念
fmt.Printf(" 工作池的基本概念:\\n")
fmt.Printf(" - 预先创建固定数量的工作 goroutine\\n")
fmt.Printf(" - 通过 channel 分发任务给工作者\\n")
fmt.Printf(" - 控制并发数量,避免创建过多 goroutine\\n")
fmt.Printf(" - 提高资源利用率和系统稳定性\\n")
fmt.Printf(" - 适用于任务数量大且不确定的场景\\n")
// 基本工作池示例
fmt.Printf(" 基本工作池示例:\\n")
const numWorkers = 3
const numJobs = 10
jobs := make(chan Job, numJobs)
results := make(chan Result, numJobs)
// 启动工作者
for w := 1; w <= numWorkers; w++ {
go worker(w, jobs, results)
}
// 发送任务
for j := 1; j <= numJobs; j++ {
jobs <- Job{
ID: j,
Data: fmt.Sprintf("任务数据-%d", j),
}
}
close(jobs)
// 收集结果
for r := 1; r <= numJobs; r++ {
result := <-results
fmt.Printf(" %s\\n", result.Message)
}
// 简化的工作池实现
fmt.Printf(" 简化的工作池实现:\\n")
simplePool := NewSimpleWorkerPool(4)
simplePool.Start()
// 提交任务
for i := 1; i <= 8; i++ {
taskID := i
simplePool.Submit(func() {
fmt.Printf(" 执行简单任务 %d\\n", taskID)
time.Sleep(100 * time.Millisecond)
})
}
simplePool.Stop()
fmt.Println()
}
// demonstrateBufferedWorkerPool 演示缓冲工作池
func demonstrateBufferedWorkerPool() {
fmt.Println("2. 缓冲工作池:")
// 缓冲工作池的优势
fmt.Printf(" 缓冲工作池的优势:\\n")
fmt.Printf(" - 任务队列缓冲,减少阻塞\\n")
fmt.Printf(" - 平滑处理突发任务\\n")
fmt.Printf(" - 提高系统吞吐量\\n")
fmt.Printf(" - 更好的负载均衡\\n")
// 缓冲工作池示例
fmt.Printf(" 缓冲工作池示例:\\n")
bufferedPool := NewBufferedWorkerPool(3, 10) // 3个工作者10个缓冲
bufferedPool.Start()
// 快速提交大量任务
for i := 1; i <= 15; i++ {
taskID := i
success := bufferedPool.TrySubmit(func() {
fmt.Printf(" 缓冲任务 %d 执行\\n", taskID)
time.Sleep(50 * time.Millisecond)
})
if !success {
fmt.Printf(" 任务 %d 提交失败,队列已满\\n", taskID)
}
}
time.Sleep(1 * time.Second) // 等待任务完成
bufferedPool.Stop()
// 优先级工作池
fmt.Printf(" 优先级工作池:\\n")
priorityPool := NewPriorityWorkerPool(2)
priorityPool.Start()
// 提交不同优先级的任务
tasks := []PriorityTask{
{Priority: 1, Name: "低优先级任务1"},
{Priority: 3, Name: "高优先级任务1"},
{Priority: 2, Name: "中优先级任务1"},
{Priority: 3, Name: "高优先级任务2"},
{Priority: 1, Name: "低优先级任务2"},
}
for _, task := range tasks {
priorityPool.Submit(task)
}
time.Sleep(500 * time.Millisecond)
priorityPool.Stop()
fmt.Println()
}
// demonstrateDynamicWorkerPool 演示动态工作池
func demonstrateDynamicWorkerPool() {
fmt.Println("3. 动态工作池:")
// 动态工作池的特点
fmt.Printf(" 动态工作池的特点:\\n")
fmt.Printf(" - 根据负载动态调整工作者数量\\n")
fmt.Printf(" - 自动扩容和缩容\\n")
fmt.Printf(" - 更好的资源利用率\\n")
fmt.Printf(" - 适应不同的工作负载\\n")
// 动态工作池示例
fmt.Printf(" 动态工作池示例:\\n")
dynamicPool := NewDynamicWorkerPool(2, 8) // 最小2个最大8个工作者
dynamicPool.Start()
// 模拟不同的负载模式
fmt.Printf(" 阶段1: 轻负载\\n")
for i := 1; i <= 3; i++ {
taskID := i
dynamicPool.Submit(func() {
fmt.Printf(" 轻负载任务 %d\\n", taskID)
time.Sleep(200 * time.Millisecond)
})
}
time.Sleep(300 * time.Millisecond)
fmt.Printf(" 当前工作者数量: %d\\n", dynamicPool.WorkerCount())
fmt.Printf(" 阶段2: 重负载\\n")
for i := 1; i <= 12; i++ {
taskID := i
dynamicPool.Submit(func() {
fmt.Printf(" 重负载任务 %d\\n", taskID)
time.Sleep(100 * time.Millisecond)
})
}
time.Sleep(200 * time.Millisecond)
fmt.Printf(" 当前工作者数量: %d\\n", dynamicPool.WorkerCount())
fmt.Printf(" 阶段3: 负载降低\\n")
time.Sleep(1 * time.Second)
fmt.Printf(" 当前工作者数量: %d\\n", dynamicPool.WorkerCount())
dynamicPool.Stop()
// 自适应工作池
fmt.Printf(" 自适应工作池:\\n")
adaptivePool := NewAdaptiveWorkerPool()
adaptivePool.Start()
// 提交任务并观察自适应行为
for i := 1; i <= 20; i++ {
taskID := i
adaptivePool.Submit(func() {
// 模拟不同执行时间的任务
duration := time.Duration(rand.Intn(300)) * time.Millisecond
time.Sleep(duration)
fmt.Printf(" 自适应任务 %d 完成 (耗时: %v)\\n", taskID, duration)
})
if i%5 == 0 {
fmt.Printf(" 提交 %d 个任务后,工作者数量: %d\\n", i, adaptivePool.WorkerCount())
}
}
time.Sleep(2 * time.Second)
adaptivePool.Stop()
fmt.Println()
}
// demonstrateContextWorkerPool 演示带上下文的工作池
func demonstrateContextWorkerPool() {
fmt.Println("4. 带上下文的工作池:")
// 上下文工作池的优势
fmt.Printf(" 上下文工作池的优势:\\n")
fmt.Printf(" - 支持取消和超时控制\\n")
fmt.Printf(" - 优雅的关闭机制\\n")
fmt.Printf(" - 更好的错误处理\\n")
fmt.Printf(" - 支持请求追踪\\n")
// 带取消的工作池
fmt.Printf(" 带取消的工作池:\\n")
ctx, cancel := context.WithCancel(context.Background())
contextPool := NewContextWorkerPool(3)
contextPool.Start(ctx)
// 提交任务
for i := 1; i <= 10; i++ {
taskID := i
contextPool.Submit(ContextTask{
ID: taskID,
Fn: func(ctx context.Context) error {
select {
case <-ctx.Done():
fmt.Printf(" 任务 %d 被取消\\n", taskID)
return ctx.Err()
case <-time.After(200 * time.Millisecond):
fmt.Printf(" 任务 %d 完成\\n", taskID)
return nil
}
},
})
}
// 3秒后取消所有任务
time.Sleep(300 * time.Millisecond)
fmt.Printf(" 取消所有任务\\n")
cancel()
time.Sleep(100 * time.Millisecond)
// 带超时的工作池
fmt.Printf(" 带超时的工作池:\\n")
timeoutCtx, timeoutCancel := context.WithTimeout(context.Background(), 500*time.Millisecond)
defer timeoutCancel()
timeoutPool := NewContextWorkerPool(2)
timeoutPool.Start(timeoutCtx)
// 提交一些长时间运行的任务
for i := 1; i <= 5; i++ {
taskID := i
timeoutPool.Submit(ContextTask{
ID: taskID,
Fn: func(ctx context.Context) error {
select {
case <-ctx.Done():
fmt.Printf(" 超时任务 %d 被取消: %v\\n", taskID, ctx.Err())
return ctx.Err()
case <-time.After(300 * time.Millisecond):
fmt.Printf(" 超时任务 %d 完成\\n", taskID)
return nil
}
},
})
}
time.Sleep(600 * time.Millisecond)
fmt.Println()
}
// demonstrateErrorHandling 演示工作池的错误处理
func demonstrateErrorHandling() {
fmt.Println("5. 工作池的错误处理:")
// 错误处理的重要性
fmt.Printf(" 错误处理的重要性:\\n")
fmt.Printf(" - 防止单个任务错误影响整个系统\\n")
fmt.Printf(" - 提供错误恢复机制\\n")
fmt.Printf(" - 记录和监控错误\\n")
fmt.Printf(" - 支持重试机制\\n")
// 带错误处理的工作池
fmt.Printf(" 带错误处理的工作池:\\n")
errorPool := NewErrorHandlingWorkerPool(3)
errorPool.Start()
// 提交一些可能失败的任务
tasks := []ErrorTask{
{ID: 1, ShouldFail: false, Data: "正常任务1"},
{ID: 2, ShouldFail: true, Data: "错误任务1"},
{ID: 3, ShouldFail: false, Data: "正常任务2"},
{ID: 4, ShouldFail: true, Data: "错误任务2"},
{ID: 5, ShouldFail: false, Data: "正常任务3"},
}
for _, task := range tasks {
errorPool.Submit(task)
}
time.Sleep(500 * time.Millisecond)
// 获取错误统计
stats := errorPool.GetStats()
fmt.Printf(" 处理统计: 成功=%d, 失败=%d, 重试=%d\\n",
stats.Success, stats.Failed, stats.Retried)
errorPool.Stop()
// 带重试机制的工作池
fmt.Printf(" 带重试机制的工作池:\\n")
retryPool := NewRetryWorkerPool(2, 3) // 2个工作者最多重试3次
retryPool.Start()
// 提交需要重试的任务
retryTasks := []RetryTask{
{ID: 1, MaxRetries: 2, Data: "重试任务1"},
{ID: 2, MaxRetries: 1, Data: "重试任务2"},
{ID: 3, MaxRetries: 3, Data: "重试任务3"},
}
for _, task := range retryTasks {
retryPool.Submit(task)
}
time.Sleep(1 * time.Second)
retryPool.Stop()
fmt.Println()
}
// demonstrateWorkerPoolMonitoring 演示工作池的监控
func demonstrateWorkerPoolMonitoring() {
fmt.Println("6. 工作池的监控:")
// 监控的重要性
fmt.Printf(" 监控的重要性:\\n")
fmt.Printf(" - 实时了解工作池状态\\n")
fmt.Printf(" - 性能调优的依据\\n")
fmt.Printf(" - 及时发现问题\\n")
fmt.Printf(" - 容量规划参考\\n")
// 带监控的工作池
fmt.Printf(" 带监控的工作池:\\n")
monitoredPool := NewMonitoredWorkerPool(4)
monitoredPool.Start()
// 启动监控
go func() {
ticker := time.NewTicker(200 * time.Millisecond)
defer ticker.Stop()
for i := 0; i < 5; i++ {
<-ticker.C
metrics := monitoredPool.GetMetrics()
fmt.Printf(" 监控: 活跃=%d, 队列=%d, 完成=%d, 平均耗时=%.2fms\\n",
metrics.ActiveWorkers, metrics.QueueSize,
metrics.CompletedTasks, metrics.AvgProcessTime)
}
}()
// 提交任务
for i := 1; i <= 20; i++ {
taskID := i
monitoredPool.Submit(MonitoredTask{
ID: taskID,
Fn: func() {
// 模拟不同执行时间
duration := time.Duration(rand.Intn(300)) * time.Millisecond
time.Sleep(duration)
},
})
}
time.Sleep(1200 * time.Millisecond)
// 最终统计
finalMetrics := monitoredPool.GetMetrics()
fmt.Printf(" 最终统计: 总任务=%d, 完成=%d, 平均耗时=%.2fms\\n",
finalMetrics.TotalTasks, finalMetrics.CompletedTasks,
finalMetrics.AvgProcessTime)
monitoredPool.Stop()
fmt.Println()
}
// demonstratePracticalApplications 演示实际应用场景
func demonstratePracticalApplications() {
fmt.Println("7. 实际应用场景:")
// Web 服务器请求处理
fmt.Printf(" Web 服务器请求处理:\\n")
webPool := NewWebServerPool(runtime.NumCPU())
webPool.Start()
// 模拟HTTP请求
requests := []HTTPRequest{
{ID: 1, Method: "GET", URL: "/api/users", Body: ""},
{ID: 2, Method: "POST", URL: "/api/users", Body: `{"name":"Alice"}`},
{ID: 3, Method: "GET", URL: "/api/orders", Body: ""},
{ID: 4, Method: "PUT", URL: "/api/users/1", Body: `{"name":"Bob"}`},
{ID: 5, Method: "DELETE", URL: "/api/users/2", Body: ""},
}
for _, req := range requests {
webPool.HandleRequest(req)
}
time.Sleep(300 * time.Millisecond)
webPool.Stop()
// 图片处理服务
fmt.Printf(" 图片处理服务:\\n")
imagePool := NewImageProcessingPool(3)
imagePool.Start()
// 模拟图片处理任务
images := []ImageTask{
{ID: 1, Filename: "photo1.jpg", Operations: []string{"resize", "compress"}},
{ID: 2, Filename: "photo2.png", Operations: []string{"crop", "filter"}},
{ID: 3, Filename: "photo3.gif", Operations: []string{"resize", "watermark"}},
{ID: 4, Filename: "photo4.jpg", Operations: []string{"rotate", "compress"}},
}
for _, img := range images {
imagePool.ProcessImage(img)
}
time.Sleep(500 * time.Millisecond)
imagePool.Stop()
// 数据处理管道
fmt.Printf(" 数据处理管道:\\n")
pipeline := NewDataPipeline(2, 2, 2) // 读取、处理、写入各2个工作者
pipeline.Start()
// 模拟数据处理
dataItems := []DataItem{
{ID: 1, Content: "数据1", Type: "json"},
{ID: 2, Content: "数据2", Type: "xml"},
{ID: 3, Content: "数据3", Type: "csv"},
{ID: 4, Content: "数据4", Type: "json"},
{ID: 5, Content: "数据5", Type: "xml"},
}
for _, item := range dataItems {
pipeline.ProcessData(item)
}
time.Sleep(800 * time.Millisecond)
pipeline.Stop()
// 批量任务处理
fmt.Printf(" 批量任务处理:\\n")
batchPool := NewBatchWorkerPool(3, 5) // 3个工作者批量大小5
batchPool.Start()
// 提交大量小任务
for i := 1; i <= 23; i++ {
batchPool.Submit(BatchItem{
ID: i,
Data: fmt.Sprintf("批量数据-%d", i),
})
}
time.Sleep(600 * time.Millisecond)
batchPool.Stop()
fmt.Println()
}
// ========== 类型定义和辅助函数 ==========
// Job 基本任务
type Job struct {
ID int
Data string
}
// Result 任务结果
type Result struct {
JobID int
Message string
}
// worker 基本工作者函数
func worker(id int, jobs <-chan Job, results chan<- Result) {
for job := range jobs {
fmt.Printf(" 工作者 %d 开始处理任务 %d\\n", id, job.ID)
time.Sleep(100 * time.Millisecond) // 模拟工作
results <- Result{
JobID: job.ID,
Message: fmt.Sprintf("工作者 %d 完成任务 %d: %s", id, job.ID, job.Data),
}
}
}
// SimpleWorkerPool 简单工作池
type SimpleWorkerPool struct {
workerCount int
taskQueue chan func()
quit chan bool
wg sync.WaitGroup
}
func NewSimpleWorkerPool(workerCount int) *SimpleWorkerPool {
return &SimpleWorkerPool{
workerCount: workerCount,
taskQueue: make(chan func()),
quit: make(chan bool),
}
}
func (p *SimpleWorkerPool) Start() {
for i := 0; i < p.workerCount; i++ {
p.wg.Add(1)
go p.worker(i + 1)
}
}
func (p *SimpleWorkerPool) Submit(task func()) {
p.taskQueue <- task
}
func (p *SimpleWorkerPool) Stop() {
close(p.taskQueue)
p.wg.Wait()
}
func (p *SimpleWorkerPool) worker(id int) {
defer p.wg.Done()
for task := range p.taskQueue {
task()
}
}
// BufferedWorkerPool 缓冲工作池
type BufferedWorkerPool struct {
workerCount int
taskQueue chan func()
quit chan bool
wg sync.WaitGroup
}
func NewBufferedWorkerPool(workerCount, bufferSize int) *BufferedWorkerPool {
return &BufferedWorkerPool{
workerCount: workerCount,
taskQueue: make(chan func(), bufferSize),
quit: make(chan bool),
}
}
func (p *BufferedWorkerPool) Start() {
for i := 0; i < p.workerCount; i++ {
p.wg.Add(1)
go p.worker(i + 1)
}
}
func (p *BufferedWorkerPool) TrySubmit(task func()) bool {
select {
case p.taskQueue <- task:
return true
default:
return false
}
}
func (p *BufferedWorkerPool) Stop() {
close(p.taskQueue)
p.wg.Wait()
}
func (p *BufferedWorkerPool) worker(id int) {
defer p.wg.Done()
for task := range p.taskQueue {
task()
}
}
// PriorityTask 优先级任务
type PriorityTask struct {
Priority int
Name string
}
// PriorityWorkerPool 优先级工作池
type PriorityWorkerPool struct {
workerCount int
taskQueue chan PriorityTask
wg sync.WaitGroup
}
func NewPriorityWorkerPool(workerCount int) *PriorityWorkerPool {
return &PriorityWorkerPool{
workerCount: workerCount,
taskQueue: make(chan PriorityTask, 100),
}
}
func (p *PriorityWorkerPool) Start() {
for i := 0; i < p.workerCount; i++ {
p.wg.Add(1)
go p.worker(i + 1)
}
}
func (p *PriorityWorkerPool) Submit(task PriorityTask) {
p.taskQueue <- task
}
func (p *PriorityWorkerPool) Stop() {
close(p.taskQueue)
p.wg.Wait()
}
func (p *PriorityWorkerPool) worker(id int) {
defer p.wg.Done()
for task := range p.taskQueue {
fmt.Printf(" 工作者 %d 执行 %s (优先级: %d)\\n",
id, task.Name, task.Priority)
time.Sleep(100 * time.Millisecond)
}
}
// DynamicWorkerPool 动态工作池
type DynamicWorkerPool struct {
minWorkers int
maxWorkers int
currentWorkers int
taskQueue chan func()
workerQueue chan chan func()
quit chan bool
wg sync.WaitGroup
mu sync.RWMutex
}
func NewDynamicWorkerPool(minWorkers, maxWorkers int) *DynamicWorkerPool {
return &DynamicWorkerPool{
minWorkers: minWorkers,
maxWorkers: maxWorkers,
taskQueue: make(chan func()),
workerQueue: make(chan chan func(), maxWorkers),
quit: make(chan bool),
}
}
func (p *DynamicWorkerPool) Start() {
// 启动最小数量的工作者
for i := 0; i < p.minWorkers; i++ {
p.addWorker()
}
// 启动调度器
go p.dispatcher()
}
func (p *DynamicWorkerPool) Submit(task func()) {
p.taskQueue <- task
}
func (p *DynamicWorkerPool) WorkerCount() int {
p.mu.RLock()
defer p.mu.RUnlock()
return p.currentWorkers
}
func (p *DynamicWorkerPool) Stop() {
close(p.quit)
p.wg.Wait()
}
func (p *DynamicWorkerPool) addWorker() {
p.mu.Lock()
defer p.mu.Unlock()
if p.currentWorkers < p.maxWorkers {
p.currentWorkers++
p.wg.Add(1)
go p.worker(p.currentWorkers)
}
}
func (p *DynamicWorkerPool) worker(id int) {
defer p.wg.Done()
defer func() {
p.mu.Lock()
p.currentWorkers--
p.mu.Unlock()
}()
taskChan := make(chan func())
for {
// 注册工作者
p.workerQueue <- taskChan
select {
case task := <-taskChan:
task()
case <-p.quit:
return
}
}
}
func (p *DynamicWorkerPool) dispatcher() {
for {
select {
case task := <-p.taskQueue:
select {
case workerTaskChan := <-p.workerQueue:
workerTaskChan <- task
default:
// 没有可用工作者,尝试添加新工作者
p.addWorker()
workerTaskChan := <-p.workerQueue
workerTaskChan <- task
}
case <-p.quit:
return
}
}
}
// AdaptiveWorkerPool 自适应工作池
type AdaptiveWorkerPool struct {
workers map[int]*AdaptiveWorker
taskQueue chan func()
workerQueue chan chan func()
quit chan bool
wg sync.WaitGroup
mu sync.RWMutex
nextID int
}
type AdaptiveWorker struct {
id int
taskChan chan func()
quit chan bool
pool *AdaptiveWorkerPool
}
func NewAdaptiveWorkerPool() *AdaptiveWorkerPool {
return &AdaptiveWorkerPool{
workers: make(map[int]*AdaptiveWorker),
taskQueue: make(chan func()),
workerQueue: make(chan chan func(), 100),
quit: make(chan bool),
}
}
func (p *AdaptiveWorkerPool) Start() {
// 启动初始工作者
p.addWorker()
go p.dispatcher()
go p.monitor()
}
func (p *AdaptiveWorkerPool) Submit(task func()) {
p.taskQueue <- task
}
func (p *AdaptiveWorkerPool) WorkerCount() int {
p.mu.RLock()
defer p.mu.RUnlock()
return len(p.workers)
}
func (p *AdaptiveWorkerPool) Stop() {
close(p.quit)
p.wg.Wait()
}
func (p *AdaptiveWorkerPool) addWorker() {
p.mu.Lock()
defer p.mu.Unlock()
p.nextID++
worker := &AdaptiveWorker{
id: p.nextID,
taskChan: make(chan func()),
quit: make(chan bool),
pool: p,
}
p.workers[worker.id] = worker
p.wg.Add(1)
go worker.run()
}
func (p *AdaptiveWorkerPool) removeWorker(id int) {
p.mu.Lock()
defer p.mu.Unlock()
if worker, exists := p.workers[id]; exists {
close(worker.quit)
delete(p.workers, id)
}
}
func (p *AdaptiveWorkerPool) dispatcher() {
for {
select {
case task := <-p.taskQueue:
select {
case workerTaskChan := <-p.workerQueue:
workerTaskChan <- task
default:
// 需要更多工作者
p.addWorker()
workerTaskChan := <-p.workerQueue
workerTaskChan <- task
}
case <-p.quit:
return
}
}
}
func (p *AdaptiveWorkerPool) monitor() {
ticker := time.NewTicker(1 * time.Second)
defer ticker.Stop()
for {
select {
case <-ticker.C:
// 简单的自适应逻辑如果队列为空且工作者多于1个减少工作者
if len(p.taskQueue) == 0 && len(p.workers) > 1 {
p.mu.RLock()
var workerID int
for id := range p.workers {
workerID = id
break
}
p.mu.RUnlock()
p.removeWorker(workerID)
}
case <-p.quit:
return
}
}
}
func (w *AdaptiveWorker) run() {
defer w.pool.wg.Done()
for {
// 注册到工作者队列
w.pool.workerQueue <- w.taskChan
select {
case task := <-w.taskChan:
task()
case <-w.quit:
return
case <-w.pool.quit:
return
}
}
}
// ContextTask 上下文任务
type ContextTask struct {
ID int
Fn func(context.Context) error
}
// ContextWorkerPool 上下文工作池
type ContextWorkerPool struct {
workerCount int
taskQueue chan ContextTask
wg sync.WaitGroup
}
func NewContextWorkerPool(workerCount int) *ContextWorkerPool {
return &ContextWorkerPool{
workerCount: workerCount,
taskQueue: make(chan ContextTask),
}
}
func (p *ContextWorkerPool) Start(ctx context.Context) {
for i := 0; i < p.workerCount; i++ {
p.wg.Add(1)
go p.worker(ctx, i+1)
}
}
func (p *ContextWorkerPool) Submit(task ContextTask) {
p.taskQueue <- task
}
func (p *ContextWorkerPool) worker(ctx context.Context, id int) {
defer p.wg.Done()
for {
select {
case task := <-p.taskQueue:
task.Fn(ctx)
case <-ctx.Done():
return
}
}
}
// ErrorTask 错误任务
type ErrorTask struct {
ID int
ShouldFail bool
Data string
}
// ErrorStats 错误统计
type ErrorStats struct {
Success int
Failed int
Retried int
}
// ErrorHandlingWorkerPool 错误处理工作池
type ErrorHandlingWorkerPool struct {
workerCount int
taskQueue chan ErrorTask
wg sync.WaitGroup
stats ErrorStats
statsMu sync.Mutex
}
func NewErrorHandlingWorkerPool(workerCount int) *ErrorHandlingWorkerPool {
return &ErrorHandlingWorkerPool{
workerCount: workerCount,
taskQueue: make(chan ErrorTask),
}
}
func (p *ErrorHandlingWorkerPool) Start() {
for i := 0; i < p.workerCount; i++ {
p.wg.Add(1)
go p.worker(i + 1)
}
}
func (p *ErrorHandlingWorkerPool) Submit(task ErrorTask) {
p.taskQueue <- task
}
func (p *ErrorHandlingWorkerPool) GetStats() ErrorStats {
p.statsMu.Lock()
defer p.statsMu.Unlock()
return p.stats
}
func (p *ErrorHandlingWorkerPool) Stop() {
close(p.taskQueue)
p.wg.Wait()
}
func (p *ErrorHandlingWorkerPool) worker(id int) {
defer p.wg.Done()
for task := range p.taskQueue {
err := p.processTask(task)
p.statsMu.Lock()
if err != nil {
p.stats.Failed++
fmt.Printf(" 工作者 %d 任务 %d 失败: %v\\n", id, task.ID, err)
} else {
p.stats.Success++
fmt.Printf(" 工作者 %d 任务 %d 成功\\n", id, task.ID)
}
p.statsMu.Unlock()
}
}
func (p *ErrorHandlingWorkerPool) processTask(task ErrorTask) error {
time.Sleep(50 * time.Millisecond)
if task.ShouldFail {
return fmt.Errorf("任务 %d 模拟失败", task.ID)
}
return nil
}
// RetryTask 重试任务
type RetryTask struct {
ID int
MaxRetries int
Data string
attempts int
}
// RetryWorkerPool 重试工作池
type RetryWorkerPool struct {
workerCount int
maxRetries int
taskQueue chan RetryTask
retryQueue chan RetryTask
wg sync.WaitGroup
}
func NewRetryWorkerPool(workerCount, maxRetries int) *RetryWorkerPool {
return &RetryWorkerPool{
workerCount: workerCount,
maxRetries: maxRetries,
taskQueue: make(chan RetryTask),
retryQueue: make(chan RetryTask, 100),
}
}
func (p *RetryWorkerPool) Start() {
for i := 0; i < p.workerCount; i++ {
p.wg.Add(1)
go p.worker(i + 1)
}
// 启动重试处理器
p.wg.Add(1)
go p.retryHandler()
}
func (p *RetryWorkerPool) Submit(task RetryTask) {
p.taskQueue <- task
}
func (p *RetryWorkerPool) Stop() {
close(p.taskQueue)
close(p.retryQueue)
p.wg.Wait()
}
func (p *RetryWorkerPool) worker(id int) {
defer p.wg.Done()
for task := range p.taskQueue {
task.attempts++
// 模拟任务执行30%成功率)
success := rand.Float32() < 0.3
if success {
fmt.Printf(" 工作者 %d 任务 %d 成功 (尝试 %d 次)\\n",
id, task.ID, task.attempts)
} else if task.attempts < task.MaxRetries {
fmt.Printf(" 工作者 %d 任务 %d 失败,准备重试 (尝试 %d/%d)\\n",
id, task.ID, task.attempts, task.MaxRetries)
p.retryQueue <- task
} else {
fmt.Printf(" 工作者 %d 任务 %d 最终失败 (尝试 %d 次)\\n",
id, task.ID, task.attempts)
}
}
}
func (p *RetryWorkerPool) retryHandler() {
defer p.wg.Done()
for task := range p.retryQueue {
// 延迟重试
time.Sleep(100 * time.Millisecond)
p.taskQueue <- task
}
}
// MonitoredTask 监控任务
type MonitoredTask struct {
ID int
Fn func()
}
// WorkerPoolMetrics 工作池指标
type WorkerPoolMetrics struct {
ActiveWorkers int
QueueSize int
TotalTasks int
CompletedTasks int
AvgProcessTime float64
}
// MonitoredWorkerPool 监控工作池
type MonitoredWorkerPool struct {
workerCount int
taskQueue chan MonitoredTask
wg sync.WaitGroup
metrics WorkerPoolMetrics
metricsMu sync.RWMutex
processTimes []time.Duration
activeWorkers int
}
func NewMonitoredWorkerPool(workerCount int) *MonitoredWorkerPool {
return &MonitoredWorkerPool{
workerCount: workerCount,
taskQueue: make(chan MonitoredTask, 100),
processTimes: make([]time.Duration, 0),
}
}
func (p *MonitoredWorkerPool) Start() {
for i := 0; i < p.workerCount; i++ {
p.wg.Add(1)
go p.worker(i + 1)
}
}
func (p *MonitoredWorkerPool) Submit(task MonitoredTask) {
p.metricsMu.Lock()
p.metrics.TotalTasks++
p.metricsMu.Unlock()
p.taskQueue <- task
}
func (p *MonitoredWorkerPool) GetMetrics() WorkerPoolMetrics {
p.metricsMu.RLock()
defer p.metricsMu.RUnlock()
metrics := p.metrics
metrics.QueueSize = len(p.taskQueue)
metrics.ActiveWorkers = p.activeWorkers
// 计算平均处理时间
if len(p.processTimes) > 0 {
var total time.Duration
for _, t := range p.processTimes {
total += t
}
metrics.AvgProcessTime = float64(total.Nanoseconds()) / float64(len(p.processTimes)) / 1e6
}
return metrics
}
func (p *MonitoredWorkerPool) Stop() {
close(p.taskQueue)
p.wg.Wait()
}
func (p *MonitoredWorkerPool) worker(id int) {
defer p.wg.Done()
for task := range p.taskQueue {
p.metricsMu.Lock()
p.activeWorkers++
p.metricsMu.Unlock()
start := time.Now()
task.Fn()
duration := time.Since(start)
p.metricsMu.Lock()
p.activeWorkers--
p.metrics.CompletedTasks++
p.processTimes = append(p.processTimes, duration)
// 保持最近100个处理时间
if len(p.processTimes) > 100 {
p.processTimes = p.processTimes[1:]
}
p.metricsMu.Unlock()
}
}
// HTTPRequest HTTP请求
type HTTPRequest struct {
ID int
Method string
URL string
Body string
}
// WebServerPool Web服务器池
type WebServerPool struct {
workerCount int
requestChan chan HTTPRequest
wg sync.WaitGroup
}
func NewWebServerPool(workerCount int) *WebServerPool {
return &WebServerPool{
workerCount: workerCount,
requestChan: make(chan HTTPRequest, 100),
}
}
func (p *WebServerPool) Start() {
for i := 0; i < p.workerCount; i++ {
p.wg.Add(1)
go p.worker(i + 1)
}
}
func (p *WebServerPool) HandleRequest(req HTTPRequest) {
p.requestChan <- req
}
func (p *WebServerPool) Stop() {
close(p.requestChan)
p.wg.Wait()
}
func (p *WebServerPool) worker(id int) {
defer p.wg.Done()
for req := range p.requestChan {
// 模拟请求处理
processingTime := time.Duration(rand.Intn(100)) * time.Millisecond
time.Sleep(processingTime)
fmt.Printf(" 工作者 %d 处理请求 %d: %s %s (耗时: %v)\\n",
id, req.ID, req.Method, req.URL, processingTime)
}
}
// ImageTask 图片任务
type ImageTask struct {
ID int
Filename string
Operations []string
}
// ImageProcessingPool 图片处理池
type ImageProcessingPool struct {
workerCount int
taskChan chan ImageTask
wg sync.WaitGroup
}
func NewImageProcessingPool(workerCount int) *ImageProcessingPool {
return &ImageProcessingPool{
workerCount: workerCount,
taskChan: make(chan ImageTask, 50),
}
}
func (p *ImageProcessingPool) Start() {
for i := 0; i < p.workerCount; i++ {
p.wg.Add(1)
go p.worker(i + 1)
}
}
func (p *ImageProcessingPool) ProcessImage(task ImageTask) {
p.taskChan <- task
}
func (p *ImageProcessingPool) Stop() {
close(p.taskChan)
p.wg.Wait()
}
func (p *ImageProcessingPool) worker(id int) {
defer p.wg.Done()
for task := range p.taskChan {
fmt.Printf(" 工作者 %d 处理图片 %s:\\n", id, task.Filename)
for _, op := range task.Operations {
// 模拟图片操作
time.Sleep(50 * time.Millisecond)
fmt.Printf(" - 执行操作: %s\\n", op)
}
fmt.Printf(" 图片 %s 处理完成\\n", task.Filename)
}
}
// DataItem 数据项
type DataItem struct {
ID int
Content string
Type string
}
// DataPipeline 数据处理管道
type DataPipeline struct {
readWorkers int
processWorkers int
writeWorkers int
inputChan chan DataItem
processedChan chan DataItem
outputChan chan DataItem
wg sync.WaitGroup
}
func NewDataPipeline(readWorkers, processWorkers, writeWorkers int) *DataPipeline {
return &DataPipeline{
readWorkers: readWorkers,
processWorkers: processWorkers,
writeWorkers: writeWorkers,
inputChan: make(chan DataItem, 20),
processedChan: make(chan DataItem, 20),
outputChan: make(chan DataItem, 20),
}
}
func (p *DataPipeline) Start() {
// 启动读取工作者
for i := 0; i < p.readWorkers; i++ {
p.wg.Add(1)
go p.readWorker(i + 1)
}
// 启动处理工作者
for i := 0; i < p.processWorkers; i++ {
p.wg.Add(1)
go p.processWorker(i + 1)
}
// 启动写入工作者
for i := 0; i < p.writeWorkers; i++ {
p.wg.Add(1)
go p.writeWorker(i + 1)
}
}
func (p *DataPipeline) ProcessData(item DataItem) {
p.inputChan <- item
}
func (p *DataPipeline) Stop() {
close(p.inputChan)
p.wg.Wait()
}
func (p *DataPipeline) readWorker(id int) {
defer p.wg.Done()
for item := range p.inputChan {
// 模拟读取处理
time.Sleep(30 * time.Millisecond)
fmt.Printf(" 读取工作者 %d 读取数据 %d\\n", id, item.ID)
p.processedChan <- item
}
if id == 1 { // 只让第一个工作者关闭下一阶段
close(p.processedChan)
}
}
func (p *DataPipeline) processWorker(id int) {
defer p.wg.Done()
for item := range p.processedChan {
// 模拟数据处理
time.Sleep(50 * time.Millisecond)
item.Content = fmt.Sprintf("处理后的%s", item.Content)
fmt.Printf(" 处理工作者 %d 处理数据 %d\\n", id, item.ID)
p.outputChan <- item
}
if id == 1 { // 只让第一个工作者关闭下一阶段
close(p.outputChan)
}
}
func (p *DataPipeline) writeWorker(id int) {
defer p.wg.Done()
for item := range p.outputChan {
// 模拟写入处理
time.Sleep(20 * time.Millisecond)
fmt.Printf(" 写入工作者 %d 写入数据 %d: %s\\n", id, item.ID, item.Content)
}
}
// BatchItem 批量项
type BatchItem struct {
ID int
Data string
}
// BatchWorkerPool 批量工作池
type BatchWorkerPool struct {
workerCount int
batchSize int
itemChan chan BatchItem
wg sync.WaitGroup
}
func NewBatchWorkerPool(workerCount, batchSize int) *BatchWorkerPool {
return &BatchWorkerPool{
workerCount: workerCount,
batchSize: batchSize,
itemChan: make(chan BatchItem, 100),
}
}
func (p *BatchWorkerPool) Start() {
for i := 0; i < p.workerCount; i++ {
p.wg.Add(1)
go p.worker(i + 1)
}
}
func (p *BatchWorkerPool) Submit(item BatchItem) {
p.itemChan <- item
}
func (p *BatchWorkerPool) Stop() {
close(p.itemChan)
p.wg.Wait()
}
func (p *BatchWorkerPool) worker(id int) {
defer p.wg.Done()
batch := make([]BatchItem, 0, p.batchSize)
for item := range p.itemChan {
batch = append(batch, item)
if len(batch) >= p.batchSize {
p.processBatch(id, batch)
batch = batch[:0] // 重置批次
}
}
// 处理剩余的项目
if len(batch) > 0 {
p.processBatch(id, batch)
}
}
func (p *BatchWorkerPool) processBatch(workerID int, batch []BatchItem) {
fmt.Printf(" 工作者 %d 处理批次 (大小: %d):\\n", workerID, len(batch))
for _, item := range batch {
fmt.Printf(" - 处理项目 %d: %s\\n", item.ID, item.Data)
}
// 模拟批量处理时间
time.Sleep(100 * time.Millisecond)
fmt.Printf(" 工作者 %d 批次处理完成\\n", workerID)
}
/*
运行这个程序:
go run 05-worker-pools.go
学习要点:
1. 工作池模式是控制并发的重要技术
2. 可以根据需求实现不同类型的工作池
3. 工作池提供了更好的资源控制和系统稳定性
4. 支持动态调整、错误处理、监控等高级特性
5. 在实际应用中有广泛的使用场景
工作池的优势:
1. 控制并发数量,避免资源耗尽
2. 重用 goroutine减少创建销毁开销
3. 提供任务队列,平滑处理突发负载
4. 支持优雅关闭和错误处理
5. 便于监控和调优
工作池类型:
1. 固定大小工作池:预先创建固定数量的工作者
2. 缓冲工作池:带有任务队列缓冲
3. 动态工作池:根据负载动态调整工作者数量
4. 优先级工作池:支持任务优先级
5. 上下文工作池:支持取消和超时
高级特性:
1. 错误处理:捕获和处理任务错误
2. 重试机制:自动重试失败的任务
3. 监控指标:实时监控工作池状态
4. 优雅关闭:安全地停止工作池
5. 负载均衡:合理分配任务
实际应用:
1. Web 服务器:处理 HTTP 请求
2. 图片处理:批量处理图片
3. 数据处理ETL 数据管道
4. 批量任务:批量处理业务逻辑
5. 消息队列:处理消息
设计考虑:
1. 工作者数量:根据 CPU 核心数和任务类型
2. 队列大小:平衡内存使用和响应时间
3. 任务粒度:避免任务过大或过小
4. 错误策略:决定如何处理失败任务
5. 监控指标:选择合适的监控维度
性能优化:
1. 合理设置工作者数量
2. 使用缓冲队列减少阻塞
3. 批量处理提高效率
4. 避免频繁创建销毁
5. 监控和调优关键指标
最佳实践:
1. 根据任务特性选择合适的工作池类型
2. 实现优雅关闭机制
3. 添加监控和日志
4. 处理 panic 和错误
5. 进行性能测试和调优
*/