c++ Kafka producer 发送消息

创建日期: 2024-12-19 14:31 | 作者: 风波 | 浏览次数: 20 | 分类: Kafka

使用 librdkafka 库。

1. 安装 librdkafka

apt install librdkafka-dev -y

2. 发送消息

参考官方代码例子:https://github.com/confluentinc/librdkafka/blob/master/examples/producer.cpp

class KafkaDeliveryReportCb : public RdKafka::DeliveryReportCb {
public:
    void dr_cb(RdKafka::Message &message) {
        /* If message.err() is non-zero the message delivery failed permanently
         * for the message. */
        if (message.err()) {
            SEARCHER_ERROR("Message delivery failed, topic: %s, error:%s", message.topic_name().data(), message.errstr().data());
        } else {
            SEARCHER_DEBUG("Message delivered to topic: %s [%d] at offset: %ld",
                    message.topic_name().data(), message.partition(), message.offset());
        }
    }
};

struct KafkaProducer {
    std::string brokers;
    std::string topic;
    std::shared_ptr<RdKafka::Producer> producer;
    std::shared_ptr<KafkaDeliveryReportCb> dcb;

    KafkaProducer(std::string brokers, std::string topic) {
        this->brokers = brokers;
        this->topic = topic;

        dcb = std::make_shared<KafkaDeliveryReportCb>();

        std::string errstr;
        std::shared_ptr<RdKafka::Conf> cf(RdKafka::Conf::create(RdKafka::Conf::CONF_GLOBAL));
        if(cf->set("bootstrap.servers", brokers, errstr) != RdKafka::Conf::CONF_OK) {
            SEARCHER_ERROR("set kafka bootstrap.servers failed, servers: %s", brokers.data());
            return;
        }

        if (cf->set("dr_cb", dcb.get(), errstr) != RdKafka::Conf::CONF_OK) {
            SEARCHER_WARNING("register delivery callback failed, error: %s", errstr.data());
        } else {
            SEARCHER_DEBUG("register delivery callback success");
        }
        auto pd = RdKafka::Producer::create(cf.get(), errstr);
        if(pd == nullptr) {
            SEARCHER_ERROR("create kafka producer failed, error: %s", errstr.data());
            return;
        }
        producer.reset(pd);
        SEARCHER_DEBUG("create kafka producer success, brokers: %s", brokers.data());
    }

    bool send(const std::string& msg) {
        RdKafka::ErrorCode err = producer->produce(
                /* Topic name */
                topic,
                /* Any Partition: the builtin partitioner will be
                 * used to assign the message to a topic based
                 * on the message key, or random partition if
                 * the key is not set. */
                RdKafka::Topic::PARTITION_UA,
                /* Make a copy of the value */
                RdKafka::Producer::RK_MSG_COPY /* Copy payload */,
                /* Value */
                const_cast<char *>(msg.data()), msg.size(),
                /* Key */
                NULL, 0,
                /* Timestamp (defaults to current time) */
                0,
                /* Message headers, if any */
                NULL,
                /* Per-message opaque value passed to
                 * delivery report */
                NULL);
        if(err != RdKafka::ERR_NO_ERROR) {
            SEARCHER_ERROR("kafka producer send message failed, brokers: %s, topic: %s", brokers.data(), topic.data());
            return false;
        }
        producer->poll(0);
        SEARCHER_DEBUG("kafkaproducer send message success, brokers: %s, topic: %s", brokers.data(), topic.data());

        return true;
    }
};

3. 编译选项

20 浏览
19 爬虫
0 评论