C++ 消息队列客户端:Kafka, RabbitMQ, ZeroMQ 的 C++ API 使用

好的,咱们今天来聊聊C++消息队列客户端,重点是Kafka、RabbitMQ和ZeroMQ这三个家伙的C++ API使用。别怕,咱们不搞学院派,争取用最接地气的方式,让你听完就能上手。

开场白:消息队列是个啥?

想象一下,你是一家奶茶店的老板,顾客点了单,你得通知后厨做奶茶,还得通知吧台准备吸管和杯子。传统的做法是,你扯着嗓子喊:“珍珠奶茶一杯!珍珠奶茶一杯!吸管准备!杯子准备!” 累不累?

消息队列就像一个中间人,顾客把订单(消息)发给它,它再负责把订单分发给后厨和吧台。这样,你就不用扯着嗓子喊了,专注收钱就行!

在软件世界里,消息队列也是干类似的事儿,解耦各个模块,提高系统的可靠性和扩展性。

第一部分:Kafka – 高吞吐量的王者

Kafka,江湖人称“卡夫卡”,是一个分布式的、分区的、可复制的日志提交系统。听起来很唬人,但其实它就是个高性能的消息队列。它擅长处理海量数据,比如日志收集、流式处理等等。

1. Kafka C++客户端的选择:librdkafka

Kafka官方并没有提供C++客户端,但业界普遍使用librdkafka这个库。它是一个高性能的、可靠的 Kafka C/C++ 客户端。

2. 安装librdkafka

这个步骤因操作系统而异,咱们简单说一下:

  • Linux (Debian/Ubuntu): sudo apt-get install librdkafka-dev
  • Linux (RedHat/CentOS): sudo yum install librdkafka-devel
  • macOS (Homebrew): brew install librdkafka

3. Kafka C++代码示例:生产者

#include <iostream>
#include <string>
#include <librdkafka/rdkafka.h>

int main() {
    // 1. 配置
    rd_kafka_conf_t *conf = rd_kafka_conf_new();
    rd_kafka_conf_set(conf, "bootstrap.servers", "localhost:9092", nullptr, 0); // Kafka broker 地址

    // 2. 创建生产者
    rd_kafka_t *producer = rd_kafka_new(RD_KAFKA_PRODUCER, conf, nullptr, 0);
    if (!producer) {
        std::cerr << "Failed to create producer: " << rd_kafka_err2str(rd_kafka_last_error()) << std::endl;
        return 1;
    }

    // 3. 发送消息
    std::string topic_name = "my_topic";
    std::string message = "Hello, Kafka!";

    rd_kafka_resp_err_t err = rd_kafka_producev(
        producer,
        RD_KAFKA_V_TOPIC(topic_name.c_str()),
        RD_KAFKA_V_MSGFLAGS(RD_KAFKA_MSG_F_COPY),
        RD_KAFKA_V_VALUE(message.c_str(), message.size()),
        RD_KAFKA_V_KEY("key", 3), // optional key
        RD_KAFKA_V_END);

    if (err) {
        std::cerr << "Failed to produce message: " << rd_kafka_err2str(err) << std::endl;
    } else {
        std::cout << "Produced message to topic " << topic_name << std::endl;
    }

    // 4. 等待消息发送完成
    rd_kafka_flush(producer, 10000); // 等待最多10秒

    // 5. 清理
    rd_kafka_destroy(producer);

    return 0;
}

代码解释:

  1. 配置: rd_kafka_conf_new() 创建配置对象,rd_kafka_conf_set() 设置配置项,比如 Kafka broker 的地址。
  2. 创建生产者: rd_kafka_new() 创建 Kafka 生产者实例。
  3. 发送消息: rd_kafka_producev() 发送消息,需要指定 topic、消息内容、key (可选) 等。RD_KAFKA_MSG_F_COPY 表示 librdkafka 会复制消息内容,避免消息内容被修改。
  4. 等待消息发送完成: rd_kafka_flush() 确保所有消息都发送到 Kafka broker。
  5. 清理: rd_kafka_destroy() 释放资源。

4. Kafka C++代码示例:消费者

#include <iostream>
#include <string>
#include <librdkafka/rdkafka.h>

int main() {
    // 1. 配置
    rd_kafka_conf_t *conf = rd_kafka_conf_new();
    rd_kafka_conf_set(conf, "bootstrap.servers", "localhost:9092", nullptr, 0);
    rd_kafka_conf_set(conf, "group.id", "my_group", nullptr, 0); // 消费者组 ID

    // 自动提交 offset
    rd_kafka_conf_set(conf, "enable.auto.commit", "true", nullptr, 0);
    rd_kafka_conf_set(conf, "auto.offset.reset", "earliest", nullptr, 0); // 如果 offset 不存在,从最早的消息开始消费

    // 2. 创建消费者
    rd_kafka_t *consumer = rd_kafka_new(RD_KAFKA_CONSUMER, conf, nullptr, 0);
    if (!consumer) {
        std::cerr << "Failed to create consumer: " << rd_kafka_err2str(rd_kafka_last_error()) << std::endl;
        return 1;
    }

    // 3. 订阅 topic
    std::string topic_name = "my_topic";
    rd_kafka_topic_partition_list_t *subscription = rd_kafka_topic_partition_list_new(1);
    rd_kafka_topic_partition_list_add(subscription, topic_name.c_str(), RD_KAFKA_PARTITION_UA); // RD_KAFKA_PARTITION_UA 表示自动分配分区

    rd_kafka_resp_err_t err = rd_kafka_subscribe(consumer, subscription);
    if (err) {
        std::cerr << "Failed to subscribe to topic: " << rd_kafka_err2str(err) << std::endl;
        rd_kafka_topic_partition_list_destroy(subscription);
        rd_kafka_destroy(consumer);
        return 1;
    }

    rd_kafka_topic_partition_list_destroy(subscription);

    // 4. 消费消息
    while (true) {
        rd_kafka_message_t *message = rd_kafka_consumer_poll(consumer, 1000); // 等待最多 1 秒

        if (!message) {
            continue; // 没有消息,继续循环
        }

        if (message->err) {
            std::cerr << "Error consuming message: " << rd_kafka_message_errstr(message) << std::endl;
            rd_kafka_message_destroy(message);
            continue;
        }

        std::cout << "Received message: " << static_cast<const char *>(message->payload) << std::endl;

        rd_kafka_message_destroy(message);
    }

    // 5. 清理
    rd_kafka_unsubscribe(consumer);
    rd_kafka_destroy(consumer);

    return 0;
}

代码解释:

  1. 配置: 除了 broker 地址,还需要设置 group.id,用于标识消费者组。auto.offset.reset 用于指定在没有 offset 的情况下从哪里开始消费。
  2. 创建消费者: rd_kafka_new() 创建 Kafka 消费者实例。
  3. 订阅 topic: rd_kafka_subscribe() 订阅指定的 topic。
  4. 消费消息: rd_kafka_consumer_poll() 轮询消息,如果没有消息,会阻塞指定的时间。rd_kafka_message_t 包含了消息的内容、topic、partition、offset 等信息。
  5. 清理: rd_kafka_unsubscribe() 取消订阅,rd_kafka_destroy() 释放资源。

Kafka 总结:

  • 优点: 高吞吐量、可扩展性强、可靠性高。
  • 缺点: 配置复杂,学习曲线陡峭。
  • 适用场景: 日志收集、流式处理、大数据分析。

第二部分:RabbitMQ – 灵活的消息路由专家

RabbitMQ 是一个实现了 AMQP(Advanced Message Queuing Protocol)协议的消息队列。它最大的特点是灵活的消息路由,可以根据不同的规则将消息分发到不同的队列。

1. RabbitMQ C++客户端的选择:RabbitMQ C++ Client

RabbitMQ 官方提供了 C++ 客户端,可以方便地与 RabbitMQ 服务器进行交互。

2. 安装 RabbitMQ C++ Client

这个库依赖 boostrabbitmq-c,所以要先安装这两个依赖:

  • Linux (Debian/Ubuntu): sudo apt-get install libboost-dev librabbitmq-dev
  • Linux (RedHat/CentOS): sudo yum install boost-devel rabbitmq-c-devel
  • macOS (Homebrew): brew install boost rabbitmq-c

然后,下载 RabbitMQ C++ Client 的源码,编译安装:

git clone https://github.com/rabbitmq/rabbitmq-cpp.git
cd rabbitmq-cpp
mkdir build
cd build
cmake ..
make
sudo make install

3. RabbitMQ C++代码示例:生产者

#include <iostream>
#include <string>
#include <amqp_tcp_socket.h>
#include <amqp.h>
#include <amqp_framing.h>

int main() {
    // 1. 连接 RabbitMQ 服务器
    amqp_connection_state_t conn = amqp_new_connection();
    amqp_socket_t *socket = amqp_tcp_socket_new(conn);

    int status = amqp_tcp_socket_open(socket, "localhost", 5672);
    if (status) {
        std::cerr << "Failed to open TCP socket" << std::endl;
        return 1;
    }

    amqp_rpc_reply_t login_result = amqp_login(conn, "/", 0, 131072, 0, AMQP_SASL_METHOD_PLAIN, "guest", "guest");
    if (login_result.reply_type != AMQP_RESPONSE_NORMAL) {
        std::cerr << "Failed to login" << std::endl;
        return 1;
    }

    amqp_channel_open(conn, 1);
    amqp_rpc_reply_t channel_result = amqp_get_rpc_reply(conn);
    if (channel_result.reply_type != AMQP_RESPONSE_NORMAL) {
        std::cerr << "Failed to open channel" << std::endl;
        return 1;
    }

    // 2. 声明 exchange 和 queue
    const char *exchange_name = "my_exchange";
    const char *queue_name = "my_queue";
    const char *routing_key = "my_routing_key";

    amqp_exchange_declare(conn, 1, amqp_cstring_bytes(exchange_name), amqp_cstring_bytes("direct"), 0, 0, 0, 0, AMQP_EMPTY_TABLE);
    amqp_get_rpc_reply(conn);

    amqp_queue_declare(conn, 1, amqp_cstring_bytes(queue_name), 0, 0, 0, 1, AMQP_EMPTY_TABLE);
    amqp_get_rpc_reply(conn);

    amqp_queue_bind(conn, 1, amqp_cstring_bytes(queue_name), amqp_cstring_bytes(exchange_name), amqp_cstring_bytes(routing_key), AMQP_EMPTY_TABLE);
    amqp_get_rpc_reply(conn);

    // 3. 发送消息
    std::string message = "Hello, RabbitMQ!";
    amqp_basic_properties_t props;
    props._flags = AMQP_BASIC_CONTENT_TYPE_FLAG | AMQP_BASIC_DELIVERY_MODE_FLAG;
    props.content_type = amqp_cstring_bytes("text/plain");
    props.delivery_mode = 2; // persistent delivery mode

    amqp_basic_publish(conn, 1, amqp_cstring_bytes(exchange_name), amqp_cstring_bytes(routing_key), 0, 0, &props, amqp_cstring_bytes(message.c_str()));
    amqp_get_rpc_reply(conn);

    std::cout << "Sent message to exchange " << exchange_name << std::endl;

    // 4. 关闭连接
    amqp_channel_close(conn, 1, AMQP_REPLY_SUCCESS);
    amqp_connection_close(conn, AMQP_REPLY_SUCCESS);
    amqp_destroy_connection(conn);

    return 0;
}

代码解释:

  1. 连接 RabbitMQ 服务器: 使用 amqp_new_connection() 创建连接,amqp_tcp_socket_new() 创建 socket,amqp_tcp_socket_open() 打开 socket,amqp_login() 登录 RabbitMQ 服务器。amqp_channel_open() 打开 channel,RabbitMQ 的所有操作都在 channel 上进行。
  2. 声明 exchange 和 queue: amqp_exchange_declare() 声明 exchange,amqp_queue_declare() 声明 queue,amqp_queue_bind() 将 queue 绑定到 exchange,并指定 routing key。
  3. 发送消息: amqp_basic_publish() 发送消息,需要指定 exchange、routing key、消息属性 (content type, delivery mode) 和消息内容。
  4. 关闭连接: amqp_channel_close() 关闭 channel,amqp_connection_close() 关闭连接,amqp_destroy_connection() 释放资源。

4. RabbitMQ C++代码示例:消费者

#include <iostream>
#include <string>
#include <amqp_tcp_socket.h>
#include <amqp.h>
#include <amqp_framing.h>

int main() {
    // 1. 连接 RabbitMQ 服务器 (与生产者相同)
    amqp_connection_state_t conn = amqp_new_connection();
    amqp_socket_t *socket = amqp_tcp_socket_new(conn);

    int status = amqp_tcp_socket_open(socket, "localhost", 5672);
    if (status) {
        std::cerr << "Failed to open TCP socket" << std::endl;
        return 1;
    }

    amqp_rpc_reply_t login_result = amqp_login(conn, "/", 0, 131072, 0, AMQP_SASL_METHOD_PLAIN, "guest", "guest");
    if (login_result.reply_type != AMQP_RESPONSE_NORMAL) {
        std::cerr << "Failed to login" << std::endl;
        return 1;
    }

    amqp_channel_open(conn, 1);
    amqp_rpc_reply_t channel_result = amqp_get_rpc_reply(conn);
    if (channel_result.reply_type != AMQP_RESPONSE_NORMAL) {
        std::cerr << "Failed to open channel" << std::endl;
        return 1;
    }

    // 2. 声明 queue (如果不存在)
    const char *queue_name = "my_queue";
    amqp_queue_declare(conn, 1, amqp_cstring_bytes(queue_name), 0, 0, 0, 1, AMQP_EMPTY_TABLE);
    amqp_get_rpc_reply(conn);

    // 3. 消费消息
    amqp_basic_consume(conn, 1, amqp_cstring_bytes(queue_name), amqp_cstring_bytes(""), 0, 1, 0, AMQP_EMPTY_TABLE);
    amqp_get_rpc_reply(conn);

    while (true) {
        amqp_envelope_t envelope;
        amqp_rpc_reply_t result = amqp_basic_get_delivery(conn, 1, &envelope, 0);

        if (result.reply_type == AMQP_RESPONSE_NORMAL) {
            std::cout << "Received message: " << static_cast<char *>(envelope.message.body.bytes) << std::endl;

            // 确认消息
            amqp_basic_ack(conn, 1, envelope.delivery_tag, 0);

            amqp_destroy_envelope(&envelope);
        } else if (result.reply_type == AMQP_RESPONSE_EMPTY) {
            // 没有消息,休眠一段时间
            usleep(100000); // 100ms
        } else {
            std::cerr << "Error consuming message" << std::endl;
            break;
        }
    }

    // 4. 关闭连接 (与生产者相同)
    amqp_channel_close(conn, 1, AMQP_REPLY_SUCCESS);
    amqp_connection_close(conn, AMQP_REPLY_SUCCESS);
    amqp_destroy_connection(conn);

    return 0;
}

代码解释:

  1. 连接 RabbitMQ 服务器: 步骤与生产者相同。
  2. 声明 queue: 确保 queue 存在,否则无法消费消息。
  3. 消费消息: amqp_basic_consume() 开始消费消息,需要指定 queue、consumer tag (通常为空)、no_local、no_ack、exclusive、arguments。amqp_basic_get_delivery() 获取消息。amqp_basic_ack() 确认消息,告诉 RabbitMQ 消息已经被成功处理,可以从 queue 中删除。
  4. 关闭连接: 步骤与生产者相同。

RabbitMQ 总结:

  • 优点: 灵活的消息路由、支持多种消息协议、管理界面友好。
  • 缺点: 性能不如 Kafka,配置相对复杂。
  • 适用场景: 企业应用集成、任务队列、异步处理。

第三部分:ZeroMQ – 嵌入式的消息传递利器

ZeroMQ(简称 ZMQ)是一个高性能的异步消息传递库,它不是一个独立的消息队列服务器,而是一个嵌入式的网络通信库。你可以把它想象成一个 socket 的升级版,提供了更高级的消息模式,比如发布/订阅、请求/应答等等。

1. ZeroMQ C++ API

ZeroMQ 提供了 C++ API,可以直接在 C++ 代码中使用。

2. 安装 ZeroMQ

  • Linux (Debian/Ubuntu): sudo apt-get install libzmq3-dev
  • Linux (RedHat/CentOS): sudo yum install zeromq-devel
  • macOS (Homebrew): brew install zeromq

3. ZeroMQ C++代码示例:发布者 (Publisher)

#include <iostream>
#include <string>
#include <zmq.hpp>

int main() {
    // 1. 创建 ZeroMQ 上下文
    zmq::context_t context(1);

    // 2. 创建发布者 socket
    zmq::socket_t publisher(context, ZMQ_PUB);

    // 3. 绑定到地址
    publisher.bind("tcp://*:5555");

    // 4. 发布消息
    int message_id = 0;
    while (true) {
        std::string message = "Message " + std::to_string(message_id++);

        // 发送消息
        zmq::message_t zmq_message(message.size());
        memcpy(zmq_message.data(), message.c_str(), message.size());
        publisher.send(zmq_message, zmq::send_flags::none);

        std::cout << "Sent: " << message << std::endl;

        sleep(1); // 每秒发送一条消息
    }

    return 0;
}

代码解释:

  1. 创建 ZeroMQ 上下文: zmq::context_t 是 ZeroMQ 的核心对象,用于管理所有的 socket。
  2. 创建发布者 socket: zmq::socket_t 是 ZeroMQ 的 socket 对象,ZMQ_PUB 表示发布者类型。
  3. 绑定到地址: publisher.bind() 将 socket 绑定到指定的地址,tcp://*:5555 表示监听所有 IP 地址的 5555 端口。
  4. 发布消息: 创建 zmq::message_t 对象,将消息内容复制到 zmq::message_t 对象中,publisher.send() 发送消息。

4. ZeroMQ C++代码示例:订阅者 (Subscriber)

#include <iostream>
#include <string>
#include <zmq.hpp>

int main() {
    // 1. 创建 ZeroMQ 上下文
    zmq::context_t context(1);

    // 2. 创建订阅者 socket
    zmq::socket_t subscriber(context, ZMQ_SUB);

    // 3. 连接到发布者
    subscriber.connect("tcp://localhost:5555");

    // 4. 订阅所有消息 (空字符串)
    subscriber.setsockopt(ZMQ_SUBSCRIBE, "", 0);

    // 5. 接收消息
    while (true) {
        zmq::message_t message;
        subscriber.recv(message, zmq::recv_flags::none);

        std::string received_message(static_cast<char*>(message.data()), message.size());
        std::cout << "Received: " << received_message << std::endl;
    }

    return 0;
}

代码解释:

  1. 创建 ZeroMQ 上下文: 与发布者相同。
  2. 创建订阅者 socket: ZMQ_SUB 表示订阅者类型。
  3. 连接到发布者: subscriber.connect() 连接到发布者的地址。
  4. 订阅所有消息: subscriber.setsockopt(ZMQ_SUBSCRIBE, "", 0) 订阅所有消息,如果需要订阅特定主题的消息,可以将空字符串替换为主题名称。
  5. 接收消息: subscriber.recv() 接收消息,并将消息内容复制到 zmq::message_t 对象中。

ZeroMQ 总结:

  • 优点: 高性能、灵活、支持多种消息模式、嵌入式。
  • 缺点: 不是一个独立的消息队列服务器,需要自己管理消息的存储和可靠性。
  • 适用场景: 分布式系统内部的通信、实时数据传输、高性能网络应用。

总结对比:

特性 Kafka RabbitMQ ZeroMQ
架构 分布式日志提交系统 消息队列服务器 嵌入式网络通信库
协议 自定义协议 AMQP 自定义协议
吞吐量 非常高 中等 非常高
消息路由 基于 Topic 和 Partition 灵活,支持多种 Exchange 类型 需要自己实现
可靠性 高,支持数据复制 高,支持消息持久化 需要自己实现
易用性 复杂,学习曲线陡峭 相对简单,管理界面友好 简单,但需要理解其工作原理
适用场景 日志收集、流式处理、大数据分析 企业应用集成、任务队列、异步处理 分布式系统内部的通信、实时数据传输、高性能网络应用
C++ 客户端 librdkafka RabbitMQ C++ Client ZeroMQ C++ API

结束语:

希望通过今天的讲解,你对 Kafka、RabbitMQ 和 ZeroMQ 的 C++ API 使用有了更清晰的认识。选择哪个消息队列,取决于你的具体需求。如果需要处理海量数据,选择 Kafka;如果需要灵活的消息路由,选择 RabbitMQ;如果需要在分布式系统内部进行高性能的通信,选择 ZeroMQ。

记住,实践是检验真理的唯一标准! 多写代码,多踩坑,才能真正掌握这些工具。 祝你编程愉快!

发表回复

您的邮箱地址不会被公开。 必填项已用 * 标注