Consumer API 比较简单。去除错误处理后的代码如下:
consumer, err := NewConsumer([]string{"localhost:9092"}, NewTestConfig())
partitionConsumer, err := consumer.ConsumePartition("my_topic", 0, OffsetNewest)
signals := make(chan os.Signal, 1)
signal.Notify(signals, os.Interrupt)
consumed := 0
ConsumerLoop:
for {
select {
case msg := <-partitionConsumer.Messages():
log.Printf("Consumed message offset %d\n", msg.Offset)
consumed++
case <-signals:
break ConsumerLoop
}
}
log.Printf("Consumed: %d\n", consumed)
其中 Message 的结构是:
type ConsumerMessage struct {
Headers []*RecordHeader // only set if kafka is version 0.11+
Timestamp time.Time // only set if kafka is version 0.10+, inner message timestamp
BlockTimestamp time.Time // only set if kafka is version 0.10+, outer (compressed) block timestamp
Key, Value []byte
Topic string
Partition int32
Offset int64
}
需要注意的是:
- 虽然一个 consumer 可以消费多个分区,但是在这套 API 中你需要调用多次 ConsumePartition 来获得某个分区的 PartitionConsumer 以消费消息。多个不同分区的 PartitionConsumer 可以同时运行
- 你需要注意 defer close 掉 Consumer 和其下的所有 PartitionConsumer
- 消息被 ConsumerMessage 结构包装。它的值是个
[]byte