Files
golang/golang-learning/06-concurrency/01-goroutines.go
2025-08-24 11:24:52 +08:00

739 lines
16 KiB
Go
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

/*
01-goroutines.go - Go 语言 Goroutines 详解
学习目标:
1. 理解 goroutine 的概念和特性
2. 掌握 goroutine 的创建和使用
3. 学会 goroutine 的同步和通信
4. 了解 goroutine 的调度机制
5. 掌握 goroutine 的最佳实践
知识点:
- goroutine 的基本概念
- go 关键字的使用
- goroutine 与线程的区别
- goroutine 的生命周期
- goroutine 泄漏的预防
- 并发安全的考虑
*/
package main
import (
"fmt"
"runtime"
"sync"
"time"
)
func main() {
fmt.Println("=== Go 语言 Goroutines 详解 ===\n")
// 演示基本的 goroutine
demonstrateBasicGoroutines()
// 演示 goroutine 的并发执行
demonstrateConcurrentExecution()
// 演示 goroutine 的同步
demonstrateGoroutineSynchronization()
// 演示 goroutine 的通信
demonstrateGoroutineCommunication()
// 演示 goroutine 池
demonstrateGoroutinePool()
// 演示 goroutine 的生命周期管理
demonstrateGoroutineLifecycle()
// 演示 goroutine 的最佳实践
demonstrateBestPractices()
}
// demonstrateBasicGoroutines 演示基本的 goroutine
func demonstrateBasicGoroutines() {
fmt.Println("1. 基本的 Goroutine:")
// goroutine 的基本概念
fmt.Printf(" Goroutine 的基本概念:\n")
fmt.Printf(" - 轻量级线程,由 Go 运行时管理\n")
fmt.Printf(" - 使用 go 关键字启动\n")
fmt.Printf(" - 栈大小动态增长,初始只有 2KB\n")
fmt.Printf(" - 由 Go 调度器调度,不是操作系统线程\n")
fmt.Printf(" - 可以创建数百万个 goroutine\n")
// 基本 goroutine 示例
fmt.Printf(" 基本 goroutine 示例:\n")
// 普通函数调用
fmt.Printf(" 普通函数调用:\n")
sayHello("World")
sayHello("Go")
// 使用 goroutine
fmt.Printf(" 使用 goroutine:\n")
go sayHello("Goroutine 1")
go sayHello("Goroutine 2")
go sayHello("Goroutine 3")
// 等待 goroutine 完成
time.Sleep(100 * time.Millisecond)
// 匿名函数 goroutine
fmt.Printf(" 匿名函数 goroutine:\n")
for i := 1; i <= 3; i++ {
go func(id int) {
fmt.Printf(" 匿名 goroutine %d 执行\n", id)
}(i)
}
time.Sleep(100 * time.Millisecond)
// 闭包 goroutine
fmt.Printf(" 闭包 goroutine:\n")
message := "Hello from closure"
go func() {
fmt.Printf(" %s\n", message)
}()
time.Sleep(100 * time.Millisecond)
fmt.Println()
}
// demonstrateConcurrentExecution 演示 goroutine 的并发执行
func demonstrateConcurrentExecution() {
fmt.Println("2. Goroutine 的并发执行:")
// 并发执行任务
fmt.Printf(" 并发执行任务:\n")
start := time.Now()
// 串行执行
fmt.Printf(" 串行执行:\n")
serialStart := time.Now()
task("任务1", 200*time.Millisecond)
task("任务2", 200*time.Millisecond)
task("任务3", 200*time.Millisecond)
serialDuration := time.Since(serialStart)
fmt.Printf(" 串行执行耗时: %v\n", serialDuration)
// 并发执行
fmt.Printf(" 并发执行:\n")
concurrentStart := time.Now()
var wg sync.WaitGroup
wg.Add(3)
go func() {
defer wg.Done()
task("并发任务1", 200*time.Millisecond)
}()
go func() {
defer wg.Done()
task("并发任务2", 200*time.Millisecond)
}()
go func() {
defer wg.Done()
task("并发任务3", 200*time.Millisecond)
}()
wg.Wait()
concurrentDuration := time.Since(concurrentStart)
fmt.Printf(" 并发执行耗时: %v\n", concurrentDuration)
// CPU 密集型任务
fmt.Printf(" CPU 密集型任务:\n")
numCPU := runtime.NumCPU()
fmt.Printf(" CPU 核心数: %d\n", numCPU)
// 设置使用的 CPU 核心数
runtime.GOMAXPROCS(numCPU)
cpuStart := time.Now()
var cpuWg sync.WaitGroup
for i := 0; i < numCPU; i++ {
cpuWg.Add(1)
go func(id int) {
defer cpuWg.Done()
result := fibonacci(35)
fmt.Printf(" Goroutine %d: fibonacci(35) = %d\n", id, result)
}(i)
}
cpuWg.Wait()
cpuDuration := time.Since(cpuStart)
fmt.Printf(" CPU 密集型任务耗时: %v\n", cpuDuration)
totalDuration := time.Since(start)
fmt.Printf(" 总耗时: %v\n", totalDuration)
fmt.Println()
}
// demonstrateGoroutineSynchronization 演示 goroutine 的同步
func demonstrateGoroutineSynchronization() {
fmt.Println("3. Goroutine 的同步:")
// 使用 WaitGroup 同步
fmt.Printf(" 使用 WaitGroup 同步:\n")
var wg sync.WaitGroup
for i := 1; i <= 5; i++ {
wg.Add(1)
go func(id int) {
defer wg.Done()
fmt.Printf(" Worker %d 开始工作\n", id)
time.Sleep(time.Duration(id*100) * time.Millisecond)
fmt.Printf(" Worker %d 完成工作\n", id)
}(i)
}
fmt.Printf(" 等待所有 worker 完成...\n")
wg.Wait()
fmt.Printf(" 所有 worker 已完成\n")
// 使用 Mutex 保护共享资源
fmt.Printf(" 使用 Mutex 保护共享资源:\n")
var counter Counter
var counterWg sync.WaitGroup
// 启动多个 goroutine 增加计数器
for i := 0; i < 10; i++ {
counterWg.Add(1)
go func(id int) {
defer counterWg.Done()
for j := 0; j < 1000; j++ {
counter.Increment()
}
fmt.Printf(" Goroutine %d 完成\n", id)
}(i)
}
counterWg.Wait()
fmt.Printf(" 最终计数器值: %d\n", counter.Value())
// 使用 Once 确保只执行一次
fmt.Printf(" 使用 Once 确保只执行一次:\n")
var once sync.Once
var onceWg sync.WaitGroup
for i := 0; i < 5; i++ {
onceWg.Add(1)
go func(id int) {
defer onceWg.Done()
once.Do(func() {
fmt.Printf(" 初始化操作只执行一次 (来自 goroutine %d)\n", id)
})
fmt.Printf(" Goroutine %d 执行完毕\n", id)
}(i)
}
onceWg.Wait()
fmt.Println()
}
// demonstrateGoroutineCommunication 演示 goroutine 的通信
func demonstrateGoroutineCommunication() {
fmt.Println("4. Goroutine 的通信:")
// 使用 channel 通信
fmt.Printf(" 使用 channel 通信:\n")
// 简单的 channel 通信
ch := make(chan string)
go func() {
time.Sleep(100 * time.Millisecond)
ch <- "Hello from goroutine"
}()
message := <-ch
fmt.Printf(" 接收到消息: %s\n", message)
// 生产者-消费者模式
fmt.Printf(" 生产者-消费者模式:\n")
jobs := make(chan int, 5)
results := make(chan int, 5)
// 启动 3 个 worker
for w := 1; w <= 3; w++ {
go worker(w, jobs, results)
}
// 发送 5 个任务
for j := 1; j <= 5; j++ {
jobs <- j
}
close(jobs)
// 收集结果
for r := 1; r <= 5; r++ {
result := <-results
fmt.Printf(" 结果: %d\n", result)
}
// 扇出/扇入模式
fmt.Printf(" 扇出/扇入模式:\n")
input := make(chan int)
output1 := make(chan int)
output2 := make(chan int)
// 扇出:一个输入分发到多个处理器
go func() {
for i := 1; i <= 10; i++ {
input <- i
}
close(input)
}()
// 处理器 1
go func() {
for num := range input {
output1 <- num * 2
}
close(output1)
}()
// 处理器 2
go func() {
for num := range input {
output2 <- num * 3
}
close(output2)
}()
// 扇入:合并多个输出
merged := fanIn(output1, output2)
fmt.Printf(" 合并结果: ")
for result := range merged {
fmt.Printf("%d ", result)
}
fmt.Printf("\n")
fmt.Println()
}
// demonstrateGoroutinePool 演示 goroutine 池
func demonstrateGoroutinePool() {
fmt.Println("5. Goroutine 池:")
// 固定大小的 goroutine 池
fmt.Printf(" 固定大小的 goroutine 池:\n")
const numWorkers = 3
const numJobs = 10
jobs := make(chan Job, numJobs)
results := make(chan Result, numJobs)
// 启动 worker 池
for w := 1; w <= numWorkers; w++ {
go jobWorker(w, jobs, results)
}
// 发送任务
for j := 1; j <= numJobs; j++ {
jobs <- Job{ID: j, Data: fmt.Sprintf("任务数据 %d", j)}
}
close(jobs)
// 收集结果
for r := 1; r <= numJobs; r++ {
result := <-results
fmt.Printf(" %s\n", result.Message)
}
// 动态 goroutine 池
fmt.Printf(" 动态 goroutine 池:\n")
pool := NewWorkerPool(5, 20)
pool.Start()
// 提交任务
for i := 1; i <= 15; i++ {
taskID := i
pool.Submit(func() {
fmt.Printf(" 执行任务 %d\n", taskID)
time.Sleep(100 * time.Millisecond)
})
}
pool.Stop()
fmt.Println()
}
// demonstrateGoroutineLifecycle 演示 goroutine 的生命周期管理
func demonstrateGoroutineLifecycle() {
fmt.Println("6. Goroutine 的生命周期管理:")
// 优雅关闭
fmt.Printf(" 优雅关闭:\n")
done := make(chan bool)
quit := make(chan bool)
go func() {
for {
select {
case <-quit:
fmt.Printf(" 接收到退出信号,正在清理...\n")
time.Sleep(100 * time.Millisecond)
fmt.Printf(" 清理完成\n")
done <- true
return
default:
fmt.Printf(" 工作中...\n")
time.Sleep(200 * time.Millisecond)
}
}
}()
time.Sleep(500 * time.Millisecond)
fmt.Printf(" 发送退出信号\n")
quit <- true
<-done
fmt.Printf(" Goroutine 已优雅退出\n")
// 超时控制
fmt.Printf(" 超时控制:\n")
timeout := time.After(300 * time.Millisecond)
finished := make(chan bool)
go func() {
time.Sleep(500 * time.Millisecond) // 模拟长时间运行的任务
finished <- true
}()
select {
case <-finished:
fmt.Printf(" 任务完成\n")
case <-timeout:
fmt.Printf(" 任务超时\n")
}
// 错误处理
fmt.Printf(" 错误处理:\n")
errorCh := make(chan error, 1)
go func() {
defer func() {
if r := recover(); r != nil {
errorCh <- fmt.Errorf("goroutine panic: %v", r)
}
}()
// 模拟可能 panic 的操作
if time.Now().UnixNano()%2 == 0 {
panic("模拟 panic")
}
fmt.Printf(" 任务正常完成\n")
errorCh <- nil
}()
if err := <-errorCh; err != nil {
fmt.Printf(" 捕获到错误: %v\n", err)
}
fmt.Println()
}
// demonstrateBestPractices 演示 goroutine 的最佳实践
func demonstrateBestPractices() {
fmt.Println("7. Goroutine 的最佳实践:")
fmt.Printf(" 最佳实践原则:\n")
fmt.Printf(" 1. 避免 goroutine 泄漏\n")
fmt.Printf(" 2. 使用 context 进行取消和超时控制\n")
fmt.Printf(" 3. 合理控制 goroutine 数量\n")
fmt.Printf(" 4. 使用 channel 进行通信而不是共享内存\n")
fmt.Printf(" 5. 正确处理 panic 和错误\n")
// 避免 goroutine 泄漏
fmt.Printf(" 避免 goroutine 泄漏:\n")
// 错误示例:可能导致 goroutine 泄漏
fmt.Printf(" 错误示例 - 可能导致泄漏:\n")
leakyCh := make(chan int)
go func() {
// 这个 goroutine 可能永远阻塞
leakyCh <- 42
}()
// 如果不读取 channelgoroutine 会泄漏
// 正确示例:使用缓冲 channel 或确保读取
fmt.Printf(" 正确示例 - 避免泄漏:\n")
safeCh := make(chan int, 1) // 缓冲 channel
go func() {
safeCh <- 42
fmt.Printf(" 安全的 goroutine 完成\n")
}()
<-safeCh
// 使用 defer 清理资源
fmt.Printf(" 使用 defer 清理资源:\n")
var cleanupWg sync.WaitGroup
cleanupWg.Add(1)
go func() {
defer cleanupWg.Done()
defer fmt.Printf(" 资源已清理\n")
fmt.Printf(" 执行任务...\n")
time.Sleep(100 * time.Millisecond)
}()
cleanupWg.Wait()
// 监控 goroutine 数量
fmt.Printf(" 监控 goroutine 数量:\n")
fmt.Printf(" 当前 goroutine 数量: %d\n", runtime.NumGoroutine())
// 启动一些 goroutine
var monitorWg sync.WaitGroup
for i := 0; i < 5; i++ {
monitorWg.Add(1)
go func(id int) {
defer monitorWg.Done()
time.Sleep(100 * time.Millisecond)
}(i)
}
fmt.Printf(" 启动 5 个 goroutine 后: %d\n", runtime.NumGoroutine())
monitorWg.Wait()
fmt.Printf(" goroutine 完成后: %d\n", runtime.NumGoroutine())
fmt.Println()
}
// ========== 辅助函数和类型定义 ==========
// sayHello 简单的问候函数
func sayHello(name string) {
fmt.Printf(" Hello, %s!\n", name)
}
// task 模拟一个耗时任务
func task(name string, duration time.Duration) {
fmt.Printf(" %s 开始执行\n", name)
time.Sleep(duration)
fmt.Printf(" %s 执行完成\n", name)
}
// fibonacci 计算斐波那契数列CPU 密集型任务)
func fibonacci(n int) int {
if n <= 1 {
return n
}
return fibonacci(n-1) + fibonacci(n-2)
}
// Counter 线程安全的计数器
type Counter struct {
mu sync.Mutex
value int
}
func (c *Counter) Increment() {
c.mu.Lock()
defer c.mu.Unlock()
c.value++
}
func (c *Counter) Value() int {
c.mu.Lock()
defer c.mu.Unlock()
return c.value
}
// worker 工作函数
func worker(id int, jobs <-chan int, results chan<- int) {
for j := range jobs {
fmt.Printf(" Worker %d 开始处理任务 %d\n", id, j)
time.Sleep(100 * time.Millisecond)
results <- j * 2
}
}
// fanIn 扇入函数,合并多个 channel
func fanIn(input1, input2 <-chan int) <-chan int {
output := make(chan int)
go func() {
defer close(output)
for input1 != nil || input2 != nil {
select {
case val, ok := <-input1:
if !ok {
input1 = nil
} else {
output <- val
}
case val, ok := <-input2:
if !ok {
input2 = nil
} else {
output <- val
}
}
}
}()
return output
}
// Job 任务结构
type Job struct {
ID int
Data string
}
// Result 结果结构
type Result struct {
JobID int
Message string
}
// jobWorker 任务处理器
func jobWorker(id int, jobs <-chan Job, results chan<- Result) {
for job := range jobs {
fmt.Printf(" Worker %d 处理任务 %d\n", id, job.ID)
time.Sleep(100 * time.Millisecond)
results <- Result{
JobID: job.ID,
Message: fmt.Sprintf("Worker %d 完成任务 %d: %s", id, job.ID, job.Data),
}
}
}
// WorkerPool 动态工作池
type WorkerPool struct {
maxWorkers int
taskQueue chan func()
quit chan bool
wg sync.WaitGroup
}
// NewWorkerPool 创建新的工作池
func NewWorkerPool(maxWorkers, queueSize int) *WorkerPool {
return &WorkerPool{
maxWorkers: maxWorkers,
taskQueue: make(chan func(), queueSize),
quit: make(chan bool),
}
}
// Start 启动工作池
func (p *WorkerPool) Start() {
for i := 0; i < p.maxWorkers; i++ {
p.wg.Add(1)
go p.worker(i + 1)
}
}
// Submit 提交任务
func (p *WorkerPool) Submit(task func()) {
select {
case p.taskQueue <- task:
default:
fmt.Printf(" 任务队列已满,任务被丢弃\n")
}
}
// Stop 停止工作池
func (p *WorkerPool) Stop() {
close(p.taskQueue)
p.wg.Wait()
}
// worker 工作池的工作函数
func (p *WorkerPool) worker(id int) {
defer p.wg.Done()
for task := range p.taskQueue {
fmt.Printf(" Pool Worker %d 执行任务\n", id)
task()
}
fmt.Printf(" Pool Worker %d 退出\n", id)
}
/*
运行这个程序:
go run 01-goroutines.go
学习要点:
1. Goroutine 是 Go 语言的轻量级线程
2. 使用 go 关键字启动 goroutine
3. Goroutine 之间需要同步和通信机制
4. 使用 WaitGroup、Mutex、Once 等<><E7AD89><EFBFBD>步原语
5. 通过 channel 进行 goroutine 间通信
Goroutine 的特性:
1. 轻量级:初始栈大小只有 2KB
2. 动态增长:栈大小可以动态调整
3. 多路复用:多个 goroutine 可以在少数 OS 线程上运行
4. 协作式调度:由 Go 运行时调度
5. 高效通信:通过 channel 进行通信
同步机制:
1. WaitGroup等待一组 goroutine 完成
2. Mutex互斥锁保护共享资源
3. RWMutex读写锁允许多个读者
4. Once确保函数只执行一次
5. Cond条件变量用于等待条件
通信模式:
1. 简单通信:通过 channel 发送和接收数据
2. 生产者-消费者:使用缓冲 channel
3. 扇出/扇入:一对多和多对一的通信
4. 管道:链式处理数据
5. 工作池:固定数量的 worker 处理任务
最佳实践:
1. 避免 goroutine 泄漏
2. 使用 context 进行取消和超时控制
3. 合理控制 goroutine 数量
4. 使用 channel 进行通信而不是共享内存
5. 正确处理 panic 和错误
6. 使用 defer 清理资源
7. 监控 goroutine 数量
常见陷阱:
1. 忘记等待 goroutine 完成
2. 在循环中使用闭包时的变量捕获问题
3. 无缓冲 channel 导致的死锁
4. goroutine 泄漏
5. 竞态条件
性能考虑:
1. Goroutine 创建成本很低
2. 上下文切换开销小
3. 内存使用效率高
4. 适合 I/O 密集型任务
5. CPU 密集型任务需要考虑 GOMAXPROCS
注意事项:
1. main 函数退出时所有 goroutine 都会终止
2. goroutine 中的 panic 会导致整个程序崩溃
3. 共享变量需要同步保护
4. channel 的关闭只能由发送方执行
5. 避免在 goroutine 中使用全局变量
*/