flyfei

并发模型(CSP理论、Goroutine轻量级线程)

1. 语法讲解

并发与并行概念

CSP理论(Communicating Sequential Processes):

2. 应用场景

Goroutine适用场景

传统多线程 vs Goroutine

特性 传统线程 Goroutine
创建成本 1-2MB 2KB
创建速度
调度方式 操作系统调度 Go运行时调度
上下文切换 完整线程切换 用户态轻量切换

3. 编程实例

并发网络请求模拟

package main

import (
    "fmt"
    "math/rand"
    "sync"
    "time"
)

// 模拟网络请求
func fetchData(url string, wg *sync.WaitGroup) {
    defer wg.Done() // 完成时通知WaitGroup
    
    // 模拟网络延迟
    delay := time.Duration(rand.Intn(1000)) * time.Millisecond
    time.Sleep(delay)
    
    fmt.Printf("从 %s 获取数据完成,耗时 %v\n", url, delay)
}

func main() {
    fmt.Println("=== 串行请求 ===")
    start := time.Now()
    
    // 串行执行
    fetchData("https://api.example.com/users", nil)
    fetchData("https://api.example.com/products", nil)
    fetchData("https://api.example.com/orders", nil)
    
    fmt.Printf("串行总耗时: %v\n\n", time.Since(start))
    
    fmt.Println("=== 并发请求 ===")
    start = time.Now()
    
    // 使用WaitGroup等待所有Goroutine完成
    var wg sync.WaitGroup
    urls := []string{
        "https://api.example.com/users",
        "https://api.example.com/products", 
        "https://api.example.com/orders",
        "https://api.example.com/comments",
        "https://api.example.com/reviews",
    }
    
    // 并发执行
    for _, url := range urls {
        wg.Add(1) // 计数器+1
        go fetchData(url, &wg)
    }
    
    wg.Wait() // 等待所有Goroutine完成
    fmt.Printf("并发总耗时: %v\n", time.Since(start))
}

4. 其他用法

Goroutine控制与最佳实践

package main

import (
    "context"
    "fmt"
    "runtime"
    "time"
	"sync"
)

// 1. 使用context控制Goroutine生命周期
func worker(ctx context.Context, id int) {
    for {
        select {
        case <-ctx.Done():
            fmt.Printf("Worker %d: 收到停止信号\n", id)
            return
        case <-time.After(500 * time.Millisecond):
            fmt.Printf("Worker %d: 正在工作...\n", id)
        }
    }
}

// 2. 限制并发数量的Goroutine池
func limitedWorkerPool(tasks []string, maxConcurrency int) {
    semaphore := make(chan struct{}, maxConcurrency) // 信号量
    var wg sync.WaitGroup
    
    for i, task := range tasks {
        wg.Add(1)
        semaphore <- struct{}{} // 获取信号量
        
        go func(taskNum int, taskDesc string) {
            defer wg.Done()
            defer func() { <-semaphore }() // 释放信号量
            
            fmt.Printf("任务 %d: 开始处理 %s\n", taskNum, taskDesc)
            time.Sleep(time.Second)
            fmt.Printf("任务 %d: 完成处理 %s\n", taskNum, taskDesc)
        }(i, task)
    }
    
    wg.Wait()
    fmt.Println("所有任务完成!")
}

// 3. 获取Goroutine信息
func showGoroutineInfo() {
    fmt.Printf("当前Goroutine数量: %d\n", runtime.NumGoroutine())
    fmt.Printf("CPU核心数: %d\n", runtime.NumCPU())
    
    // 设置最大并行度
    runtime.GOMAXPROCS(4)
}

func main() {
    fmt.Println("=== Goroutine控制示例 ===")
    
    // 示例1: 使用context控制
    fmt.Println("\n1. Context控制:")
    ctx, cancel := context.WithTimeout(context.Background(), 3*time.Second)
    defer cancel()
    
    go worker(ctx, 1)
    go worker(ctx, 2)
    
    time.Sleep(4 * time.Second)
    
    // 示例2: 限制并发数
    fmt.Println("\n2. 限制并发数:")
    tasks := []string{
        "数据导入", "报表生成", "缓存预热", 
        "数据备份", "日志归档", "用户通知",
    }
    limitedWorkerPool(tasks, 2)
    
    // 示例3: 系统信息
    fmt.Println("\n3. 系统信息:")
    showGoroutineInfo()
}

5. 课时总结