1184 lines
26 KiB
Go
1184 lines
26 KiB
Go
/*
|
||
04-sync-package.go - Go 语言 Sync 包详解
|
||
|
||
学习目标:
|
||
1. 理解 sync 包的作用和重要性
|
||
2. 掌握 Mutex 和 RWMutex 的使用
|
||
3. 学会使用 WaitGroup 进行同步
|
||
4. 了解 Once 的使用场景
|
||
5. 掌握 Cond 条件变量的用法
|
||
|
||
知识点:
|
||
- sync.Mutex 互斥锁
|
||
- sync.RWMutex 读写锁
|
||
- sync.WaitGroup 等待组
|
||
- sync.Once 单次执行
|
||
- sync.Cond 条件变量
|
||
- sync.Pool 对象池
|
||
- 原子操作 sync/atomic
|
||
*/
|
||
|
||
package main
|
||
|
||
import (
|
||
"fmt"
|
||
"math"
|
||
"math/rand"
|
||
"strings"
|
||
"sync"
|
||
"sync/atomic"
|
||
"time"
|
||
"unsafe"
|
||
)
|
||
|
||
func main() {
|
||
fmt.Println("=== Go 语言 Sync 包详解 ===\\n")
|
||
|
||
// 演示 Mutex 互斥锁
|
||
demonstrateMutex()
|
||
|
||
// 演示 RWMutex 读写锁
|
||
demonstrateRWMutex()
|
||
|
||
// 演示 WaitGroup 等待组
|
||
demonstrateWaitGroup()
|
||
|
||
// 演示 Once 单次执行
|
||
demonstrateOnce()
|
||
|
||
// 演示 Cond 条件变量
|
||
demonstrateCond()
|
||
|
||
// 演示 Pool 对象池
|
||
demonstratePool()
|
||
|
||
// 演示原子操作
|
||
demonstrateAtomic()
|
||
}
|
||
|
||
// demonstrateMutex 演示 Mutex 互斥锁
|
||
func demonstrateMutex() {
|
||
fmt.Println("1. Mutex 互斥锁:")
|
||
|
||
// Mutex 的基本概念
|
||
fmt.Printf(" Mutex 的基本概念:\\n")
|
||
fmt.Printf(" - 互斥锁,同时只允许一个 goroutine 访问共享资源\\n")
|
||
fmt.Printf(" - 用于保护临界区,防止竞态条件\\n")
|
||
fmt.Printf(" - Lock() 获取锁,Unlock() 释放锁\\n")
|
||
fmt.Printf(" - 零值可以直接使用\\n")
|
||
fmt.Printf(" - 不可重入,同一 goroutine 不能重复加锁\\n")
|
||
|
||
// 不使用锁的竞态条件示例
|
||
fmt.Printf(" 不使用锁的竞态条件示例:\\n")
|
||
|
||
var unsafeCounter int
|
||
var wg sync.WaitGroup
|
||
|
||
// 启动多个 goroutine 增加计数器
|
||
for i := 0; i < 10; i++ {
|
||
wg.Add(1)
|
||
go func(id int) {
|
||
defer wg.Done()
|
||
for j := 0; j < 1000; j++ {
|
||
unsafeCounter++ // 竞态条件
|
||
}
|
||
}(i)
|
||
}
|
||
|
||
wg.Wait()
|
||
fmt.Printf(" 不安全计数器最终值: %d (期望: 10000)\\n", unsafeCounter)
|
||
|
||
// 使用 Mutex 保护共享资源
|
||
fmt.Printf(" 使用 Mutex 保护共享资源:\\n")
|
||
|
||
safeCounter := &SafeCounter{}
|
||
|
||
// 启动多个 goroutine 安全地增加计数器
|
||
for i := 0; i < 10; i++ {
|
||
wg.Add(1)
|
||
go func(id int) {
|
||
defer wg.Done()
|
||
for j := 0; j < 1000; j++ {
|
||
safeCounter.Increment()
|
||
}
|
||
}(i)
|
||
}
|
||
|
||
wg.Wait()
|
||
fmt.Printf(" 安全计数器最终值: %d\\n", safeCounter.Value())
|
||
|
||
// 银行账户转账示例
|
||
fmt.Printf(" 银行账户转账示例:\\n")
|
||
|
||
account1 := NewBankAccount("Alice", 1000)
|
||
account2 := NewBankAccount("Bob", 500)
|
||
|
||
fmt.Printf(" 转账前: %s\\n", account1.String())
|
||
fmt.Printf(" 转账前: %s\\n", account2.String())
|
||
|
||
// 并发转账
|
||
for i := 0; i < 5; i++ {
|
||
wg.Add(2)
|
||
|
||
go func() {
|
||
defer wg.Done()
|
||
Transfer(account1, account2, 50)
|
||
}()
|
||
|
||
go func() {
|
||
defer wg.Done()
|
||
Transfer(account2, account1, 30)
|
||
}()
|
||
}
|
||
|
||
wg.Wait()
|
||
|
||
fmt.Printf(" 转账后: %s\\n", account1.String())
|
||
fmt.Printf(" 转账后: %s\\n", account2.String())
|
||
fmt.Printf(" 总金额: %.2f (应该保持不变)\\n",
|
||
account1.Balance()+account2.Balance())
|
||
|
||
fmt.Println()
|
||
}
|
||
|
||
// demonstrateRWMutex 演示 RWMutex 读写锁
|
||
func demonstrateRWMutex() {
|
||
fmt.Println("2. RWMutex 读写锁:")
|
||
|
||
// RWMutex 的基本概念
|
||
fmt.Printf(" RWMutex 的基本概念:\\n")
|
||
fmt.Printf(" - 读写锁,允许多个读者或一个写者\\n")
|
||
fmt.Printf(" - RLock()/RUnlock() 用于读操作\\n")
|
||
fmt.Printf(" - Lock()/Unlock() 用于写操作\\n")
|
||
fmt.Printf(" - 读操作可以并发,写操作互斥\\n")
|
||
fmt.Printf(" - 适用于读多写少的场景\\n")
|
||
|
||
// 配置管理示例
|
||
fmt.Printf(" 配置管理示例:\\n")
|
||
|
||
config := NewConfig()
|
||
|
||
// 设置初始配置
|
||
config.Set("database_url", "localhost:5432")
|
||
config.Set("max_connections", "100")
|
||
config.Set("timeout", "30s")
|
||
|
||
var wg sync.WaitGroup
|
||
|
||
// 启动多个读者
|
||
for i := 0; i < 10; i++ {
|
||
wg.Add(1)
|
||
go func(id int) {
|
||
defer wg.Done()
|
||
for j := 0; j < 5; j++ {
|
||
url := config.Get("database_url")
|
||
fmt.Printf(" 读者 %d: database_url = %s\\n", id, url)
|
||
time.Sleep(10 * time.Millisecond)
|
||
}
|
||
}(i)
|
||
}
|
||
|
||
// 启动少量写者
|
||
for i := 0; i < 2; i++ {
|
||
wg.Add(1)
|
||
go func(id int) {
|
||
defer wg.Done()
|
||
time.Sleep(50 * time.Millisecond)
|
||
newURL := fmt.Sprintf("server%d:5432", id)
|
||
config.Set("database_url", newURL)
|
||
fmt.Printf(" 写者 %d: 更新 database_url = %s\\n", id, newURL)
|
||
}(i)
|
||
}
|
||
|
||
wg.Wait()
|
||
|
||
// 缓存系统示例
|
||
fmt.Printf(" 缓存系统示例:\\n")
|
||
|
||
cache := NewCache()
|
||
|
||
// 预填充缓存
|
||
cache.Set("user:1", "Alice")
|
||
cache.Set("user:2", "Bob")
|
||
cache.Set("user:3", "Charlie")
|
||
|
||
// 并发读写测试
|
||
for i := 0; i < 5; i++ {
|
||
wg.Add(2)
|
||
|
||
// 读操作
|
||
go func(id int) {
|
||
defer wg.Done()
|
||
key := fmt.Sprintf("user:%d", (id%3)+1)
|
||
if value, ok := cache.Get(key); ok {
|
||
fmt.Printf(" 缓存读取: %s = %s\\n", key, value)
|
||
}
|
||
}(i)
|
||
|
||
// 写操作
|
||
go func(id int) {
|
||
defer wg.Done()
|
||
key := fmt.Sprintf("user:%d", id+4)
|
||
value := fmt.Sprintf("User%d", id+4)
|
||
cache.Set(key, value)
|
||
fmt.Printf(" 缓存写入: %s = %s\\n", key, value)
|
||
}(i)
|
||
}
|
||
|
||
wg.Wait()
|
||
|
||
fmt.Printf(" 缓存大小: %d\\n", cache.Size())
|
||
|
||
fmt.Println()
|
||
}
|
||
|
||
// demonstrateWaitGroup 演示 WaitGroup 等待组
|
||
func demonstrateWaitGroup() {
|
||
fmt.Println("3. WaitGroup 等待组:")
|
||
|
||
// WaitGroup 的基本概念
|
||
fmt.Printf(" WaitGroup 的基本概念:\\n")
|
||
fmt.Printf(" - 用于等待一组 goroutine 完成\\n")
|
||
fmt.Printf(" - Add(n) 增加等待计数\\n")
|
||
fmt.Printf(" - Done() 减少等待计数\\n")
|
||
fmt.Printf(" - Wait() 阻塞直到计数为零\\n")
|
||
fmt.Printf(" - 零值可以直接使用\\n")
|
||
|
||
// 基本使用示例
|
||
fmt.Printf(" 基本使用示例:\\n")
|
||
|
||
var wg sync.WaitGroup
|
||
|
||
// 启动多个工作 goroutine
|
||
for i := 1; i <= 5; i++ {
|
||
wg.Add(1)
|
||
go func(id int) {
|
||
defer wg.Done()
|
||
fmt.Printf(" 工作者 %d 开始工作\\n", id)
|
||
time.Sleep(time.Duration(rand.Intn(200)) * time.Millisecond)
|
||
fmt.Printf(" 工作者 %d 完成工作\\n", id)
|
||
}(i)
|
||
}
|
||
|
||
fmt.Printf(" 等待所有工作者完成...\\n")
|
||
wg.Wait()
|
||
fmt.Printf(" 所有工作者已完成\\n")
|
||
|
||
// 批量处理示例
|
||
fmt.Printf(" 批量处理示例:\\n")
|
||
|
||
urls := []string{
|
||
"https://api.example.com/users",
|
||
"https://api.example.com/orders",
|
||
"https://api.example.com/products",
|
||
"https://api.example.com/categories",
|
||
"https://api.example.com/reviews",
|
||
}
|
||
|
||
results := make([]string, len(urls))
|
||
|
||
for i, url := range urls {
|
||
wg.Add(1)
|
||
go func(index int, endpoint string) {
|
||
defer wg.Done()
|
||
result := fetchData(endpoint)
|
||
results[index] = result
|
||
}(i, url)
|
||
}
|
||
|
||
wg.Wait()
|
||
|
||
fmt.Printf(" 批量处理结果:\\n")
|
||
for i, result := range results {
|
||
fmt.Printf(" %s: %s\\n", urls[i], result)
|
||
}
|
||
|
||
// 分阶段处理示例
|
||
fmt.Printf(" 分阶段处理示例:\\n")
|
||
|
||
// 第一阶段:数据准备
|
||
fmt.Printf(" 第一阶段:数据准备\\n")
|
||
dataChannels := make([]chan int, 3)
|
||
for i := range dataChannels {
|
||
dataChannels[i] = make(chan int, 10)
|
||
wg.Add(1)
|
||
go func(id int, ch chan int) {
|
||
defer wg.Done()
|
||
defer close(ch)
|
||
for j := 1; j <= 5; j++ {
|
||
ch <- id*10 + j
|
||
time.Sleep(20 * time.Millisecond)
|
||
}
|
||
fmt.Printf(" 数据源 %d 准备完成\\n", id)
|
||
}(i, dataChannels[i])
|
||
}
|
||
|
||
wg.Wait()
|
||
fmt.Printf(" 第一阶段完成\\n")
|
||
|
||
// 第二阶段:数据处理
|
||
fmt.Printf(" 第二阶段:数据处理\\n")
|
||
processedData := make([][]int, len(dataChannels))
|
||
|
||
for i, ch := range dataChannels {
|
||
wg.Add(1)
|
||
go func(id int, dataCh chan int) {
|
||
defer wg.Done()
|
||
var processed []int
|
||
for data := range dataCh {
|
||
processed = append(processed, data*2)
|
||
}
|
||
processedData[id] = processed
|
||
fmt.Printf(" 处理器 %d 完成\\n", id)
|
||
}(i, ch)
|
||
}
|
||
|
||
wg.Wait()
|
||
fmt.Printf(" 第二阶段完成\\n")
|
||
|
||
// 输出最终结果
|
||
for i, data := range processedData {
|
||
fmt.Printf(" 处理器 %d 结果: %v\\n", i, data)
|
||
}
|
||
|
||
fmt.Println()
|
||
}
|
||
|
||
// demonstrateOnce 演示 Once 单次执行
|
||
func demonstrateOnce() {
|
||
fmt.Println("4. Once 单次执行:")
|
||
|
||
// Once 的基本概念
|
||
fmt.Printf(" Once 的基本概念:\\n")
|
||
fmt.Printf(" - 确保函数只执行一次\\n")
|
||
fmt.Printf(" - 线程安全的单例模式实现\\n")
|
||
fmt.Printf(" - Do(func()) 方法执行函数\\n")
|
||
fmt.Printf(" - 常用于初始化操作\\n")
|
||
fmt.Printf(" - 零值可以直接使用\\n")
|
||
|
||
// 基本使用示例
|
||
fmt.Printf(" 基本使用示例:\\n")
|
||
|
||
var once sync.Once
|
||
var wg sync.WaitGroup
|
||
|
||
// 启动多个 goroutine 尝试执行初始化
|
||
for i := 1; i <= 5; i++ {
|
||
wg.Add(1)
|
||
go func(id int) {
|
||
defer wg.Done()
|
||
once.Do(func() {
|
||
fmt.Printf(" 初始化操作执行 (来自 goroutine %d)\\n", id)
|
||
time.Sleep(100 * time.Millisecond)
|
||
fmt.Printf(" 初始化操作完成\\n")
|
||
})
|
||
fmt.Printf(" Goroutine %d 继续执行\\n", id)
|
||
}(i)
|
||
}
|
||
|
||
wg.Wait()
|
||
|
||
// 单例模式示例
|
||
fmt.Printf(" 单例模式示例:\\n")
|
||
|
||
// 并发获取单例实例
|
||
for i := 1; i <= 5; i++ {
|
||
wg.Add(1)
|
||
go func(id int) {
|
||
defer wg.Done()
|
||
instance := GetDatabaseInstance()
|
||
fmt.Printf(" Goroutine %d 获取实例: %p\\n", id, instance)
|
||
}(i)
|
||
}
|
||
|
||
wg.Wait()
|
||
|
||
// 配置加载示例
|
||
fmt.Printf(" 配置加载示例:\\n")
|
||
|
||
configLoader := &ConfigLoader{}
|
||
|
||
// 多个 goroutine 同时加载配置
|
||
for i := 1; i <= 3; i++ {
|
||
wg.Add(1)
|
||
go func(id int) {
|
||
defer wg.Done()
|
||
config := configLoader.GetConfig()
|
||
fmt.Printf(" Goroutine %d 获取配置: %v\\n", id, config)
|
||
}(i)
|
||
}
|
||
|
||
wg.Wait()
|
||
|
||
// 资源初始化示例
|
||
fmt.Printf(" 资源初始化示例:\\n")
|
||
|
||
resource := &ExpensiveResource{}
|
||
|
||
// 多个 goroutine 使用资源
|
||
for i := 1; i <= 4; i++ {
|
||
wg.Add(1)
|
||
go func(id int) {
|
||
defer wg.Done()
|
||
result := resource.Process(fmt.Sprintf("数据-%d", id))
|
||
fmt.Printf(" Goroutine %d 处理结果: %s\\n", id, result)
|
||
}(i)
|
||
}
|
||
|
||
wg.Wait()
|
||
|
||
fmt.Println()
|
||
}
|
||
|
||
// demonstrateCond 演示 Cond 条件变量
|
||
func demonstrateCond() {
|
||
fmt.Println("5. Cond 条件变量:")
|
||
|
||
// Cond 的基本概念
|
||
fmt.Printf(" Cond 的基本概念:\\n")
|
||
fmt.Printf(" - 条件变量,用于等待或通知条件变化\\n")
|
||
fmt.Printf(" - 必须与 Mutex 或 RWMutex 配合使用\\n")
|
||
fmt.Printf(" - Wait() 等待条件满足\\n")
|
||
fmt.Printf(" - Signal() 唤醒一个等待者\\n")
|
||
fmt.Printf(" - Broadcast() 唤醒所有等待者\\n")
|
||
|
||
// 生产者-消费者示例
|
||
fmt.Printf(" 生产者-消费者示例:\\n")
|
||
|
||
buffer := &Buffer{
|
||
items: make([]int, 0, 5),
|
||
cond: sync.NewCond(&sync.Mutex{}),
|
||
}
|
||
|
||
var wg sync.WaitGroup
|
||
|
||
// 启动消费者
|
||
for i := 1; i <= 2; i++ {
|
||
wg.Add(1)
|
||
go func(id int) {
|
||
defer wg.Done()
|
||
for j := 0; j < 5; j++ {
|
||
item := buffer.Consume()
|
||
fmt.Printf(" 消费者 %d 消费: %d\\n", id, item)
|
||
time.Sleep(150 * time.Millisecond)
|
||
}
|
||
}(i)
|
||
}
|
||
|
||
// 启动生产者
|
||
for i := 1; i <= 2; i++ {
|
||
wg.Add(1)
|
||
go func(id int) {
|
||
defer wg.Done()
|
||
for j := 1; j <= 5; j++ {
|
||
item := id*10 + j
|
||
buffer.Produce(item)
|
||
fmt.Printf(" 生产者 %d 生产: %d\\n", id, item)
|
||
time.Sleep(100 * time.Millisecond)
|
||
}
|
||
}(i)
|
||
}
|
||
|
||
wg.Wait()
|
||
|
||
// 任务队列示例
|
||
fmt.Printf(" 任务队列示例:\\n")
|
||
|
||
taskQueue := &TaskQueue{
|
||
tasks: make([]Task, 0),
|
||
cond: sync.NewCond(&sync.Mutex{}),
|
||
}
|
||
|
||
// 启动工作者
|
||
for i := 1; i <= 3; i++ {
|
||
wg.Add(1)
|
||
go func(id int) {
|
||
defer wg.Done()
|
||
for j := 0; j < 3; j++ {
|
||
task := taskQueue.GetTask()
|
||
fmt.Printf(" 工作者 %d 执行任务: %s\\n", id, task.Name)
|
||
time.Sleep(100 * time.Millisecond)
|
||
}
|
||
}(i)
|
||
}
|
||
|
||
// 添加任务
|
||
go func() {
|
||
tasks := []Task{
|
||
{Name: "任务A"}, {Name: "任务B"}, {Name: "任务C"},
|
||
{Name: "任务D"}, {Name: "任务E"}, {Name: "任务F"},
|
||
{Name: "任务G"}, {Name: "任务H"}, {Name: "任务I"},
|
||
}
|
||
|
||
for _, task := range tasks {
|
||
taskQueue.AddTask(task)
|
||
time.Sleep(50 * time.Millisecond)
|
||
}
|
||
}()
|
||
|
||
wg.Wait()
|
||
|
||
fmt.Println()
|
||
}
|
||
|
||
// demonstratePool 演示 Pool 对象池
|
||
func demonstratePool() {
|
||
fmt.Println("6. Pool 对象池:")
|
||
|
||
// Pool 的基本概念
|
||
fmt.Printf(" Pool 的基本概念:\\n")
|
||
fmt.Printf(" - 对象池,用于复用对象减少 GC 压力\\n")
|
||
fmt.Printf(" - Get() 获取对象,Put() 归还对象\\n")
|
||
fmt.Printf(" - New 字段定义创建新对象的函数\\n")
|
||
fmt.Printf(" - 线程安全,可以并发使用\\n")
|
||
fmt.Printf(" - 适用于频繁创建销毁的对象\\n")
|
||
|
||
// 基本使用示例
|
||
fmt.Printf(" 基本使用示例:\\n")
|
||
|
||
// 创建字符串构建器池
|
||
builderPool := &sync.Pool{
|
||
New: func() interface{} {
|
||
fmt.Printf(" 创建新的 StringBuilder\\n")
|
||
return &StringBuilder{data: make([]string, 0, 10)}
|
||
},
|
||
}
|
||
|
||
var wg sync.WaitGroup
|
||
|
||
// 并发使用对象池
|
||
for i := 1; i <= 5; i++ {
|
||
wg.Add(1)
|
||
go func(id int) {
|
||
defer wg.Done()
|
||
|
||
// 从池中获取对象
|
||
builder := builderPool.Get().(*StringBuilder)
|
||
|
||
// 使用对象
|
||
builder.Append(fmt.Sprintf("Goroutine-%d", id))
|
||
builder.Append("处理数据")
|
||
result := builder.String()
|
||
|
||
fmt.Printf(" Goroutine %d 结果: %s\\n", id, result)
|
||
|
||
// 重置对象状态
|
||
builder.Reset()
|
||
|
||
// 归还对象到池中
|
||
builderPool.Put(builder)
|
||
}(i)
|
||
}
|
||
|
||
wg.Wait()
|
||
|
||
// 缓冲区池示例
|
||
fmt.Printf(" 缓冲区池示例:\\n")
|
||
|
||
bufferPool := &sync.Pool{
|
||
New: func() interface{} {
|
||
fmt.Printf(" 创建新的缓冲区\\n")
|
||
return make([]byte, 1024)
|
||
},
|
||
}
|
||
|
||
// 模拟网络请求处理
|
||
for i := 1; i <= 3; i++ {
|
||
wg.Add(1)
|
||
go func(id int) {
|
||
defer wg.Done()
|
||
|
||
// 获取缓冲区
|
||
buffer := bufferPool.Get().([]byte)
|
||
|
||
// 模拟数据处理
|
||
data := fmt.Sprintf("请求 %d 的数据", id)
|
||
copy(buffer, data)
|
||
|
||
fmt.Printf(" 处理请求 %d: %s\\n", id, string(buffer[:len(data)]))
|
||
|
||
// 清理缓冲区
|
||
for i := range buffer {
|
||
buffer[i] = 0
|
||
}
|
||
|
||
// 归还缓冲区
|
||
bufferPool.Put(buffer)
|
||
}(i)
|
||
}
|
||
|
||
wg.Wait()
|
||
|
||
// 连接池示例
|
||
fmt.Printf(" 连接池示例:\\n")
|
||
|
||
connPool := &ConnectionPool{
|
||
pool: &sync.Pool{
|
||
New: func() interface{} {
|
||
fmt.Printf(" 创建新的数据库连接\\n")
|
||
return &DBConn{ID: rand.Intn(1000)}
|
||
},
|
||
},
|
||
}
|
||
|
||
// 并发使用连接
|
||
for i := 1; i <= 4; i++ {
|
||
wg.Add(1)
|
||
go func(id int) {
|
||
defer wg.Done()
|
||
|
||
conn := connPool.GetConnection()
|
||
fmt.Printf(" Goroutine %d 使用连接 %d\\n", id, conn.ID)
|
||
|
||
// 模拟数据库操作
|
||
time.Sleep(100 * time.Millisecond)
|
||
|
||
connPool.PutConnection(conn)
|
||
}(i)
|
||
}
|
||
|
||
wg.Wait()
|
||
|
||
fmt.Println()
|
||
}
|
||
|
||
// demonstrateAtomic 演示原子操作
|
||
func demonstrateAtomic() {
|
||
fmt.Println("7. 原子操作:")
|
||
|
||
// 原子操作的基本概念
|
||
fmt.Printf(" 原子操作的基本概念:\\n")
|
||
fmt.Printf(" - 不可分割的操作,要么全部完成要么全部不完成\\n")
|
||
fmt.Printf(" - 无需加锁即可保证并发安全\\n")
|
||
fmt.Printf(" - 性能比互斥锁更好\\n")
|
||
fmt.Printf(" - 适用于简单的数值操作\\n")
|
||
fmt.Printf(" - sync/atomic 包提供原子操作函数\\n")
|
||
|
||
// 原子计数器示例
|
||
fmt.Printf(" 原子计数器示例:\\n")
|
||
|
||
var atomicCounter int64
|
||
var wg sync.WaitGroup
|
||
|
||
// 启动多个 goroutine 原子地增加计数器
|
||
for i := 0; i < 10; i++ {
|
||
wg.Add(1)
|
||
go func(id int) {
|
||
defer wg.Done()
|
||
for j := 0; j < 1000; j++ {
|
||
atomic.AddInt64(&atomicCounter, 1)
|
||
}
|
||
}(i)
|
||
}
|
||
|
||
wg.Wait()
|
||
fmt.Printf(" 原子计数器最终值: %d\\n", atomic.LoadInt64(&atomicCounter))
|
||
|
||
// 原子操作类型示例
|
||
fmt.Printf(" 原子操作类型示例:\\n")
|
||
|
||
var (
|
||
intVal int64 = 100
|
||
uintVal uint64 = 200
|
||
floatVal uint64 // 存储 float64 的位表示
|
||
boolVal int32 // 存储 bool 值
|
||
stringPtr unsafe.Pointer
|
||
)
|
||
|
||
// 整数原子操作
|
||
atomic.AddInt64(&intVal, 50)
|
||
fmt.Printf(" 整数加法: %d\\n", atomic.LoadInt64(&intVal))
|
||
|
||
old := atomic.SwapInt64(&intVal, 300)
|
||
fmt.Printf(" 整数交换: 旧值=%d, 新值=%d\\n", old, atomic.LoadInt64(&intVal))
|
||
|
||
// 比较并交换
|
||
swapped := atomic.CompareAndSwapInt64(&intVal, 300, 400)
|
||
fmt.Printf(" 比较并交换: 成功=%t, 当前值=%d\\n", swapped, atomic.LoadInt64(&intVal))
|
||
|
||
// 无符号整数操作
|
||
atomic.AddUint64(&uintVal, 100)
|
||
fmt.Printf(" 无符号整数: %d\\n", atomic.LoadUint64(&uintVal))
|
||
|
||
// 浮点数操作(通过位操作)
|
||
f := 3.14159
|
||
atomic.StoreUint64(&floatVal, math.Float64bits(f))
|
||
loadedFloat := math.Float64frombits(atomic.LoadUint64(&floatVal))
|
||
fmt.Printf(" 浮点数: %.5f\\n", loadedFloat)
|
||
|
||
// 布尔值操作
|
||
atomic.StoreInt32(&boolVal, 1) // true
|
||
fmt.Printf(" 布尔值: %t\\n", atomic.LoadInt32(&boolVal) != 0)
|
||
|
||
// 指针操作
|
||
str1 := "Hello"
|
||
str2 := "World"
|
||
atomic.StorePointer(&stringPtr, unsafe.Pointer(&str1))
|
||
loadedStr := (*string)(atomic.LoadPointer(&stringPtr))
|
||
fmt.Printf(" 指针字符串: %s\\n", *loadedStr)
|
||
|
||
atomic.StorePointer(&stringPtr, unsafe.Pointer(&str2))
|
||
loadedStr = (*string)(atomic.LoadPointer(&stringPtr))
|
||
fmt.Printf(" 指针字符串: %s\\n", *loadedStr)
|
||
|
||
// 原子值示例
|
||
fmt.Printf(" 原子值示例:\\n")
|
||
|
||
var atomicValue atomic.Value
|
||
|
||
// 存储不同类型的值
|
||
atomicValue.Store("字符串值")
|
||
fmt.Printf(" 原子值: %v\\n", atomicValue.Load())
|
||
|
||
atomicValue.Store(42)
|
||
fmt.Printf(" 原子值: %v\\n", atomicValue.Load())
|
||
|
||
atomicValue.Store(map[string]int{"key": 100})
|
||
fmt.Printf(" 原子值: %v\\n", atomicValue.Load())
|
||
|
||
// 性能对比示例
|
||
fmt.Printf(" 性能对比示例:\\n")
|
||
|
||
// 使用原子操作的计数器
|
||
atomicStats := &AtomicStats{}
|
||
start := time.Now()
|
||
|
||
for i := 0; i < 10; i++ {
|
||
wg.Add(1)
|
||
go func() {
|
||
defer wg.Done()
|
||
for j := 0; j < 10000; j++ {
|
||
atomicStats.Increment()
|
||
}
|
||
}()
|
||
}
|
||
wg.Wait()
|
||
|
||
atomicDuration := time.Since(start)
|
||
fmt.Printf(" 原子操作耗时: %v, 结果: %d\\n",
|
||
atomicDuration, atomicStats.Value())
|
||
|
||
// 使用互斥锁的计数器
|
||
mutexStats := &MutexStats{}
|
||
start = time.Now()
|
||
|
||
for i := 0; i < 10; i++ {
|
||
wg.Add(1)
|
||
go func() {
|
||
defer wg.Done()
|
||
for j := 0; j < 10000; j++ {
|
||
mutexStats.Increment()
|
||
}
|
||
}()
|
||
}
|
||
wg.Wait()
|
||
|
||
mutexDuration := time.Since(start)
|
||
fmt.Printf(" 互斥锁耗时: %v, 结果: %d\\n",
|
||
mutexDuration, mutexStats.Value())
|
||
|
||
fmt.Printf(" 性能提升: %.2fx\\n",
|
||
float64(mutexDuration)/float64(atomicDuration))
|
||
|
||
fmt.Println()
|
||
}
|
||
|
||
// ========== 类型定义和辅助函数 ==========
|
||
|
||
// SafeCounter 线程安全的计数器
|
||
type SafeCounter struct {
|
||
mu sync.Mutex
|
||
value int
|
||
}
|
||
|
||
func (c *SafeCounter) Increment() {
|
||
c.mu.Lock()
|
||
defer c.mu.Unlock()
|
||
c.value++
|
||
}
|
||
|
||
func (c *SafeCounter) Value() int {
|
||
c.mu.Lock()
|
||
defer c.mu.Unlock()
|
||
return c.value
|
||
}
|
||
|
||
// BankAccount 银行账户
|
||
type BankAccount struct {
|
||
mu sync.Mutex
|
||
name string
|
||
balance float64
|
||
}
|
||
|
||
func NewBankAccount(name string, balance float64) *BankAccount {
|
||
return &BankAccount{
|
||
name: name,
|
||
balance: balance,
|
||
}
|
||
}
|
||
|
||
func (a *BankAccount) Deposit(amount float64) {
|
||
a.mu.Lock()
|
||
defer a.mu.Unlock()
|
||
a.balance += amount
|
||
}
|
||
|
||
func (a *BankAccount) Withdraw(amount float64) bool {
|
||
a.mu.Lock()
|
||
defer a.mu.Unlock()
|
||
if a.balance >= amount {
|
||
a.balance -= amount
|
||
return true
|
||
}
|
||
return false
|
||
}
|
||
|
||
func (a *BankAccount) Balance() float64 {
|
||
a.mu.Lock()
|
||
defer a.mu.Unlock()
|
||
return a.balance
|
||
}
|
||
|
||
func (a *BankAccount) String() string {
|
||
a.mu.Lock()
|
||
defer a.mu.Unlock()
|
||
return fmt.Sprintf("%s: $%.2f", a.name, a.balance)
|
||
}
|
||
|
||
// Transfer 转账函数
|
||
func Transfer(from, to *BankAccount, amount float64) {
|
||
// 避免死锁:总是按照相同的顺序获取锁
|
||
if from == to {
|
||
return
|
||
}
|
||
|
||
// 使用地址比较确保锁的顺序
|
||
if uintptr(unsafe.Pointer(from)) < uintptr(unsafe.Pointer(to)) {
|
||
from.mu.Lock()
|
||
defer from.mu.Unlock()
|
||
to.mu.Lock()
|
||
defer to.mu.Unlock()
|
||
} else {
|
||
to.mu.Lock()
|
||
defer to.mu.Unlock()
|
||
from.mu.Lock()
|
||
defer from.mu.Unlock()
|
||
}
|
||
|
||
if from.balance >= amount {
|
||
from.balance -= amount
|
||
to.balance += amount
|
||
}
|
||
}
|
||
|
||
// Config 配置管理器
|
||
type Config struct {
|
||
mu sync.RWMutex
|
||
data map[string]string
|
||
}
|
||
|
||
func NewConfig() *Config {
|
||
return &Config{
|
||
data: make(map[string]string),
|
||
}
|
||
}
|
||
|
||
func (c *Config) Set(key, value string) {
|
||
c.mu.Lock()
|
||
defer c.mu.Unlock()
|
||
c.data[key] = value
|
||
}
|
||
|
||
func (c *Config) Get(key string) string {
|
||
c.mu.RLock()
|
||
defer c.mu.RUnlock()
|
||
return c.data[key]
|
||
}
|
||
|
||
// Cache 缓存系统
|
||
type Cache struct {
|
||
mu sync.RWMutex
|
||
data map[string]string
|
||
}
|
||
|
||
func NewCache() *Cache {
|
||
return &Cache{
|
||
data: make(map[string]string),
|
||
}
|
||
}
|
||
|
||
func (c *Cache) Set(key, value string) {
|
||
c.mu.Lock()
|
||
defer c.mu.Unlock()
|
||
c.data[key] = value
|
||
}
|
||
|
||
func (c *Cache) Get(key string) (string, bool) {
|
||
c.mu.RLock()
|
||
defer c.mu.RUnlock()
|
||
value, ok := c.data[key]
|
||
return value, ok
|
||
}
|
||
|
||
func (c *Cache) Size() int {
|
||
c.mu.RLock()
|
||
defer c.mu.RUnlock()
|
||
return len(c.data)
|
||
}
|
||
|
||
// fetchData 模拟数据获取
|
||
func fetchData(url string) string {
|
||
time.Sleep(time.Duration(rand.Intn(100)) * time.Millisecond)
|
||
return fmt.Sprintf("数据来自 %s", url)
|
||
}
|
||
|
||
// 单例模式相关
|
||
var (
|
||
dbInstance *Database
|
||
dbOnce sync.Once
|
||
)
|
||
|
||
type Database struct {
|
||
connectionString string
|
||
}
|
||
|
||
func GetDatabaseInstance() *Database {
|
||
dbOnce.Do(func() {
|
||
fmt.Printf(" 创建数据库实例\\n")
|
||
time.Sleep(100 * time.Millisecond) // 模拟初始化时间
|
||
dbInstance = &Database{
|
||
connectionString: "localhost:5432",
|
||
}
|
||
})
|
||
return dbInstance
|
||
}
|
||
|
||
// ConfigLoader 配置加载器
|
||
type ConfigLoader struct {
|
||
once sync.Once
|
||
config map[string]string
|
||
}
|
||
|
||
func (cl *ConfigLoader) GetConfig() map[string]string {
|
||
cl.once.Do(func() {
|
||
fmt.Printf(" 加载配置文件\\n")
|
||
time.Sleep(50 * time.Millisecond)
|
||
cl.config = map[string]string{
|
||
"database_url": "localhost:5432",
|
||
"redis_url": "localhost:6379",
|
||
"log_level": "info",
|
||
}
|
||
})
|
||
return cl.config
|
||
}
|
||
|
||
// ExpensiveResource 昂贵资源
|
||
type ExpensiveResource struct {
|
||
once sync.Once
|
||
initialized bool
|
||
data []string
|
||
}
|
||
|
||
func (er *ExpensiveResource) init() {
|
||
fmt.Printf(" 初始化昂贵资源\\n")
|
||
time.Sleep(200 * time.Millisecond)
|
||
er.data = []string{"资源1", "资源2", "资源3"}
|
||
er.initialized = true
|
||
}
|
||
|
||
func (er *ExpensiveResource) Process(input string) string {
|
||
er.once.Do(er.init)
|
||
return fmt.Sprintf("处理 %s 使用 %v", input, er.data)
|
||
}
|
||
|
||
// Buffer 缓冲区(生产者-消费者)
|
||
type Buffer struct {
|
||
mu sync.Mutex
|
||
cond *sync.Cond
|
||
items []int
|
||
size int
|
||
}
|
||
|
||
func (b *Buffer) Produce(item int) {
|
||
b.cond.L.Lock()
|
||
defer b.cond.L.Unlock()
|
||
|
||
// 等待缓冲区有空间
|
||
for len(b.items) >= 5 {
|
||
b.cond.Wait()
|
||
}
|
||
|
||
b.items = append(b.items, item)
|
||
b.cond.Signal() // 通知消费者
|
||
}
|
||
|
||
func (b *Buffer) Consume() int {
|
||
b.cond.L.Lock()
|
||
defer b.cond.L.Unlock()
|
||
|
||
// 等待缓冲区有数据
|
||
for len(b.items) == 0 {
|
||
b.cond.Wait()
|
||
}
|
||
|
||
item := b.items[0]
|
||
b.items = b.items[1:]
|
||
b.cond.Signal() // 通知生产者
|
||
return item
|
||
}
|
||
|
||
// Task 任务
|
||
type Task struct {
|
||
Name string
|
||
}
|
||
|
||
// TaskQueue 任务队列
|
||
type TaskQueue struct {
|
||
mu sync.Mutex
|
||
cond *sync.Cond
|
||
tasks []Task
|
||
}
|
||
|
||
func (tq *TaskQueue) AddTask(task Task) {
|
||
tq.cond.L.Lock()
|
||
defer tq.cond.L.Unlock()
|
||
|
||
tq.tasks = append(tq.tasks, task)
|
||
tq.cond.Signal() // 通知等待的工作者
|
||
}
|
||
|
||
func (tq *TaskQueue) GetTask() Task {
|
||
tq.cond.L.Lock()
|
||
defer tq.cond.L.Unlock()
|
||
|
||
// 等待任务可用
|
||
for len(tq.tasks) == 0 {
|
||
tq.cond.Wait()
|
||
}
|
||
|
||
task := tq.tasks[0]
|
||
tq.tasks = tq.tasks[1:]
|
||
return task
|
||
}
|
||
|
||
// StringBuilder 字符串构建器
|
||
type StringBuilder struct {
|
||
data []string
|
||
}
|
||
|
||
func (sb *StringBuilder) Append(s string) {
|
||
sb.data = append(sb.data, s)
|
||
}
|
||
|
||
func (sb *StringBuilder) String() string {
|
||
return strings.Join(sb.data, " ")
|
||
}
|
||
|
||
func (sb *StringBuilder) Reset() {
|
||
sb.data = sb.data[:0]
|
||
}
|
||
|
||
// ConnectionPool 连接池
|
||
type ConnectionPool struct {
|
||
pool *sync.Pool
|
||
}
|
||
|
||
type DBConn struct {
|
||
ID int
|
||
}
|
||
|
||
func (cp *ConnectionPool) GetConnection() *DBConn {
|
||
return cp.pool.Get().(*DBConn)
|
||
}
|
||
|
||
func (cp *ConnectionPool) PutConnection(conn *DBConn) {
|
||
cp.pool.Put(conn)
|
||
}
|
||
|
||
// AtomicStats 原子统计
|
||
type AtomicStats struct {
|
||
counter int64
|
||
}
|
||
|
||
func (as *AtomicStats) Increment() {
|
||
atomic.AddInt64(&as.counter, 1)
|
||
}
|
||
|
||
func (as *AtomicStats) Value() int64 {
|
||
return atomic.LoadInt64(&as.counter)
|
||
}
|
||
|
||
// MutexStats 互斥锁统计
|
||
type MutexStats struct {
|
||
mu sync.Mutex
|
||
counter int64
|
||
}
|
||
|
||
func (ms *MutexStats) Increment() {
|
||
ms.mu.Lock()
|
||
defer ms.mu.Unlock()
|
||
ms.counter++
|
||
}
|
||
|
||
func (ms *MutexStats) Value() int64 {
|
||
ms.mu.Lock()
|
||
defer ms.mu.Unlock()
|
||
return ms.counter
|
||
}
|
||
|
||
/*
|
||
运行这个程序:
|
||
go run 04-sync-package.go
|
||
|
||
学习要点:
|
||
1. sync 包提供了多种同步原语
|
||
2. Mutex 用于互斥访问,RWMutex 适用于读多写少
|
||
3. WaitGroup 用于等待一组 goroutine 完成
|
||
4. Once 确保函数只执行一次
|
||
5. Cond 用于条件等待和通知
|
||
|
||
Sync 包组件:
|
||
1. Mutex:互斥锁,同时只允许一个 goroutine 访问
|
||
2. RWMutex:读写锁,允许多个读者或一个写者
|
||
3. WaitGroup:等待组,等待一组 goroutine 完成
|
||
4. Once:单次执行,确保函数只执行一次
|
||
5. Cond:条件变量,用于等待条件变化
|
||
6. Pool:对象池,复用对象减少 GC 压力
|
||
|
||
使用场景:
|
||
1. Mutex:保护共享资源,防止竞态条件
|
||
2. RWMutex:读多写少的场景,如配置管理、缓存
|
||
3. WaitGroup:批量处理、分阶段处理
|
||
4. Once:单例模式、资源初始化
|
||
5. Cond:生产者-消费者、任务队列
|
||
6. Pool:频繁创建销毁的对象
|
||
|
||
原子操作:
|
||
1. 无锁操作,性能更好
|
||
2. 适用于简单的数值操作
|
||
3. 支持整数、指针、布尔值等类型
|
||
4. 提供加法、交换、比较并交换等操作
|
||
5. atomic.Value 支持任意类型
|
||
|
||
最佳实践:
|
||
1. 优先使用 channel 进行通信
|
||
2. 必要时使用 sync 包的同步原语
|
||
3. 避免死锁,注意锁的顺序
|
||
4. 使用 defer 确保锁被释放
|
||
5. 合理选择同步机制
|
||
|
||
性能考虑:
|
||
1. 原子操作 > RWMutex > Mutex > Channel
|
||
2. 读写锁适用于读多写少
|
||
3. 对象池可以减少 GC 压力
|
||
4. 避免锁竞争和锁粒度过大
|
||
5. 合理使用缓冲区
|
||
|
||
注意事项:
|
||
1. 不要复制包含锁的结构体
|
||
2. 避免在持有锁时调用可能阻塞的操作
|
||
3. 注意死锁的产生条件
|
||
4. Cond 必须与锁配合使用
|
||
5. Pool 中的对象可能被 GC 回收
|
||
*/
|