1. server 端支持的消息大小设置
使用 docker 镜像 bitnami/kafka:3.2 创建 kafka 服务时,设置环境变量 KAFKA_CFG_MESSAGE_MAX_BYTES 即可设置服务端的 message.max.bytes 值。
设置换环境变量最后会被写入到配置文件 /opt/bitnami/kafka/config/server.properties 中。服务在启动时使用的配置文件就是这个。
通过容器内的脚本文件 /opt/bitnami/scripts/kafka-env.sh 可以看到这个镜像支持的环境变量列表。
- KAFKA_AUTO_CREATE_TOPICS_ENABLE
 - KAFKA_BASE_DIR
 - KAFKA_BROKER_PASSWORD
 - KAFKA_BROKER_USER
 - KAFKA_CERTIFICATE_PASSWORD
 - KAFKA_CERTS_DIR
 KAFKA_CFG_- 一些其它环境变量的开头- KAFKA_CFG_ADVERTISED_LISTENERS
 - KAFKA_CFG_AUTO_CREATE_TOPICS_ENABLE
 - KAFKA_CFG_BROKER_ID
 - KAFKA_CFG_CONTROLLER_LISTENER_NAMES
 - KAFKA_CFG_CONTROLLER_QUORUM_VOTERS
 - KAFKA_CFG_INTER_BROKER_LISTENER_NAME
 - KAFKA_CFG_LISTENERS
 - KAFKA_CFG_LISTENER_SECURITY_PROTOCOL_MAP
 - KAFKA_CFG_LOG_DIRS
 - KAFKA_CFG_LOG_SEGMENT_BYTES
 KAFKA_CFG_MAX_PARTITION_FETCH_BYTES- consumer 获取消息的最大字节数KAFKA_CFG_MAX_REQUEST_SIZE- producer 发送消息的最大字节数KAFKA_CFG_MESSAGE_MAX_BYTES- server 接收的最大消息字节数- KAFKA_CFG_PORT
 - KAFKA_CFG_PROCESS_ROLES
 - KAFKA_CFG_SASL_ENABLED_MECHANISMS
 - KAFKA_CFG_SASL_MECHANISM_INTER_BROKER_PROTOCOL
 - KAFKA_CFG_ZOOKEEPER_CLIENTCNXNSOCKET
 - KAFKA_CFG_ZOOKEEPER_CONNECT
 - KAFKA_CFG_ZOOKEEPER_CONNECTION_TIMEOUT_MS
 - KAFKA_CLIENT_PASSWORDS
 - KAFKA_CLIENT_USERS
 - KAFKA_CONF_DIR
 - KAFKA_CONF_FILE
 - KAFKA_DAEMON_GROUP
 - KAFKA_DAEMON_USER
 - KAFKA_DATA_DIR
 - KAFKA_ENABLE_KRAFT
 - KAFKA_EXTRA_FLAGS
 - KAFKA_HEAP_OPTS
 - KAFKA_HOME
 - KAFKA_INITSCRIPTS_DIR
 - KAFKA_INTER_BROKER_PASSWORD
 - KAFKA_INTER_BROKER_USER
 - KAFKA_KRAFT_CLUSTER_ID
 - KAFKA_LOGS_DIRS
 - KAFKA_LOG_DIR
 - KAFKA_MAX_MESSAGE_BYTES
 - KAFKA_MOUNTED_CONF_DIR
 - KAFKA_OPTS
 - KAFKA_OWNERSHIP_USER
 - KAFKA_PID_FILE
 - KAFKA_PORT_NUMBER
 - KAFKA_SEGMENT_BYTES
 KAFKA_TLS_- KAFKA_TLS_CLIENT_AUTH
 - KAFKA_TLS_TRUSTSTORE_FILE
 - KAFKA_TLS_TYPE
 - KAFKA_VOLUME_DIR
 - KAFKA_ZOOKEEPER_CONNECT_TIMEOUT_MS
 - KAFKA_ZOOKEEPER_PASSWORD
 - KAFKA_ZOOKEEPER_PROTOCOL
 KAFKA_ZOOKEEPER_TLS_- KAFKA_ZOOKEEPER_TLS_KEYSTORE_PASSWORD
 - KAFKA_ZOOKEEPER_TLS_TRUSTSTORE_FILE
 - KAFKA_ZOOKEEPER_TLS_TRUSTSTORE_PASSWORD
 - KAFKA_ZOOKEEPER_TLS_TYPE
 - KAFKA_ZOOKEEPER_TLS_VERIFY_HOSTNAME
 - KAFKA_ZOOKEEPER_USER
 
2. producer 支持的消息大小设置
KafkaProducer 文档:https://kafka-python.readthedocs.io/en/master/apidoc/KafkaProducer.html
创建 KafkaProducer 的时候使用 max_request_size 参数指定发送的消息体的最大值。
import json
from kafka import KafkaProducer
producer = KafkaProducer(
            bootstrap_servers='127.0.0.1:9092',
            value_serializer=lambda v: json.dumps(v).encode('utf-8'),
            max_request_size=104857600 # 100MB
        )
3. consumer 端支持的消息大小设置
kafka-python Consomuer 文档:https://kafka-python.readthedocs.io/en/master/apidoc/KafkaConsumer.html
创建 consumer 时使用 max_partition_fetch_bytes 参数指定可以接收的最大消息的大小,默认值是 1048576(也就是1MB)
from kafka import KafkaConsumer
bootstrap_servers = "127.0.0.1:9092"
# 创建消费者
consumer = KafkaConsumer(
        group_id='test-001',
        bootstrap_servers=bootstrap_servers,
        auto_offset_reset='earliest',
        max_partition_fetch_bytes=104857600) # 设置可以接收的最大消息体大小为100MB