Kafka: Library: Sarama: Fetch Messages

 24th May 2022 at 9:00pm

Kafka FetchRequest 实现

Kafka client 通过 FetchReqeust API 向 broker 拉取消息。

FetchRequest 的文档:

使用 Sarama 建立一个 consumer group 时,对于一个多分区的 topic,client 会与集群内不同 broker 建立基于 partition 的订阅关系,比如:

consumer/broker/1 added subscription to inhouse_cs_send_msg_live_america/15
consumer/broker/3 added subscription to inhouse_cs_send_msg_live_america/19
consumer/broker/3 added subscription to inhouse_cs_send_msg_live_america/25
consumer/broker/2 added subscription to inhouse_cs_send_msg_live_america/30
consumer/broker/5 added subscription to inhouse_cs_send_msg_live_america/20
consumer/broker/4 added subscription to inhouse_cs_send_msg_live_america/16
consumer/broker/3 added subscription to inhouse_cs_send_msg_live_america/1
consumer/broker/2 added subscription to inhouse_cs_send_msg_live_america/6
consumer/broker/6 added subscription to inhouse_cs_send_msg_live_america/23
consumer/broker/2 added subscription to inhouse_cs_send_msg_live_america/24
consumer/broker/4 added subscription to inhouse_cs_send_msg_live_america/10
consumer/broker/6 added subscription to inhouse_cs_send_msg_live_america/17

可以看出,一个 broker 可能为 client 提供数个 partition 的消息。

FetchRequest 的请求中,client 发向某一 broker,要求对方提供这数个已建立订阅关系的 partition 的消息。因此,理论上最多会有 N 个并行的 FetchRequest,其中 N 是 client 与其建立订阅关系的 broker 数量。

FetchRequest 请求返回数据后,回包的消息数据会按 partition 被分发到每一个 partition 协程的 ConsumeClaim 函数中。

Sarama Consumer 配置

// Fetch is the namespace for controlling how many bytes are retrieved by any
// given request.
Fetch struct {
    // The minimum number of message bytes to fetch in a request - the broker
    // will wait until at least this many are available. The default is 1,
    // as 0 causes the consumer to spin when no messages are available.
    // Equivalent to the JVM's `fetch.min.bytes`.
    Min int32
    // The default number of message bytes to fetch from the broker in each
    // request (default 1MB). This should be larger than the majority of
    // your messages, or else the consumer will spend a lot of time
    // negotiating sizes and not actually consuming. Similar to the JVM's
    // `fetch.message.max.bytes`.
    Default int32
    // The maximum number of message bytes to fetch from the broker in a
    // single request. Messages larger than this will return
    // ErrMessageTooLarge and will not be consumable, so you must be sure
    // this is at least as large as your largest message. Defaults to 0
    // (no limit). Similar to the JVM's `fetch.message.max.bytes`. The
    // global `sarama.MaxResponseSize` still applies.
    Max int32
}
// The maximum amount of time the broker will wait for Consumer.Fetch.Min
// bytes to become available before it returns fewer than that anyways. The
// default is 250ms, since 0 causes the consumer to spin when no events are
// available. 100-500ms is a reasonable range for most cases. Kafka only
// supports precision up to milliseconds; nanoseconds will be truncated.
// Equivalent to the JVM's `fetch.wait.max.ms`.
MaxWaitTime time.Duration

// The maximum amount of time the consumer expects a message takes to
// process for the user. If writing to the Messages channel takes longer
// than this, that partition will stop fetching more messages until it
// can proceed again.
// Note that, since the Messages channel is buffered, the actual grace time is
// (MaxProcessingTime * ChannelBufferSize). Defaults to 100ms.
// If a message is not written to the Messages channel between two ticks
// of the expiryTicker then a timeout is detected.
// Using a ticker instead of a timer to detect timeouts should typically
// result in many fewer calls to Timer functions which may result in a
// significant performance improvement if many messages are being sent
// and timeouts are infrequent.
// The disadvantage of using a ticker instead of a timer is that
// timeouts will be less accurate. That is, the effective timeout could
// be between `MaxProcessingTime` and `2 * MaxProcessingTime`. For
// example, if `MaxProcessingTime` is 100ms then a delay of 180ms
// between two messages being sent may not be recognized as a timeout.
MaxProcessingTime time.Duration

Sarama 跟 FetchRequest 相关的配置,其文档跟实现不完全相符:

配置项含义文档描述
Fetch.Min作为 FetchRequest.MinBytes 参数,表示 server 端会(结合 MaxWaitTime)等待这个数量的数据再返回。准确
Fetch.Default表示 FetchRequest每个分区 应该拉取的数据量;如果这个值比一条消息还小,那 Sarama 会渐进提升这个值到 Fetch.Max 配置,使消息能够完整返回。不准确
Fetch.Max表示 FetchRequest每个分区 应该拉取的最大数据量;这个配置不会在 FetchRequest 中给 server,而是配合上一参数的 client 库自身的行为。不准确
sarama.MaxResponseSize作为 FetchRequest.MaxBytes 参数,表示 server 端最多可以返回这个数据量,防止 client 端被消耗过多的内存。默认值是 100M。准确
MaxWaitTime见文档准确
MaxProcessingTime见文档准确

数据量计算及 Sarama 配置推导

从 BR 抽样的 20 万条数据,每条消息(包含 header 及 body)平均约 600 字节。

1M 字节约可以包含 1700 条数据。

假如想实现:

  • 消息在 Kafka 集群中最多呆 t 时间就被发送
  • 一批消息至少 N 条

那么:

  • MaxWaitTime 应该为 t
  • Fetch.Default 应该大于 600 * N
    • 这样无论 topic 与 broker 的订阅是不是仅有 1 个(比如单分区 topic),也可以实现上述目标
  • ChannelSize 表示 client 端每 partition 可缓存的消息数量上限,至少为 2N
  • MaxProcessingTime 至少为 delivery 请求 access 批量接口的时延