参考:https://www.baeldung.com/ops/kafka-list-topics
1. 创建 topic
/opt/bitnami/kafka/bin/kafka-topics.sh --create --topic users.registrations --replication-factor 1 --partitions 2 --zookeeper localhost:2181
/opt/bitnami/kafka/bin/kafka-topics.sh --create --topic users.verfications --replication-factor 1 --partitions 2 --zookeeper localhost:2181
2. 列出所有 topic
/opt/bitnami/kafka/bin/kafka-topics.sh --bootstrap-server=localhost:9092 --list
# 或者
/opt/bitnami/kafka/bin/kafka-topics.sh --list --zookeeper localhost:2181
3. 查看 topic 细节
/opt/bitnami/kafka/bin/kafka-topics.sh --bootstrap-server=localhost:9092 --describe --topic users.registrations
users.registrations
是 topic
4. 消费 topic 的消息
# 消费从0开始的所有消息
/opt/bitnami/kafka/bin/kafka-console-consumer.sh --bootstrap-server=localhost:9092 --from-beginning --topic topic_name
# 只消费最新的消息
/opt/bitnami/kafka/bin/kafka-console-consumer.sh --bootstrap-server=localhost:9092 --topic topic_name
5. 删除 topic
/opt/bitnami/kafka/bin/kafka-topics.sh --delete --bootstrap-server=localhost:9092 --topic topic_name
6. 查看一个 topic 下最后的 n 条消息
参考:https://devpress.csdn.net/bigdata/6304af6bc67703293080d8c2.html 先获取 topic 的最大 offset 和分区信息
bin/kafka-run-class.sh kafka.tools.GetOffsetShell --broker-list localhost:9092 --topic mytopic
获取最大 offset 存储变量。因为会有 \r
字符,所以需要使用 grep
过滤一遍。
maxoffset=$(bin/kafka-run-class.sh kafka.tools.GetOffsetShell --broker-list localhost:9092 --topic mytopic | head -n 1 | awk -F ':' '{print $(NF)}' | grep -Eio '[0-9]+')
再从指定的 offset 读取消息
bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic mytopic --offset 10 --partition 0
7. 查看指定 topic 的 padition 数量
./bin/kafka-topics.sh --describe --bootstrap-server "172.18.96.119:9092" --topic TOPIC_NAME