Kafka: Library: Sarama: Error Handling

 6th January 2022 at 2:15pm

这个帖子描述一些错误情况的测试,以判断在使用 Sarama 库时应如何做错误处理。基于 Kafka 2.8.x 及 Sarama 1.28.0 版本进行测试。

测试代码使用 ConsumerGroup API,

  • 在 Consume() 处套了 for 循环
  • 循环监听 group.Errors()
go func() {
    for {
        err := k.group.Consume(ctx, []string{k.config.Topic}, &handler{&s})
        if err != nil {
            fmt.Printf("Consume failed: %v\n", err)
            break
        } else {
            // ctx 发送了 Done,正常停止
            break
        }
    }
}()
for {  
   select {  
   case m := <-stream.Messages:  
      fmt.Println(string(m.Value))  
   case e := <-stream.Errors:  
      fmt.Println(e)  
   }  
}

Broker 连接断开

测试时是用 SSH tunnel,把装在虚拟机上的 broker 地址映射到了本地的 9092 端口。断开 broker 即是把 port forward 取消了。

观察到的行为:

  • 在尝试重新连接 5 次(Config.Admin.Retry.Max)后,不再重试
  • Errors 依次出现了:
    1. sarama.ConsumerError x3, EOF, “kafka: error while consuming data-sync-test-1/0: EOF”
    2. net.OpError dial tcp failed
    3. sarama.ConsumerError 包裹着的 net.OpError,一样是 dial tcp failed
  • 在此过程中 k.group.Consume() 会先返回一次 nil;重新执行 k.group.Consume() 时,会返回 net.OpError 表示连不上 broker

Topic 新增了 partition

一开始的分区数是 3。在 consumer group 运行期间,使用此命令修改分区数为 4:

docker-compose exec broker kafka-topics \
    --bootstrap-server broker:9092 --topic data-sync-test-1 \
    --alter --partitions 4

观察到的行为是:

  • Sarama 需要在下一次刷新 metadata 后才能感知到新的 partition;刷新间隔由 Config.Metadata.RefreshFrequency 指定,默认是 10 分钟
  • 刷新 metadata 后,k.group.Consume() 返回 nil,日志显示对原有分区的 subscription 被关掉
  • for 循环重新调用 k.group.Consume() 后,建立了对包含新分区在内的所有分区的 subscription

日志

[Kafka]2021/10/13 10:56:50 Initializing new client
[Kafka]2021/10/13 10:56:50 client/metadata fetching metadata for all topics from broker localhost:9092
[Kafka]2021/10/13 10:56:50 Connected to broker at localhost:9092 (unregistered)
[Kafka]2021/10/13 10:56:51 client/brokers registered new broker #1 at localhost:9092
[Kafka]2021/10/13 10:56:51 Successfully initialized new client
[Kafka]2021/10/13 10:56:51 client/metadata fetching metadata for [data-sync-test-1] from broker localhost:9092
[Kafka]2021/10/13 10:56:51 client/coordinator requesting coordinator for consumergroup job-11 from localhost:9092
[Kafka]2021/10/13 10:56:51 client/coordinator coordinator for consumergroup job-11 is #1 (localhost:9092)
[Kafka]2021/10/13 10:56:51 Connected to broker at localhost:9092 (registered as #1)
[Kafka]2021/10/13 10:56:51 client/coordinator requesting coordinator for consumergroup job-11 from localhost:9092
[Kafka]2021/10/13 10:56:51 client/coordinator coordinator for consumergroup job-11 is #1 (localhost:9092)
[Kafka]2021/10/13 10:56:51 consumer/broker/1 added subscription to data-sync-test-1/0
[Kafka]2021/10/13 10:56:51 consumer/broker/1 added subscription to data-sync-test-1/1
[Kafka]2021/10/13 10:56:52 consumer/broker/1 added subscription to data-sync-test-1/4
[Kafka]2021/10/13 10:56:52 consumer/broker/1 added subscription to data-sync-test-1/3
[Kafka]2021/10/13 10:56:52 consumer/broker/1 added subscription to data-sync-test-1/2
[Kafka]2021/10/13 10:57:01 client/metadata fetching metadata for all topics from broker localhost:9092
[Kafka]2021/10/13 10:57:01 consumer/broker/1 closed dead subscription to data-sync-test-1/3
[Kafka]2021/10/13 10:57:01 consumer/broker/1 closed dead subscription to data-sync-test-1/2
[Kafka]2021/10/13 10:57:01 consumer/broker/1 closed dead subscription to data-sync-test-1/0
[Kafka]2021/10/13 10:57:01 consumer/broker/1 closed dead subscription to data-sync-test-1/1
[Kafka]2021/10/13 10:57:01 consumer/broker/1 closed dead subscription to data-sync-test-1/4
Rebalanced or context canceled
[Kafka]2021/10/13 10:57:01 client/metadata fetching metadata for [data-sync-test-1] from broker localhost:9092
[Kafka]2021/10/13 10:57:01 client/coordinator requesting coordinator for consumergroup job-11 from localhost:9092
[Kafka]2021/10/13 10:57:02 client/coordinator coordinator for consumergroup job-11 is #1 (localhost:9092)
[Kafka]2021/10/13 10:57:02 consumer/broker/1 added subscription to data-sync-test-1/4
[Kafka]2021/10/13 10:57:02 consumer/broker/1 added subscription to data-sync-test-1/0
[Kafka]2021/10/13 10:57:02 consumer/broker/1 added subscription to data-sync-test-1/3
[Kafka]2021/10/13 10:57:02 consumer/broker/1 added subscription to data-sync-test-1/5
[Kafka]2021/10/13 10:57:02 consumer/broker/1 added subscription to data-sync-test-1/1
[Kafka]2021/10/13 10:57:02 consumer/broker/1 added subscription to data-sync-test-1/2

Rebalance 发生

在一个 consumer 运行的前提下,加入第二个 consumer。

旧 consumer 的 k.group.Consume() 返回了 nil error。

Context cancel

k.group.Consume(ctx, ...) 的 ctx cancel 后,Consume 仍然返回的 nil error。这使得与 rebalance 难以分辨。

总结

对于 Sarama 库,错误处理应该是:

  • ConsumerGroup.Errors 中的错误很杂,有一些错误在重试后就消失了;建议记日志即可,没有必要基于它做逻辑
  • ConsumerGroup.Consume() 如果返回错误,一般是无法恢复的错误(比如 broker 连不上)。当然随着时间过去,可能错误就消失了。