如果遇到 kafka 报错,可以参考文档 python 3.12.0 之后使用 kaki 报错 kafka.vendor.six.moves
kafka-python 文档:https://kafka-python.readthedocs.io/en/master/apidoc/KafkaAdminClient.html
from loguru import logger
import sys
from kafka import KafkaAdminClient, KafkaConsumer
class PartitionStatus:
def __init__(self, topic, partition, offset):
self.topic = topic
self.partition = partition
self.offset = offset
class KafkaPeeper:
def __init__(self, **configs):
self.adminer = KafkaAdminClient(**configs)
self.consumer = KafkaConsumer(**configs)
def list_topics(self):
topics = self.consumer.topics()
return sorted(list(topics), key=lambda x: x)
def list_groups(self):
groups = self.adminer.list_consumer_groups()
return [g[0] for g in groups]
def list_group_status(self, checked_topics=None, checked_groups=None):
logger.debug(f"topics filter: {checked_topics}")
logger.debug(f"groups filter: {checked_groups}")
status = dict()
groups = self.adminer.list_consumer_groups()
keep_topics = set()
drop_topics = set()
group_topics = set()
for g in groups:
group_name = g[0]
if checked_groups and group_name not in checked_groups:
logger.debug(f"ignore offset of group: {group_name}")
continue
of = self.adminer.list_consumer_group_offsets(group_name)
for k, v in of.items():
group_topics.add(k.topic + "\t" + group_name)
if checked_topics and k.topic not in checked_topics:
logger.debug(f"ignore offset of topic: {k.topic}")
drop_topics.add(k.topic)
continue
keep_topics.add(k.topic)
logger.debug(f"get offset of topic: {k.topic}")
poofs = status.get(k.topic, list())
poofs.append({
"topic": k.topic,
"partition": k.partition,
"offset": v.offset,
"group": group_name,
})
status[k.topic] = sorted(poofs, key=lambda x: x["group"] + "," + str(len(str(x['partition']))) + "-" + str(x['partition']))
logger.debug("keep_topics: \n\t{}".format('\n\t'.join(keep_topics)))
logger.debug("drop_topics: \n\t{}".format('\n\t'.join(drop_topics)))
logger.debug("group_topics: \n\t{}".format('\n\t'.join(sorted(list(group_topics)))))
return status
使用
client = KafkaPeeper(bootstrap_servers=bootstrap_servers, request_timeout_ms=10 * 1000, api_version_auto_timeout_ms=10 * 1000)
groups = client.list_groups()
topics = client.list_topics()
offsets = client.list_group_status(checked_topics=checked_topics, checked_groups=checked_groups)
checked_topics
和checked_groups
为过滤条件,避免将所有的 topic 的所有 partition 的所有 group 都返回。
参考: - https://stackoverflow.com/questions/35432326/how-to-get-latest-offset-for-a-partition-for-a-kafka-topic/35442428
from kafka import SimpleClient
from kafka.protocol.offset import OffsetRequest, OffsetResetStrategy
from kafka.common import OffsetRequestPayload
client = SimpleClient(brokers)
partitions = client.topic_partitions[topic]
offset_requests = [OffsetRequestPayload(topic, p, -1, 1) for p in partitions.keys()]
offsets_responses = client.send_offset_request(offset_requests)
for r in offsets_responses:
print "partition = %s, offset = %s"%(r.partition, r.offsets[0])