Kafka FetchRequest 实现
Kafka client 通过 FetchReqeust
API 向 broker 拉取消息。
FetchRequest
的文档:
使用 Sarama 建立一个 consumer group 时,对于一个多分区的 topic,client 会与集群内不同 broker 建立基于 partition 的订阅关系,比如:
consumer/broker/1 added subscription to inhouse_cs_send_msg_live_america/15
consumer/broker/3 added subscription to inhouse_cs_send_msg_live_america/19
consumer/broker/3 added subscription to inhouse_cs_send_msg_live_america/25
consumer/broker/2 added subscription to inhouse_cs_send_msg_live_america/30
consumer/broker/5 added subscription to inhouse_cs_send_msg_live_america/20
consumer/broker/4 added subscription to inhouse_cs_send_msg_live_america/16
consumer/broker/3 added subscription to inhouse_cs_send_msg_live_america/1
consumer/broker/2 added subscription to inhouse_cs_send_msg_live_america/6
consumer/broker/6 added subscription to inhouse_cs_send_msg_live_america/23
consumer/broker/2 added subscription to inhouse_cs_send_msg_live_america/24
consumer/broker/4 added subscription to inhouse_cs_send_msg_live_america/10
consumer/broker/6 added subscription to inhouse_cs_send_msg_live_america/17
可以看出,一个 broker 可能为 client 提供数个 partition 的消息。
FetchRequest
的请求中,client 发向某一 broker,要求对方提供这数个已建立订阅关系的 partition 的消息。因此,理论上最多会有 N 个并行的 FetchRequest
,其中 N 是 client 与其建立订阅关系的 broker 数量。
FetchRequest
请求返回数据后,回包的消息数据会按 partition 被分发到每一个 partition 协程的 ConsumeClaim
函数中。
Sarama Consumer 配置
// Fetch is the namespace for controlling how many bytes are retrieved by any
// given request.
Fetch struct {
// The minimum number of message bytes to fetch in a request - the broker
// will wait until at least this many are available. The default is 1,
// as 0 causes the consumer to spin when no messages are available.
// Equivalent to the JVM's `fetch.min.bytes`.
Min int32
// The default number of message bytes to fetch from the broker in each
// request (default 1MB). This should be larger than the majority of
// your messages, or else the consumer will spend a lot of time
// negotiating sizes and not actually consuming. Similar to the JVM's
// `fetch.message.max.bytes`.
Default int32
// The maximum number of message bytes to fetch from the broker in a
// single request. Messages larger than this will return
// ErrMessageTooLarge and will not be consumable, so you must be sure
// this is at least as large as your largest message. Defaults to 0
// (no limit). Similar to the JVM's `fetch.message.max.bytes`. The
// global `sarama.MaxResponseSize` still applies.
Max int32
}
// The maximum amount of time the broker will wait for Consumer.Fetch.Min
// bytes to become available before it returns fewer than that anyways. The
// default is 250ms, since 0 causes the consumer to spin when no events are
// available. 100-500ms is a reasonable range for most cases. Kafka only
// supports precision up to milliseconds; nanoseconds will be truncated.
// Equivalent to the JVM's `fetch.wait.max.ms`.
MaxWaitTime time.Duration
// The maximum amount of time the consumer expects a message takes to
// process for the user. If writing to the Messages channel takes longer
// than this, that partition will stop fetching more messages until it
// can proceed again.
// Note that, since the Messages channel is buffered, the actual grace time is
// (MaxProcessingTime * ChannelBufferSize). Defaults to 100ms.
// If a message is not written to the Messages channel between two ticks
// of the expiryTicker then a timeout is detected.
// Using a ticker instead of a timer to detect timeouts should typically
// result in many fewer calls to Timer functions which may result in a
// significant performance improvement if many messages are being sent
// and timeouts are infrequent.
// The disadvantage of using a ticker instead of a timer is that
// timeouts will be less accurate. That is, the effective timeout could
// be between `MaxProcessingTime` and `2 * MaxProcessingTime`. For
// example, if `MaxProcessingTime` is 100ms then a delay of 180ms
// between two messages being sent may not be recognized as a timeout.
MaxProcessingTime time.Duration
Sarama 跟 FetchRequest
相关的配置,其文档跟实现不完全相符:
配置项 | 含义 | 文档描述 |
Fetch.Min | 作为 FetchRequest.MinBytes 参数,表示 server 端会(结合 MaxWaitTime )等待这个数量的数据再返回。 | 准确 |
Fetch.Default | 表示 FetchRequest 中 每个分区 应该拉取的数据量;如果这个值比一条消息还小,那 Sarama 会渐进提升这个值到 Fetch.Max 配置,使消息能够完整返回。 | 不准确 |
Fetch.Max | 表示 FetchRequest 中 每个分区 应该拉取的最大数据量;这个配置不会在 FetchRequest 中给 server,而是配合上一参数的 client 库自身的行为。 | 不准确 |
sarama.MaxResponseSize | 作为 FetchRequest.MaxBytes 参数,表示 server 端最多可以返回这个数据量,防止 client 端被消耗过多的内存。默认值是 100M。 | 准确 |
MaxWaitTime | 见文档 | 准确 |
MaxProcessingTime | 见文档 | 准确 |
数据量计算及 Sarama 配置推导
从 BR 抽样的 20 万条数据,每条消息(包含 header 及 body)平均约 600 字节。
1M 字节约可以包含 1700 条数据。
假如想实现:
- 消息在 Kafka 集群中最多呆 t 时间就被发送
- 一批消息至少 N 条
那么:
MaxWaitTime
应该为 tFetch.Default
应该大于 600 * N- 这样无论 topic 与 broker 的订阅是不是仅有 1 个(比如单分区 topic),也可以实现上述目标
ChannelSize
表示 client 端每 partition 可缓存的消息数量上限,至少为 2NMaxProcessingTime
至少为 delivery 请求 access 批量接口的时延