Kafka: Library: Sarama: Consumer API

9th February 2022 at 2:58pm

Consumer API 比较简单。去除错误处理后的代码如下:

consumer, err := NewConsumer([]string{"localhost:9092"}, NewTestConfig())
partitionConsumer, err := consumer.ConsumePartition("my_topic", 0, OffsetNewest)

signals := make(chan os.Signal, 1)
signal.Notify(signals, os.Interrupt)

consumed := 0
ConsumerLoop:
for {
    select {
    case msg := <-partitionConsumer.Messages():
        log.Printf("Consumed message offset %d\n", msg.Offset)
        consumed++
    case <-signals:
        break ConsumerLoop
    }
}

log.Printf("Consumed: %d\n", consumed)

其中 Message 的结构是:

type ConsumerMessage struct {
    Headers        []*RecordHeader // only set if kafka is version 0.11+
    Timestamp      time.Time       // only set if kafka is version 0.10+, inner message timestamp
    BlockTimestamp time.Time       // only set if kafka is version 0.10+, outer (compressed) block timestamp

    Key, Value []byte
    Topic      string
    Partition  int32
    Offset     int64
}

需要注意的是:

  • 虽然一个 consumer 可以消费多个分区,但是在这套 API 中你需要调用多次 ConsumePartition 来获得某个分区的 PartitionConsumer 以消费消息。多个不同分区的 PartitionConsumer 可以同时运行
  • 你需要注意 defer close 掉 Consumer 和其下的所有 PartitionConsumer
  • 消息被 ConsumerMessage 结构包装。它的值是个 []byte