flyfei

Channel(管道)

1. 语法讲解

Channel是Go语言中各个并发执行体间的通信机制,是类型相关的管道,用于在goroutine之间传递数据和同步执行。

不要通过共享内存来通信,而应通过通信来共享内存

// 创建channel
ch1 := make(chan int)        // 无缓冲channel 
ch2 := make(chan int, 10)   // 有缓冲channel,容量10

// 操作
ch1 <- 42    // 发送数据到channel
value := <-ch1 // 从channel接收数据
close(ch1)   // 关闭channel

// 特殊用法
value, ok := <-ch1 // 检查channel是否关闭

缓冲区区别

select多路复用:同时等待多个channel操作。

2. 应用场景

3. 编程实例

电商订单处理系统

package main

import (
    "fmt"
    "math/rand"
    "time"
)

type Order struct {
    ID        int
    UserID    string
    Amount    float64
    Status    string
    CreatedAt time.Time
}

func orderProducer(orderChan chan<- Order, numOrders int) {
    for i := 1; i <= numOrders; i++ {
        order := Order{
            ID:        i,
            UserID:    fmt.Sprintf("user%d", rand.Intn(100)),
            Amount:    rand.Float64() * 1000,
            Status:    "pending",
            CreatedAt: time.Now(),
        }
        orderChan <- order
        fmt.Printf("生成订单: ID=%d, 用户=%s, 金额=¥%.2f\n", 
            order.ID, order.UserID, order.Amount)
        time.Sleep(time.Millisecond * 100) // 模拟生成间隔
    }
    close(orderChan)
}

func orderProcessor(orderChan <-chan Order, resultChan chan<- Order) {
    for order := range orderChan {
        // 模拟订单处理逻辑
        processingTime := time.Duration(rand.Intn(300)) * time.Millisecond
        time.Sleep(processingTime)
        
        // 更新订单状态
        if order.Amount > 500 {
            order.Status = "verified" // 大额订单需要验证
        } else {
            order.Status = "completed"
        }
        
        resultChan <- order
    }
	time.Sleep(time.Second * 1)
	close(resultChan)
}

func orderResultCollector(resultChan <-chan Order, done chan<- bool) {
    processedCount := 0
    for order := range resultChan {
        processedCount++
        fmt.Printf("处理完成: 订单ID=%d, 状态=%s, 金额=¥%.2f\n", 
            order.ID, order.Status, order.Amount)
    }
    fmt.Printf("所有订单处理完成! 总计: %d 个订单\n", processedCount)
    done <- true
}

func main() {
    rand.Seed(time.Now().UnixNano())
    
    // 创建管道
    orderChan := make(chan Order, 10)
    resultChan := make(chan Order, 10)
    done := make(chan bool)
    
    // 启动订单生成器
    go orderProducer(orderChan, 20)
    
    // 启动多个订单处理器(工人)
    for i := 1; i <= 3; i++ {
        go orderProcessor(orderChan, resultChan)
    }
    
    // 启动结果收集器
    go orderResultCollector(resultChan, done)
    
    // 等待所有处理完成
    <-done
}

4. 其他用法

package main

import (
    "fmt"
    "time"
)

func main() {
    // 1. select多路复用
    ch1 := make(chan string)
    ch2 := make(chan string)
    
    go func() {
        time.Sleep(1 * time.Second)
        ch1 <- "来自ch1"
    }()
    
    go func() {
        time.Sleep(2 * time.Second)
        ch2 <- "来自ch2"
    }()
    
    for i := 0; i < 2; i++ {
        select {
        case msg1 := <-ch1:
            fmt.Println("收到:", msg1)
        case msg2 := <-ch2:
            fmt.Println("收到:", msg2)
        case <-time.After(3 * time.Second): // 超时控制
            fmt.Println("超时!")
            return
        }
    }
    
    // 2. 定时器与Ticker
    ticker := time.NewTicker(500 * time.Millisecond)
    done := make(chan bool)
    
    go func() {
        for {
            select {
            case <-done:
                return
            case t := <-ticker.C:
                fmt.Println("定时触发 at", t.Format("15:04:05"))
            }
        }
    }()
    
    time.Sleep(2 * time.Second)
    ticker.Stop()
    done <- true
    fmt.Println("Ticker停止")
    
    // 3. 工作池模式
    jobs := make(chan int, 100)
    results := make(chan int, 100)
    
    // 启动3个worker
    for w := 1; w <= 3; w++ {
        go worker(w, jobs, results)
    }
    
    // 发送5个任务
    for j := 1; j <= 5; j++ {
        jobs <- j
    }
    close(jobs)
    
    // 收集结果
    for i:=0; i<=5; i++ {
		value := <-results
        fmt.Printf("Worker 处理结果 %d\n", value)
    }
}

func worker(id int, jobs <-chan int, results chan<- int) {
    for j := range jobs {
        fmt.Printf("Worker %d 处理任务 %d\n", id, j)
        time.Sleep(time.Second)
        results <- j * 2
    }
}

5. 课时总结