Consumer Group API 相对来讲复杂很多。
需要注意的是,一个 consumer group 会为其 被分配的所有分区 各起一个协程来执行 ConsumerGroupHandler.ConsumeClaim()
。除此之外,Setup
/ CleanUp
是分区无关的,仅会在 group 开始消费及停止时被执行一次。
另外一个细节是,当某个 topic x partition 对应的 ConsumeClaim()
函数退出时(无论返回的 error 是不是 nil),都会使得整个 consumer group 停止(表现为 group.Consume()
这个 blocking 函数返回)。我在实现获取 topic 数据的接口时遇到一个场景:仅需要部分分区的消息,其他分区需要被忽略;此时仍然需要保证其他分区的 ConsumeClaim()
不退出,不然会使你需要的分区的 ConsumeClaim()
也被关闭(体现为 claim.Message()
channel 被关闭,拿不到新数据)。