好的,咱们今天来聊聊C++消息队列客户端,重点是Kafka、RabbitMQ和ZeroMQ这三个家伙的C++ API使用。别怕,咱们不搞学院派,争取用最接地气的方式,让你听完就能上手。
开场白:消息队列是个啥?
想象一下,你是一家奶茶店的老板,顾客点了单,你得通知后厨做奶茶,还得通知吧台准备吸管和杯子。传统的做法是,你扯着嗓子喊:“珍珠奶茶一杯!珍珠奶茶一杯!吸管准备!杯子准备!” 累不累?
消息队列就像一个中间人,顾客把订单(消息)发给它,它再负责把订单分发给后厨和吧台。这样,你就不用扯着嗓子喊了,专注收钱就行!
在软件世界里,消息队列也是干类似的事儿,解耦各个模块,提高系统的可靠性和扩展性。
第一部分:Kafka – 高吞吐量的王者
Kafka,江湖人称“卡夫卡”,是一个分布式的、分区的、可复制的日志提交系统。听起来很唬人,但其实它就是个高性能的消息队列。它擅长处理海量数据,比如日志收集、流式处理等等。
1. Kafka C++客户端的选择:librdkafka
Kafka官方并没有提供C++客户端,但业界普遍使用librdkafka
这个库。它是一个高性能的、可靠的 Kafka C/C++ 客户端。
2. 安装librdkafka
这个步骤因操作系统而异,咱们简单说一下:
- Linux (Debian/Ubuntu):
sudo apt-get install librdkafka-dev
- Linux (RedHat/CentOS):
sudo yum install librdkafka-devel
- macOS (Homebrew):
brew install librdkafka
3. Kafka C++代码示例:生产者
#include <iostream>
#include <string>
#include <librdkafka/rdkafka.h>
int main() {
// 1. 配置
rd_kafka_conf_t *conf = rd_kafka_conf_new();
rd_kafka_conf_set(conf, "bootstrap.servers", "localhost:9092", nullptr, 0); // Kafka broker 地址
// 2. 创建生产者
rd_kafka_t *producer = rd_kafka_new(RD_KAFKA_PRODUCER, conf, nullptr, 0);
if (!producer) {
std::cerr << "Failed to create producer: " << rd_kafka_err2str(rd_kafka_last_error()) << std::endl;
return 1;
}
// 3. 发送消息
std::string topic_name = "my_topic";
std::string message = "Hello, Kafka!";
rd_kafka_resp_err_t err = rd_kafka_producev(
producer,
RD_KAFKA_V_TOPIC(topic_name.c_str()),
RD_KAFKA_V_MSGFLAGS(RD_KAFKA_MSG_F_COPY),
RD_KAFKA_V_VALUE(message.c_str(), message.size()),
RD_KAFKA_V_KEY("key", 3), // optional key
RD_KAFKA_V_END);
if (err) {
std::cerr << "Failed to produce message: " << rd_kafka_err2str(err) << std::endl;
} else {
std::cout << "Produced message to topic " << topic_name << std::endl;
}
// 4. 等待消息发送完成
rd_kafka_flush(producer, 10000); // 等待最多10秒
// 5. 清理
rd_kafka_destroy(producer);
return 0;
}
代码解释:
- 配置:
rd_kafka_conf_new()
创建配置对象,rd_kafka_conf_set()
设置配置项,比如 Kafka broker 的地址。 - 创建生产者:
rd_kafka_new()
创建 Kafka 生产者实例。 - 发送消息:
rd_kafka_producev()
发送消息,需要指定 topic、消息内容、key (可选) 等。RD_KAFKA_MSG_F_COPY
表示 librdkafka 会复制消息内容,避免消息内容被修改。 - 等待消息发送完成:
rd_kafka_flush()
确保所有消息都发送到 Kafka broker。 - 清理:
rd_kafka_destroy()
释放资源。
4. Kafka C++代码示例:消费者
#include <iostream>
#include <string>
#include <librdkafka/rdkafka.h>
int main() {
// 1. 配置
rd_kafka_conf_t *conf = rd_kafka_conf_new();
rd_kafka_conf_set(conf, "bootstrap.servers", "localhost:9092", nullptr, 0);
rd_kafka_conf_set(conf, "group.id", "my_group", nullptr, 0); // 消费者组 ID
// 自动提交 offset
rd_kafka_conf_set(conf, "enable.auto.commit", "true", nullptr, 0);
rd_kafka_conf_set(conf, "auto.offset.reset", "earliest", nullptr, 0); // 如果 offset 不存在,从最早的消息开始消费
// 2. 创建消费者
rd_kafka_t *consumer = rd_kafka_new(RD_KAFKA_CONSUMER, conf, nullptr, 0);
if (!consumer) {
std::cerr << "Failed to create consumer: " << rd_kafka_err2str(rd_kafka_last_error()) << std::endl;
return 1;
}
// 3. 订阅 topic
std::string topic_name = "my_topic";
rd_kafka_topic_partition_list_t *subscription = rd_kafka_topic_partition_list_new(1);
rd_kafka_topic_partition_list_add(subscription, topic_name.c_str(), RD_KAFKA_PARTITION_UA); // RD_KAFKA_PARTITION_UA 表示自动分配分区
rd_kafka_resp_err_t err = rd_kafka_subscribe(consumer, subscription);
if (err) {
std::cerr << "Failed to subscribe to topic: " << rd_kafka_err2str(err) << std::endl;
rd_kafka_topic_partition_list_destroy(subscription);
rd_kafka_destroy(consumer);
return 1;
}
rd_kafka_topic_partition_list_destroy(subscription);
// 4. 消费消息
while (true) {
rd_kafka_message_t *message = rd_kafka_consumer_poll(consumer, 1000); // 等待最多 1 秒
if (!message) {
continue; // 没有消息,继续循环
}
if (message->err) {
std::cerr << "Error consuming message: " << rd_kafka_message_errstr(message) << std::endl;
rd_kafka_message_destroy(message);
continue;
}
std::cout << "Received message: " << static_cast<const char *>(message->payload) << std::endl;
rd_kafka_message_destroy(message);
}
// 5. 清理
rd_kafka_unsubscribe(consumer);
rd_kafka_destroy(consumer);
return 0;
}
代码解释:
- 配置: 除了 broker 地址,还需要设置
group.id
,用于标识消费者组。auto.offset.reset
用于指定在没有 offset 的情况下从哪里开始消费。 - 创建消费者:
rd_kafka_new()
创建 Kafka 消费者实例。 - 订阅 topic:
rd_kafka_subscribe()
订阅指定的 topic。 - 消费消息:
rd_kafka_consumer_poll()
轮询消息,如果没有消息,会阻塞指定的时间。rd_kafka_message_t
包含了消息的内容、topic、partition、offset 等信息。 - 清理:
rd_kafka_unsubscribe()
取消订阅,rd_kafka_destroy()
释放资源。
Kafka 总结:
- 优点: 高吞吐量、可扩展性强、可靠性高。
- 缺点: 配置复杂,学习曲线陡峭。
- 适用场景: 日志收集、流式处理、大数据分析。
第二部分:RabbitMQ – 灵活的消息路由专家
RabbitMQ 是一个实现了 AMQP(Advanced Message Queuing Protocol)协议的消息队列。它最大的特点是灵活的消息路由,可以根据不同的规则将消息分发到不同的队列。
1. RabbitMQ C++客户端的选择:RabbitMQ C++ Client
RabbitMQ 官方提供了 C++ 客户端,可以方便地与 RabbitMQ 服务器进行交互。
2. 安装 RabbitMQ C++ Client
这个库依赖 boost
和 rabbitmq-c
,所以要先安装这两个依赖:
- Linux (Debian/Ubuntu):
sudo apt-get install libboost-dev librabbitmq-dev
- Linux (RedHat/CentOS):
sudo yum install boost-devel rabbitmq-c-devel
- macOS (Homebrew):
brew install boost rabbitmq-c
然后,下载 RabbitMQ C++ Client 的源码,编译安装:
git clone https://github.com/rabbitmq/rabbitmq-cpp.git
cd rabbitmq-cpp
mkdir build
cd build
cmake ..
make
sudo make install
3. RabbitMQ C++代码示例:生产者
#include <iostream>
#include <string>
#include <amqp_tcp_socket.h>
#include <amqp.h>
#include <amqp_framing.h>
int main() {
// 1. 连接 RabbitMQ 服务器
amqp_connection_state_t conn = amqp_new_connection();
amqp_socket_t *socket = amqp_tcp_socket_new(conn);
int status = amqp_tcp_socket_open(socket, "localhost", 5672);
if (status) {
std::cerr << "Failed to open TCP socket" << std::endl;
return 1;
}
amqp_rpc_reply_t login_result = amqp_login(conn, "/", 0, 131072, 0, AMQP_SASL_METHOD_PLAIN, "guest", "guest");
if (login_result.reply_type != AMQP_RESPONSE_NORMAL) {
std::cerr << "Failed to login" << std::endl;
return 1;
}
amqp_channel_open(conn, 1);
amqp_rpc_reply_t channel_result = amqp_get_rpc_reply(conn);
if (channel_result.reply_type != AMQP_RESPONSE_NORMAL) {
std::cerr << "Failed to open channel" << std::endl;
return 1;
}
// 2. 声明 exchange 和 queue
const char *exchange_name = "my_exchange";
const char *queue_name = "my_queue";
const char *routing_key = "my_routing_key";
amqp_exchange_declare(conn, 1, amqp_cstring_bytes(exchange_name), amqp_cstring_bytes("direct"), 0, 0, 0, 0, AMQP_EMPTY_TABLE);
amqp_get_rpc_reply(conn);
amqp_queue_declare(conn, 1, amqp_cstring_bytes(queue_name), 0, 0, 0, 1, AMQP_EMPTY_TABLE);
amqp_get_rpc_reply(conn);
amqp_queue_bind(conn, 1, amqp_cstring_bytes(queue_name), amqp_cstring_bytes(exchange_name), amqp_cstring_bytes(routing_key), AMQP_EMPTY_TABLE);
amqp_get_rpc_reply(conn);
// 3. 发送消息
std::string message = "Hello, RabbitMQ!";
amqp_basic_properties_t props;
props._flags = AMQP_BASIC_CONTENT_TYPE_FLAG | AMQP_BASIC_DELIVERY_MODE_FLAG;
props.content_type = amqp_cstring_bytes("text/plain");
props.delivery_mode = 2; // persistent delivery mode
amqp_basic_publish(conn, 1, amqp_cstring_bytes(exchange_name), amqp_cstring_bytes(routing_key), 0, 0, &props, amqp_cstring_bytes(message.c_str()));
amqp_get_rpc_reply(conn);
std::cout << "Sent message to exchange " << exchange_name << std::endl;
// 4. 关闭连接
amqp_channel_close(conn, 1, AMQP_REPLY_SUCCESS);
amqp_connection_close(conn, AMQP_REPLY_SUCCESS);
amqp_destroy_connection(conn);
return 0;
}
代码解释:
- 连接 RabbitMQ 服务器: 使用
amqp_new_connection()
创建连接,amqp_tcp_socket_new()
创建 socket,amqp_tcp_socket_open()
打开 socket,amqp_login()
登录 RabbitMQ 服务器。amqp_channel_open()
打开 channel,RabbitMQ 的所有操作都在 channel 上进行。 - 声明 exchange 和 queue:
amqp_exchange_declare()
声明 exchange,amqp_queue_declare()
声明 queue,amqp_queue_bind()
将 queue 绑定到 exchange,并指定 routing key。 - 发送消息:
amqp_basic_publish()
发送消息,需要指定 exchange、routing key、消息属性 (content type, delivery mode) 和消息内容。 - 关闭连接:
amqp_channel_close()
关闭 channel,amqp_connection_close()
关闭连接,amqp_destroy_connection()
释放资源。
4. RabbitMQ C++代码示例:消费者
#include <iostream>
#include <string>
#include <amqp_tcp_socket.h>
#include <amqp.h>
#include <amqp_framing.h>
int main() {
// 1. 连接 RabbitMQ 服务器 (与生产者相同)
amqp_connection_state_t conn = amqp_new_connection();
amqp_socket_t *socket = amqp_tcp_socket_new(conn);
int status = amqp_tcp_socket_open(socket, "localhost", 5672);
if (status) {
std::cerr << "Failed to open TCP socket" << std::endl;
return 1;
}
amqp_rpc_reply_t login_result = amqp_login(conn, "/", 0, 131072, 0, AMQP_SASL_METHOD_PLAIN, "guest", "guest");
if (login_result.reply_type != AMQP_RESPONSE_NORMAL) {
std::cerr << "Failed to login" << std::endl;
return 1;
}
amqp_channel_open(conn, 1);
amqp_rpc_reply_t channel_result = amqp_get_rpc_reply(conn);
if (channel_result.reply_type != AMQP_RESPONSE_NORMAL) {
std::cerr << "Failed to open channel" << std::endl;
return 1;
}
// 2. 声明 queue (如果不存在)
const char *queue_name = "my_queue";
amqp_queue_declare(conn, 1, amqp_cstring_bytes(queue_name), 0, 0, 0, 1, AMQP_EMPTY_TABLE);
amqp_get_rpc_reply(conn);
// 3. 消费消息
amqp_basic_consume(conn, 1, amqp_cstring_bytes(queue_name), amqp_cstring_bytes(""), 0, 1, 0, AMQP_EMPTY_TABLE);
amqp_get_rpc_reply(conn);
while (true) {
amqp_envelope_t envelope;
amqp_rpc_reply_t result = amqp_basic_get_delivery(conn, 1, &envelope, 0);
if (result.reply_type == AMQP_RESPONSE_NORMAL) {
std::cout << "Received message: " << static_cast<char *>(envelope.message.body.bytes) << std::endl;
// 确认消息
amqp_basic_ack(conn, 1, envelope.delivery_tag, 0);
amqp_destroy_envelope(&envelope);
} else if (result.reply_type == AMQP_RESPONSE_EMPTY) {
// 没有消息,休眠一段时间
usleep(100000); // 100ms
} else {
std::cerr << "Error consuming message" << std::endl;
break;
}
}
// 4. 关闭连接 (与生产者相同)
amqp_channel_close(conn, 1, AMQP_REPLY_SUCCESS);
amqp_connection_close(conn, AMQP_REPLY_SUCCESS);
amqp_destroy_connection(conn);
return 0;
}
代码解释:
- 连接 RabbitMQ 服务器: 步骤与生产者相同。
- 声明 queue: 确保 queue 存在,否则无法消费消息。
- 消费消息:
amqp_basic_consume()
开始消费消息,需要指定 queue、consumer tag (通常为空)、no_local、no_ack、exclusive、arguments。amqp_basic_get_delivery()
获取消息。amqp_basic_ack()
确认消息,告诉 RabbitMQ 消息已经被成功处理,可以从 queue 中删除。 - 关闭连接: 步骤与生产者相同。
RabbitMQ 总结:
- 优点: 灵活的消息路由、支持多种消息协议、管理界面友好。
- 缺点: 性能不如 Kafka,配置相对复杂。
- 适用场景: 企业应用集成、任务队列、异步处理。
第三部分:ZeroMQ – 嵌入式的消息传递利器
ZeroMQ(简称 ZMQ)是一个高性能的异步消息传递库,它不是一个独立的消息队列服务器,而是一个嵌入式的网络通信库。你可以把它想象成一个 socket 的升级版,提供了更高级的消息模式,比如发布/订阅、请求/应答等等。
1. ZeroMQ C++ API
ZeroMQ 提供了 C++ API,可以直接在 C++ 代码中使用。
2. 安装 ZeroMQ
- Linux (Debian/Ubuntu):
sudo apt-get install libzmq3-dev
- Linux (RedHat/CentOS):
sudo yum install zeromq-devel
- macOS (Homebrew):
brew install zeromq
3. ZeroMQ C++代码示例:发布者 (Publisher)
#include <iostream>
#include <string>
#include <zmq.hpp>
int main() {
// 1. 创建 ZeroMQ 上下文
zmq::context_t context(1);
// 2. 创建发布者 socket
zmq::socket_t publisher(context, ZMQ_PUB);
// 3. 绑定到地址
publisher.bind("tcp://*:5555");
// 4. 发布消息
int message_id = 0;
while (true) {
std::string message = "Message " + std::to_string(message_id++);
// 发送消息
zmq::message_t zmq_message(message.size());
memcpy(zmq_message.data(), message.c_str(), message.size());
publisher.send(zmq_message, zmq::send_flags::none);
std::cout << "Sent: " << message << std::endl;
sleep(1); // 每秒发送一条消息
}
return 0;
}
代码解释:
- 创建 ZeroMQ 上下文:
zmq::context_t
是 ZeroMQ 的核心对象,用于管理所有的 socket。 - 创建发布者 socket:
zmq::socket_t
是 ZeroMQ 的 socket 对象,ZMQ_PUB
表示发布者类型。 - 绑定到地址:
publisher.bind()
将 socket 绑定到指定的地址,tcp://*:5555
表示监听所有 IP 地址的 5555 端口。 - 发布消息: 创建
zmq::message_t
对象,将消息内容复制到zmq::message_t
对象中,publisher.send()
发送消息。
4. ZeroMQ C++代码示例:订阅者 (Subscriber)
#include <iostream>
#include <string>
#include <zmq.hpp>
int main() {
// 1. 创建 ZeroMQ 上下文
zmq::context_t context(1);
// 2. 创建订阅者 socket
zmq::socket_t subscriber(context, ZMQ_SUB);
// 3. 连接到发布者
subscriber.connect("tcp://localhost:5555");
// 4. 订阅所有消息 (空字符串)
subscriber.setsockopt(ZMQ_SUBSCRIBE, "", 0);
// 5. 接收消息
while (true) {
zmq::message_t message;
subscriber.recv(message, zmq::recv_flags::none);
std::string received_message(static_cast<char*>(message.data()), message.size());
std::cout << "Received: " << received_message << std::endl;
}
return 0;
}
代码解释:
- 创建 ZeroMQ 上下文: 与发布者相同。
- 创建订阅者 socket:
ZMQ_SUB
表示订阅者类型。 - 连接到发布者:
subscriber.connect()
连接到发布者的地址。 - 订阅所有消息:
subscriber.setsockopt(ZMQ_SUBSCRIBE, "", 0)
订阅所有消息,如果需要订阅特定主题的消息,可以将空字符串替换为主题名称。 - 接收消息:
subscriber.recv()
接收消息,并将消息内容复制到zmq::message_t
对象中。
ZeroMQ 总结:
- 优点: 高性能、灵活、支持多种消息模式、嵌入式。
- 缺点: 不是一个独立的消息队列服务器,需要自己管理消息的存储和可靠性。
- 适用场景: 分布式系统内部的通信、实时数据传输、高性能网络应用。
总结对比:
特性 | Kafka | RabbitMQ | ZeroMQ |
---|---|---|---|
架构 | 分布式日志提交系统 | 消息队列服务器 | 嵌入式网络通信库 |
协议 | 自定义协议 | AMQP | 自定义协议 |
吞吐量 | 非常高 | 中等 | 非常高 |
消息路由 | 基于 Topic 和 Partition | 灵活,支持多种 Exchange 类型 | 需要自己实现 |
可靠性 | 高,支持数据复制 | 高,支持消息持久化 | 需要自己实现 |
易用性 | 复杂,学习曲线陡峭 | 相对简单,管理界面友好 | 简单,但需要理解其工作原理 |
适用场景 | 日志收集、流式处理、大数据分析 | 企业应用集成、任务队列、异步处理 | 分布式系统内部的通信、实时数据传输、高性能网络应用 |
C++ 客户端 | librdkafka | RabbitMQ C++ Client | ZeroMQ C++ API |
结束语:
希望通过今天的讲解,你对 Kafka、RabbitMQ 和 ZeroMQ 的 C++ API 使用有了更清晰的认识。选择哪个消息队列,取决于你的具体需求。如果需要处理海量数据,选择 Kafka;如果需要灵活的消息路由,选择 RabbitMQ;如果需要在分布式系统内部进行高性能的通信,选择 ZeroMQ。
记住,实践是检验真理的唯一标准! 多写代码,多踩坑,才能真正掌握这些工具。 祝你编程愉快!