Producer 的职责是把 record 发往 broker。
Record
一条 record 需要指定 topic。可以指定 partition 及 key:
- 如果没有指定 partition,默认会按 key 做 hash 来确定 partition
- 如果连 key 也没有,看下文了解会被发到什么 partition
发往同个 partition 的纪录,会被攒成一批次(batch)一起发过去。
发送成功后,Kafka 会返回该 record 的 topic、partition 及 partition 中的 offset。
几种发送消息的方法
Kafka 的 Java 库提供这些方式
- Fire-and-forget:
- 发送完消息,不关心成功失败就返回控制权给调用方
- 调用方无法知道消息是否成功发送
- 失败时,Kafka 的库仍然会重试,但有可能重试也失败
- Go 的库 shopify/sarama 在
AsyncProducer
中支持这种模式
- Synchronous send:同步发送。Java 库中
send()
返回一个 Future 对象;Future.get()
会阻塞直至返回成功失败 - Asynchronous send:异步发送。Java 库中
send()
带有一个 callback 参数
两种错误
- 可重试的,比如:
- 跟 broker 连接断开;重连即可
- broker 返回 no leader;等待新 leader 选出。
- 不可重试的,比如:
- 超过了重试次数后,仍然未解决的错误
- "message size too large"
Producer 可以配置成自动重试。
Acks
可以给 producer 配置 acks 参数,来表示分区 leader 在什么情况下对写请求作出回应。
acks=0 | 不等 leader 回应;client 不知道有没有写成功 |
---|---|
acks=1 | leader 在落自己 log 成功后回应;如果此时 leader 挂掉,有可能丢数据 |
acks=all | leader 在自己及所有 follower 落盘成功后返回。除非所有 replicas 挂掉,否则不会丢数据 |
不带 key(non-keyed)的 record 被发往什么 partition?
如果 record 有 key,默认的 Partitioner
会采用 key 的 hash 来确定分区。根据 batch.size
及 linger.ms
参数,这些 record 会攒成一批次发送给 broker。
如果 record 不带 key:
- 在 Kafka 2.4 版本之前,每条 record 会组成一个单 record 批次,以 round-robin 形式发往不同的 broker
- 从 Kafka 2.4 版本开始,这些 record 会组成一个多 record 的批次,随机地发往一个 broker;发送完后,新的几条不带 key record,又会被组成一个批次,随机发往一个 broker。这个特性被称为 sticky partitioner。提升了读写性能
参考文档:
- Confluent 官方文章