Kafka命令行工具 kcat或kafkacat
简介
kcat, 又称为 kafkacat,是一款非 JVM 的 Kafka 消息生产和消费的命令行工具,适用于 Kafka 0.8 及以上版本。
github地址:https://github.com/edenhill/kcat
生产者模式:kcat 从标准输入读取数据,-D 指定分割符,把数据发送到Kafka 集群,-b 指定集群地址, -t 指定topic名称, -p 指定分区
消费者模式:kcat 从指定的 topic 和 partition 消费数据,并打印到标准输出。
kcat 还支持打印 Kafka 集群的状态,topics,pairtions 信息。
kafkacat has been renamed to kcat to adhere to the Apache Software Foundation's (ASF) trademark policies.
kafkacat 是基于kafka C语言的librdkafka库的 kafka客户端,不依赖java,小巧轻便,支持主流系统。
在高版本的debain、Ubuntu下可以直接apt-get install kafkacat,maxos下可以用brew install kafkacat
也可以自己编译安装。
安装方式
Ubuntu 环境
12sudo apt-get updatesudo apt-get install kafkacatMacOS 环境
1brew install kcat从源码编译安装
12345678yum install -y librdkafka-develwget https://github.com/edenhill/kcat/archive/refs/tags/1.7.0.tar.gz -O kafkacat_1.7.0.tar.gztar -zxvf kafkacat_1.7.0.tar.gzcd kcat-1.7.0/./configuremake && make installkcat --help使用
Ubuntu 中使用 kafkacat, 在 MacOS 中使用 kcat, 如果是源码方式安装也是使用 kcat
生产者
常用参数:
-P: 说明以生产者模式运行
-b : Kafka集群地址,端口为9092时可省略端口
-t: topic名称
-p: 分区编号
-z: 压缩格式,如snappy
示例
读取 /var/log/syslog 文件的数据,数据写到“syslog” 主题
1 | tail -f /var/log/syslog | kafkacat -b mybroker -t syslog -P |
消费者
常用参数:
-P: 说明以消费者模式运行
-b : Kafka集群地址,端口为9092时可省略端口
-t: topic名称
-p: 分区编号
-J: 以 Json 格式打印数据
示例:
消费 syslog 所有数据
1 | kafkacat -C -b mybroker -t syslog |
消费 syslog 最近2000条数据
1 | kafkacat -C -b mybroker -t syslog -p 0 -o -2000 -e |
消费 mytopic 指定时间戳的数据
1 | kafkacat -b mybroker -C -t mytopic -o s@1568276612443 -o e@1568276617901 |
消费多个 topic
1 | kafkacat -b mybroker -G mygroup topic1 topic2 |
Kafka信息查询
查询给定时间戳对应的 offset
1 | kafkacat -b mybroker -Q -t mytopic:3:2389238523 -t mytopic2:0:18921841 |
打印 Kafka 集群所有元数据信息
1 | kafkacat -L -b mybroker |
以 Json 格式打印 Kafka 集群所有元数据信息
1 | kafkacat -b mybroker -L -J | jq . |
示例
1 2 3 4 5 6 7 8 9 | -- 生产者 /usr/pgsql-13/bin/pg_recvlogical -h 172.17.0.4 -p 5432 -U postgres -d postgres -S slot1 --start -f - | /usr/local/bin/kcat -b 127.0.0.1:9092 -t testdb_topic -- 消费者 -- 消费 first 主题中的数据 /usr/local/kafka/bin/kafka-console-consumer.sh --topic testdb_topic --bootstrap-server 127.0.0.1:9092 -- 把主题中所有的数据都读取出来(包括历史数据) /usr/local/kafka/bin/kafka-console-consumer.sh --topic testdb_topic --bootstrap-server 127.0.0.1:9092 --from-beginning |