下面的函数给出了一套在线上用过、相对 OK 的参数配置:
func NewSaramaConfig() *sarama.Config {
// NewConfig 设置了一些默认值
c := sarama.NewConfig()
c.Net.DialTimeout = 5 * time.Second
c.Net.ReadTimeout = 5 * time.Second
c.Net.WriteTimeout = 5 * time.Second
c.Net.KeepAlive = 5 * time.Second
// 刷新 metadata 的频率,默认为 10 分钟。见 Metadata.RefreshFrequency。
// 不要获取整个集群的 metadata。一些集群有好几千 topic,获取完整 metadata 会很耗内存(观察到 Metadata 占了 30M 内存)
c.Metadata.Full = false
c.Producer.Timeout = 5 * time.Second
c.Consumer.Return.Errors = true
c.Consumer.Offsets.Initial = sarama.OffsetNewest
c.Consumer.Offsets.AutoCommit.Enable = true
c.Consumer.Offsets.Retention = 7 * 24 * time.Hour
// 当需要处理的数据量大时,假如 Sarama 的 channel 缓冲区被填满(大小由 c.ChannelBufferSize 指定),Sarama 会在等待
// c.Consumer.MaxProcessingTime 后暂断开该 partition 与 broker 的连接:
// consumer/broker/2 abandoned subscription to shopee_inhouse_test__dwd_account_tab/0 because consuming was taking too long
// 这里设置一个比较大的时间,以避免频繁与 broker 断开连接而影响消费速度。同时外部应该设置合理的 ChannelBufferSize。
c.Consumer.MaxProcessingTime = 10 * time.Second
// 处理 Source Config
c.Net.SASL.Enable = sc.SASL.Enable
c.Net.SASL.Version = sc.SASL.Version
c.Net.SASL.User = sc.SASL.User
c.Net.SASL.Password = sc.SASL.Password
return c
}