参考:https://pypi.org/project/kafka-python/
安装 kafka-python
pip install kafka-python
消费者
文档:https://kafka-python.readthedocs.io/en/master/apidoc/KafkaConsumer.html
from kafka import KafkaConsumer
# 创建消费者
consumer = KafkaConsumer('my_favorite_topic', consumer_timeout_ms=3000) # poll 消息的时候超时时间设置为 3秒
for msg in consumer:
print (msg)
指定 group
consumer = KafkaConsumer('my_favorite_topic', group_id='my_favorite_group', auto_offset_reset='earliest')
- auto_offset_reset 可选项:
earliest
、latest
手动分配 partition
from kafka import TopicPartition
consumer = KafkaConsumer(bootstrap_servers='localhost:1234')
consumer.assign([TopicPartition('foobar', 2)])
msg = next(consumer)
设置反序列化器
consumer = KafkaConsumer(value_deserializer=msgpack.loads)
consumer.subscribe(['msgpackfoo'])
for msg in consumer:
assert isinstance(msg.value, dict)
获取消息头(没搞明白,kafka 也有消息头?)
for msg in consumer:
print (msg.headers)
获取metrics 信息(蕾丝性能监控啥的)
metrics = consumer.metrics()
2. 每次都从最开始消费
参考:https://github.com/dpkp/kafka-python/issues/601
from kafka import KafkaConsumer
consumer = KafkaConsumer(
bootstrap_servers='localhost:9092',
group_id="consumers_group" # <- solution
)
consumer.subscribe(['test-topic-1', 'test-topic-2'])
consumer.poll(max_records=1, update_offsets=False) # <- solution
consumer.seek_to_beginning()
生产者
from kafka import KafkaProducer
producer = KafkaProducer(bootstrap_servers='localhost:1234')
for _ in range(100):
producer.send('foobar', b'some_message_bytes')
同步发送
future = producer.send('foobar', b'another_message')
result = future.get(timeout=60)
消息发送到网络上(未必真的有效)
producer.flush()
用来做 hash 的 key
producer.send('foobar', key=b'foo', value=b'bar')
序列化消息
import json
producer = KafkaProducer(value_serializer=lambda v: json.dumps(v).encode('utf-8'))
producer.send('fizzbuzz', {'foo': 'bar'})
序列化 key
producer = KafkaProducer(key_serializer=str.encode)
producer.send('flipflap', key='ping', value=b'1234')
压缩消息
producer = KafkaProducer(compression_type='gzip')
for i in range(1000):
producer.send('foobar', b'msg %d' % i)
producer.flush() # 等待消息发送完成
添加消息头
producer.send('foobar', value=b'c29tZSB2YWx1ZQ==', headers=[('content-encoding', b'base64')])
获取生产者的 metrics
metrics = producer.metrics()