使用 python 监控 kafka 服务状态

创建日期: 2025-01-17 19:42 | 作者: 风波 | 浏览次数: 15 | 分类: Kafka

如果遇到 kafka 报错,可以参考文档 python 3.12.0 之后使用 kaki 报错 kafka.vendor.six.moves

kafka-python 文档:https://kafka-python.readthedocs.io/en/master/apidoc/KafkaAdminClient.html

from loguru import logger
import sys

from kafka import KafkaAdminClient, KafkaConsumer


class PartitionStatus:
    def __init__(self, topic, partition, offset):
        self.topic = topic
        self.partition = partition
        self.offset = offset


class KafkaPeeper:
    def __init__(self, **configs):
        self.adminer = KafkaAdminClient(**configs)
        self.consumer = KafkaConsumer(**configs)


    def list_topics(self):
        topics = self.consumer.topics()
        return sorted(list(topics), key=lambda x: x)


    def list_groups(self):
        groups = self.adminer.list_consumer_groups()
        return [g[0] for g in groups]


    def list_group_status(self, checked_topics=None, checked_groups=None):
        logger.debug(f"topics filter: {checked_topics}")
        logger.debug(f"groups filter: {checked_groups}")
        status = dict()
        groups = self.adminer.list_consumer_groups()
        keep_topics = set()
        drop_topics = set()
        group_topics = set()
        for g in groups:
            group_name = g[0]
            if checked_groups and group_name not in checked_groups:
                logger.debug(f"ignore offset of group: {group_name}")
                continue
            of = self.adminer.list_consumer_group_offsets(group_name)
            for k, v in of.items():
                group_topics.add(k.topic + "\t" + group_name)
                if checked_topics and k.topic not in checked_topics:
                    logger.debug(f"ignore offset of topic: {k.topic}")
                    drop_topics.add(k.topic)
                    continue
                keep_topics.add(k.topic)
                logger.debug(f"get    offset of topic: {k.topic}")
                poofs = status.get(k.topic, list())
                poofs.append({
                    "topic": k.topic,
                    "partition": k.partition,
                    "offset": v.offset,
                    "group": group_name,
                    })
                status[k.topic] = sorted(poofs, key=lambda x: x["group"] + "," + str(len(str(x['partition']))) + "-" + str(x['partition']))
        logger.debug("keep_topics: \n\t{}".format('\n\t'.join(keep_topics)))
        logger.debug("drop_topics: \n\t{}".format('\n\t'.join(drop_topics)))
        logger.debug("group_topics: \n\t{}".format('\n\t'.join(sorted(list(group_topics)))))

        return status

使用

client = KafkaPeeper(bootstrap_servers=bootstrap_servers, request_timeout_ms=10 * 1000, api_version_auto_timeout_ms=10 * 1000)

groups = client.list_groups()
topics = client.list_topics()

offsets = client.list_group_status(checked_topics=checked_topics, checked_groups=checked_groups)

参考: - https://stackoverflow.com/questions/35432326/how-to-get-latest-offset-for-a-partition-for-a-kafka-topic/35442428

from kafka import SimpleClient
from kafka.protocol.offset import OffsetRequest, OffsetResetStrategy
from kafka.common import OffsetRequestPayload

client = SimpleClient(brokers)

partitions = client.topic_partitions[topic]
offset_requests = [OffsetRequestPayload(topic, p, -1, 1) for p in partitions.keys()]

offsets_responses = client.send_offset_request(offset_requests)

for r in offsets_responses:
    print "partition = %s, offset = %s"%(r.partition, r.offsets[0])
15 浏览
16 爬虫
0 评论