列出 group 对各 topic X partition 的 offset
kafka-consumer-groups --bootstrap-server localhost:9092 --group <group_name> --describe
修改 group X topic X partition 到特定的 offset
kafka-consumer-groups --bootstrap-server localhost:9092 --group <group_name> --topic <topic_name>[:<partition number>] --reset-offsets --to-offset <Long: offset> --execute
该 group 不存在时会新建 group。
假如该集群需要认证才可访问,可以新建一个 properties 文件,并在 CLI 选项上附上:
--consumer.config consumer.properties
properties 文件示例:
security.protocol=SASL_PLAINTEXT
sasl.mechanism=PLAIN
sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required \
username="your-username" \
password="your-password";
下载消息
下载消息可以用 Kafka 官方的 CLI,也可以试试用 kaf。Kaf 是纯 Go 程序,部署的成本低一些。但是它的功能相对弱一些。
kafka-console-consumer \
--bootstrap-server localhost:9092 \
--topic <topic> --from-beginning --max-messages 200000 \
--property print.timestamp=true --property print.key=true --property print.offset=true \
--property print.partition=true --property print.headers=true --property print.value=true > msg.txt
配套 Python 程序,转成 CSV:
import csv
import dataclasses
import json
@dataclasses.dataclass
class Message:
kafka_create_time: str
kafka_partition: int
kafka_offset: int
kafka_key: str
kafka_content: dict
def parse_row(row):
create_time = row[0].split(":")[-1]
partition = int(row[1].split(":")[-1])
offset = int(row[2].split(":")[-1])
headers = {}
for header in row[3].split(","):
parts = header.split(":")
headers[parts[0]] = parts[1]
key = row[4]
content = row[5]
return Message(create_time, partition, offset, key, content)
def main():
msgs = []
with open("msg.txt", newline="") as csvfile:
reader = csv.reader(csvfile, delimiter="\t")
for row in reader:
msg = parse_row(row)
msgs.append(msg)
with open("msg.csv", mode="w") as csvfile:
writer = csv.writer(csvfile)
writer.writerow(f.name for f in dataclasses.fields(Message))
for msg in msgs:
writer.writerow(dataclasses.astuple(msg))
if __name__ == '__main__':
main()
之后可以再使用 wireservice/csvkit 将数据导入 DB 做分析。
删除 consumer group
只能针对所有 topic 删除,无法针对单个。
kafka-consumer-groups --bootstrap-server localhost:9092 --delete --group <group_name>