1. server 端支持的消息大小设置
使用 docker 镜像 bitnami/kafka:3.2
创建 kafka 服务时,设置环境变量 KAFKA_CFG_MESSAGE_MAX_BYTES
即可设置服务端的 message.max.bytes
值。
设置换环境变量最后会被写入到配置文件 /opt/bitnami/kafka/config/server.properties
中。服务在启动时使用的配置文件就是这个。
通过容器内的脚本文件 /opt/bitnami/scripts/kafka-env.sh
可以看到这个镜像支持的环境变量列表。
- KAFKA_AUTO_CREATE_TOPICS_ENABLE
- KAFKA_BASE_DIR
- KAFKA_BROKER_PASSWORD
- KAFKA_BROKER_USER
- KAFKA_CERTIFICATE_PASSWORD
- KAFKA_CERTS_DIR
KAFKA_CFG_
- 一些其它环境变量的开头- KAFKA_CFG_ADVERTISED_LISTENERS
- KAFKA_CFG_AUTO_CREATE_TOPICS_ENABLE
- KAFKA_CFG_BROKER_ID
- KAFKA_CFG_CONTROLLER_LISTENER_NAMES
- KAFKA_CFG_CONTROLLER_QUORUM_VOTERS
- KAFKA_CFG_INTER_BROKER_LISTENER_NAME
- KAFKA_CFG_LISTENERS
- KAFKA_CFG_LISTENER_SECURITY_PROTOCOL_MAP
- KAFKA_CFG_LOG_DIRS
- KAFKA_CFG_LOG_SEGMENT_BYTES
KAFKA_CFG_MAX_PARTITION_FETCH_BYTES
- consumer 获取消息的最大字节数KAFKA_CFG_MAX_REQUEST_SIZE
- producer 发送消息的最大字节数KAFKA_CFG_MESSAGE_MAX_BYTES
- server 接收的最大消息字节数- KAFKA_CFG_PORT
- KAFKA_CFG_PROCESS_ROLES
- KAFKA_CFG_SASL_ENABLED_MECHANISMS
- KAFKA_CFG_SASL_MECHANISM_INTER_BROKER_PROTOCOL
- KAFKA_CFG_ZOOKEEPER_CLIENTCNXNSOCKET
- KAFKA_CFG_ZOOKEEPER_CONNECT
- KAFKA_CFG_ZOOKEEPER_CONNECTION_TIMEOUT_MS
- KAFKA_CLIENT_PASSWORDS
- KAFKA_CLIENT_USERS
- KAFKA_CONF_DIR
- KAFKA_CONF_FILE
- KAFKA_DAEMON_GROUP
- KAFKA_DAEMON_USER
- KAFKA_DATA_DIR
- KAFKA_ENABLE_KRAFT
- KAFKA_EXTRA_FLAGS
- KAFKA_HEAP_OPTS
- KAFKA_HOME
- KAFKA_INITSCRIPTS_DIR
- KAFKA_INTER_BROKER_PASSWORD
- KAFKA_INTER_BROKER_USER
- KAFKA_KRAFT_CLUSTER_ID
- KAFKA_LOGS_DIRS
- KAFKA_LOG_DIR
- KAFKA_MAX_MESSAGE_BYTES
- KAFKA_MOUNTED_CONF_DIR
- KAFKA_OPTS
- KAFKA_OWNERSHIP_USER
- KAFKA_PID_FILE
- KAFKA_PORT_NUMBER
- KAFKA_SEGMENT_BYTES
KAFKA_TLS_
- KAFKA_TLS_CLIENT_AUTH
- KAFKA_TLS_TRUSTSTORE_FILE
- KAFKA_TLS_TYPE
- KAFKA_VOLUME_DIR
- KAFKA_ZOOKEEPER_CONNECT_TIMEOUT_MS
- KAFKA_ZOOKEEPER_PASSWORD
- KAFKA_ZOOKEEPER_PROTOCOL
KAFKA_ZOOKEEPER_TLS_
- KAFKA_ZOOKEEPER_TLS_KEYSTORE_PASSWORD
- KAFKA_ZOOKEEPER_TLS_TRUSTSTORE_FILE
- KAFKA_ZOOKEEPER_TLS_TRUSTSTORE_PASSWORD
- KAFKA_ZOOKEEPER_TLS_TYPE
- KAFKA_ZOOKEEPER_TLS_VERIFY_HOSTNAME
- KAFKA_ZOOKEEPER_USER
2. producer 支持的消息大小设置
KafkaProducer 文档:https://kafka-python.readthedocs.io/en/master/apidoc/KafkaProducer.html
创建 KafkaProducer 的时候使用 max_request_size
参数指定发送的消息体的最大值。
import json
from kafka import KafkaProducer
producer = KafkaProducer(
bootstrap_servers='127.0.0.1:9092',
value_serializer=lambda v: json.dumps(v).encode('utf-8'),
max_request_size=104857600 # 100MB
)
3. consumer 端支持的消息大小设置
kafka-python Consomuer
文档:https://kafka-python.readthedocs.io/en/master/apidoc/KafkaConsumer.html
创建 consumer 时使用 max_partition_fetch_bytes
参数指定可以接收的最大消息的大小,默认值是 1048576
(也就是1MB)
from kafka import KafkaConsumer
bootstrap_servers = "127.0.0.1:9092"
# 创建消费者
consumer = KafkaConsumer(
group_id='test-001',
bootstrap_servers=bootstrap_servers,
auto_offset_reset='earliest',
max_partition_fetch_bytes=104857600) # 设置可以接收的最大消息体大小为100MB