这个帖子描述一些错误情况的测试,以判断在使用 Sarama 库时应如何做错误处理。基于 Kafka 2.8.x 及 Sarama 1.28.0 版本进行测试。
测试代码使用 ConsumerGroup API,
- 在 Consume() 处套了 for 循环
- 循环监听 group.Errors()
go func() {
    for {
        err := k.group.Consume(ctx, []string{k.config.Topic}, &handler{&s})
        if err != nil {
            fmt.Printf("Consume failed: %v\n", err)
            break
        } else {
            // ctx 发送了 Done,正常停止
            break
        }
    }
}()for {  
   select {  
   case m := <-stream.Messages:  
      fmt.Println(string(m.Value))  
   case e := <-stream.Errors:  
      fmt.Println(e)  
   }  
}Broker 连接断开
测试时是用 SSH tunnel,把装在虚拟机上的 broker 地址映射到了本地的 9092 端口。断开 broker 即是把 port forward 取消了。
观察到的行为:
- 在尝试重新连接 5 次(Config.Admin.Retry.Max)后,不再重试
- Errors 依次出现了:- sarama.ConsumerErrorx3, EOF, “kafka: error while consuming data-sync-test-1/0: EOF”
- net.OpErrordial tcp failed
- sarama.ConsumerError包裹着的- net.OpError,一样是 dial tcp failed
 
- 在此过程中 k.group.Consume()会先返回一次 nil;重新执行k.group.Consume()时,会返回net.OpError表示连不上 broker
Topic 新增了 partition
一开始的分区数是 3。在 consumer group 运行期间,使用此命令修改分区数为 4:
docker-compose exec broker kafka-topics \
    --bootstrap-server broker:9092 --topic data-sync-test-1 \
    --alter --partitions 4观察到的行为是:
- Sarama 需要在下一次刷新 metadata 后才能感知到新的 partition;刷新间隔由 Config.Metadata.RefreshFrequency指定,默认是 10 分钟
- 刷新 metadata 后,k.group.Consume()返回 nil,日志显示对原有分区的 subscription 被关掉
- for 循环重新调用 k.group.Consume()后,建立了对包含新分区在内的所有分区的 subscription
日志
[Kafka]2021/10/13 10:56:50 Initializing new client
[Kafka]2021/10/13 10:56:50 client/metadata fetching metadata for all topics from broker localhost:9092
[Kafka]2021/10/13 10:56:50 Connected to broker at localhost:9092 (unregistered)
[Kafka]2021/10/13 10:56:51 client/brokers registered new broker #1 at localhost:9092
[Kafka]2021/10/13 10:56:51 Successfully initialized new client
[Kafka]2021/10/13 10:56:51 client/metadata fetching metadata for [data-sync-test-1] from broker localhost:9092
[Kafka]2021/10/13 10:56:51 client/coordinator requesting coordinator for consumergroup job-11 from localhost:9092
[Kafka]2021/10/13 10:56:51 client/coordinator coordinator for consumergroup job-11 is #1 (localhost:9092)
[Kafka]2021/10/13 10:56:51 Connected to broker at localhost:9092 (registered as #1)
[Kafka]2021/10/13 10:56:51 client/coordinator requesting coordinator for consumergroup job-11 from localhost:9092
[Kafka]2021/10/13 10:56:51 client/coordinator coordinator for consumergroup job-11 is #1 (localhost:9092)
[Kafka]2021/10/13 10:56:51 consumer/broker/1 added subscription to data-sync-test-1/0
[Kafka]2021/10/13 10:56:51 consumer/broker/1 added subscription to data-sync-test-1/1
[Kafka]2021/10/13 10:56:52 consumer/broker/1 added subscription to data-sync-test-1/4
[Kafka]2021/10/13 10:56:52 consumer/broker/1 added subscription to data-sync-test-1/3
[Kafka]2021/10/13 10:56:52 consumer/broker/1 added subscription to data-sync-test-1/2
[Kafka]2021/10/13 10:57:01 client/metadata fetching metadata for all topics from broker localhost:9092
[Kafka]2021/10/13 10:57:01 consumer/broker/1 closed dead subscription to data-sync-test-1/3
[Kafka]2021/10/13 10:57:01 consumer/broker/1 closed dead subscription to data-sync-test-1/2
[Kafka]2021/10/13 10:57:01 consumer/broker/1 closed dead subscription to data-sync-test-1/0
[Kafka]2021/10/13 10:57:01 consumer/broker/1 closed dead subscription to data-sync-test-1/1
[Kafka]2021/10/13 10:57:01 consumer/broker/1 closed dead subscription to data-sync-test-1/4
Rebalanced or context canceled
[Kafka]2021/10/13 10:57:01 client/metadata fetching metadata for [data-sync-test-1] from broker localhost:9092
[Kafka]2021/10/13 10:57:01 client/coordinator requesting coordinator for consumergroup job-11 from localhost:9092
[Kafka]2021/10/13 10:57:02 client/coordinator coordinator for consumergroup job-11 is #1 (localhost:9092)
[Kafka]2021/10/13 10:57:02 consumer/broker/1 added subscription to data-sync-test-1/4
[Kafka]2021/10/13 10:57:02 consumer/broker/1 added subscription to data-sync-test-1/0
[Kafka]2021/10/13 10:57:02 consumer/broker/1 added subscription to data-sync-test-1/3
[Kafka]2021/10/13 10:57:02 consumer/broker/1 added subscription to data-sync-test-1/5
[Kafka]2021/10/13 10:57:02 consumer/broker/1 added subscription to data-sync-test-1/1
[Kafka]2021/10/13 10:57:02 consumer/broker/1 added subscription to data-sync-test-1/2Rebalance 发生
在一个 consumer 运行的前提下,加入第二个 consumer。
旧 consumer 的 k.group.Consume() 返回了 nil error。
Context cancel
把 k.group.Consume(ctx, ...) 的 ctx cancel 后,Consume 仍然返回的 nil error。这使得与 rebalance 难以分辨。
总结
对于 Sarama 库,错误处理应该是:
- ConsumerGroup.Errors中的错误很杂,有一些错误在重试后就消失了;建议记日志即可,没有必要基于它做逻辑
- ConsumerGroup.Consume()如果返回错误,一般是无法恢复的错误(比如 broker 连不上)。当然随着时间过去,可能错误就消失了。