参考: - https://kafka-python.readthedocs.io/en/master/index.html - KafkaConsumer - https://kafka-python.readthedocs.io/en/master/apidoc/KafkaConsumer.html - KafkaProducer - https://kafka-python.readthedocs.io/en/master/apidoc/KafkaProducer.html - KafkaAdminClient - https://kafka-python.readthedocs.io/en/master/apidoc/KafkaAdminClient.html - KafkaClient - https://kafka-python.readthedocs.io/en/master/apidoc/KafkaClient.html - BrokerConnection - https://kafka-python.readthedocs.io/en/master/apidoc/BrokerConnection.html - ClusterMetadata - https://kafka-python.readthedocs.io/en/master/apidoc/ClusterMetadata.html
1. 获取所有的 topics
from kafka import KafkaConsumer
bootstrap_servers="localhost:9092"
topic = 'hello-world'
consumer = KafkaConsumer(
bootstrap_servers=bootstrap_servers,
group_id='debug-reader',
auto_offset_reset='earliest', # latest, earliest
consumer_timeout_ms=1000)
consumer.subscribe(topic)
topics = consumer.topics()
print(topics)
2. 获取所有的 partitions
from kafka import KafkaProducer
bootstrap_servers="localhost:9092"
topic = 'hello-world'
prd = KafkaProducer(bootstrap_servers=bootstrap_servers)
partitions = prd.partitions_for(topic)
print(partitions)
3. 获取所有 broker 信息
from kafka.cluster import ClusterMetadata
bootstrap_servers="localhost:9092"
topic = 'hello-world'
meta = ClusterMetadata(bootstrap_servers=bootstrap_servers)
brokers = meta.brokers()
for b in brokers:
logger.info(b)