Golang: Language: Concurrency: Buffered Channel and Worker Pool

26th December 2020 at 9:48pm

Buffered channel 带有缓冲区。

基础

定义时需要指定大小:

// Unbuffered
ch := make(chan type)

// Buffered
ch := make(chan type, capacity)

Unbuffered channel 在收发时会阻塞;Unbuffered 则仅在发时缓冲区满、收时缓冲区空情况下会阻塞,其他情况下不会。

收取顺序是先进先出:

func main() {  
    ch := make(chan string, 2)
    ch <- "naveen"
    ch <- "paul"
    fmt.Println(<- ch)
    fmt.Println(<- ch)
}
naveen  
paul

cap() 可以查容量,len() 查当前 channel 中对象个数:

func main() {  
    ch := make(chan string, 3)
    ch <- "naveen"
    ch <- "paul"
    fmt.Println("capacity is", cap(ch))
    fmt.Println("length is", len(ch))
    fmt.Println("read value", <-ch)
    fmt.Println("new length is", len(ch))
}
capacity is 3  
length is 2  
read value naveen  
new length is 1

死锁

可能引起死锁:

func main() {  
    ch := make(chan string, 2)
    ch <- "naveen"
    ch <- "paul"
    ch <- "steve"
    fmt.Println(<-ch)
    fmt.Println(<-ch)
}
fatal error: all goroutines are asleep - deadlock!

goroutine 1 [chan send]:  
main.main()  
    /tmp/sandbox274756028/main.go:11 +0x100

当代码运行到 ch <- "steve" 时,main goroutine 因为 block 转为 sleep 状态,但是 Go 发现当前并没有其他 goroutine 可运行,因此程序无法再继续运行下去,抛出异常。

WaitGroup

WaitGroup 被用于等待一组 goroutine 完成。它通过维护一个计数实现。使用例子:

func process(i int, wg *sync.WaitGroup) {  
    fmt.Println("started Goroutine ", i)
    time.Sleep(2 * time.Second)
    fmt.Printf("Goroutine %d ended\n", i)
    wg.Done()
}

func main() {  
    no := 3
    var wg sync.WaitGroup
    for i := 0; i < no; i++ {
        wg.Add(1)
        go process(i, &wg)
    }
    wg.Wait()
    fmt.Println("All go routines finished executing")
}
started Goroutine  2  
started Goroutine  0  
started Goroutine  1  
Goroutine 0 ended  
Goroutine 2 ended  
Goroutine 1 ended  
All go routines finished executing  

其中:

  • wg.Add(1) 使得 WaitGroup 维护的计数加一
  • wg.Done() 使计数减一
  • wg.Wait() 会 block 直到计数为 0

Worker Pool

用 buffered channel 可以实现 worker pool。原理很简单:

  • 搞两个 buffered channel,一个作为 任务队列,一个作为 结果队列
  • 起多个 go routine 作为 worker,从任务队列取任务,将运算结果写入结果队列

代码不做描述,有需要的时候写一个就好了。