Channel是Go语言中各个并发执行体间的通信机制,是类型相关的管道,用于在goroutine之间传递数据和同步执行。
不要通过共享内存来通信,而应通过通信来共享内存
// 创建channel
ch1 := make(chan int) // 无缓冲channel
ch2 := make(chan int, 10) // 有缓冲channel,容量10
// 操作
ch1 <- 42 // 发送数据到channel
value := <-ch1 // 从channel接收数据
close(ch1) // 关闭channel
// 特殊用法
value, ok := <-ch1 // 检查channel是否关闭
缓冲区区别:
select多路复用:同时等待多个channel操作。
电商订单处理系统:
package main
import (
"fmt"
"math/rand"
"time"
)
type Order struct {
ID int
UserID string
Amount float64
Status string
CreatedAt time.Time
}
func orderProducer(orderChan chan<- Order, numOrders int) {
for i := 1; i <= numOrders; i++ {
order := Order{
ID: i,
UserID: fmt.Sprintf("user%d", rand.Intn(100)),
Amount: rand.Float64() * 1000,
Status: "pending",
CreatedAt: time.Now(),
}
orderChan <- order
fmt.Printf("生成订单: ID=%d, 用户=%s, 金额=¥%.2f\n",
order.ID, order.UserID, order.Amount)
time.Sleep(time.Millisecond * 100) // 模拟生成间隔
}
close(orderChan)
}
func orderProcessor(orderChan <-chan Order, resultChan chan<- Order) {
for order := range orderChan {
// 模拟订单处理逻辑
processingTime := time.Duration(rand.Intn(300)) * time.Millisecond
time.Sleep(processingTime)
// 更新订单状态
if order.Amount > 500 {
order.Status = "verified" // 大额订单需要验证
} else {
order.Status = "completed"
}
resultChan <- order
}
time.Sleep(time.Second * 1)
close(resultChan)
}
func orderResultCollector(resultChan <-chan Order, done chan<- bool) {
processedCount := 0
for order := range resultChan {
processedCount++
fmt.Printf("处理完成: 订单ID=%d, 状态=%s, 金额=¥%.2f\n",
order.ID, order.Status, order.Amount)
}
fmt.Printf("所有订单处理完成! 总计: %d 个订单\n", processedCount)
done <- true
}
func main() {
rand.Seed(time.Now().UnixNano())
// 创建管道
orderChan := make(chan Order, 10)
resultChan := make(chan Order, 10)
done := make(chan bool)
// 启动订单生成器
go orderProducer(orderChan, 20)
// 启动多个订单处理器(工人)
for i := 1; i <= 3; i++ {
go orderProcessor(orderChan, resultChan)
}
// 启动结果收集器
go orderResultCollector(resultChan, done)
// 等待所有处理完成
<-done
}
package main
import (
"fmt"
"time"
)
func main() {
// 1. select多路复用
ch1 := make(chan string)
ch2 := make(chan string)
go func() {
time.Sleep(1 * time.Second)
ch1 <- "来自ch1"
}()
go func() {
time.Sleep(2 * time.Second)
ch2 <- "来自ch2"
}()
for i := 0; i < 2; i++ {
select {
case msg1 := <-ch1:
fmt.Println("收到:", msg1)
case msg2 := <-ch2:
fmt.Println("收到:", msg2)
case <-time.After(3 * time.Second): // 超时控制
fmt.Println("超时!")
return
}
}
// 2. 定时器与Ticker
ticker := time.NewTicker(500 * time.Millisecond)
done := make(chan bool)
go func() {
for {
select {
case <-done:
return
case t := <-ticker.C:
fmt.Println("定时触发 at", t.Format("15:04:05"))
}
}
}()
time.Sleep(2 * time.Second)
ticker.Stop()
done <- true
fmt.Println("Ticker停止")
// 3. 工作池模式
jobs := make(chan int, 100)
results := make(chan int, 100)
// 启动3个worker
for w := 1; w <= 3; w++ {
go worker(w, jobs, results)
}
// 发送5个任务
for j := 1; j <= 5; j++ {
jobs <- j
}
close(jobs)
// 收集结果
for i:=0; i<=5; i++ {
value := <-results
fmt.Printf("Worker 处理结果 %d\n", value)
}
}
func worker(id int, jobs <-chan int, results chan<- int) {
for j := range jobs {
fmt.Printf("Worker %d 处理任务 %d\n", id, j)
time.Sleep(time.Second)
results <- j * 2
}
}