这个帖子描述一些错误情况的测试,以判断在使用 Sarama 库时应如何做错误处理。基于 Kafka 2.8.x 及 Sarama 1.28.0 版本进行测试。
测试代码使用 ConsumerGroup API,
- 在 Consume() 处套了 for 循环
- 循环监听 group.Errors()
go func() {
for {
err := k.group.Consume(ctx, []string{k.config.Topic}, &handler{&s})
if err != nil {
fmt.Printf("Consume failed: %v\n", err)
break
} else {
// ctx 发送了 Done,正常停止
break
}
}
}()
for {
select {
case m := <-stream.Messages:
fmt.Println(string(m.Value))
case e := <-stream.Errors:
fmt.Println(e)
}
}
Broker 连接断开
测试时是用 SSH tunnel,把装在虚拟机上的 broker 地址映射到了本地的 9092 端口。断开 broker 即是把 port forward 取消了。
观察到的行为:
- 在尝试重新连接 5 次(
Config.Admin.Retry.Max
)后,不再重试 - Errors 依次出现了:
sarama.ConsumerError
x3, EOF, “kafka: error while consuming data-sync-test-1/0: EOF”net.OpError
dial tcp failedsarama.ConsumerError
包裹着的net.OpError
,一样是 dial tcp failed
- 在此过程中
k.group.Consume()
会先返回一次 nil;重新执行k.group.Consume()
时,会返回net.OpError
表示连不上 broker
Topic 新增了 partition
一开始的分区数是 3。在 consumer group 运行期间,使用此命令修改分区数为 4:
docker-compose exec broker kafka-topics \
--bootstrap-server broker:9092 --topic data-sync-test-1 \
--alter --partitions 4
观察到的行为是:
- Sarama 需要在下一次刷新 metadata 后才能感知到新的 partition;刷新间隔由
Config.Metadata.RefreshFrequency
指定,默认是 10 分钟 - 刷新 metadata 后,
k.group.Consume()
返回 nil,日志显示对原有分区的 subscription 被关掉 - for 循环重新调用
k.group.Consume()
后,建立了对包含新分区在内的所有分区的 subscription
日志
[Kafka]2021/10/13 10:56:50 Initializing new client
[Kafka]2021/10/13 10:56:50 client/metadata fetching metadata for all topics from broker localhost:9092
[Kafka]2021/10/13 10:56:50 Connected to broker at localhost:9092 (unregistered)
[Kafka]2021/10/13 10:56:51 client/brokers registered new broker #1 at localhost:9092
[Kafka]2021/10/13 10:56:51 Successfully initialized new client
[Kafka]2021/10/13 10:56:51 client/metadata fetching metadata for [data-sync-test-1] from broker localhost:9092
[Kafka]2021/10/13 10:56:51 client/coordinator requesting coordinator for consumergroup job-11 from localhost:9092
[Kafka]2021/10/13 10:56:51 client/coordinator coordinator for consumergroup job-11 is #1 (localhost:9092)
[Kafka]2021/10/13 10:56:51 Connected to broker at localhost:9092 (registered as #1)
[Kafka]2021/10/13 10:56:51 client/coordinator requesting coordinator for consumergroup job-11 from localhost:9092
[Kafka]2021/10/13 10:56:51 client/coordinator coordinator for consumergroup job-11 is #1 (localhost:9092)
[Kafka]2021/10/13 10:56:51 consumer/broker/1 added subscription to data-sync-test-1/0
[Kafka]2021/10/13 10:56:51 consumer/broker/1 added subscription to data-sync-test-1/1
[Kafka]2021/10/13 10:56:52 consumer/broker/1 added subscription to data-sync-test-1/4
[Kafka]2021/10/13 10:56:52 consumer/broker/1 added subscription to data-sync-test-1/3
[Kafka]2021/10/13 10:56:52 consumer/broker/1 added subscription to data-sync-test-1/2
[Kafka]2021/10/13 10:57:01 client/metadata fetching metadata for all topics from broker localhost:9092
[Kafka]2021/10/13 10:57:01 consumer/broker/1 closed dead subscription to data-sync-test-1/3
[Kafka]2021/10/13 10:57:01 consumer/broker/1 closed dead subscription to data-sync-test-1/2
[Kafka]2021/10/13 10:57:01 consumer/broker/1 closed dead subscription to data-sync-test-1/0
[Kafka]2021/10/13 10:57:01 consumer/broker/1 closed dead subscription to data-sync-test-1/1
[Kafka]2021/10/13 10:57:01 consumer/broker/1 closed dead subscription to data-sync-test-1/4
Rebalanced or context canceled
[Kafka]2021/10/13 10:57:01 client/metadata fetching metadata for [data-sync-test-1] from broker localhost:9092
[Kafka]2021/10/13 10:57:01 client/coordinator requesting coordinator for consumergroup job-11 from localhost:9092
[Kafka]2021/10/13 10:57:02 client/coordinator coordinator for consumergroup job-11 is #1 (localhost:9092)
[Kafka]2021/10/13 10:57:02 consumer/broker/1 added subscription to data-sync-test-1/4
[Kafka]2021/10/13 10:57:02 consumer/broker/1 added subscription to data-sync-test-1/0
[Kafka]2021/10/13 10:57:02 consumer/broker/1 added subscription to data-sync-test-1/3
[Kafka]2021/10/13 10:57:02 consumer/broker/1 added subscription to data-sync-test-1/5
[Kafka]2021/10/13 10:57:02 consumer/broker/1 added subscription to data-sync-test-1/1
[Kafka]2021/10/13 10:57:02 consumer/broker/1 added subscription to data-sync-test-1/2
Rebalance 发生
在一个 consumer 运行的前提下,加入第二个 consumer。
旧 consumer 的 k.group.Consume()
返回了 nil
error。
Context cancel
把 k.group.Consume(ctx, ...)
的 ctx cancel 后,Consume 仍然返回的 nil
error。这使得与 rebalance 难以分辨。
总结
对于 Sarama 库,错误处理应该是:
ConsumerGroup.Errors
中的错误很杂,有一些错误在重试后就消失了;建议记日志即可,没有必要基于它做逻辑ConsumerGroup.Consume()
如果返回错误,一般是无法恢复的错误(比如 broker 连不上)。当然随着时间过去,可能错误就消失了。