Kafka: CLI

 8th November 2022 at 6:34pm

列出 group 对各 topic X partition 的 offset

kafka-consumer-groups --bootstrap-server localhost:9092 --group <group_name> --describe

修改 group X topic X partition 到特定的 offset

kafka-consumer-groups --bootstrap-server localhost:9092 --group <group_name> --topic <topic_name>[:<partition number>] --reset-offsets --to-offset <Long: offset> --execute

该 group 不存在时会新建 group。

假如该集群需要认证才可访问,可以新建一个 properties 文件,并在 CLI 选项上附上:

--consumer.config consumer.properties

properties 文件示例:

security.protocol=SASL_PLAINTEXT
sasl.mechanism=PLAIN
sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required \
  username="your-username" \
  password="your-password";

下载消息

下载消息可以用 Kafka 官方的 CLI,也可以试试用 kaf。Kaf 是纯 Go 程序,部署的成本低一些。但是它的功能相对弱一些。

kafka-console-consumer \
    --bootstrap-server localhost:9092 \
    --topic <topic> --from-beginning --max-messages 200000 \
    --property print.timestamp=true --property print.key=true --property print.offset=true \
    --property print.partition=true --property print.headers=true --property print.value=true > msg.txt

配套 Python 程序,转成 CSV:

import csv
import dataclasses
import json


@dataclasses.dataclass
class Message:
    kafka_create_time: str
    kafka_partition: int
    kafka_offset: int
    kafka_key: str
    kafka_content: dict


def parse_row(row):
    create_time = row[0].split(":")[-1]
    partition = int(row[1].split(":")[-1])
    offset = int(row[2].split(":")[-1])

    headers = {}
    for header in row[3].split(","):
        parts = header.split(":")
        headers[parts[0]] = parts[1]

    key = row[4]
    content = row[5]

    return Message(create_time, partition, offset, key, content)


def main():
    msgs = []
    with open("msg.txt", newline="") as csvfile:
        reader = csv.reader(csvfile, delimiter="\t")
        for row in reader:
            msg = parse_row(row)
            msgs.append(msg)

    with open("msg.csv", mode="w") as csvfile:
        writer = csv.writer(csvfile)

        writer.writerow(f.name for f in dataclasses.fields(Message))
        for msg in msgs:
            writer.writerow(dataclasses.astuple(msg))


if __name__ == '__main__':
    main()

之后可以再使用 wireservice/csvkit 将数据导入 DB 做分析。

删除 consumer group

只能针对所有 topic 删除,无法针对单个。

kafka-consumer-groups --bootstrap-server localhost:9092 --delete --group <group_name>