Kafka: Library: Sarama: Consumer API

 9th February 2022 at 2:58pm

Consumer API 比较简单。去除错误处理后的代码如下:

consumer, err := NewConsumer([]string{"localhost:9092"}, NewTestConfig())
partitionConsumer, err := consumer.ConsumePartition("my_topic", 0, OffsetNewest)

signals := make(chan os.Signal, 1)
signal.Notify(signals, os.Interrupt)

consumed := 0
ConsumerLoop:
for {
	select {
	case msg := <-partitionConsumer.Messages():
		log.Printf("Consumed message offset %d\n", msg.Offset)
		consumed++
	case <-signals:
		break ConsumerLoop
	}
}

log.Printf("Consumed: %d\n", consumed)

其中 Message 的结构是:

type ConsumerMessage struct {
	Headers        []*RecordHeader // only set if kafka is version 0.11+
	Timestamp      time.Time       // only set if kafka is version 0.10+, inner message timestamp
	BlockTimestamp time.Time       // only set if kafka is version 0.10+, outer (compressed) block timestamp

	Key, Value []byte
	Topic      string
	Partition  int32
	Offset     int64
}

需要注意的是:

  • 虽然一个 consumer 可以消费多个分区,但是在这套 API 中你需要调用多次 ConsumePartition 来获得某个分区的 PartitionConsumer 以消费消息。多个不同分区的 PartitionConsumer 可以同时运行
  • 你需要注意 defer close 掉 Consumer 和其下的所有 PartitionConsumer
  • 消息被 ConsumerMessage 结构包装。它的值是个 []byte