Kafka: Library: Sarama: Config

 20th May 2022 at 4:29pm

下面的函数给出了一套在线上用过、相对 OK 的参数配置:

func NewSaramaConfig() *sarama.Config {
	// NewConfig 设置了一些默认值
	c := sarama.NewConfig()

	c.Net.DialTimeout = 5 * time.Second
	c.Net.ReadTimeout = 5 * time.Second
	c.Net.WriteTimeout = 5 * time.Second
	c.Net.KeepAlive = 5 * time.Second

	// 刷新 metadata 的频率,默认为 10 分钟。见 Metadata.RefreshFrequency。
	// 不要获取整个集群的 metadata。一些集群有好几千 topic,获取完整 metadata 会很耗内存(观察到 Metadata 占了 30M 内存)
	c.Metadata.Full = false

	c.Producer.Timeout = 5 * time.Second

	c.Consumer.Return.Errors = true
	c.Consumer.Offsets.Initial = sarama.OffsetNewest
	c.Consumer.Offsets.AutoCommit.Enable = true
	c.Consumer.Offsets.Retention = 7 * 24 * time.Hour

	// 当需要处理的数据量大时,假如 Sarama 的 channel 缓冲区被填满(大小由 c.ChannelBufferSize 指定),Sarama 会在等待
	// c.Consumer.MaxProcessingTime 后暂断开该 partition 与 broker 的连接:
	// consumer/broker/2 abandoned subscription to shopee_inhouse_test__dwd_account_tab/0 because consuming was taking too long
	// 这里设置一个比较大的时间,以避免频繁与 broker 断开连接而影响消费速度。同时外部应该设置合理的 ChannelBufferSize。
	c.Consumer.MaxProcessingTime = 10 * time.Second

	// 处理 Source Config
	c.Net.SASL.Enable = sc.SASL.Enable
	c.Net.SASL.Version = sc.SASL.Version
	c.Net.SASL.User = sc.SASL.User
	c.Net.SASL.Password = sc.SASL.Password

	return c
}