1562 lines
34 KiB
Go
1562 lines
34 KiB
Go
/*
|
||
05-worker-pools.go - Go 语言工作池模式详解
|
||
|
||
学习目标:
|
||
1. 理解工作池模式的概念和优势
|
||
2. 掌握固定大小工作池的实现
|
||
3. 学会动态工作池的设计
|
||
4. 了解工作池的优雅关闭
|
||
5. 掌握工作池的实际应用场景
|
||
|
||
知识点:
|
||
- 工作池模式的基本概念
|
||
- 固定大小工作池
|
||
- 动态工作池
|
||
- 工作池的生命周期管理
|
||
- 错误处理和监控
|
||
- 性能优化技巧
|
||
*/
|
||
|
||
package main
|
||
|
||
import (
|
||
"context"
|
||
"fmt"
|
||
"math/rand"
|
||
"runtime"
|
||
"sync"
|
||
"time"
|
||
)
|
||
|
||
func main() {
|
||
fmt.Println("=== Go 语言工作池模式详解 ===\\n")
|
||
|
||
// 演示基本工作池
|
||
demonstrateBasicWorkerPool()
|
||
|
||
// 演示缓冲工作池
|
||
demonstrateBufferedWorkerPool()
|
||
|
||
// 演示动态工作池
|
||
demonstrateDynamicWorkerPool()
|
||
|
||
// 演示带上下文的工作池
|
||
demonstrateContextWorkerPool()
|
||
|
||
// 演示工作池的错误处理
|
||
demonstrateErrorHandling()
|
||
|
||
// 演示工作池的监控
|
||
demonstrateWorkerPoolMonitoring()
|
||
|
||
// 演示实际应用场景
|
||
demonstratePracticalApplications()
|
||
}
|
||
|
||
// demonstrateBasicWorkerPool 演示基本工作池
|
||
func demonstrateBasicWorkerPool() {
|
||
fmt.Println("1. 基本工作池:")
|
||
|
||
// 工作池的基本概念
|
||
fmt.Printf(" 工作池的基本概念:\\n")
|
||
fmt.Printf(" - 预先创建固定数量的工作 goroutine\\n")
|
||
fmt.Printf(" - 通过 channel 分发任务给工作者\\n")
|
||
fmt.Printf(" - 控制并发数量,避免创建过多 goroutine\\n")
|
||
fmt.Printf(" - 提高资源利用率和系统稳定性\\n")
|
||
fmt.Printf(" - 适用于任务数量大且不确定的场景\\n")
|
||
|
||
// 基本工作池示例
|
||
fmt.Printf(" 基本工作池示例:\\n")
|
||
|
||
const numWorkers = 3
|
||
const numJobs = 10
|
||
|
||
jobs := make(chan Job, numJobs)
|
||
results := make(chan Result, numJobs)
|
||
|
||
// 启动工作者
|
||
for w := 1; w <= numWorkers; w++ {
|
||
go worker(w, jobs, results)
|
||
}
|
||
|
||
// 发送任务
|
||
for j := 1; j <= numJobs; j++ {
|
||
jobs <- Job{
|
||
ID: j,
|
||
Data: fmt.Sprintf("任务数据-%d", j),
|
||
}
|
||
}
|
||
close(jobs)
|
||
|
||
// 收集结果
|
||
for r := 1; r <= numJobs; r++ {
|
||
result := <-results
|
||
fmt.Printf(" %s\\n", result.Message)
|
||
}
|
||
|
||
// 简化的工作池实现
|
||
fmt.Printf(" 简化的工作池实现:\\n")
|
||
|
||
simplePool := NewSimpleWorkerPool(4)
|
||
simplePool.Start()
|
||
|
||
// 提交任务
|
||
for i := 1; i <= 8; i++ {
|
||
taskID := i
|
||
simplePool.Submit(func() {
|
||
fmt.Printf(" 执行简单任务 %d\\n", taskID)
|
||
time.Sleep(100 * time.Millisecond)
|
||
})
|
||
}
|
||
|
||
simplePool.Stop()
|
||
|
||
fmt.Println()
|
||
}
|
||
|
||
// demonstrateBufferedWorkerPool 演示缓冲工作池
|
||
func demonstrateBufferedWorkerPool() {
|
||
fmt.Println("2. 缓冲工作池:")
|
||
|
||
// 缓冲工作池的优势
|
||
fmt.Printf(" 缓冲工作池的优势:\\n")
|
||
fmt.Printf(" - 任务队列缓冲,减少阻塞\\n")
|
||
fmt.Printf(" - 平滑处理突发任务\\n")
|
||
fmt.Printf(" - 提高系统吞吐量\\n")
|
||
fmt.Printf(" - 更好的负载均衡\\n")
|
||
|
||
// 缓冲工作池示例
|
||
fmt.Printf(" 缓冲工作池示例:\\n")
|
||
|
||
bufferedPool := NewBufferedWorkerPool(3, 10) // 3个工作者,10个缓冲
|
||
bufferedPool.Start()
|
||
|
||
// 快速提交大量任务
|
||
for i := 1; i <= 15; i++ {
|
||
taskID := i
|
||
success := bufferedPool.TrySubmit(func() {
|
||
fmt.Printf(" 缓冲任务 %d 执行\\n", taskID)
|
||
time.Sleep(50 * time.Millisecond)
|
||
})
|
||
|
||
if !success {
|
||
fmt.Printf(" 任务 %d 提交失败,队列已满\\n", taskID)
|
||
}
|
||
}
|
||
|
||
time.Sleep(1 * time.Second) // 等待任务完成
|
||
bufferedPool.Stop()
|
||
|
||
// 优先级工作池
|
||
fmt.Printf(" 优先级工作池:\\n")
|
||
|
||
priorityPool := NewPriorityWorkerPool(2)
|
||
priorityPool.Start()
|
||
|
||
// 提交不同优先级的任务
|
||
tasks := []PriorityTask{
|
||
{Priority: 1, Name: "低优先级任务1"},
|
||
{Priority: 3, Name: "高优先级任务1"},
|
||
{Priority: 2, Name: "中优先级任务1"},
|
||
{Priority: 3, Name: "高优先级任务2"},
|
||
{Priority: 1, Name: "低优先级任务2"},
|
||
}
|
||
|
||
for _, task := range tasks {
|
||
priorityPool.Submit(task)
|
||
}
|
||
|
||
time.Sleep(500 * time.Millisecond)
|
||
priorityPool.Stop()
|
||
|
||
fmt.Println()
|
||
}
|
||
|
||
// demonstrateDynamicWorkerPool 演示动态工作池
|
||
func demonstrateDynamicWorkerPool() {
|
||
fmt.Println("3. 动态工作池:")
|
||
|
||
// 动态工作池的特点
|
||
fmt.Printf(" 动态工作池的特点:\\n")
|
||
fmt.Printf(" - 根据负载动态调整工作者数量\\n")
|
||
fmt.Printf(" - 自动扩容和缩容\\n")
|
||
fmt.Printf(" - 更好的资源利用率\\n")
|
||
fmt.Printf(" - 适应不同的工作负载\\n")
|
||
|
||
// 动态工作池示例
|
||
fmt.Printf(" 动态工作池示例:\\n")
|
||
|
||
dynamicPool := NewDynamicWorkerPool(2, 8) // 最小2个,最大8个工作者
|
||
dynamicPool.Start()
|
||
|
||
// 模拟不同的负载模式
|
||
fmt.Printf(" 阶段1: 轻负载\\n")
|
||
for i := 1; i <= 3; i++ {
|
||
taskID := i
|
||
dynamicPool.Submit(func() {
|
||
fmt.Printf(" 轻负载任务 %d\\n", taskID)
|
||
time.Sleep(200 * time.Millisecond)
|
||
})
|
||
}
|
||
|
||
time.Sleep(300 * time.Millisecond)
|
||
fmt.Printf(" 当前工作者数量: %d\\n", dynamicPool.WorkerCount())
|
||
|
||
fmt.Printf(" 阶段2: 重负载\\n")
|
||
for i := 1; i <= 12; i++ {
|
||
taskID := i
|
||
dynamicPool.Submit(func() {
|
||
fmt.Printf(" 重负载任务 %d\\n", taskID)
|
||
time.Sleep(100 * time.Millisecond)
|
||
})
|
||
}
|
||
|
||
time.Sleep(200 * time.Millisecond)
|
||
fmt.Printf(" 当前工作者数量: %d\\n", dynamicPool.WorkerCount())
|
||
|
||
fmt.Printf(" 阶段3: 负载降低\\n")
|
||
time.Sleep(1 * time.Second)
|
||
fmt.Printf(" 当前工作者数量: %d\\n", dynamicPool.WorkerCount())
|
||
|
||
dynamicPool.Stop()
|
||
|
||
// 自适应工作池
|
||
fmt.Printf(" 自适应工作池:\\n")
|
||
|
||
adaptivePool := NewAdaptiveWorkerPool()
|
||
adaptivePool.Start()
|
||
|
||
// 提交任务并观察自适应行为
|
||
for i := 1; i <= 20; i++ {
|
||
taskID := i
|
||
adaptivePool.Submit(func() {
|
||
// 模拟不同执行时间的任务
|
||
duration := time.Duration(rand.Intn(300)) * time.Millisecond
|
||
time.Sleep(duration)
|
||
fmt.Printf(" 自适应任务 %d 完成 (耗时: %v)\\n", taskID, duration)
|
||
})
|
||
|
||
if i%5 == 0 {
|
||
fmt.Printf(" 提交 %d 个任务后,工作者数量: %d\\n", i, adaptivePool.WorkerCount())
|
||
}
|
||
}
|
||
|
||
time.Sleep(2 * time.Second)
|
||
adaptivePool.Stop()
|
||
|
||
fmt.Println()
|
||
}
|
||
|
||
// demonstrateContextWorkerPool 演示带上下文的工作池
|
||
func demonstrateContextWorkerPool() {
|
||
fmt.Println("4. 带上下文的工作池:")
|
||
|
||
// 上下文工作池的优势
|
||
fmt.Printf(" 上下文工作池的优势:\\n")
|
||
fmt.Printf(" - 支持取消和超时控制\\n")
|
||
fmt.Printf(" - 优雅的关闭机制\\n")
|
||
fmt.Printf(" - 更好的错误处理\\n")
|
||
fmt.Printf(" - 支持请求追踪\\n")
|
||
|
||
// 带取消的工作池
|
||
fmt.Printf(" 带取消的工作池:\\n")
|
||
|
||
ctx, cancel := context.WithCancel(context.Background())
|
||
contextPool := NewContextWorkerPool(3)
|
||
contextPool.Start(ctx)
|
||
|
||
// 提交任务
|
||
for i := 1; i <= 10; i++ {
|
||
taskID := i
|
||
contextPool.Submit(ContextTask{
|
||
ID: taskID,
|
||
Fn: func(ctx context.Context) error {
|
||
select {
|
||
case <-ctx.Done():
|
||
fmt.Printf(" 任务 %d 被取消\\n", taskID)
|
||
return ctx.Err()
|
||
case <-time.After(200 * time.Millisecond):
|
||
fmt.Printf(" 任务 %d 完成\\n", taskID)
|
||
return nil
|
||
}
|
||
},
|
||
})
|
||
}
|
||
|
||
// 3秒后取消所有任务
|
||
time.Sleep(300 * time.Millisecond)
|
||
fmt.Printf(" 取消所有任务\\n")
|
||
cancel()
|
||
|
||
time.Sleep(100 * time.Millisecond)
|
||
|
||
// 带超时的工作池
|
||
fmt.Printf(" 带超时的工作池:\\n")
|
||
|
||
timeoutCtx, timeoutCancel := context.WithTimeout(context.Background(), 500*time.Millisecond)
|
||
defer timeoutCancel()
|
||
|
||
timeoutPool := NewContextWorkerPool(2)
|
||
timeoutPool.Start(timeoutCtx)
|
||
|
||
// 提交一些长时间运行的任务
|
||
for i := 1; i <= 5; i++ {
|
||
taskID := i
|
||
timeoutPool.Submit(ContextTask{
|
||
ID: taskID,
|
||
Fn: func(ctx context.Context) error {
|
||
select {
|
||
case <-ctx.Done():
|
||
fmt.Printf(" 超时任务 %d 被取消: %v\\n", taskID, ctx.Err())
|
||
return ctx.Err()
|
||
case <-time.After(300 * time.Millisecond):
|
||
fmt.Printf(" 超时任务 %d 完成\\n", taskID)
|
||
return nil
|
||
}
|
||
},
|
||
})
|
||
}
|
||
|
||
time.Sleep(600 * time.Millisecond)
|
||
|
||
fmt.Println()
|
||
}
|
||
|
||
// demonstrateErrorHandling 演示工作池的错误处理
|
||
func demonstrateErrorHandling() {
|
||
fmt.Println("5. 工作池的错误处理:")
|
||
|
||
// 错误处理的重要性
|
||
fmt.Printf(" 错误处理的重要性:\\n")
|
||
fmt.Printf(" - 防止单个任务错误影响整个系统\\n")
|
||
fmt.Printf(" - 提供错误恢复机制\\n")
|
||
fmt.Printf(" - 记录和监控错误\\n")
|
||
fmt.Printf(" - 支持重试机制\\n")
|
||
|
||
// 带错误处理的工作池
|
||
fmt.Printf(" 带错误处理的工作池:\\n")
|
||
|
||
errorPool := NewErrorHandlingWorkerPool(3)
|
||
errorPool.Start()
|
||
|
||
// 提交一些可能失败的任务
|
||
tasks := []ErrorTask{
|
||
{ID: 1, ShouldFail: false, Data: "正常任务1"},
|
||
{ID: 2, ShouldFail: true, Data: "错误任务1"},
|
||
{ID: 3, ShouldFail: false, Data: "正常任务2"},
|
||
{ID: 4, ShouldFail: true, Data: "错误任务2"},
|
||
{ID: 5, ShouldFail: false, Data: "正常任务3"},
|
||
}
|
||
|
||
for _, task := range tasks {
|
||
errorPool.Submit(task)
|
||
}
|
||
|
||
time.Sleep(500 * time.Millisecond)
|
||
|
||
// 获取错误统计
|
||
stats := errorPool.GetStats()
|
||
fmt.Printf(" 处理统计: 成功=%d, 失败=%d, 重试=%d\\n",
|
||
stats.Success, stats.Failed, stats.Retried)
|
||
|
||
errorPool.Stop()
|
||
|
||
// 带重试机制的工作池
|
||
fmt.Printf(" 带重试机制的工作池:\\n")
|
||
|
||
retryPool := NewRetryWorkerPool(2, 3) // 2个工作者,最多重试3次
|
||
retryPool.Start()
|
||
|
||
// 提交需要重试的任务
|
||
retryTasks := []RetryTask{
|
||
{ID: 1, MaxRetries: 2, Data: "重试任务1"},
|
||
{ID: 2, MaxRetries: 1, Data: "重试任务2"},
|
||
{ID: 3, MaxRetries: 3, Data: "重试任务3"},
|
||
}
|
||
|
||
for _, task := range retryTasks {
|
||
retryPool.Submit(task)
|
||
}
|
||
|
||
time.Sleep(1 * time.Second)
|
||
retryPool.Stop()
|
||
|
||
fmt.Println()
|
||
}
|
||
|
||
// demonstrateWorkerPoolMonitoring 演示工作池的监控
|
||
func demonstrateWorkerPoolMonitoring() {
|
||
fmt.Println("6. 工作池的监控:")
|
||
|
||
// 监控的重要性
|
||
fmt.Printf(" 监控的重要性:\\n")
|
||
fmt.Printf(" - 实时了解工作池状态\\n")
|
||
fmt.Printf(" - 性能调优的依据\\n")
|
||
fmt.Printf(" - 及时发现问题\\n")
|
||
fmt.Printf(" - 容量规划参考\\n")
|
||
|
||
// 带监控的工作池
|
||
fmt.Printf(" 带监控的工作池:\\n")
|
||
|
||
monitoredPool := NewMonitoredWorkerPool(4)
|
||
monitoredPool.Start()
|
||
|
||
// 启动监控
|
||
go func() {
|
||
ticker := time.NewTicker(200 * time.Millisecond)
|
||
defer ticker.Stop()
|
||
|
||
for i := 0; i < 5; i++ {
|
||
<-ticker.C
|
||
metrics := monitoredPool.GetMetrics()
|
||
fmt.Printf(" 监控: 活跃=%d, 队列=%d, 完成=%d, 平均耗时=%.2fms\\n",
|
||
metrics.ActiveWorkers, metrics.QueueSize,
|
||
metrics.CompletedTasks, metrics.AvgProcessTime)
|
||
}
|
||
}()
|
||
|
||
// 提交任务
|
||
for i := 1; i <= 20; i++ {
|
||
taskID := i
|
||
monitoredPool.Submit(MonitoredTask{
|
||
ID: taskID,
|
||
Fn: func() {
|
||
// 模拟不同执行时间
|
||
duration := time.Duration(rand.Intn(300)) * time.Millisecond
|
||
time.Sleep(duration)
|
||
},
|
||
})
|
||
}
|
||
|
||
time.Sleep(1200 * time.Millisecond)
|
||
|
||
// 最终统计
|
||
finalMetrics := monitoredPool.GetMetrics()
|
||
fmt.Printf(" 最终统计: 总任务=%d, 完成=%d, 平均耗时=%.2fms\\n",
|
||
finalMetrics.TotalTasks, finalMetrics.CompletedTasks,
|
||
finalMetrics.AvgProcessTime)
|
||
|
||
monitoredPool.Stop()
|
||
|
||
fmt.Println()
|
||
}
|
||
|
||
// demonstratePracticalApplications 演示实际应用场景
|
||
func demonstratePracticalApplications() {
|
||
fmt.Println("7. 实际应用场景:")
|
||
|
||
// Web 服务器请求处理
|
||
fmt.Printf(" Web 服务器请求处理:\\n")
|
||
|
||
webPool := NewWebServerPool(runtime.NumCPU())
|
||
webPool.Start()
|
||
|
||
// 模拟HTTP请求
|
||
requests := []HTTPRequest{
|
||
{ID: 1, Method: "GET", URL: "/api/users", Body: ""},
|
||
{ID: 2, Method: "POST", URL: "/api/users", Body: `{"name":"Alice"}`},
|
||
{ID: 3, Method: "GET", URL: "/api/orders", Body: ""},
|
||
{ID: 4, Method: "PUT", URL: "/api/users/1", Body: `{"name":"Bob"}`},
|
||
{ID: 5, Method: "DELETE", URL: "/api/users/2", Body: ""},
|
||
}
|
||
|
||
for _, req := range requests {
|
||
webPool.HandleRequest(req)
|
||
}
|
||
|
||
time.Sleep(300 * time.Millisecond)
|
||
webPool.Stop()
|
||
|
||
// 图片处理服务
|
||
fmt.Printf(" 图片处理服务:\\n")
|
||
|
||
imagePool := NewImageProcessingPool(3)
|
||
imagePool.Start()
|
||
|
||
// 模拟图片处理任务
|
||
images := []ImageTask{
|
||
{ID: 1, Filename: "photo1.jpg", Operations: []string{"resize", "compress"}},
|
||
{ID: 2, Filename: "photo2.png", Operations: []string{"crop", "filter"}},
|
||
{ID: 3, Filename: "photo3.gif", Operations: []string{"resize", "watermark"}},
|
||
{ID: 4, Filename: "photo4.jpg", Operations: []string{"rotate", "compress"}},
|
||
}
|
||
|
||
for _, img := range images {
|
||
imagePool.ProcessImage(img)
|
||
}
|
||
|
||
time.Sleep(500 * time.Millisecond)
|
||
imagePool.Stop()
|
||
|
||
// 数据处理管道
|
||
fmt.Printf(" 数据处理管道:\\n")
|
||
|
||
pipeline := NewDataPipeline(2, 2, 2) // 读取、处理、写入各2个工作者
|
||
pipeline.Start()
|
||
|
||
// 模拟数据处理
|
||
dataItems := []DataItem{
|
||
{ID: 1, Content: "数据1", Type: "json"},
|
||
{ID: 2, Content: "数据2", Type: "xml"},
|
||
{ID: 3, Content: "数据3", Type: "csv"},
|
||
{ID: 4, Content: "数据4", Type: "json"},
|
||
{ID: 5, Content: "数据5", Type: "xml"},
|
||
}
|
||
|
||
for _, item := range dataItems {
|
||
pipeline.ProcessData(item)
|
||
}
|
||
|
||
time.Sleep(800 * time.Millisecond)
|
||
pipeline.Stop()
|
||
|
||
// 批量任务处理
|
||
fmt.Printf(" 批量任务处理:\\n")
|
||
|
||
batchPool := NewBatchWorkerPool(3, 5) // 3个工作者,批量大小5
|
||
batchPool.Start()
|
||
|
||
// 提交大量小任务
|
||
for i := 1; i <= 23; i++ {
|
||
batchPool.Submit(BatchItem{
|
||
ID: i,
|
||
Data: fmt.Sprintf("批量数据-%d", i),
|
||
})
|
||
}
|
||
|
||
time.Sleep(600 * time.Millisecond)
|
||
batchPool.Stop()
|
||
|
||
fmt.Println()
|
||
}
|
||
|
||
// ========== 类型定义和辅助函数 ==========
|
||
|
||
// Job 基本任务
|
||
type Job struct {
|
||
ID int
|
||
Data string
|
||
}
|
||
|
||
// Result 任务结果
|
||
type Result struct {
|
||
JobID int
|
||
Message string
|
||
}
|
||
|
||
// worker 基本工作者函数
|
||
func worker(id int, jobs <-chan Job, results chan<- Result) {
|
||
for job := range jobs {
|
||
fmt.Printf(" 工作者 %d 开始处理任务 %d\\n", id, job.ID)
|
||
time.Sleep(100 * time.Millisecond) // 模拟工作
|
||
results <- Result{
|
||
JobID: job.ID,
|
||
Message: fmt.Sprintf("工作者 %d 完成任务 %d: %s", id, job.ID, job.Data),
|
||
}
|
||
}
|
||
}
|
||
|
||
// SimpleWorkerPool 简单工作池
|
||
type SimpleWorkerPool struct {
|
||
workerCount int
|
||
taskQueue chan func()
|
||
quit chan bool
|
||
wg sync.WaitGroup
|
||
}
|
||
|
||
func NewSimpleWorkerPool(workerCount int) *SimpleWorkerPool {
|
||
return &SimpleWorkerPool{
|
||
workerCount: workerCount,
|
||
taskQueue: make(chan func()),
|
||
quit: make(chan bool),
|
||
}
|
||
}
|
||
|
||
func (p *SimpleWorkerPool) Start() {
|
||
for i := 0; i < p.workerCount; i++ {
|
||
p.wg.Add(1)
|
||
go p.worker(i + 1)
|
||
}
|
||
}
|
||
|
||
func (p *SimpleWorkerPool) Submit(task func()) {
|
||
p.taskQueue <- task
|
||
}
|
||
|
||
func (p *SimpleWorkerPool) Stop() {
|
||
close(p.taskQueue)
|
||
p.wg.Wait()
|
||
}
|
||
|
||
func (p *SimpleWorkerPool) worker(id int) {
|
||
defer p.wg.Done()
|
||
for task := range p.taskQueue {
|
||
task()
|
||
}
|
||
}
|
||
|
||
// BufferedWorkerPool 缓冲工作池
|
||
type BufferedWorkerPool struct {
|
||
workerCount int
|
||
taskQueue chan func()
|
||
quit chan bool
|
||
wg sync.WaitGroup
|
||
}
|
||
|
||
func NewBufferedWorkerPool(workerCount, bufferSize int) *BufferedWorkerPool {
|
||
return &BufferedWorkerPool{
|
||
workerCount: workerCount,
|
||
taskQueue: make(chan func(), bufferSize),
|
||
quit: make(chan bool),
|
||
}
|
||
}
|
||
|
||
func (p *BufferedWorkerPool) Start() {
|
||
for i := 0; i < p.workerCount; i++ {
|
||
p.wg.Add(1)
|
||
go p.worker(i + 1)
|
||
}
|
||
}
|
||
|
||
func (p *BufferedWorkerPool) TrySubmit(task func()) bool {
|
||
select {
|
||
case p.taskQueue <- task:
|
||
return true
|
||
default:
|
||
return false
|
||
}
|
||
}
|
||
|
||
func (p *BufferedWorkerPool) Stop() {
|
||
close(p.taskQueue)
|
||
p.wg.Wait()
|
||
}
|
||
|
||
func (p *BufferedWorkerPool) worker(id int) {
|
||
defer p.wg.Done()
|
||
for task := range p.taskQueue {
|
||
task()
|
||
}
|
||
}
|
||
|
||
// PriorityTask 优先级任务
|
||
type PriorityTask struct {
|
||
Priority int
|
||
Name string
|
||
}
|
||
|
||
// PriorityWorkerPool 优先级工作池
|
||
type PriorityWorkerPool struct {
|
||
workerCount int
|
||
taskQueue chan PriorityTask
|
||
wg sync.WaitGroup
|
||
}
|
||
|
||
func NewPriorityWorkerPool(workerCount int) *PriorityWorkerPool {
|
||
return &PriorityWorkerPool{
|
||
workerCount: workerCount,
|
||
taskQueue: make(chan PriorityTask, 100),
|
||
}
|
||
}
|
||
|
||
func (p *PriorityWorkerPool) Start() {
|
||
for i := 0; i < p.workerCount; i++ {
|
||
p.wg.Add(1)
|
||
go p.worker(i + 1)
|
||
}
|
||
}
|
||
|
||
func (p *PriorityWorkerPool) Submit(task PriorityTask) {
|
||
p.taskQueue <- task
|
||
}
|
||
|
||
func (p *PriorityWorkerPool) Stop() {
|
||
close(p.taskQueue)
|
||
p.wg.Wait()
|
||
}
|
||
|
||
func (p *PriorityWorkerPool) worker(id int) {
|
||
defer p.wg.Done()
|
||
for task := range p.taskQueue {
|
||
fmt.Printf(" 工作者 %d 执行 %s (优先级: %d)\\n",
|
||
id, task.Name, task.Priority)
|
||
time.Sleep(100 * time.Millisecond)
|
||
}
|
||
}
|
||
|
||
// DynamicWorkerPool 动态工作池
|
||
type DynamicWorkerPool struct {
|
||
minWorkers int
|
||
maxWorkers int
|
||
currentWorkers int
|
||
taskQueue chan func()
|
||
workerQueue chan chan func()
|
||
quit chan bool
|
||
wg sync.WaitGroup
|
||
mu sync.RWMutex
|
||
}
|
||
|
||
func NewDynamicWorkerPool(minWorkers, maxWorkers int) *DynamicWorkerPool {
|
||
return &DynamicWorkerPool{
|
||
minWorkers: minWorkers,
|
||
maxWorkers: maxWorkers,
|
||
taskQueue: make(chan func()),
|
||
workerQueue: make(chan chan func(), maxWorkers),
|
||
quit: make(chan bool),
|
||
}
|
||
}
|
||
|
||
func (p *DynamicWorkerPool) Start() {
|
||
// 启动最小数量的工作者
|
||
for i := 0; i < p.minWorkers; i++ {
|
||
p.addWorker()
|
||
}
|
||
|
||
// 启动调度器
|
||
go p.dispatcher()
|
||
}
|
||
|
||
func (p *DynamicWorkerPool) Submit(task func()) {
|
||
p.taskQueue <- task
|
||
}
|
||
|
||
func (p *DynamicWorkerPool) WorkerCount() int {
|
||
p.mu.RLock()
|
||
defer p.mu.RUnlock()
|
||
return p.currentWorkers
|
||
}
|
||
|
||
func (p *DynamicWorkerPool) Stop() {
|
||
close(p.quit)
|
||
p.wg.Wait()
|
||
}
|
||
|
||
func (p *DynamicWorkerPool) addWorker() {
|
||
p.mu.Lock()
|
||
defer p.mu.Unlock()
|
||
|
||
if p.currentWorkers < p.maxWorkers {
|
||
p.currentWorkers++
|
||
p.wg.Add(1)
|
||
go p.worker(p.currentWorkers)
|
||
}
|
||
}
|
||
|
||
func (p *DynamicWorkerPool) worker(id int) {
|
||
defer p.wg.Done()
|
||
defer func() {
|
||
p.mu.Lock()
|
||
p.currentWorkers--
|
||
p.mu.Unlock()
|
||
}()
|
||
|
||
taskChan := make(chan func())
|
||
|
||
for {
|
||
// 注册工作者
|
||
p.workerQueue <- taskChan
|
||
|
||
select {
|
||
case task := <-taskChan:
|
||
task()
|
||
case <-p.quit:
|
||
return
|
||
}
|
||
}
|
||
}
|
||
|
||
func (p *DynamicWorkerPool) dispatcher() {
|
||
for {
|
||
select {
|
||
case task := <-p.taskQueue:
|
||
select {
|
||
case workerTaskChan := <-p.workerQueue:
|
||
workerTaskChan <- task
|
||
default:
|
||
// 没有可用工作者,尝试添加新工作者
|
||
p.addWorker()
|
||
workerTaskChan := <-p.workerQueue
|
||
workerTaskChan <- task
|
||
}
|
||
case <-p.quit:
|
||
return
|
||
}
|
||
}
|
||
}
|
||
|
||
// AdaptiveWorkerPool 自适应工作池
|
||
type AdaptiveWorkerPool struct {
|
||
workers map[int]*AdaptiveWorker
|
||
taskQueue chan func()
|
||
workerQueue chan chan func()
|
||
quit chan bool
|
||
wg sync.WaitGroup
|
||
mu sync.RWMutex
|
||
nextID int
|
||
}
|
||
|
||
type AdaptiveWorker struct {
|
||
id int
|
||
taskChan chan func()
|
||
quit chan bool
|
||
pool *AdaptiveWorkerPool
|
||
}
|
||
|
||
func NewAdaptiveWorkerPool() *AdaptiveWorkerPool {
|
||
return &AdaptiveWorkerPool{
|
||
workers: make(map[int]*AdaptiveWorker),
|
||
taskQueue: make(chan func()),
|
||
workerQueue: make(chan chan func(), 100),
|
||
quit: make(chan bool),
|
||
}
|
||
}
|
||
|
||
func (p *AdaptiveWorkerPool) Start() {
|
||
// 启动初始工作者
|
||
p.addWorker()
|
||
go p.dispatcher()
|
||
go p.monitor()
|
||
}
|
||
|
||
func (p *AdaptiveWorkerPool) Submit(task func()) {
|
||
p.taskQueue <- task
|
||
}
|
||
|
||
func (p *AdaptiveWorkerPool) WorkerCount() int {
|
||
p.mu.RLock()
|
||
defer p.mu.RUnlock()
|
||
return len(p.workers)
|
||
}
|
||
|
||
func (p *AdaptiveWorkerPool) Stop() {
|
||
close(p.quit)
|
||
p.wg.Wait()
|
||
}
|
||
|
||
func (p *AdaptiveWorkerPool) addWorker() {
|
||
p.mu.Lock()
|
||
defer p.mu.Unlock()
|
||
|
||
p.nextID++
|
||
worker := &AdaptiveWorker{
|
||
id: p.nextID,
|
||
taskChan: make(chan func()),
|
||
quit: make(chan bool),
|
||
pool: p,
|
||
}
|
||
|
||
p.workers[worker.id] = worker
|
||
p.wg.Add(1)
|
||
go worker.run()
|
||
}
|
||
|
||
func (p *AdaptiveWorkerPool) removeWorker(id int) {
|
||
p.mu.Lock()
|
||
defer p.mu.Unlock()
|
||
|
||
if worker, exists := p.workers[id]; exists {
|
||
close(worker.quit)
|
||
delete(p.workers, id)
|
||
}
|
||
}
|
||
|
||
func (p *AdaptiveWorkerPool) dispatcher() {
|
||
for {
|
||
select {
|
||
case task := <-p.taskQueue:
|
||
select {
|
||
case workerTaskChan := <-p.workerQueue:
|
||
workerTaskChan <- task
|
||
default:
|
||
// 需要更多工作者
|
||
p.addWorker()
|
||
workerTaskChan := <-p.workerQueue
|
||
workerTaskChan <- task
|
||
}
|
||
case <-p.quit:
|
||
return
|
||
}
|
||
}
|
||
}
|
||
|
||
func (p *AdaptiveWorkerPool) monitor() {
|
||
ticker := time.NewTicker(1 * time.Second)
|
||
defer ticker.Stop()
|
||
|
||
for {
|
||
select {
|
||
case <-ticker.C:
|
||
// 简单的自适应逻辑:如果队列为空且工作者多于1个,减少工作者
|
||
if len(p.taskQueue) == 0 && len(p.workers) > 1 {
|
||
p.mu.RLock()
|
||
var workerID int
|
||
for id := range p.workers {
|
||
workerID = id
|
||
break
|
||
}
|
||
p.mu.RUnlock()
|
||
p.removeWorker(workerID)
|
||
}
|
||
case <-p.quit:
|
||
return
|
||
}
|
||
}
|
||
}
|
||
|
||
func (w *AdaptiveWorker) run() {
|
||
defer w.pool.wg.Done()
|
||
|
||
for {
|
||
// 注册到工作者队列
|
||
w.pool.workerQueue <- w.taskChan
|
||
|
||
select {
|
||
case task := <-w.taskChan:
|
||
task()
|
||
case <-w.quit:
|
||
return
|
||
case <-w.pool.quit:
|
||
return
|
||
}
|
||
}
|
||
}
|
||
|
||
// ContextTask 上下文任务
|
||
type ContextTask struct {
|
||
ID int
|
||
Fn func(context.Context) error
|
||
}
|
||
|
||
// ContextWorkerPool 上下文工作池
|
||
type ContextWorkerPool struct {
|
||
workerCount int
|
||
taskQueue chan ContextTask
|
||
wg sync.WaitGroup
|
||
}
|
||
|
||
func NewContextWorkerPool(workerCount int) *ContextWorkerPool {
|
||
return &ContextWorkerPool{
|
||
workerCount: workerCount,
|
||
taskQueue: make(chan ContextTask),
|
||
}
|
||
}
|
||
|
||
func (p *ContextWorkerPool) Start(ctx context.Context) {
|
||
for i := 0; i < p.workerCount; i++ {
|
||
p.wg.Add(1)
|
||
go p.worker(ctx, i+1)
|
||
}
|
||
}
|
||
|
||
func (p *ContextWorkerPool) Submit(task ContextTask) {
|
||
p.taskQueue <- task
|
||
}
|
||
|
||
func (p *ContextWorkerPool) worker(ctx context.Context, id int) {
|
||
defer p.wg.Done()
|
||
|
||
for {
|
||
select {
|
||
case task := <-p.taskQueue:
|
||
task.Fn(ctx)
|
||
case <-ctx.Done():
|
||
return
|
||
}
|
||
}
|
||
}
|
||
|
||
// ErrorTask 错误任务
|
||
type ErrorTask struct {
|
||
ID int
|
||
ShouldFail bool
|
||
Data string
|
||
}
|
||
|
||
// ErrorStats 错误统计
|
||
type ErrorStats struct {
|
||
Success int
|
||
Failed int
|
||
Retried int
|
||
}
|
||
|
||
// ErrorHandlingWorkerPool 错误处理工作池
|
||
type ErrorHandlingWorkerPool struct {
|
||
workerCount int
|
||
taskQueue chan ErrorTask
|
||
wg sync.WaitGroup
|
||
stats ErrorStats
|
||
statsMu sync.Mutex
|
||
}
|
||
|
||
func NewErrorHandlingWorkerPool(workerCount int) *ErrorHandlingWorkerPool {
|
||
return &ErrorHandlingWorkerPool{
|
||
workerCount: workerCount,
|
||
taskQueue: make(chan ErrorTask),
|
||
}
|
||
}
|
||
|
||
func (p *ErrorHandlingWorkerPool) Start() {
|
||
for i := 0; i < p.workerCount; i++ {
|
||
p.wg.Add(1)
|
||
go p.worker(i + 1)
|
||
}
|
||
}
|
||
|
||
func (p *ErrorHandlingWorkerPool) Submit(task ErrorTask) {
|
||
p.taskQueue <- task
|
||
}
|
||
|
||
func (p *ErrorHandlingWorkerPool) GetStats() ErrorStats {
|
||
p.statsMu.Lock()
|
||
defer p.statsMu.Unlock()
|
||
return p.stats
|
||
}
|
||
|
||
func (p *ErrorHandlingWorkerPool) Stop() {
|
||
close(p.taskQueue)
|
||
p.wg.Wait()
|
||
}
|
||
|
||
func (p *ErrorHandlingWorkerPool) worker(id int) {
|
||
defer p.wg.Done()
|
||
|
||
for task := range p.taskQueue {
|
||
err := p.processTask(task)
|
||
|
||
p.statsMu.Lock()
|
||
if err != nil {
|
||
p.stats.Failed++
|
||
fmt.Printf(" 工作者 %d 任务 %d 失败: %v\\n", id, task.ID, err)
|
||
} else {
|
||
p.stats.Success++
|
||
fmt.Printf(" 工作者 %d 任务 %d 成功\\n", id, task.ID)
|
||
}
|
||
p.statsMu.Unlock()
|
||
}
|
||
}
|
||
|
||
func (p *ErrorHandlingWorkerPool) processTask(task ErrorTask) error {
|
||
time.Sleep(50 * time.Millisecond)
|
||
|
||
if task.ShouldFail {
|
||
return fmt.Errorf("任务 %d 模拟失败", task.ID)
|
||
}
|
||
|
||
return nil
|
||
}
|
||
|
||
// RetryTask 重试任务
|
||
type RetryTask struct {
|
||
ID int
|
||
MaxRetries int
|
||
Data string
|
||
attempts int
|
||
}
|
||
|
||
// RetryWorkerPool 重试工作池
|
||
type RetryWorkerPool struct {
|
||
workerCount int
|
||
maxRetries int
|
||
taskQueue chan RetryTask
|
||
retryQueue chan RetryTask
|
||
wg sync.WaitGroup
|
||
}
|
||
|
||
func NewRetryWorkerPool(workerCount, maxRetries int) *RetryWorkerPool {
|
||
return &RetryWorkerPool{
|
||
workerCount: workerCount,
|
||
maxRetries: maxRetries,
|
||
taskQueue: make(chan RetryTask),
|
||
retryQueue: make(chan RetryTask, 100),
|
||
}
|
||
}
|
||
|
||
func (p *RetryWorkerPool) Start() {
|
||
for i := 0; i < p.workerCount; i++ {
|
||
p.wg.Add(1)
|
||
go p.worker(i + 1)
|
||
}
|
||
|
||
// 启动重试处理器
|
||
p.wg.Add(1)
|
||
go p.retryHandler()
|
||
}
|
||
|
||
func (p *RetryWorkerPool) Submit(task RetryTask) {
|
||
p.taskQueue <- task
|
||
}
|
||
|
||
func (p *RetryWorkerPool) Stop() {
|
||
close(p.taskQueue)
|
||
close(p.retryQueue)
|
||
p.wg.Wait()
|
||
}
|
||
|
||
func (p *RetryWorkerPool) worker(id int) {
|
||
defer p.wg.Done()
|
||
|
||
for task := range p.taskQueue {
|
||
task.attempts++
|
||
|
||
// 模拟任务执行(30%成功率)
|
||
success := rand.Float32() < 0.3
|
||
|
||
if success {
|
||
fmt.Printf(" 工作者 %d 任务 %d 成功 (尝试 %d 次)\\n",
|
||
id, task.ID, task.attempts)
|
||
} else if task.attempts < task.MaxRetries {
|
||
fmt.Printf(" 工作者 %d 任务 %d 失败,准备重试 (尝试 %d/%d)\\n",
|
||
id, task.ID, task.attempts, task.MaxRetries)
|
||
p.retryQueue <- task
|
||
} else {
|
||
fmt.Printf(" 工作者 %d 任务 %d 最终失败 (尝试 %d 次)\\n",
|
||
id, task.ID, task.attempts)
|
||
}
|
||
}
|
||
}
|
||
|
||
func (p *RetryWorkerPool) retryHandler() {
|
||
defer p.wg.Done()
|
||
|
||
for task := range p.retryQueue {
|
||
// 延迟重试
|
||
time.Sleep(100 * time.Millisecond)
|
||
p.taskQueue <- task
|
||
}
|
||
}
|
||
|
||
// MonitoredTask 监控任务
|
||
type MonitoredTask struct {
|
||
ID int
|
||
Fn func()
|
||
}
|
||
|
||
// WorkerPoolMetrics 工作池指标
|
||
type WorkerPoolMetrics struct {
|
||
ActiveWorkers int
|
||
QueueSize int
|
||
TotalTasks int
|
||
CompletedTasks int
|
||
AvgProcessTime float64
|
||
}
|
||
|
||
// MonitoredWorkerPool 监控工作池
|
||
type MonitoredWorkerPool struct {
|
||
workerCount int
|
||
taskQueue chan MonitoredTask
|
||
wg sync.WaitGroup
|
||
metrics WorkerPoolMetrics
|
||
metricsMu sync.RWMutex
|
||
processTimes []time.Duration
|
||
activeWorkers int
|
||
}
|
||
|
||
func NewMonitoredWorkerPool(workerCount int) *MonitoredWorkerPool {
|
||
return &MonitoredWorkerPool{
|
||
workerCount: workerCount,
|
||
taskQueue: make(chan MonitoredTask, 100),
|
||
processTimes: make([]time.Duration, 0),
|
||
}
|
||
}
|
||
|
||
func (p *MonitoredWorkerPool) Start() {
|
||
for i := 0; i < p.workerCount; i++ {
|
||
p.wg.Add(1)
|
||
go p.worker(i + 1)
|
||
}
|
||
}
|
||
|
||
func (p *MonitoredWorkerPool) Submit(task MonitoredTask) {
|
||
p.metricsMu.Lock()
|
||
p.metrics.TotalTasks++
|
||
p.metricsMu.Unlock()
|
||
|
||
p.taskQueue <- task
|
||
}
|
||
|
||
func (p *MonitoredWorkerPool) GetMetrics() WorkerPoolMetrics {
|
||
p.metricsMu.RLock()
|
||
defer p.metricsMu.RUnlock()
|
||
|
||
metrics := p.metrics
|
||
metrics.QueueSize = len(p.taskQueue)
|
||
metrics.ActiveWorkers = p.activeWorkers
|
||
|
||
// 计算平均处理时间
|
||
if len(p.processTimes) > 0 {
|
||
var total time.Duration
|
||
for _, t := range p.processTimes {
|
||
total += t
|
||
}
|
||
metrics.AvgProcessTime = float64(total.Nanoseconds()) / float64(len(p.processTimes)) / 1e6
|
||
}
|
||
|
||
return metrics
|
||
}
|
||
|
||
func (p *MonitoredWorkerPool) Stop() {
|
||
close(p.taskQueue)
|
||
p.wg.Wait()
|
||
}
|
||
|
||
func (p *MonitoredWorkerPool) worker(id int) {
|
||
defer p.wg.Done()
|
||
|
||
for task := range p.taskQueue {
|
||
p.metricsMu.Lock()
|
||
p.activeWorkers++
|
||
p.metricsMu.Unlock()
|
||
|
||
start := time.Now()
|
||
task.Fn()
|
||
duration := time.Since(start)
|
||
|
||
p.metricsMu.Lock()
|
||
p.activeWorkers--
|
||
p.metrics.CompletedTasks++
|
||
p.processTimes = append(p.processTimes, duration)
|
||
|
||
// 保持最近100个处理时间
|
||
if len(p.processTimes) > 100 {
|
||
p.processTimes = p.processTimes[1:]
|
||
}
|
||
p.metricsMu.Unlock()
|
||
}
|
||
}
|
||
|
||
// HTTPRequest HTTP请求
|
||
type HTTPRequest struct {
|
||
ID int
|
||
Method string
|
||
URL string
|
||
Body string
|
||
}
|
||
|
||
// WebServerPool Web服务器池
|
||
type WebServerPool struct {
|
||
workerCount int
|
||
requestChan chan HTTPRequest
|
||
wg sync.WaitGroup
|
||
}
|
||
|
||
func NewWebServerPool(workerCount int) *WebServerPool {
|
||
return &WebServerPool{
|
||
workerCount: workerCount,
|
||
requestChan: make(chan HTTPRequest, 100),
|
||
}
|
||
}
|
||
|
||
func (p *WebServerPool) Start() {
|
||
for i := 0; i < p.workerCount; i++ {
|
||
p.wg.Add(1)
|
||
go p.worker(i + 1)
|
||
}
|
||
}
|
||
|
||
func (p *WebServerPool) HandleRequest(req HTTPRequest) {
|
||
p.requestChan <- req
|
||
}
|
||
|
||
func (p *WebServerPool) Stop() {
|
||
close(p.requestChan)
|
||
p.wg.Wait()
|
||
}
|
||
|
||
func (p *WebServerPool) worker(id int) {
|
||
defer p.wg.Done()
|
||
|
||
for req := range p.requestChan {
|
||
// 模拟请求处理
|
||
processingTime := time.Duration(rand.Intn(100)) * time.Millisecond
|
||
time.Sleep(processingTime)
|
||
|
||
fmt.Printf(" 工作者 %d 处理请求 %d: %s %s (耗时: %v)\\n",
|
||
id, req.ID, req.Method, req.URL, processingTime)
|
||
}
|
||
}
|
||
|
||
// ImageTask 图片任务
|
||
type ImageTask struct {
|
||
ID int
|
||
Filename string
|
||
Operations []string
|
||
}
|
||
|
||
// ImageProcessingPool 图片处理池
|
||
type ImageProcessingPool struct {
|
||
workerCount int
|
||
taskChan chan ImageTask
|
||
wg sync.WaitGroup
|
||
}
|
||
|
||
func NewImageProcessingPool(workerCount int) *ImageProcessingPool {
|
||
return &ImageProcessingPool{
|
||
workerCount: workerCount,
|
||
taskChan: make(chan ImageTask, 50),
|
||
}
|
||
}
|
||
|
||
func (p *ImageProcessingPool) Start() {
|
||
for i := 0; i < p.workerCount; i++ {
|
||
p.wg.Add(1)
|
||
go p.worker(i + 1)
|
||
}
|
||
}
|
||
|
||
func (p *ImageProcessingPool) ProcessImage(task ImageTask) {
|
||
p.taskChan <- task
|
||
}
|
||
|
||
func (p *ImageProcessingPool) Stop() {
|
||
close(p.taskChan)
|
||
p.wg.Wait()
|
||
}
|
||
|
||
func (p *ImageProcessingPool) worker(id int) {
|
||
defer p.wg.Done()
|
||
|
||
for task := range p.taskChan {
|
||
fmt.Printf(" 工作者 %d 处理图片 %s:\\n", id, task.Filename)
|
||
|
||
for _, op := range task.Operations {
|
||
// 模拟图片操作
|
||
time.Sleep(50 * time.Millisecond)
|
||
fmt.Printf(" - 执行操作: %s\\n", op)
|
||
}
|
||
|
||
fmt.Printf(" 图片 %s 处理完成\\n", task.Filename)
|
||
}
|
||
}
|
||
|
||
// DataItem 数据项
|
||
type DataItem struct {
|
||
ID int
|
||
Content string
|
||
Type string
|
||
}
|
||
|
||
// DataPipeline 数据处理管道
|
||
type DataPipeline struct {
|
||
readWorkers int
|
||
processWorkers int
|
||
writeWorkers int
|
||
|
||
inputChan chan DataItem
|
||
processedChan chan DataItem
|
||
outputChan chan DataItem
|
||
|
||
wg sync.WaitGroup
|
||
}
|
||
|
||
func NewDataPipeline(readWorkers, processWorkers, writeWorkers int) *DataPipeline {
|
||
return &DataPipeline{
|
||
readWorkers: readWorkers,
|
||
processWorkers: processWorkers,
|
||
writeWorkers: writeWorkers,
|
||
inputChan: make(chan DataItem, 20),
|
||
processedChan: make(chan DataItem, 20),
|
||
outputChan: make(chan DataItem, 20),
|
||
}
|
||
}
|
||
|
||
func (p *DataPipeline) Start() {
|
||
// 启动读取工作者
|
||
for i := 0; i < p.readWorkers; i++ {
|
||
p.wg.Add(1)
|
||
go p.readWorker(i + 1)
|
||
}
|
||
|
||
// 启动处理工作者
|
||
for i := 0; i < p.processWorkers; i++ {
|
||
p.wg.Add(1)
|
||
go p.processWorker(i + 1)
|
||
}
|
||
|
||
// 启动写入工作者
|
||
for i := 0; i < p.writeWorkers; i++ {
|
||
p.wg.Add(1)
|
||
go p.writeWorker(i + 1)
|
||
}
|
||
}
|
||
|
||
func (p *DataPipeline) ProcessData(item DataItem) {
|
||
p.inputChan <- item
|
||
}
|
||
|
||
func (p *DataPipeline) Stop() {
|
||
close(p.inputChan)
|
||
p.wg.Wait()
|
||
}
|
||
|
||
func (p *DataPipeline) readWorker(id int) {
|
||
defer p.wg.Done()
|
||
|
||
for item := range p.inputChan {
|
||
// 模拟读取处理
|
||
time.Sleep(30 * time.Millisecond)
|
||
fmt.Printf(" 读取工作者 %d 读取数据 %d\\n", id, item.ID)
|
||
p.processedChan <- item
|
||
}
|
||
|
||
if id == 1 { // 只让第一个工作者关闭下一阶段
|
||
close(p.processedChan)
|
||
}
|
||
}
|
||
|
||
func (p *DataPipeline) processWorker(id int) {
|
||
defer p.wg.Done()
|
||
|
||
for item := range p.processedChan {
|
||
// 模拟数据处理
|
||
time.Sleep(50 * time.Millisecond)
|
||
item.Content = fmt.Sprintf("处理后的%s", item.Content)
|
||
fmt.Printf(" 处理工作者 %d 处理数据 %d\\n", id, item.ID)
|
||
p.outputChan <- item
|
||
}
|
||
|
||
if id == 1 { // 只让第一个工作者关闭下一阶段
|
||
close(p.outputChan)
|
||
}
|
||
}
|
||
|
||
func (p *DataPipeline) writeWorker(id int) {
|
||
defer p.wg.Done()
|
||
|
||
for item := range p.outputChan {
|
||
// 模拟写入处理
|
||
time.Sleep(20 * time.Millisecond)
|
||
fmt.Printf(" 写入工作者 %d 写入数据 %d: %s\\n", id, item.ID, item.Content)
|
||
}
|
||
}
|
||
|
||
// BatchItem 批量项
|
||
type BatchItem struct {
|
||
ID int
|
||
Data string
|
||
}
|
||
|
||
// BatchWorkerPool 批量工作池
|
||
type BatchWorkerPool struct {
|
||
workerCount int
|
||
batchSize int
|
||
itemChan chan BatchItem
|
||
wg sync.WaitGroup
|
||
}
|
||
|
||
func NewBatchWorkerPool(workerCount, batchSize int) *BatchWorkerPool {
|
||
return &BatchWorkerPool{
|
||
workerCount: workerCount,
|
||
batchSize: batchSize,
|
||
itemChan: make(chan BatchItem, 100),
|
||
}
|
||
}
|
||
|
||
func (p *BatchWorkerPool) Start() {
|
||
for i := 0; i < p.workerCount; i++ {
|
||
p.wg.Add(1)
|
||
go p.worker(i + 1)
|
||
}
|
||
}
|
||
|
||
func (p *BatchWorkerPool) Submit(item BatchItem) {
|
||
p.itemChan <- item
|
||
}
|
||
|
||
func (p *BatchWorkerPool) Stop() {
|
||
close(p.itemChan)
|
||
p.wg.Wait()
|
||
}
|
||
|
||
func (p *BatchWorkerPool) worker(id int) {
|
||
defer p.wg.Done()
|
||
|
||
batch := make([]BatchItem, 0, p.batchSize)
|
||
|
||
for item := range p.itemChan {
|
||
batch = append(batch, item)
|
||
|
||
if len(batch) >= p.batchSize {
|
||
p.processBatch(id, batch)
|
||
batch = batch[:0] // 重置批次
|
||
}
|
||
}
|
||
|
||
// 处理剩余的项目
|
||
if len(batch) > 0 {
|
||
p.processBatch(id, batch)
|
||
}
|
||
}
|
||
|
||
func (p *BatchWorkerPool) processBatch(workerID int, batch []BatchItem) {
|
||
fmt.Printf(" 工作者 %d 处理批次 (大小: %d):\\n", workerID, len(batch))
|
||
|
||
for _, item := range batch {
|
||
fmt.Printf(" - 处理项目 %d: %s\\n", item.ID, item.Data)
|
||
}
|
||
|
||
// 模拟批量处理时间
|
||
time.Sleep(100 * time.Millisecond)
|
||
fmt.Printf(" 工作者 %d 批次处理完成\\n", workerID)
|
||
}
|
||
|
||
/*
|
||
运行这个程序:
|
||
go run 05-worker-pools.go
|
||
|
||
学习要点:
|
||
1. 工作池模式是控制并发的重要技术
|
||
2. 可以根据需求实现不同类型的工作池
|
||
3. 工作池提供了更好的资源控制和系统稳定性
|
||
4. 支持动态调整、错误处理、监控等高级特性
|
||
5. 在实际应用中有广泛的使用场景
|
||
|
||
工作池的优势:
|
||
1. 控制并发数量,避免资源耗尽
|
||
2. 重用 goroutine,减少创建销毁开销
|
||
3. 提供任务队列,平滑处理突发负载
|
||
4. 支持优雅关闭和错误处理
|
||
5. 便于监控和调优
|
||
|
||
工作池类型:
|
||
1. 固定大小工作池:预先创建固定数量的工作者
|
||
2. 缓冲工作池:带有任务队列缓冲
|
||
3. 动态工作池:根据负载动态调整工作者数量
|
||
4. 优先级工作池:支持任务优先级
|
||
5. 上下文工作池:支持取消和超时
|
||
|
||
高级特性:
|
||
1. 错误处理:捕获和处理任务错误
|
||
2. 重试机制:自动重试失败的任务
|
||
3. 监控指标:实时监控工作池状态
|
||
4. 优雅关闭:安全地停止工作池
|
||
5. 负载均衡:合理分配任务
|
||
|
||
实际应用:
|
||
1. Web 服务器:处理 HTTP 请求
|
||
2. 图片处理:批量处理图片
|
||
3. 数据处理:ETL 数据管道
|
||
4. 批量任务:批量处理业务逻辑
|
||
5. 消息队列:处理消息
|
||
|
||
设计考虑:
|
||
1. 工作者数量:根据 CPU 核心数和任务类型
|
||
2. 队列大小:平衡内存使用和响应时间
|
||
3. 任务粒度:避免任务过大或过小
|
||
4. 错误策略:决定如何处理失败任务
|
||
5. 监控指标:选择合适的监控维度
|
||
|
||
性能优化:
|
||
1. 合理设置工作者数量
|
||
2. 使用缓冲队列减少阻塞
|
||
3. 批量处理提高效率
|
||
4. 避免频繁创建销毁
|
||
5. 监控和调优关键指标
|
||
|
||
最佳实践:
|
||
1. 根据任务特性选择合适的工作池类型
|
||
2. 实现优雅关闭机制
|
||
3. 添加监控和日志
|
||
4. 处理 panic 和错误
|
||
5. 进行性能测试和调优
|
||
*/
|