使用 librdkafka
库。
1. 安装 librdkafka
apt install librdkafka-dev -y
2. 发送消息
参考官方代码例子:https://github.com/confluentinc/librdkafka/blob/master/examples/producer.cpp
class KafkaDeliveryReportCb : public RdKafka::DeliveryReportCb {
public:
void dr_cb(RdKafka::Message &message) {
/* If message.err() is non-zero the message delivery failed permanently
* for the message. */
if (message.err()) {
SEARCHER_ERROR("Message delivery failed, topic: %s, error:%s", message.topic_name().data(), message.errstr().data());
} else {
SEARCHER_DEBUG("Message delivered to topic: %s [%d] at offset: %ld",
message.topic_name().data(), message.partition(), message.offset());
}
}
};
struct KafkaProducer {
std::string brokers;
std::string topic;
std::shared_ptr<RdKafka::Producer> producer;
std::shared_ptr<KafkaDeliveryReportCb> dcb;
KafkaProducer(std::string brokers, std::string topic) {
this->brokers = brokers;
this->topic = topic;
dcb = std::make_shared<KafkaDeliveryReportCb>();
std::string errstr;
std::shared_ptr<RdKafka::Conf> cf(RdKafka::Conf::create(RdKafka::Conf::CONF_GLOBAL));
if(cf->set("bootstrap.servers", brokers, errstr) != RdKafka::Conf::CONF_OK) {
SEARCHER_ERROR("set kafka bootstrap.servers failed, servers: %s", brokers.data());
return;
}
if (cf->set("dr_cb", dcb.get(), errstr) != RdKafka::Conf::CONF_OK) {
SEARCHER_WARNING("register delivery callback failed, error: %s", errstr.data());
} else {
SEARCHER_DEBUG("register delivery callback success");
}
auto pd = RdKafka::Producer::create(cf.get(), errstr);
if(pd == nullptr) {
SEARCHER_ERROR("create kafka producer failed, error: %s", errstr.data());
return;
}
producer.reset(pd);
SEARCHER_DEBUG("create kafka producer success, brokers: %s", brokers.data());
}
bool send(const std::string& msg) {
RdKafka::ErrorCode err = producer->produce(
/* Topic name */
topic,
/* Any Partition: the builtin partitioner will be
* used to assign the message to a topic based
* on the message key, or random partition if
* the key is not set. */
RdKafka::Topic::PARTITION_UA,
/* Make a copy of the value */
RdKafka::Producer::RK_MSG_COPY /* Copy payload */,
/* Value */
const_cast<char *>(msg.data()), msg.size(),
/* Key */
NULL, 0,
/* Timestamp (defaults to current time) */
0,
/* Message headers, if any */
NULL,
/* Per-message opaque value passed to
* delivery report */
NULL);
if(err != RdKafka::ERR_NO_ERROR) {
SEARCHER_ERROR("kafka producer send message failed, brokers: %s, topic: %s", brokers.data(), topic.data());
return false;
}
producer->poll(0);
SEARCHER_DEBUG("kafkaproducer send message success, brokers: %s, topic: %s", brokers.data(), topic.data());
return true;
}
};
3. 编译选项
-
g++
需要增加-lrdkafka++
选项 -
cmake
TARGET_LINK_LIBRARIES(project_name rdkafka++)