python Kafka 支持大消息

创建日期: 2024-09-04 10:24 | 作者: 风波 | 浏览次数: 16 | 分类: Kafka

1. server 端支持的消息大小设置

使用 docker 镜像 bitnami/kafka:3.2 创建 kafka 服务时,设置环境变量 KAFKA_CFG_MESSAGE_MAX_BYTES 即可设置服务端的 message.max.bytes 值。

设置换环境变量最后会被写入到配置文件 /opt/bitnami/kafka/config/server.properties 中。服务在启动时使用的配置文件就是这个。

通过容器内的脚本文件 /opt/bitnami/scripts/kafka-env.sh 可以看到这个镜像支持的环境变量列表。

2. producer 支持的消息大小设置

KafkaProducer 文档:https://kafka-python.readthedocs.io/en/master/apidoc/KafkaProducer.html

创建 KafkaProducer 的时候使用 max_request_size 参数指定发送的消息体的最大值。

import json
from kafka import KafkaProducer

producer = KafkaProducer(
            bootstrap_servers='127.0.0.1:9092',
            value_serializer=lambda v: json.dumps(v).encode('utf-8'),
            max_request_size=104857600 # 100MB
        )

3. consumer 端支持的消息大小设置

kafka-python Consomuer 文档:https://kafka-python.readthedocs.io/en/master/apidoc/KafkaConsumer.html

创建 consumer 时使用 max_partition_fetch_bytes 参数指定可以接收的最大消息的大小,默认值是 1048576(也就是1MB)

from kafka import KafkaConsumer

bootstrap_servers = "127.0.0.1:9092"
# 创建消费者
consumer = KafkaConsumer(
        group_id='test-001',
        bootstrap_servers=bootstrap_servers,
        auto_offset_reset='earliest',
        max_partition_fetch_bytes=104857600) # 设置可以接收的最大消息体大小为100MB
16 浏览
10 爬虫
0 评论