python-kafka 使用

创建日期: 2022-07-07 18:10 | 作者: 风波 | 浏览次数: 24 | 分类: Kafka

参考:https://pypi.org/project/kafka-python/

安装 kafka-python

pip install kafka-python

消费者

文档:https://kafka-python.readthedocs.io/en/master/apidoc/KafkaConsumer.html

from kafka import KafkaConsumer
# 创建消费者
consumer = KafkaConsumer('my_favorite_topic', consumer_timeout_ms=3000) # poll 消息的时候超时时间设置为 3秒
for msg in consumer:
    print (msg)

指定 group

consumer = KafkaConsumer('my_favorite_topic', group_id='my_favorite_group', auto_offset_reset='earliest')

手动分配 partition

from kafka import TopicPartition
consumer = KafkaConsumer(bootstrap_servers='localhost:1234')
consumer.assign([TopicPartition('foobar', 2)])
msg = next(consumer)

设置反序列化器

consumer = KafkaConsumer(value_deserializer=msgpack.loads)
consumer.subscribe(['msgpackfoo'])
for msg in consumer:
    assert isinstance(msg.value, dict)

获取消息头(没搞明白,kafka 也有消息头?)

for msg in consumer:
    print (msg.headers)

获取metrics 信息(蕾丝性能监控啥的)

metrics = consumer.metrics()

2. 每次都从最开始消费

参考:https://github.com/dpkp/kafka-python/issues/601

from kafka import KafkaConsumer
consumer = KafkaConsumer(
    bootstrap_servers='localhost:9092', 
    group_id="consumers_group" # <- solution
)
consumer.subscribe(['test-topic-1', 'test-topic-2'])
consumer.poll(max_records=1,  update_offsets=False) # <- solution
consumer.seek_to_beginning()

生产者

from kafka import KafkaProducer
producer = KafkaProducer(bootstrap_servers='localhost:1234')
for _ in range(100):
    producer.send('foobar', b'some_message_bytes')

同步发送

future = producer.send('foobar', b'another_message')
result = future.get(timeout=60)

消息发送到网络上(未必真的有效)

producer.flush()

用来做 hash 的 key

producer.send('foobar', key=b'foo', value=b'bar')

序列化消息

import json
producer = KafkaProducer(value_serializer=lambda v: json.dumps(v).encode('utf-8'))
producer.send('fizzbuzz', {'foo': 'bar'})

序列化 key

producer = KafkaProducer(key_serializer=str.encode)
producer.send('flipflap', key='ping', value=b'1234')

压缩消息

producer = KafkaProducer(compression_type='gzip')
for i in range(1000):
    producer.send('foobar', b'msg %d' % i)

producer.flush() # 等待消息发送完成

添加消息头

producer.send('foobar', value=b'c29tZSB2YWx1ZQ==', headers=[('content-encoding', b'base64')])

获取生产者的 metrics

metrics = producer.metrics()
24 浏览
18 爬虫
0 评论