Kafka: Library: Sarama

12th October 2021 at 3:22pm

两篇 帖子 提到了 Sarama 在 consumer rebalance 时的不稳定性。出于稳定性考虑,confluent-kafka-go 应该是更好的选择;但是它使用了 CGO,使得编译比较慢。

配置

Samara 提供了详细的 Config 结构。编程实现时要设置好这个结构,来实现稳定可靠的读取和写入。

Consumer

消费消息有两套 API:consumer 及 consumer group。

Consumer API

Consumer API 比较简单。去除错误处理后的代码如下:

consumer, err := NewConsumer([]string{"localhost:9092"}, NewTestConfig())
partitionConsumer, err := consumer.ConsumePartition("my_topic", 0, OffsetNewest)

signals := make(chan os.Signal, 1)
signal.Notify(signals, os.Interrupt)

consumed := 0
ConsumerLoop:
for {
    select {
    case msg := <-partitionConsumer.Messages():
        log.Printf("Consumed message offset %d\n", msg.Offset)
        consumed++
    case <-signals:
        break ConsumerLoop
    }
}

log.Printf("Consumed: %d\n", consumed)

其中 Message 的结构是:

type ConsumerMessage struct {
    Headers        []*RecordHeader // only set if kafka is version 0.11+
    Timestamp      time.Time       // only set if kafka is version 0.10+, inner message timestamp
    BlockTimestamp time.Time       // only set if kafka is version 0.10+, outer (compressed) block timestamp

    Key, Value []byte
    Topic      string
    Partition  int32
    Offset     int64
}

需要注意的是:

  • 虽然一个 consumer 可以消费多个分区,但是在这套 API 中你需要调用多次 ConsumePartition 来获得某个分区的 PartitionConsumer 以消费消息。多个不同分区的 PartitionConsumer 可以同时运行
  • 你需要注意 defer close 掉 Consumer 和其下的所有 PartitionConsumer
  • 消息被 ConsumerMessage 结构包装。它的值是个 []byte

Consumer Group API

Consumer Group API 相对来讲复杂很多。