在某个类对象的生命周期快结束时,有一个常见的逻辑是,将其管理的 channel 的数据消耗掉,并关闭 channel。Sarama 有一个例子(虽然没有关掉 channel):
func (pc *PartitionConsumer) Close() error {
var (
closeErr error
wg sync.WaitGroup
)
wg.Add(1)
go func() {
defer wg.Done()
errs := make(sarama.ConsumerErrors, 0)
for err := range pc.errors {
errs = append(errs, err)
}
if len(errs) > 0 {
closeErr = errs
}
}()
wg.Add(1)
go func() {
defer wg.Done()
for range pc.messages {
// drain
}
}()
wg.Wait()
return closeErr
}