Files
golang/golang-learning/06-concurrency/04-sync-package.go
2025-08-24 11:24:52 +08:00

1184 lines
26 KiB
Go
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

/*
04-sync-package.go - Go 语言 Sync 包详解
学习目标:
1. 理解 sync 包的作用和重要性
2. 掌握 Mutex 和 RWMutex 的使用
3. 学会使用 WaitGroup 进行同步
4. 了解 Once 的使用场景
5. 掌握 Cond 条件变量的用法
知识点:
- sync.Mutex 互斥锁
- sync.RWMutex 读写锁
- sync.WaitGroup 等待组
- sync.Once 单次执行
- sync.Cond 条件变量
- sync.Pool 对象池
- 原子操作 sync/atomic
*/
package main
import (
"fmt"
"math"
"math/rand"
"strings"
"sync"
"sync/atomic"
"time"
"unsafe"
)
func main() {
fmt.Println("=== Go 语言 Sync 包详解 ===\\n")
// 演示 Mutex 互斥锁
demonstrateMutex()
// 演示 RWMutex 读写锁
demonstrateRWMutex()
// 演示 WaitGroup 等待组
demonstrateWaitGroup()
// 演示 Once 单次执行
demonstrateOnce()
// 演示 Cond 条件变量
demonstrateCond()
// 演示 Pool 对象池
demonstratePool()
// 演示原子操作
demonstrateAtomic()
}
// demonstrateMutex 演示 Mutex 互斥锁
func demonstrateMutex() {
fmt.Println("1. Mutex 互斥锁:")
// Mutex 的基本概念
fmt.Printf(" Mutex 的基本概念:\\n")
fmt.Printf(" - 互斥锁,同时只允许一个 goroutine 访问共享资源\\n")
fmt.Printf(" - 用于保护临界区,防止竞态条件\\n")
fmt.Printf(" - Lock() 获取锁Unlock() 释放锁\\n")
fmt.Printf(" - 零值可以直接使用\\n")
fmt.Printf(" - 不可重入,同一 goroutine 不能重复加锁\\n")
// 不使用锁的竞态条件示例
fmt.Printf(" 不使用锁的竞态条件示例:\\n")
var unsafeCounter int
var wg sync.WaitGroup
// 启动多个 goroutine 增加计数器
for i := 0; i < 10; i++ {
wg.Add(1)
go func(id int) {
defer wg.Done()
for j := 0; j < 1000; j++ {
unsafeCounter++ // 竞态条件
}
}(i)
}
wg.Wait()
fmt.Printf(" 不安全计数器最终值: %d (期望: 10000)\\n", unsafeCounter)
// 使用 Mutex 保护共享资源
fmt.Printf(" 使用 Mutex 保护共享资源:\\n")
safeCounter := &SafeCounter{}
// 启动多个 goroutine 安全地增加计数器
for i := 0; i < 10; i++ {
wg.Add(1)
go func(id int) {
defer wg.Done()
for j := 0; j < 1000; j++ {
safeCounter.Increment()
}
}(i)
}
wg.Wait()
fmt.Printf(" 安全计数器最终值: %d\\n", safeCounter.Value())
// 银行账户转账示例
fmt.Printf(" 银行账户转账示例:\\n")
account1 := NewBankAccount("Alice", 1000)
account2 := NewBankAccount("Bob", 500)
fmt.Printf(" 转账前: %s\\n", account1.String())
fmt.Printf(" 转账前: %s\\n", account2.String())
// 并发转账
for i := 0; i < 5; i++ {
wg.Add(2)
go func() {
defer wg.Done()
Transfer(account1, account2, 50)
}()
go func() {
defer wg.Done()
Transfer(account2, account1, 30)
}()
}
wg.Wait()
fmt.Printf(" 转账后: %s\\n", account1.String())
fmt.Printf(" 转账后: %s\\n", account2.String())
fmt.Printf(" 总金额: %.2f (应该保持不变)\\n",
account1.Balance()+account2.Balance())
fmt.Println()
}
// demonstrateRWMutex 演示 RWMutex 读写锁
func demonstrateRWMutex() {
fmt.Println("2. RWMutex 读写锁:")
// RWMutex 的基本概念
fmt.Printf(" RWMutex 的基本概念:\\n")
fmt.Printf(" - 读写锁,允许多个读者或一个写者\\n")
fmt.Printf(" - RLock()/RUnlock() 用于读操作\\n")
fmt.Printf(" - Lock()/Unlock() 用于写操作\\n")
fmt.Printf(" - 读操作可以并发,写操作互斥\\n")
fmt.Printf(" - 适用于读多写少的场景\\n")
// 配置管理示例
fmt.Printf(" 配置管理示例:\\n")
config := NewConfig()
// 设置初始配置
config.Set("database_url", "localhost:5432")
config.Set("max_connections", "100")
config.Set("timeout", "30s")
var wg sync.WaitGroup
// 启动多个读者
for i := 0; i < 10; i++ {
wg.Add(1)
go func(id int) {
defer wg.Done()
for j := 0; j < 5; j++ {
url := config.Get("database_url")
fmt.Printf(" 读者 %d: database_url = %s\\n", id, url)
time.Sleep(10 * time.Millisecond)
}
}(i)
}
// 启动少量写者
for i := 0; i < 2; i++ {
wg.Add(1)
go func(id int) {
defer wg.Done()
time.Sleep(50 * time.Millisecond)
newURL := fmt.Sprintf("server%d:5432", id)
config.Set("database_url", newURL)
fmt.Printf(" 写者 %d: 更新 database_url = %s\\n", id, newURL)
}(i)
}
wg.Wait()
// 缓存系统示例
fmt.Printf(" 缓存系统示例:\\n")
cache := NewCache()
// 预填充缓存
cache.Set("user:1", "Alice")
cache.Set("user:2", "Bob")
cache.Set("user:3", "Charlie")
// 并发读写测试
for i := 0; i < 5; i++ {
wg.Add(2)
// 读操作
go func(id int) {
defer wg.Done()
key := fmt.Sprintf("user:%d", (id%3)+1)
if value, ok := cache.Get(key); ok {
fmt.Printf(" 缓存读取: %s = %s\\n", key, value)
}
}(i)
// 写操作
go func(id int) {
defer wg.Done()
key := fmt.Sprintf("user:%d", id+4)
value := fmt.Sprintf("User%d", id+4)
cache.Set(key, value)
fmt.Printf(" 缓存写入: %s = %s\\n", key, value)
}(i)
}
wg.Wait()
fmt.Printf(" 缓存大小: %d\\n", cache.Size())
fmt.Println()
}
// demonstrateWaitGroup 演示 WaitGroup 等待组
func demonstrateWaitGroup() {
fmt.Println("3. WaitGroup 等待组:")
// WaitGroup 的基本概念
fmt.Printf(" WaitGroup 的基本概念:\\n")
fmt.Printf(" - 用于等待一组 goroutine 完成\\n")
fmt.Printf(" - Add(n) 增加等待计数\\n")
fmt.Printf(" - Done() 减少等待计数\\n")
fmt.Printf(" - Wait() 阻塞直到计数为零\\n")
fmt.Printf(" - 零值可以直接使用\\n")
// 基本使用示例
fmt.Printf(" 基本使用示例:\\n")
var wg sync.WaitGroup
// 启动多个工作 goroutine
for i := 1; i <= 5; i++ {
wg.Add(1)
go func(id int) {
defer wg.Done()
fmt.Printf(" 工作者 %d 开始工作\\n", id)
time.Sleep(time.Duration(rand.Intn(200)) * time.Millisecond)
fmt.Printf(" 工作者 %d 完成工作\\n", id)
}(i)
}
fmt.Printf(" 等待所有工作者完成...\\n")
wg.Wait()
fmt.Printf(" 所有工作者已完成\\n")
// 批量处理示例
fmt.Printf(" 批量处理示例:\\n")
urls := []string{
"https://api.example.com/users",
"https://api.example.com/orders",
"https://api.example.com/products",
"https://api.example.com/categories",
"https://api.example.com/reviews",
}
results := make([]string, len(urls))
for i, url := range urls {
wg.Add(1)
go func(index int, endpoint string) {
defer wg.Done()
result := fetchData(endpoint)
results[index] = result
}(i, url)
}
wg.Wait()
fmt.Printf(" 批量处理结果:\\n")
for i, result := range results {
fmt.Printf(" %s: %s\\n", urls[i], result)
}
// 分阶段处理示例
fmt.Printf(" 分阶段处理示例:\\n")
// 第一阶段:数据准备
fmt.Printf(" 第一阶段:数据准备\\n")
dataChannels := make([]chan int, 3)
for i := range dataChannels {
dataChannels[i] = make(chan int, 10)
wg.Add(1)
go func(id int, ch chan int) {
defer wg.Done()
defer close(ch)
for j := 1; j <= 5; j++ {
ch <- id*10 + j
time.Sleep(20 * time.Millisecond)
}
fmt.Printf(" 数据源 %d 准备完成\\n", id)
}(i, dataChannels[i])
}
wg.Wait()
fmt.Printf(" 第一阶段完成\\n")
// 第二阶段:数据处理
fmt.Printf(" 第二阶段:数据处理\\n")
processedData := make([][]int, len(dataChannels))
for i, ch := range dataChannels {
wg.Add(1)
go func(id int, dataCh chan int) {
defer wg.Done()
var processed []int
for data := range dataCh {
processed = append(processed, data*2)
}
processedData[id] = processed
fmt.Printf(" 处理器 %d 完成\\n", id)
}(i, ch)
}
wg.Wait()
fmt.Printf(" 第二阶段完成\\n")
// 输出最终结果
for i, data := range processedData {
fmt.Printf(" 处理器 %d 结果: %v\\n", i, data)
}
fmt.Println()
}
// demonstrateOnce 演示 Once 单次执行
func demonstrateOnce() {
fmt.Println("4. Once 单次执行:")
// Once 的基本概念
fmt.Printf(" Once 的基本概念:\\n")
fmt.Printf(" - 确保函数只执行一次\\n")
fmt.Printf(" - 线程安全的单例模式实现\\n")
fmt.Printf(" - Do(func()) 方法执行函数\\n")
fmt.Printf(" - 常用于初始化操作\\n")
fmt.Printf(" - 零值可以直接使用\\n")
// 基本使用示例
fmt.Printf(" 基本使用示例:\\n")
var once sync.Once
var wg sync.WaitGroup
// 启动多个 goroutine 尝试执行初始化
for i := 1; i <= 5; i++ {
wg.Add(1)
go func(id int) {
defer wg.Done()
once.Do(func() {
fmt.Printf(" 初始化操作执行 (来自 goroutine %d)\\n", id)
time.Sleep(100 * time.Millisecond)
fmt.Printf(" 初始化操作完成\\n")
})
fmt.Printf(" Goroutine %d 继续执行\\n", id)
}(i)
}
wg.Wait()
// 单例模式示例
fmt.Printf(" 单例模式示例:\\n")
// 并发获取单例实例
for i := 1; i <= 5; i++ {
wg.Add(1)
go func(id int) {
defer wg.Done()
instance := GetDatabaseInstance()
fmt.Printf(" Goroutine %d 获取实例: %p\\n", id, instance)
}(i)
}
wg.Wait()
// 配置加载示例
fmt.Printf(" 配置加载示例:\\n")
configLoader := &ConfigLoader{}
// 多个 goroutine 同时加载配置
for i := 1; i <= 3; i++ {
wg.Add(1)
go func(id int) {
defer wg.Done()
config := configLoader.GetConfig()
fmt.Printf(" Goroutine %d 获取配置: %v\\n", id, config)
}(i)
}
wg.Wait()
// 资源初始化示例
fmt.Printf(" 资源初始化示例:\\n")
resource := &ExpensiveResource{}
// 多个 goroutine 使用资源
for i := 1; i <= 4; i++ {
wg.Add(1)
go func(id int) {
defer wg.Done()
result := resource.Process(fmt.Sprintf("数据-%d", id))
fmt.Printf(" Goroutine %d 处理结果: %s\\n", id, result)
}(i)
}
wg.Wait()
fmt.Println()
}
// demonstrateCond 演示 Cond 条件变量
func demonstrateCond() {
fmt.Println("5. Cond 条件变量:")
// Cond 的基本概念
fmt.Printf(" Cond 的基本概念:\\n")
fmt.Printf(" - 条件变量,用于等待或通知条件变化\\n")
fmt.Printf(" - 必须与 Mutex 或 RWMutex 配合使用\\n")
fmt.Printf(" - Wait() 等待条件满足\\n")
fmt.Printf(" - Signal() 唤醒一个等待者\\n")
fmt.Printf(" - Broadcast() 唤醒所有等待者\\n")
// 生产者-消费者示例
fmt.Printf(" 生产者-消费者示例:\\n")
buffer := &Buffer{
items: make([]int, 0, 5),
cond: sync.NewCond(&sync.Mutex{}),
}
var wg sync.WaitGroup
// 启动消费者
for i := 1; i <= 2; i++ {
wg.Add(1)
go func(id int) {
defer wg.Done()
for j := 0; j < 5; j++ {
item := buffer.Consume()
fmt.Printf(" 消费者 %d 消费: %d\\n", id, item)
time.Sleep(150 * time.Millisecond)
}
}(i)
}
// 启动生产者
for i := 1; i <= 2; i++ {
wg.Add(1)
go func(id int) {
defer wg.Done()
for j := 1; j <= 5; j++ {
item := id*10 + j
buffer.Produce(item)
fmt.Printf(" 生产者 %d 生产: %d\\n", id, item)
time.Sleep(100 * time.Millisecond)
}
}(i)
}
wg.Wait()
// 任务队列示例
fmt.Printf(" 任务队列示例:\\n")
taskQueue := &TaskQueue{
tasks: make([]Task, 0),
cond: sync.NewCond(&sync.Mutex{}),
}
// 启动工作者
for i := 1; i <= 3; i++ {
wg.Add(1)
go func(id int) {
defer wg.Done()
for j := 0; j < 3; j++ {
task := taskQueue.GetTask()
fmt.Printf(" 工作者 %d 执行任务: %s\\n", id, task.Name)
time.Sleep(100 * time.Millisecond)
}
}(i)
}
// 添加任务
go func() {
tasks := []Task{
{Name: "任务A"}, {Name: "任务B"}, {Name: "任务C"},
{Name: "任务D"}, {Name: "任务E"}, {Name: "任务F"},
{Name: "任务G"}, {Name: "任务H"}, {Name: "任务I"},
}
for _, task := range tasks {
taskQueue.AddTask(task)
time.Sleep(50 * time.Millisecond)
}
}()
wg.Wait()
fmt.Println()
}
// demonstratePool 演示 Pool 对象池
func demonstratePool() {
fmt.Println("6. Pool 对象池:")
// Pool 的基本概念
fmt.Printf(" Pool 的基本概念:\\n")
fmt.Printf(" - 对象池,用于复用对象减少 GC 压力\\n")
fmt.Printf(" - Get() 获取对象Put() 归还对象\\n")
fmt.Printf(" - New 字段定义创建新对象的函数\\n")
fmt.Printf(" - 线程安全,可以并发使用\\n")
fmt.Printf(" - 适用于频繁创建销毁的对象\\n")
// 基本使用示例
fmt.Printf(" 基本使用示例:\\n")
// 创建字符串构建器池
builderPool := &sync.Pool{
New: func() interface{} {
fmt.Printf(" 创建新的 StringBuilder\\n")
return &StringBuilder{data: make([]string, 0, 10)}
},
}
var wg sync.WaitGroup
// 并发使用对象池
for i := 1; i <= 5; i++ {
wg.Add(1)
go func(id int) {
defer wg.Done()
// 从池中获取对象
builder := builderPool.Get().(*StringBuilder)
// 使用对象
builder.Append(fmt.Sprintf("Goroutine-%d", id))
builder.Append("处理数据")
result := builder.String()
fmt.Printf(" Goroutine %d 结果: %s\\n", id, result)
// 重置对象状态
builder.Reset()
// 归还对象到池中
builderPool.Put(builder)
}(i)
}
wg.Wait()
// 缓冲区池示例
fmt.Printf(" 缓冲区池示例:\\n")
bufferPool := &sync.Pool{
New: func() interface{} {
fmt.Printf(" 创建新的缓冲区\\n")
return make([]byte, 1024)
},
}
// 模拟网络请求处理
for i := 1; i <= 3; i++ {
wg.Add(1)
go func(id int) {
defer wg.Done()
// 获取缓冲区
buffer := bufferPool.Get().([]byte)
// 模拟数据处理
data := fmt.Sprintf("请求 %d 的数据", id)
copy(buffer, data)
fmt.Printf(" 处理请求 %d: %s\\n", id, string(buffer[:len(data)]))
// 清理缓冲区
for i := range buffer {
buffer[i] = 0
}
// 归还缓冲区
bufferPool.Put(buffer)
}(i)
}
wg.Wait()
// 连接池示例
fmt.Printf(" 连接池示例:\\n")
connPool := &ConnectionPool{
pool: &sync.Pool{
New: func() interface{} {
fmt.Printf(" 创建新的数据库连接\\n")
return &DBConn{ID: rand.Intn(1000)}
},
},
}
// 并发使用连接
for i := 1; i <= 4; i++ {
wg.Add(1)
go func(id int) {
defer wg.Done()
conn := connPool.GetConnection()
fmt.Printf(" Goroutine %d 使用连接 %d\\n", id, conn.ID)
// 模拟数据库操作
time.Sleep(100 * time.Millisecond)
connPool.PutConnection(conn)
}(i)
}
wg.Wait()
fmt.Println()
}
// demonstrateAtomic 演示原子操作
func demonstrateAtomic() {
fmt.Println("7. 原子操作:")
// 原子操作的基本概念
fmt.Printf(" 原子操作的基本概念:\\n")
fmt.Printf(" - 不可分割的操作,要么全部完成要么全部不完成\\n")
fmt.Printf(" - 无需加锁即可保证并发安全\\n")
fmt.Printf(" - 性能比互斥锁更好\\n")
fmt.Printf(" - 适用于简单的数值操作\\n")
fmt.Printf(" - sync/atomic 包提供原子操作函数\\n")
// 原子计数器示例
fmt.Printf(" 原子计数器示例:\\n")
var atomicCounter int64
var wg sync.WaitGroup
// 启动多个 goroutine 原子地增加计数器
for i := 0; i < 10; i++ {
wg.Add(1)
go func(id int) {
defer wg.Done()
for j := 0; j < 1000; j++ {
atomic.AddInt64(&atomicCounter, 1)
}
}(i)
}
wg.Wait()
fmt.Printf(" 原子计数器最终值: %d\\n", atomic.LoadInt64(&atomicCounter))
// 原子操作类型示例
fmt.Printf(" 原子操作类型示例:\\n")
var (
intVal int64 = 100
uintVal uint64 = 200
floatVal uint64 // 存储 float64 的位表示
boolVal int32 // 存储 bool 值
stringPtr unsafe.Pointer
)
// 整数原子操作
atomic.AddInt64(&intVal, 50)
fmt.Printf(" 整数加法: %d\\n", atomic.LoadInt64(&intVal))
old := atomic.SwapInt64(&intVal, 300)
fmt.Printf(" 整数交换: 旧值=%d, 新值=%d\\n", old, atomic.LoadInt64(&intVal))
// 比较并交换
swapped := atomic.CompareAndSwapInt64(&intVal, 300, 400)
fmt.Printf(" 比较并交换: 成功=%t, 当前值=%d\\n", swapped, atomic.LoadInt64(&intVal))
// 无符号整数操作
atomic.AddUint64(&uintVal, 100)
fmt.Printf(" 无符号整数: %d\\n", atomic.LoadUint64(&uintVal))
// 浮点数操作(通过位操作)
f := 3.14159
atomic.StoreUint64(&floatVal, math.Float64bits(f))
loadedFloat := math.Float64frombits(atomic.LoadUint64(&floatVal))
fmt.Printf(" 浮点数: %.5f\\n", loadedFloat)
// 布尔值操作
atomic.StoreInt32(&boolVal, 1) // true
fmt.Printf(" 布尔值: %t\\n", atomic.LoadInt32(&boolVal) != 0)
// 指针操作
str1 := "Hello"
str2 := "World"
atomic.StorePointer(&stringPtr, unsafe.Pointer(&str1))
loadedStr := (*string)(atomic.LoadPointer(&stringPtr))
fmt.Printf(" 指针字符串: %s\\n", *loadedStr)
atomic.StorePointer(&stringPtr, unsafe.Pointer(&str2))
loadedStr = (*string)(atomic.LoadPointer(&stringPtr))
fmt.Printf(" 指针字符串: %s\\n", *loadedStr)
// 原子值示例
fmt.Printf(" 原子值示例:\\n")
var atomicValue atomic.Value
// 存储不同类型的值
atomicValue.Store("字符串值")
fmt.Printf(" 原子值: %v\\n", atomicValue.Load())
atomicValue.Store(42)
fmt.Printf(" 原子值: %v\\n", atomicValue.Load())
atomicValue.Store(map[string]int{"key": 100})
fmt.Printf(" 原子值: %v\\n", atomicValue.Load())
// 性能对比示例
fmt.Printf(" 性能对比示例:\\n")
// 使用原子操作的计数器
atomicStats := &AtomicStats{}
start := time.Now()
for i := 0; i < 10; i++ {
wg.Add(1)
go func() {
defer wg.Done()
for j := 0; j < 10000; j++ {
atomicStats.Increment()
}
}()
}
wg.Wait()
atomicDuration := time.Since(start)
fmt.Printf(" 原子操作耗时: %v, 结果: %d\\n",
atomicDuration, atomicStats.Value())
// 使用互斥锁的计数器
mutexStats := &MutexStats{}
start = time.Now()
for i := 0; i < 10; i++ {
wg.Add(1)
go func() {
defer wg.Done()
for j := 0; j < 10000; j++ {
mutexStats.Increment()
}
}()
}
wg.Wait()
mutexDuration := time.Since(start)
fmt.Printf(" 互斥锁耗时: %v, 结果: %d\\n",
mutexDuration, mutexStats.Value())
fmt.Printf(" 性能提升: %.2fx\\n",
float64(mutexDuration)/float64(atomicDuration))
fmt.Println()
}
// ========== 类型定义和辅助函数 ==========
// SafeCounter 线程安全的计数器
type SafeCounter struct {
mu sync.Mutex
value int
}
func (c *SafeCounter) Increment() {
c.mu.Lock()
defer c.mu.Unlock()
c.value++
}
func (c *SafeCounter) Value() int {
c.mu.Lock()
defer c.mu.Unlock()
return c.value
}
// BankAccount 银行账户
type BankAccount struct {
mu sync.Mutex
name string
balance float64
}
func NewBankAccount(name string, balance float64) *BankAccount {
return &BankAccount{
name: name,
balance: balance,
}
}
func (a *BankAccount) Deposit(amount float64) {
a.mu.Lock()
defer a.mu.Unlock()
a.balance += amount
}
func (a *BankAccount) Withdraw(amount float64) bool {
a.mu.Lock()
defer a.mu.Unlock()
if a.balance >= amount {
a.balance -= amount
return true
}
return false
}
func (a *BankAccount) Balance() float64 {
a.mu.Lock()
defer a.mu.Unlock()
return a.balance
}
func (a *BankAccount) String() string {
a.mu.Lock()
defer a.mu.Unlock()
return fmt.Sprintf("%s: $%.2f", a.name, a.balance)
}
// Transfer 转账函数
func Transfer(from, to *BankAccount, amount float64) {
// 避免死锁:总是按照相同的顺序获取锁
if from == to {
return
}
// 使用地址比较确保锁的顺序
if uintptr(unsafe.Pointer(from)) < uintptr(unsafe.Pointer(to)) {
from.mu.Lock()
defer from.mu.Unlock()
to.mu.Lock()
defer to.mu.Unlock()
} else {
to.mu.Lock()
defer to.mu.Unlock()
from.mu.Lock()
defer from.mu.Unlock()
}
if from.balance >= amount {
from.balance -= amount
to.balance += amount
}
}
// Config 配置管理器
type Config struct {
mu sync.RWMutex
data map[string]string
}
func NewConfig() *Config {
return &Config{
data: make(map[string]string),
}
}
func (c *Config) Set(key, value string) {
c.mu.Lock()
defer c.mu.Unlock()
c.data[key] = value
}
func (c *Config) Get(key string) string {
c.mu.RLock()
defer c.mu.RUnlock()
return c.data[key]
}
// Cache 缓存系统
type Cache struct {
mu sync.RWMutex
data map[string]string
}
func NewCache() *Cache {
return &Cache{
data: make(map[string]string),
}
}
func (c *Cache) Set(key, value string) {
c.mu.Lock()
defer c.mu.Unlock()
c.data[key] = value
}
func (c *Cache) Get(key string) (string, bool) {
c.mu.RLock()
defer c.mu.RUnlock()
value, ok := c.data[key]
return value, ok
}
func (c *Cache) Size() int {
c.mu.RLock()
defer c.mu.RUnlock()
return len(c.data)
}
// fetchData 模拟数据获取
func fetchData(url string) string {
time.Sleep(time.Duration(rand.Intn(100)) * time.Millisecond)
return fmt.Sprintf("数据来自 %s", url)
}
// 单例模式相关
var (
dbInstance *Database
dbOnce sync.Once
)
type Database struct {
connectionString string
}
func GetDatabaseInstance() *Database {
dbOnce.Do(func() {
fmt.Printf(" 创建数据库实例\\n")
time.Sleep(100 * time.Millisecond) // 模拟初始化时间
dbInstance = &Database{
connectionString: "localhost:5432",
}
})
return dbInstance
}
// ConfigLoader 配置加载器
type ConfigLoader struct {
once sync.Once
config map[string]string
}
func (cl *ConfigLoader) GetConfig() map[string]string {
cl.once.Do(func() {
fmt.Printf(" 加载配置文件\\n")
time.Sleep(50 * time.Millisecond)
cl.config = map[string]string{
"database_url": "localhost:5432",
"redis_url": "localhost:6379",
"log_level": "info",
}
})
return cl.config
}
// ExpensiveResource 昂贵资源
type ExpensiveResource struct {
once sync.Once
initialized bool
data []string
}
func (er *ExpensiveResource) init() {
fmt.Printf(" 初始化昂贵资源\\n")
time.Sleep(200 * time.Millisecond)
er.data = []string{"资源1", "资源2", "资源3"}
er.initialized = true
}
func (er *ExpensiveResource) Process(input string) string {
er.once.Do(er.init)
return fmt.Sprintf("处理 %s 使用 %v", input, er.data)
}
// Buffer 缓冲区(生产者-消费者)
type Buffer struct {
mu sync.Mutex
cond *sync.Cond
items []int
size int
}
func (b *Buffer) Produce(item int) {
b.cond.L.Lock()
defer b.cond.L.Unlock()
// 等待缓冲区有空间
for len(b.items) >= 5 {
b.cond.Wait()
}
b.items = append(b.items, item)
b.cond.Signal() // 通知消费者
}
func (b *Buffer) Consume() int {
b.cond.L.Lock()
defer b.cond.L.Unlock()
// 等待缓冲区有数据
for len(b.items) == 0 {
b.cond.Wait()
}
item := b.items[0]
b.items = b.items[1:]
b.cond.Signal() // 通知生产者
return item
}
// Task 任务
type Task struct {
Name string
}
// TaskQueue 任务队列
type TaskQueue struct {
mu sync.Mutex
cond *sync.Cond
tasks []Task
}
func (tq *TaskQueue) AddTask(task Task) {
tq.cond.L.Lock()
defer tq.cond.L.Unlock()
tq.tasks = append(tq.tasks, task)
tq.cond.Signal() // 通知等待的工作者
}
func (tq *TaskQueue) GetTask() Task {
tq.cond.L.Lock()
defer tq.cond.L.Unlock()
// 等待任务可用
for len(tq.tasks) == 0 {
tq.cond.Wait()
}
task := tq.tasks[0]
tq.tasks = tq.tasks[1:]
return task
}
// StringBuilder 字符串构建器
type StringBuilder struct {
data []string
}
func (sb *StringBuilder) Append(s string) {
sb.data = append(sb.data, s)
}
func (sb *StringBuilder) String() string {
return strings.Join(sb.data, " ")
}
func (sb *StringBuilder) Reset() {
sb.data = sb.data[:0]
}
// ConnectionPool 连接池
type ConnectionPool struct {
pool *sync.Pool
}
type DBConn struct {
ID int
}
func (cp *ConnectionPool) GetConnection() *DBConn {
return cp.pool.Get().(*DBConn)
}
func (cp *ConnectionPool) PutConnection(conn *DBConn) {
cp.pool.Put(conn)
}
// AtomicStats 原子统计
type AtomicStats struct {
counter int64
}
func (as *AtomicStats) Increment() {
atomic.AddInt64(&as.counter, 1)
}
func (as *AtomicStats) Value() int64 {
return atomic.LoadInt64(&as.counter)
}
// MutexStats 互斥锁统计
type MutexStats struct {
mu sync.Mutex
counter int64
}
func (ms *MutexStats) Increment() {
ms.mu.Lock()
defer ms.mu.Unlock()
ms.counter++
}
func (ms *MutexStats) Value() int64 {
ms.mu.Lock()
defer ms.mu.Unlock()
return ms.counter
}
/*
运行这个程序:
go run 04-sync-package.go
学习要点:
1. sync 包提供了多种同步原语
2. Mutex 用于互斥访问RWMutex 适用于读多写少
3. WaitGroup 用于等待一组 goroutine 完成
4. Once 确保函数只执行一次
5. Cond 用于条件等待和通知
Sync 包组件:
1. Mutex互斥锁同时只允许一个 goroutine 访问
2. RWMutex读写锁允许多个读者或一个写者
3. WaitGroup等待组等待一组 goroutine 完成
4. Once单次执行确保函数只执行一次
5. Cond条件变量用于等待条件变化
6. Pool对象池复用对象减少 GC 压力
使用场景:
1. Mutex保护共享资源防止竞态条件
2. RWMutex读多写少的场景如配置管理、缓存
3. WaitGroup批量处理、分阶段处理
4. Once单例模式、资源初始化
5. Cond生产者-消费者、任务队列
6. Pool频繁创建销毁的对象
原子操作:
1. 无锁操作,性能更好
2. 适用于简单的数值操作
3. 支持整数、指针、布尔值等类型
4. 提供加法、交换、比较并交换等操作
5. atomic.Value 支持任意类型
最佳实践:
1. 优先使用 channel 进行通信
2. 必要时使用 sync 包的同步原语
3. 避免死锁,注意锁的顺序
4. 使用 defer 确保锁被释放
5. 合理选择同步机制
性能考虑:
1. 原子操作 > RWMutex > Mutex > Channel
2. 读写锁适用于读多写少
3. 对象池可以减少 GC 压力
4. 避免锁竞争和锁粒度过大
5. 合理使用缓冲区
注意事项:
1. 不要复制包含锁的结构体
2. 避免在持有锁时调用可能阻塞的操作
3. 注意死锁的产生条件
4. Cond 必须与锁配合使用
5. Pool 中的对象可能被 GC 回收
*/