Kafka: Concept: Producer

31st August 2021 at 7:27pm

Producer 的职责是把 record 发往 broker。

Record

一条 record 需要指定 topic。可以指定 partition 及 key:

  • 如果没有指定 partition,默认会按 key 做 hash 来确定 partition
  • 如果连 key 也没有,看下文了解会被发到什么 partition

发往同个 partition 的纪录,会被攒成一批次(batch)一起发过去。

发送成功后,Kafka 会返回该 record 的 topic、partition 及 partition 中的 offset。

几种发送消息的方法

Kafka 的 Java 库提供这些方式

  • Fire-and-forget:
    • 发送完消息,不关心成功失败就返回控制权给调用方
    • 调用方无法知道消息是否成功发送
    • 失败时,Kafka 的库仍然会重试,但有可能重试也失败
    • Go 的库 shopify/sarama 在 AsyncProducer 中支持这种模式
  • Synchronous send:同步发送。Java 库中 send() 返回一个 Future 对象;Future.get() 会阻塞直至返回成功失败
  • Asynchronous send:异步发送。Java 库中 send() 带有一个 callback 参数

两种错误

  • 可重试的,比如:
    • 跟 broker 连接断开;重连即可
    • broker 返回 no leader;等待新 leader 选出。
  • 不可重试的,比如:
    • 超过了重试次数后,仍然未解决的错误
    • "message size too large"

Producer 可以配置成自动重试。

Acks

可以给 producer 配置 acks 参数,来表示分区 leader 在什么情况下对写请求作出回应。

acks=0不等 leader 回应;client 不知道有没有写成功
acks=1leader 在落自己 log 成功后回应;如果此时 leader 挂掉,有可能丢数据
acks=allleader 在自己及所有 follower 落盘成功后返回。除非所有 replicas 挂掉,否则不会丢数据

不带 key(non-keyed)的 record 被发往什么 partition?

如果 record 有 key,默认的 Partitioner 会采用 key 的 hash 来确定分区。根据 batch.sizelinger.ms 参数,这些 record 会攒成一批次发送给 broker。

如果 record 不带 key:

  • 在 Kafka 2.4 版本之前,每条 record 会组成一个单 record 批次,以 round-robin 形式发往不同的 broker
  • 从 Kafka 2.4 版本开始,这些 record 会组成一个多 record 的批次,随机地发往一个 broker;发送完后,新的几条不带 key record,又会被组成一个批次,随机发往一个 broker。这个特性被称为 sticky partitioner。提升了读写性能

参考文档: