C++ 与 ‘RDMA’ (远程直接内存访问):如何在零 CPU 参与的情况下实现跨机器的内存直接读写?

C++ 与 RDMA:零 CPU 参与的跨机器内存直接读写之道

在现代分布式系统和高性能计算领域,网络通信的瓶颈日益凸显。传统的TCP/IP协议栈虽然通用且可靠,但其多层软件处理、数据拷贝以及内核参与的开销,使得CPU在数据传输上消耗了大量宝贵资源,并引入了显著的延迟。为了打破这一瓶颈,远程直接内存访问(RDMA)技术应运而生。RDMA允许网络适配器(HCA)绕过操作系统内核,直接读写远程机器的内存,从而实现极低的延迟、高吞吐量以及最重要的——“零 CPU 参与”的数据传输。

本讲座将深入探讨C++环境下如何利用RDMA实现这一革命性的通信范式。我们将从RDMA的基本概念出发,逐步深入到其核心组件、编程模型,并通过详尽的C++代码示例,演示如何构建一个RDMA客户端和服务器,实现跨机器的直接内存读写。

1. RDMA 的核心理念与优势

1.1 传统网络通信的困境

在深入RDMA之前,我们先回顾一下传统网络通信(以TCP为例)的数据路径。当一个应用程序发送数据时,数据通常经历以下步骤:

  1. 用户空间到内核空间拷贝: 应用程序的数据首先从用户空间的缓冲区拷贝到内核空间的socket缓冲区。
  2. 协议栈处理: 数据在内核中经过TCP、IP等协议栈的多层处理,添加各种头部信息。
  3. DMA到网卡: 最终,数据通过DMA(直接内存访问)从内核缓冲区传输到网卡硬件。
  4. 物理传输: 网卡将数据发送到网络。
  5. 接收端反向处理: 接收端网卡接收数据,通过DMA传输到内核缓冲区,经过协议栈处理,最终拷贝到用户空间的应用程序缓冲区。

这个过程涉及多次数据拷贝(尤其是在发送和接收两端),以及操作系统内核大量的上下文切换和CPU指令执行。这些开销在高并发、大数据量传输的场景下,会严重限制系统的性能,并占用宝贵的CPU资源,使其无法专注于应用逻辑。

1.2 RDMA 的突破:内核旁路与零拷贝

RDMA技术的核心在于其“内核旁路”(Kernel Bypass)和“零拷贝”(Zero Copy)特性。它通过特殊的网络适配器——主机通道适配器(HCA,Host Channel Adapter),直接在应用程序的用户空间与远程机器的内存之间建立数据传输路径。

其工作原理可以概括为:

  1. 应用程序注册内存: 应用程序需要将用于RDMA操作的内存区域注册给HCA,使其知道这块内存可以被直接访问。
  2. HCA 直接 DMA: 一旦注册完成并建立连接,应用程序(或更准确地说,是HCA)可以直接发起对远程机器注册内存区域的读写请求。HCA通过PCIe总线直接访问本地机器的内存,并将数据通过网络发送到远程HCA。
  3. 远程 HCA 直接 DMA: 远程HCA接收到数据后,同样通过PCIe总线直接将其写入到远程机器已注册的内存区域,无需远程CPU或操作系统内核的介入。

这种机制带来了显著的优势:

  • 极低延迟: 绕过了协议栈和内核,减少了软件处理的开销,显著降低了端到端延迟。
  • 高吞吐量: 数据传输直接由硬件完成,能够充分利用网络带宽。
  • 零 CPU 参与(Zero CPU Involvement): CPU无需参与数据路径上的拷贝和协议处理,可以将更多资源用于应用逻辑,提高整体系统效率。
  • 零拷贝(Zero Copy): 数据无需在用户空间和内核空间之间进行多次拷贝,进一步提升效率。

1.3 RDMA 的应用场景

RDMA技术因其卓越的性能,广泛应用于以下领域:

  • 高性能计算(HPC): 集群通信,如MPI(Message Passing Interface)库的底层实现。
  • 大数据处理: 分布式文件系统(如Ceph、Lustre)、分布式数据库(如Cassandra、Redis)的节点间通信。
  • 金融交易系统: 对延迟极其敏感的高频交易平台。
  • AI/机器学习: 深度学习框架中模型参数和梯度的高速同步。
  • 存储系统: NVMe over Fabrics (NVMe-oF) 等分布式存储解决方案。

2. RDMA 核心组件与编程模型

RDMA编程主要通过一套名为 libibverbs 的用户空间库来实现。理解其核心组件是掌握RDMA编程的关键。

2.1 硬件基础:HCA (Host Channel Adapter)

HCA是RDMA网络的物理接口,类似于传统以太网卡,但具有RDMA处理能力。它负责执行DMA操作、管理内存注册、处理RDMA协议等。常见的HCA供应商包括Mellanox (现在是NVIDIA的一部分)、Intel等。

2.2 核心软件对象

libibverbs 库提供了一系列结构体和函数来抽象和操作HCA的功能。主要的对象包括:

  • ibv_context 代表一个物理HCA设备。是所有RDMA操作的起点。
  • ibv_pd (Protection Domain): 保护域。一个PD是资源(如内存区域、队列对)的逻辑分组。所有属于同一PD的资源可以相互访问。它提供了内存隔离和保护机制。
  • ibv_mr (Memory Region): 内存区域。应用程序需要通过 ibv_reg_mr() 函数将一块内存注册到HCA,使其可用于RDMA操作。注册后,HCA会获得这块内存的物理地址和访问权限。注册操作会返回一个 lkey (Local Key) 和 rkey (Remote Key),用于标识这块内存。
    • lkey:用于本地HCA访问该MR。
    • rkey:用于远程HCA访问该MR。当进行RDMA Read/Write操作时,发起方需要提供目标MR的 rkey
  • ibv_cq (Completion Queue): 完成队列。HCA完成一个RDMA操作后,会将一个完成事件(Completion Queue Entry, CQE)放入对应的CQ中。应用程序通过轮询(polling)或事件通知的方式从CQ中获取CQE,从而得知操作是否完成。
  • ibv_qp (Queue Pair): 队列对。QP是RDMA通信的端点,由一个发送队列(Send Queue, SQ)和一个接收队列(Receive Queue, RQ)组成。SQ用于提交发送或RDMA读写请求,RQ用于接收传入的数据(对于RDMA Send/Recv操作)。
  • ibv_wr (Work Request) / ibv_send_wr / ibv_recv_wr 工作请求。这些结构体描述了RDMA操作的具体细节,例如数据源、数据目标、操作类型(RDMA Read/Write/Send/Recv)、lkeyrkey 等。应用程序将WR提交到QP的SQ或RQ中。

2.3 RDMA 操作类型

RDMA支持多种操作类型,但我们主要关注“零 CPU 参与”的RDMA Read和RDMA Write:

  • RDMA Write: 本地HCA将本地内存的数据直接写入到远程HCA管理的远程内存区域。发起方是本地,目标是远程。
  • RDMA Read: 本地HCA从远程HCA管理的远程内存区域中直接读取数据到本地内存区域。发起方是本地,目标是远程。
  • RDMA Send/Receive (Channel Operations): 这种模式更接近传统的Send/Recv,但仍然通过HCA完成。发送方将数据放入自己的SQ,接收方需要预先在RQ中post一个接收请求。数据抵达后,HCA将数据放入接收方预设的缓冲区,并在接收方CQ中生成一个CQE。虽然也实现了零拷贝和内核旁路,但接收方CPU仍需预先post接收请求,所以并非完全的“零 CPU 参与”。通常用于控制消息或小数据包传输。
  • RDMA Atomic Operations: 包括Fetch-and-Add和Compare-and-Swap等原子操作,用于在远程内存上实现同步原语。

2.4 QP 状态机

一个Queue Pair在生命周期中会经历多个状态。理解这些状态及其转换是正确设置RDMA连接的关键。

状态 描述 转换方式
RESET 初始状态。QP刚被创建,但尚未分配资源。 ibv_create_qp()
INIT 初始化状态。QP已分配资源,并与PD、CQ关联。可以设置QP类型(如可靠连接RC),但不能发送/接收数据。 ibv_modify_qp() with IBV_QPS_INIT。需要设置qp_access_flags (读写权限)、port_num 等。
RTR Ready To Receive (准备接收)。QP已准备好接收来自远程对端的请求。对于可靠连接(RC),这意味着HCA已知道远程QP的详细信息(如QP号、LID/GID)。 ibv_modify_qp() with IBV_QPS_RTR。需要设置远程QP的qpn (Queue Pair Number)、psn (Packet Sequence Number)、lid (Local ID) 或 gid (Global ID, for RoCEv2/IB over IP)。
RTS Ready To Send (准备发送)。QP已准备好发起发送或RDMA读写请求。对于可靠连接(RC),这意味着本地HCA已完成与远程HCA的握手,可以开始数据传输。 ibv_modify_qp() with IBV_QPS_RTS。需要设置sq_psn (Send Queue Packet Sequence Number) 等。
SQD Send Queue Drain (发送队列排空)。QP不再接受新的发送请求,但会完成所有已提交的请求。 ibv_modify_qp() with IBV_QPS_SQD
SQE Send Queue Error (发送队列错误)。SQ中发生错误。 HCA检测到发送错误。
ERR Error (错误)。QP处于错误状态,无法进行任何操作。通常是致命错误。 任何严重错误(如远程QP不可达、内存访问违规)都可能导致QP进入此状态。
DRAINED Drain (排空)。QP已完成所有未决操作,并且不再接受新的请求。 ibv_modify_qp() with IBV_QPS_DRAINED
UNKNOWN 未知状态。通常在查询QP状态时发生错误或HCA处于异常状态时出现。

可靠连接(RC)类型的QP,其状态转换通常是:RESET -> INIT -> RTR -> RTS。其中,RTRRTS 状态的转换需要双方交换QP信息,这是建立RDMA连接的关键一步。

3. C++ RDMA 编程实践:零 CPU 读写示例

我们将构建一个简单的RDMA客户端和服务器程序。服务器会注册一块内存区域,并等待客户端的RDMA Read/Write请求。客户端将连接服务器,然后向服务器的内存区域写入数据,再从同一区域读取数据,验证RDMA的零 CPU 读写功能。

为了简化连接管理,我们将使用传统的TCP socket进行初始的QP信息交换(如QP号、MR的rkey和地址)。实际生产环境中,可以使用 librdmacm (RDMA Connection Manager) 库来自动化这一过程。

3.1 预备知识与环境设置

  • 硬件: 具备RDMA功能的网卡(如Mellanox ConnectX系列),并连接到支持RDMA的网络(InfiniBand交换机或支持RoCE的以太网交换机)。
  • 操作系统: Linux发行版(如Ubuntu, CentOS)。
  • RDMA驱动和库: 安装Mellanox OFED或RDMA-Core(ibverbs)库。
    • 在Ubuntu上:sudo apt update && sudo apt install libibverbs-dev librdmacm-dev
    • 在CentOS上:sudo yum install libibverbs-devel librdmacm-devel
  • 编译工具: G++。

3.2 辅助宏与结构体

为了更好的错误处理和代码可读性,我们定义一些辅助宏和结构体。

#include <iostream>
#include <vector>
#include <string>
#include <cstring>
#include <unistd.h>
#include <arpa/inet.h>
#include <netdb.h>
#include <errno.h>

// RDMA verbs headers
#include <infiniband/verbs.h>

// Helper for error checking
#define CHECK_NULL(ptr, msg) 
    do { 
        if (ptr == nullptr) { 
            std::cerr << "ERROR: " << msg << " failed (errno: " << errno << ", " << strerror(errno) << ")" << std::endl; 
            exit(EXIT_FAILURE); 
        } 
    } while (0)

#define CHECK_NON_ZERO(ret, msg) 
    do { 
        if (ret != 0) { 
            std::cerr << "ERROR: " << msg << " failed (errno: " << errno << ", " << strerror(errno) << ")" << std::endl; 
            exit(EXIT_FAILURE); 
        } 
    } while (0)

#define CHECK_NEGATIVE_ONE(ret, msg) 
    do { 
        if (ret == -1) { 
            std::cerr << "ERROR: " << msg << " failed (errno: " << errno << ", " << strerror(errno) << ")" << std::endl; 
            exit(EXIT_FAILURE); 
        } 
    } while (0)

// Maximum number of Work Requests (WRs) and Completion Queue Entries (CQEs)
const int MAX_WR = 1;
const int MAX_CQE = 1;

// Size of the memory buffer to be registered
const size_t BUFFER_SIZE = 4096; // 4KB

// TCP port for connection management
const int TCP_PORT = 18515;

// Structure to exchange QP and MR information
struct ConnectionInfo {
    uint32_t qp_num;
    uint32_t lid;       // Local ID, for InfiniBand. For RoCE, usually 0 or GID is used.
    uint64_t remote_addr; // Address of the registered memory region
    uint32_t rkey;      // RKey of the registered memory region
    // For RoCE, you might also need GID (Global ID)
    // union ibv_gid gid;
};

// Helper function to resolve hostname to IP address
std::string resolve_hostname(const std::string& hostname) {
    struct hostent *host_entry = gethostbyname(hostname.c_str());
    CHECK_NULL(host_entry, "gethostbyname");
    return inet_ntoa(*(struct in_addr*)host_entry->h_addr_list[0]);
}

// Function to print QP state for debugging
void print_qp_state(struct ibv_qp *qp) {
    struct ibv_qp_attr attr;
    struct ibv_qp_init_attr init_attr;
    CHECK_NON_ZERO(ibv_query_qp(qp, &attr, IBV_QP_STATE, &init_attr), "ibv_query_qp");
    std::cout << "QP State: " << attr.qp_state << std::endl;
}

3.3 RDMA 服务器端实现

服务器端主要负责初始化RDMA设备,注册内存,将QP转换为RTR/RTS状态,并等待客户端的连接和RDMA操作。

// Server.cpp
#include <iostream>
#include <vector>
#include <string>
#include <cstring>
#include <unistd.h>
#include <arpa/inet.h>
#include <netdb.h>
#include <errno.h>

#include <infiniband/verbs.h>

// ... (Auxiliary macros and ConnectionInfo struct from above) ...

class RdmaServer {
private:
    struct ibv_context *ctx = nullptr;
    struct ibv_pd *pd = nullptr;
    struct ibv_mr *mr = nullptr;
    struct ibv_cq *cq = nullptr;
    struct ibv_qp *qp = nullptr;
    char *buffer = nullptr; // The memory region to be registered

    int tcp_listen_sock = -1;
    int tcp_conn_sock = -1;

    ConnectionInfo local_info;
    ConnectionInfo remote_info;

public:
    RdmaServer() {
        buffer = new char[BUFFER_SIZE];
        std::memset(buffer, 0, BUFFER_SIZE); // Initialize buffer
        std::cout << "Server buffer allocated at " << (void*)buffer << std::endl;
    }

    ~RdmaServer() {
        if (qp) {
            ibv_destroy_qp(qp);
            std::cout << "QP destroyed." << std::endl;
        }
        if (cq) {
            ibv_destroy_cq(cq);
            std::cout << "CQ destroyed." << std::endl;
        }
        if (mr) {
            ibv_dereg_mr(mr);
            std::cout << "MR deregistered." << std::endl;
        }
        if (pd) {
            ibv_dealloc_pd(pd);
            std::cout << "PD deallocated." << std::endl;
        }
        if (ctx) {
            ibv_close_device(ctx);
            std::cout << "RDMA device closed." << std::endl;
        }
        if (tcp_conn_sock != -1) {
            close(tcp_conn_sock);
            std::cout << "TCP connection socket closed." << std::endl;
        }
        if (tcp_listen_sock != -1) {
            close(tcp_listen_sock);
            std::cout << "TCP listen socket closed." << std::endl;
        }
        if (buffer) {
            delete[] buffer;
            std::cout << "Server buffer deallocated." << std::endl;
        }
    }

    void setup_rdma_device() {
        struct ibv_device **dev_list;
        int num_devices;

        dev_list = ibv_get_device_list(&num_devices);
        CHECK_NULL(dev_list, "Failed to get IB devices list");
        if (num_devices == 0) {
            std::cerr << "ERROR: No RDMA devices found." << std::endl;
            exit(EXIT_FAILURE);
        }

        ctx = ibv_open_device(dev_list[0]);
        CHECK_NULL(ctx, "Failed to open RDMA device");
        std::cout << "RDMA device " << ibv_get_device_name(dev_list[0]) << " opened." << std::endl;
        ibv_free_device_list(dev_list);

        pd = ibv_alloc_pd(ctx);
        CHECK_NULL(pd, "Failed to allocate Protection Domain");
        std::cout << "Protection Domain allocated." << std::endl;

        // Register the memory buffer for RDMA operations
        mr = ibv_reg_mr(pd, buffer, BUFFER_SIZE,
                        IBV_ACCESS_LOCAL_WRITE |
                        IBV_ACCESS_REMOTE_READ |
                        IBV_ACCESS_REMOTE_WRITE);
        CHECK_NULL(mr, "Failed to register Memory Region");
        std::cout << "Memory Region registered. Buffer address: " << (void*)mr->addr
                  << ", length: " << mr->length << ", lkey: " << mr->lkey
                  << ", rkey: " << mr->rkey << std::endl;

        cq = ibv_create_cq(ctx, MAX_CQE, nullptr, nullptr, 0);
        CHECK_NULL(cq, "Failed to create Completion Queue");
        std::cout << "Completion Queue created." << std::endl;

        // QP initialization attributes
        struct ibv_qp_init_attr qp_init_attr;
        std::memset(&qp_init_attr, 0, sizeof(qp_init_attr));
        qp_init_attr.qp_type = IBV_QPT_RC; // Reliable Connected
        qp_init_attr.sq_size = MAX_WR;
        qp_init_attr.rq_size = MAX_WR;
        qp_init_attr.send_cq = cq;
        qp_init_attr.recv_cq = cq;
        qp_init_attr.cap.max_send_wr = MAX_WR;
        qp_init_attr.cap.max_recv_wr = MAX_WR;
        qp_init_attr.cap.max_send_sge = 1;
        qp_init_attr.cap.max_recv_sge = 1;

        qp = ibv_create_qp(pd, &qp_init_attr);
        CHECK_NULL(qp, "Failed to create Queue Pair");
        std::cout << "Queue Pair created. QP Num: " << qp->qp_num << std::endl;

        // Get local QP info for exchange
        local_info.qp_num = qp->qp_num;
        local_info.remote_addr = (uint64_t)mr->addr; // Address for remote access
        local_info.rkey = mr->rkey;

        // Get LID (Local ID)
        struct ibv_port_attr port_attr;
        CHECK_NON_ZERO(ibv_query_port(ctx, 1, &port_attr), "Failed to query port info");
        local_info.lid = port_attr.lid;
        std::cout << "Local LID: " << local_info.lid << std::endl;
    }

    void setup_tcp_connection() {
        tcp_listen_sock = socket(AF_INET, SOCK_STREAM, 0);
        CHECK_NEGATIVE_ONE(tcp_listen_sock, "Failed to create TCP listen socket");

        int optval = 1;
        CHECK_NEGATIVE_ONE(setsockopt(tcp_listen_sock, SOL_SOCKET, SO_REUSEADDR, &optval, sizeof(optval)), "setsockopt SO_REUSEADDR");

        struct sockaddr_in server_addr;
        std::memset(&server_addr, 0, sizeof(server_addr));
        server_addr.sin_family = AF_INET;
        server_addr.sin_addr.s_addr = INADDR_ANY;
        server_addr.sin_port = htons(TCP_PORT);

        CHECK_NEGATIVE_ONE(bind(tcp_listen_sock, (struct sockaddr*)&server_addr, sizeof(server_addr)), "Failed to bind TCP listen socket");
        CHECK_NEGATIVE_ONE(listen(tcp_listen_sock, 1), "Failed to listen on TCP socket");

        std::cout << "Waiting for client connection on TCP port " << TCP_PORT << "..." << std::endl;
        tcp_conn_sock = accept(tcp_listen_sock, nullptr, nullptr);
        CHECK_NEGATIVE_ONE(tcp_conn_sock, "Failed to accept client connection");
        std::cout << "Client connected via TCP." << std::endl;

        // Exchange connection info
        CHECK_NEGATIVE_ONE(send(tcp_conn_sock, &local_info, sizeof(local_info), 0), "Failed to send local info");
        std::cout << "Sent local RDMA info to client." << std::endl;

        CHECK_NEGATIVE_ONE(recv(tcp_conn_sock, &remote_info, sizeof(remote_info), 0), "Failed to receive remote info");
        std::cout << "Received remote RDMA info from client (QP Num: " << remote_info.qp_num
                  << ", LID: " << remote_info.lid << ", Addr: " << (void*)remote_info.remote_addr
                  << ", RKey: " << remote_info.rkey << ")." << std::endl;
    }

    void transition_qp_to_rtr() {
        struct ibv_qp_attr qp_attr;
        std::memset(&qp_attr, 0, sizeof(qp_attr));
        qp_attr.qp_state = IBV_QPS_INIT;
        qp_attr.qp_access_flags = IBV_ACCESS_REMOTE_READ | IBV_ACCESS_REMOTE_WRITE | IBV_ACCESS_LOCAL_WRITE;
        qp_attr.port_num = 1; // Default port 1

        CHECK_NON_ZERO(ibv_modify_qp(qp, &qp_attr,
                                     IBV_QP_STATE | IBV_QP_ACCESS_FLAGS | IBV_QP_PORT),
                       "Failed to modify QP to INIT state");
        std::cout << "QP transitioned to INIT state." << std::endl;
        print_qp_state(qp);

        // Transition QP to RTR (Ready To Receive)
        std::memset(&qp_attr, 0, sizeof(qp_attr));
        qp_attr.qp_state = IBV_QPS_RTR;
        qp_attr.path_mtu = IBV_MTU_1024; // Common MTU
        qp_attr.dest_qp_num = remote_info.qp_num;
        qp_attr.rq_psn = 0; // Packet Sequence Number
        qp_attr.max_dest_rd_atomic = 1; // Max outstanding RDMA Read/Atomic
        qp_attr.min_rnr_timer = 12; // Min RNR NAK timer (2.048ms)

        // For InfiniBand, using LID
        qp_attr.ah_attr.dlid = remote_info.lid;
        qp_attr.ah_attr.sl = 0; // Service Level
        qp_attr.ah_attr.src_path_bits = 0;
        qp_attr.ah_attr.port_num = 1; // Default port 1

        CHECK_NON_ZERO(ibv_modify_qp(qp, &qp_attr,
                                     IBV_QP_STATE | IBV_QP_PATH_MTU | IBV_QP_DEST_QP_NUM |
                                     IBV_QP_RQ_PSN | IBV_QP_MAX_DEST_RD_ATOMIC | IBV_QP_MIN_RNR_TIMER |
                                     IBV_QP_AH_ATTR),
                       "Failed to modify QP to RTR state");
        std::cout << "QP transitioned to RTR state." << std::endl;
        print_qp_state(qp);
    }

    void transition_qp_to_rts() {
        // Transition QP to RTS (Ready To Send)
        struct ibv_qp_attr qp_attr;
        std::memset(&qp_attr, 0, sizeof(qp_attr));
        qp_attr.qp_state = IBV_QPS_RTS;
        qp_attr.timeout = 14; // Default timeout (4.096ms)
        qp_attr.retry_cnt = 7; // Max number of retries
        qp_attr.rnr_retry = 7; // Max RNR retries
        qp_attr.sq_psn = 0; // Send Queue Packet Sequence Number
        qp_attr.max_rd_atomic = 1; // Max outstanding RDMA Read/Atomic

        CHECK_NON_ZERO(ibv_modify_qp(qp, &qp_attr,
                                     IBV_QP_STATE | IBV_QP_TIMEOUT | IBV_QP_RETRY_CNT |
                                     IBV_QP_RNR_RETRY | IBV_QP_SQ_PSN | IBV_QP_MAX_RD_ATOMIC),
                       "Failed to modify QP to RTS state");
        std::cout << "QP transitioned to RTS state." << std::endl;
        print_qp_state(qp);
    }

    void run() {
        setup_rdma_device();
        setup_tcp_connection();
        transition_qp_to_rtr(); // Server needs to be RTR first to receive client's RTS
        transition_qp_to_rts(); // Server can also initiate operations now

        std::cout << "Server is ready for RDMA operations. Waiting for client to perform read/write..." << std::endl;

        // In a real application, the server might post receives for control messages
        // or just wait for RDMA Read/Write completions if it initiates operations.
        // For this example, we just wait and then print buffer content.
        std::cout << "Waiting 10 seconds for client RDMA operations..." << std::endl;
        sleep(10); // Give client time to perform RDMA operations

        std::cout << "nServer Buffer Content after client RDMA operations:" << std::endl;
        std::cout << "-----------------------------------------------" << std::endl;
        std::cout << "Buffer (first 64 bytes): ";
        for (size_t i = 0; i < std::min((size_t)64, BUFFER_SIZE); ++i) {
            std::cout << buffer[i];
        }
        std::cout << std::endl;
        std::cout << "-----------------------------------------------" << std::endl;

        // Optionally, if the server was expected to post a receive for a 'done' message:
        // struct ibv_recv_wr rv_wr, *bad_rv_wr = nullptr;
        // struct ibv_sge rv_sge;
        // // ... setup rv_sge and rv_wr ...
        // CHECK_NON_ZERO(ibv_post_recv(qp, &rv_wr, &bad_rv_wr), "Failed to post receive WR");
        // // ... poll for completion ...

        // For this demo, we'll just exit after a delay.
    }
};

int main() {
    RdmaServer server;
    server.run();
    return 0;
}

编译服务器: g++ -std=c++11 Server.cpp -o server -libverbs -lrdmacm -pthread

3.4 RDMA 客户端实现

客户端负责初始化RDMA设备,注册内存,连接服务器,交换QP信息,然后将自己的QP转换为RTR/RTS状态,最后发起RDMA Write和RDMA Read操作。

// Client.cpp
#include <iostream>
#include <vector>
#include <string>
#include <cstring>
#include <unistd.h>
#include <arpa/inet.h>
#include <netdb.h>
#include <errno.h>

#include <infiniband/verbs.h>

// ... (Auxiliary macros and ConnectionInfo struct from above) ...

class RdmaClient {
private:
    struct ibv_context *ctx = nullptr;
    struct ibv_pd *pd = nullptr;
    struct ibv_mr *mr = nullptr;
    struct ibv_cq *cq = nullptr;
    struct ibv_qp *qp = nullptr;
    char *buffer = nullptr; // Local buffer for sending/receiving data

    int tcp_conn_sock = -1;

    ConnectionInfo local_info;
    ConnectionInfo remote_info;

    std::string server_ip;

public:
    RdmaClient(const std::string& ip) : server_ip(ip) {
        buffer = new char[BUFFER_SIZE];
        std::memset(buffer, 0, BUFFER_SIZE); // Initialize buffer
        std::cout << "Client buffer allocated at " << (void*)buffer << std::endl;
    }

    ~RdmaClient() {
        if (qp) {
            ibv_destroy_qp(qp);
            std::cout << "QP destroyed." << std::endl;
        }
        if (cq) {
            ibv_destroy_cq(cq);
            std::cout << "CQ destroyed." << std::endl;
        }
        if (mr) {
            ibv_dereg_mr(mr);
            std::cout << "MR deregistered." << std::endl;
        }
        if (pd) {
            ibv_dealloc_pd(pd);
            std::cout << "PD deallocated." << std::endl;
        }
        if (ctx) {
            ibv_close_device(ctx);
            std::cout << "RDMA device closed." << std::endl;
        }
        if (tcp_conn_sock != -1) {
            close(tcp_conn_sock);
            std::cout << "TCP connection socket closed." << std::endl;
        }
        if (buffer) {
            delete[] buffer;
            std::cout << "Client buffer deallocated." << std::endl;
        }
    }

    void setup_rdma_device() {
        struct ibv_device **dev_list;
        int num_devices;

        dev_list = ibv_get_device_list(&num_devices);
        CHECK_NULL(dev_list, "Failed to get IB devices list");
        if (num_devices == 0) {
            std::cerr << "ERROR: No RDMA devices found." << std::endl;
            exit(EXIT_FAILURE);
        }

        ctx = ibv_open_device(dev_list[0]);
        CHECK_NULL(ctx, "Failed to open RDMA device");
        std::cout << "RDMA device " << ibv_get_device_name(dev_list[0]) << " opened." << std::endl;
        ibv_free_device_list(dev_list);

        pd = ibv_alloc_pd(ctx);
        CHECK_NULL(pd, "Failed to allocate Protection Domain");
        std::cout << "Protection Domain allocated." << std::endl;

        // Register the memory buffer for RDMA operations
        mr = ibv_reg_mr(pd, buffer, BUFFER_SIZE,
                        IBV_ACCESS_LOCAL_WRITE |
                        IBV_ACCESS_REMOTE_READ |
                        IBV_ACCESS_REMOTE_WRITE);
        CHECK_NULL(mr, "Failed to register Memory Region");
        std::cout << "Memory Region registered. Buffer address: " << (void*)mr->addr
                  << ", length: " << mr->length << ", lkey: " << mr->lkey
                  << ", rkey: " << mr->rkey << std::endl;

        cq = ibv_create_cq(ctx, MAX_CQE, nullptr, nullptr, 0);
        CHECK_NULL(cq, "Failed to create Completion Queue");
        std::cout << "Completion Queue created." << std::endl;

        // QP initialization attributes
        struct ibv_qp_init_attr qp_init_attr;
        std::memset(&qp_init_attr, 0, sizeof(qp_init_attr));
        qp_init_attr.qp_type = IBV_QPT_RC; // Reliable Connected
        qp_init_attr.sq_size = MAX_WR;
        qp_init_attr.rq_size = MAX_WR;
        qp_init_attr.send_cq = cq;
        qp_init_attr.recv_cq = cq;
        qp_init_attr.cap.max_send_wr = MAX_WR;
        qp_init_attr.cap.max_recv_wr = MAX_WR;
        qp_init_attr.cap.max_send_sge = 1;
        qp_init_attr.cap.max_recv_sge = 1;

        qp = ibv_create_qp(pd, &qp_init_attr);
        CHECK_NULL(qp, "Failed to create Queue Pair");
        std::cout << "Queue Pair created. QP Num: " << qp->qp_num << std::endl;

        // Get local QP info for exchange
        local_info.qp_num = qp->qp_num;
        local_info.remote_addr = (uint64_t)mr->addr; // For symmetric MR address usage, though not strictly required for client
        local_info.rkey = mr->rkey;

        // Get LID (Local ID)
        struct ibv_port_attr port_attr;
        CHECK_NON_ZERO(ibv_query_port(ctx, 1, &port_attr), "Failed to query port info");
        local_info.lid = port_attr.lid;
        std::cout << "Local LID: " << local_info.lid << std::endl;
    }

    void setup_tcp_connection() {
        tcp_conn_sock = socket(AF_INET, SOCK_STREAM, 0);
        CHECK_NEGATIVE_ONE(tcp_conn_sock, "Failed to create TCP connection socket");

        struct sockaddr_in server_addr_in;
        std::memset(&server_addr_in, 0, sizeof(server_addr_in));
        server_addr_in.sin_family = AF_INET;
        server_addr_in.sin_port = htons(TCP_PORT);
        CHECK_NEGATIVE_ONE(inet_pton(AF_INET, server_ip.c_str(), &server_addr_in.sin_addr), "Invalid server IP address");

        std::cout << "Connecting to TCP server " << server_ip << ":" << TCP_PORT << "..." << std::endl;
        CHECK_NEGATIVE_ONE(connect(tcp_conn_sock, (struct sockaddr*)&server_addr_in, sizeof(server_addr_in)), "Failed to connect to TCP server");
        std::cout << "Connected to TCP server." << std::endl;

        // Exchange connection info
        CHECK_NEGATIVE_ONE(recv(tcp_conn_sock, &remote_info, sizeof(remote_info), 0), "Failed to receive remote info");
        std::cout << "Received remote RDMA info from server (QP Num: " << remote_info.qp_num
                  << ", LID: " << remote_info.lid << ", Addr: " << (void*)remote_info.remote_addr
                  << ", RKey: " << remote_info.rkey << ")." << std::endl;

        CHECK_NEGATIVE_ONE(send(tcp_conn_sock, &local_info, sizeof(local_info), 0), "Failed to send local info");
        std::cout << "Sent local RDMA info to server." << std::endl;
    }

    void transition_qp_to_rtr() {
        struct ibv_qp_attr qp_attr;
        std::memset(&qp_attr, 0, sizeof(qp_attr));
        qp_attr.qp_state = IBV_QPS_INIT;
        qp_attr.qp_access_flags = IBV_ACCESS_REMOTE_READ | IBV_ACCESS_REMOTE_WRITE | IBV_ACCESS_LOCAL_WRITE;
        qp_attr.port_num = 1;

        CHECK_NON_ZERO(ibv_modify_qp(qp, &qp_attr,
                                     IBV_QP_STATE | IBV_QP_ACCESS_FLAGS | IBV_QP_PORT),
                       "Failed to modify QP to INIT state");
        std::cout << "QP transitioned to INIT state." << std::endl;
        print_qp_state(qp);

        // Transition QP to RTR (Ready To Receive)
        std::memset(&qp_attr, 0, sizeof(qp_attr));
        qp_attr.qp_state = IBV_QPS_RTR;
        qp_attr.path_mtu = IBV_MTU_1024;
        qp_attr.dest_qp_num = remote_info.qp_num;
        qp_attr.rq_psn = 0;
        qp_attr.max_dest_rd_atomic = 1;
        qp_attr.min_rnr_timer = 12;

        qp_attr.ah_attr.dlid = remote_info.lid;
        qp_attr.ah_attr.sl = 0;
        qp_attr.ah_attr.src_path_bits = 0;
        qp_attr.ah_attr.port_num = 1;

        CHECK_NON_ZERO(ibv_modify_qp(qp, &qp_attr,
                                     IBV_QP_STATE | IBV_QP_PATH_MTU | IBV_QP_DEST_QP_NUM |
                                     IBV_QP_RQ_PSN | IBV_QP_MAX_DEST_RD_ATOMIC | IBV_QP_MIN_RNR_TIMER |
                                     IBV_QP_AH_ATTR),
                       "Failed to modify QP to RTR state");
        std::cout << "QP transitioned to RTR state." << std::endl;
        print_qp_state(qp);
    }

    void transition_qp_to_rts() {
        // Transition QP to RTS (Ready To Send)
        struct ibv_qp_attr qp_attr;
        std::memset(&qp_attr, 0, sizeof(qp_attr));
        qp_attr.qp_state = IBV_QPS_RTS;
        qp_attr.timeout = 14;
        qp_attr.retry_cnt = 7;
        qp_attr.rnr_retry = 7;
        qp_attr.sq_psn = 0;
        qp_attr.max_rd_atomic = 1;

        CHECK_NON_ZERO(ibv_modify_qp(qp, &qp_attr,
                                     IBV_QP_STATE | IBV_QP_TIMEOUT | IBV_QP_RETRY_CNT |
                                     IBV_QP_RNR_RETRY | IBV_QP_SQ_PSN | IBV_QP_MAX_RD_ATOMIC),
                       "Failed to modify QP to RTS state");
        std::cout << "QP transitioned to RTS state." << std::endl;
        print_qp_state(qp);
    }

    void perform_rdma_write() {
        std::cout << "n--- Performing RDMA Write ---" << std::endl;
        const char *message = "Hello from Client via RDMA Write! This is direct memory access.";
        size_t msg_len = std::strlen(message);
        if (msg_len > BUFFER_SIZE) msg_len = BUFFER_SIZE; // Prevent overflow

        std::memcpy(buffer, message, msg_len);
        std::cout << "Local buffer content before RDMA Write: '" << std::string(buffer, msg_len) << "'" << std::endl;

        struct ibv_sge sge;
        sge.addr = (uint64_t)buffer;
        sge.length = msg_len;
        sge.lkey = mr->lkey;

        struct ibv_send_wr wr, *bad_wr = nullptr;
        std::memset(&wr, 0, sizeof(wr));
        wr.wr_id = 1; // User defined ID for completion tracking
        wr.sg_list = &sge;
        wr.num_sge = 1;
        wr.opcode = IBV_WR_RDMA_WRITE; // RDMA Write operation
        wr.send_flags = IBV_SEND_SIGNALED; // Request a completion notification
        wr.wr.rdma.remote_addr = remote_info.remote_addr; // Server's MR address
        wr.wr.rdma.rkey = remote_info.rkey; // Server's MR rkey

        CHECK_NON_ZERO(ibv_post_send(qp, &wr, &bad_wr), "Failed to post RDMA Write WR");
        std::cout << "RDMA Write WR posted. Waiting for completion..." << std::endl;

        struct ibv_wc wc;
        CHECK_NON_ZERO(ibv_poll_cq(cq, 1, &wc), "Failed to poll CQ for RDMA Write completion");

        if (wc.status != IBV_WC_SUCCESS) {
            std::cerr << "ERROR: RDMA Write failed with status: " << ibv_wc_status_str(wc.status) << std::endl;
            exit(EXIT_FAILURE);
        }
        std::cout << "RDMA Write completed successfully. Bytes written: " << wc.byte_len << std::endl;
    }

    void perform_rdma_read() {
        std::cout << "n--- Performing RDMA Read ---" << std::endl;
        std::memset(buffer, 0, BUFFER_SIZE); // Clear local buffer before read

        struct ibv_sge sge;
        sge.addr = (uint64_t)buffer; // Local buffer to receive data
        sge.length = BUFFER_SIZE;
        sge.lkey = mr->lkey;

        struct ibv_send_wr wr, *bad_wr = nullptr;
        std::memset(&wr, 0, sizeof(wr));
        wr.wr_id = 2; // User defined ID
        wr.sg_list = &sge;
        wr.num_sge = 1;
        wr.opcode = IBV_WR_RDMA_READ; // RDMA Read operation
        wr.send_flags = IBV_SEND_SIGNALED;
        wr.wr.rdma.remote_addr = remote_info.remote_addr; // Server's MR address
        wr.wr.rdma.rkey = remote_info.rkey; // Server's MR rkey

        CHECK_NON_ZERO(ibv_post_send(qp, &wr, &bad_wr), "Failed to post RDMA Read WR");
        std::cout << "RDMA Read WR posted. Waiting for completion..." << std::endl;

        struct ibv_wc wc;
        CHECK_NON_ZERO(ibv_poll_cq(cq, 1, &wc), "Failed to poll CQ for RDMA Read completion");

        if (wc.status != IBV_WC_SUCCESS) {
            std::cerr << "ERROR: RDMA Read failed with status: " << ibv_wc_status_str(wc.status) << std::endl;
            exit(EXIT_FAILURE);
        }
        std::cout << "RDMA Read completed successfully. Bytes read: " << wc.byte_len << std::endl;
        std::cout << "Local buffer content after RDMA Read from server: '" << std::string(buffer, wc.byte_len) << "'" << std::endl;
    }

    void run() {
        setup_rdma_device();
        setup_tcp_connection();
        transition_qp_to_rtr();
        transition_qp_to_rts();

        std::cout << "Client is ready for RDMA operations." << std::endl;

        perform_rdma_write();
        perform_rdma_read();
    }
};

int main(int argc, char *argv[]) {
    if (argc != 2) {
        std::cerr << "Usage: " << argv[0] << " <server_ip>" << std::endl;
        return EXIT_FAILURE;
    }
    std::string server_ip = argv[1];

    RdmaClient client(server_ip);
    client.run();
    return 0;
}

编译客户端: g++ -std=c++11 Client.cpp -o client -libverbs -lrdmacm -pthread

3.5 运行与验证

  1. 在服务器上运行:
    ./server
    服务器将启动并等待客户端连接。

  2. 在客户端上运行:
    ./client <服务器IP地址>
    客户端将连接服务器,执行RDMA Write和RDMA Read操作。

预期输出:

  • 服务器端: 将看到QP状态转换,以及在客户端完成RDMA Write后,服务器本地buffer内容被修改的打印。
  • 客户端端: 将看到QP状态转换,RDMA Write操作成功,以及RDMA Read操作后,从服务器读取到的数据与之前写入数据一致的打印。

这个示例清晰地展示了客户端如何通过 ibv_post_send 函数,指定 IBV_WR_RDMA_WRITEIBV_WR_RDMA_READ 操作码,提供服务器端的 remote_addrrkey,直接对服务器的内存进行读写。整个数据传输过程,服务器端的CPU在接受到客户端的RDMA Read/Write请求后,无需执行任何数据拷贝指令,也无需上下文切换,完全由HCA硬件完成数据传输。CPU只会在客户端轮询CQ以获取完成通知时,才感知到操作的完成。

4. 零 CPU 参与的机制解析

“零 CPU 参与”并非指CPU完全不参与RDMA的任何环节,而是特指CPU不参与数据路径上的直接数据拷贝和协议处理。其核心机制如下:

  1. 内存注册与 pinning: 当应用程序调用 ibv_reg_mr() 注册内存时,HCA会获取这块内存的物理地址,并将其“钉住”(pinning)在物理内存中,防止操作系统将其交换到磁盘或重新映射。这样,HCA可以直接通过DMA访问这些物理地址。
  2. HCA 作为 DMA master: HCA是一个智能的PCIe设备,它具有DMA控制器,可以直接作为PCIe总线上的master,访问主机内存。当HCA接收到RDMA请求时(例如RDMA Write),它会解析请求,获取远程目标内存的rkey和地址。
  3. HCA 之间直接通信: HCA之间通过专用的RDMA协议进行通信。对于RDMA Write,发起方HCA从本地注册内存读取数据,通过网络传输到目标HCA。目标HCA接收数据后,根据请求中的rkey和地址,直接将数据DMA写入到目标机器已注册的内存区域。整个过程不经过目标机器的CPU、操作系统内核或TCP/IP协议栈。
  4. Completion Queue (CQ) 通知: 当HCA完成一个RDMA操作时,它会在对应的Completion Queue中写入一个完成事件(CQE)。应用程序通过 ibv_poll_cq() 轮询CQ来获取这些事件。轮询操作本身是CPU参与的,但这是为了获取操作结果,而不是为了传输数据。也可以配置CQ为事件通知模式,当有CQE时通过中断通知CPU,但为了最低延迟,通常采用忙轮询。

这种机制使得数据流完全在HCA和物理内存之间移动,CPU得以解放,从而实现极致的性能和资源效率。

5. 实际考量与进阶话题

  • 错误处理: RDMA编程中错误处理至关重要。网络故障、内存访问违规、QP状态不匹配等都可能导致操作失败。必须仔细检查每个 ibv_ 函数的返回值,并在出错时进行适当的清理和恢复。
  • 性能优化:
    • 批处理 WR: 尽可能一次性提交多个Work Request (ibv_post_sendibv_post_recv 都支持 wr 链表),减少每次系统调用的开销。
    • 忙轮询 CQ: 对于延迟敏感的应用,使用忙轮询 (ibv_poll_cq) 而非事件通知(中断),以避免中断处理的延迟。
    • NUMA 亲和性: 将应用程序进程、内存缓冲区和HCA绑定到同一个NUMA节点,以减少跨NUMA节点的内存访问延迟。
    • Huge Pages: 使用大页内存(Huge Pages)可以减少TLB(Translation Lookaside Buffer)未命中,提高内存访问效率。
  • 连接管理: 示例中使用了TCP进行QP信息交换,这在简单场景下可行。在复杂的生产环境中,建议使用 librdmacm 库。它提供了更高级的API来管理RDMA连接,包括地址解析、QP创建和状态转换,简化了RDMA应用的开发。
  • 内存管理: 除了 ibv_reg_mr 注册内存外,还需要注意内存的生命周期。注册的内存必须在RDMA操作完成后才能释放。在Linux上,mlock() 函数可以用于防止内存被交换出物理RAM,但这通常由 ibv_reg_mr 内部处理。
  • RoCE vs. InfiniBand: InfiniBand是RDMA的原始协议,运行在专用的InfiniBand网络上。RoCE (RDMA over Converged Ethernet) 允许RDMA在标准以太网上运行,这大大降低了RDMA的部署成本。RoCEv2是目前主流的RoCE版本,支持IP路由。在编程接口层面,libibverbs 对两者提供了统一的抽象,但在网络配置和某些细节上(如寻址使用LID还是GID)有所不同。
  • 异步操作和事件驱动: 虽然我们使用了同步的 ibv_poll_cq,但 libibverbs 也支持将CQ与事件文件描述符关联,然后通过 epoll 等机制异步等待CQ事件,实现更高效的资源利用。

6. RDMA 的未来展望

RDMA技术正变得越来越普及,尤其是在数据中心和云计算环境中。随着网络带宽的持续增长和对低延迟、高效率通信的不断追求,RDMA将成为构建下一代高性能分布式系统的基石。从传统HPC到AI、大数据、存储等领域,RDMA的“零 CPU 参与”特性将持续赋能开发者,构建出更强大、更高效的应用。

RDMA提供了一种革命性的网络通信方式,通过将数据路径从CPU卸载到智能网卡,实现了跨机器内存的直接、高效访问。C++开发者可以利用 libibverbs 库,结合对RDMA核心概念的理解,构建出性能卓越、CPU利用率极高的分布式应用。虽然初始学习曲线可能较陡峭,但其带来的性能提升足以证明投入的价值。

发表回复

您的邮箱地址不会被公开。 必填项已用 * 标注