C++ 与 远程内存(Remote Memory):在分布式 C++ 集群中通过 RDMA 实现跨节点共享内存池分配机制

C++ 与远程内存:在分布式 C++ 集群中通过 RDMA 实现跨节点共享内存池分配机制

各位来宾,各位技术爱好者,大家好!

今天,我们将深入探讨一个在高性能分布式系统中至关重要的话题:如何在 C++ 分布式集群中,利用 RDMA(Remote Direct Memory Access)技术,实现一个高效、低延迟的跨节点共享内存池分配机制。随着数据规模的爆炸式增长和计算需求的日益复杂,传统的网络通信模型已经无法满足现代分布式应用对极致性能的追求。RDMA 的出现,为 C++ 开发者打开了一扇通往全新内存管理范式的大门。

1. 引言:分布式C++集群的内存管理困境与RDMA的曙光

在构建大规模分布式 C++ 集群时,无论是数据分析、机器学习、金融交易系统还是高性能计算(HPC)领域,节点间的数据交换和共享都是核心操作。传统上,我们依赖 TCP/IP 协议栈进行通信,数据从应用程序缓冲区拷贝到内核缓冲区,再通过网络接口发送,接收方进行反向操作。这个过程涉及多次内存拷贝、CPU 上下文切换以及操作系统内核的参与,导致显著的延迟和 CPU 消耗。

想象一下,一个 C++ 应用程序需要频繁地在不同节点上操作同一块逻辑上的“共享”数据。如果每次操作都需要通过繁琐的网络协议栈,那么性能瓶颈将不言而喻。mallocnew 只能在本地节点上分配内存,它们无法感知远程节点的内存资源。我们需要一种机制,让一个节点上的 C++ 程序能够像访问本地内存一样,直接、高效地访问另一个节点上的内存。

这就是 RDMA 技术的用武之地。RDMA 允许网络适配器(通常称为 HCA, Host Channel Adapter)直接访问远程节点的内存,绕过远程节点的 CPU 和操作系统内核。这种“零拷贝”(Zero-copy)、“CPU 卸载”(CPU Offload)的特性,使得 RDMA 能够提供极低的延迟和极高的吞吐量,从而彻底改变了分布式系统中内存访问的范式。

本次讲座的目标是,不仅要理解 RDMA 的基本原理,更要探讨如何将其融入 C++ 的内存管理体系,设计并实现一个能够跨越节点边界,提供统一内存分配接口的共享内存池。这不仅仅是性能的提升,更是编程模型的一次革新。

2. 深入理解RDMA与远程内存

在构建我们的跨节点共享内存池之前,我们必须对 RDMA 的核心概念有一个清晰的认识。

2.1 什么是RDMA?核心优势解析

RDMA (Remote Direct Memory Access),顾名思义,是一种允许一台计算机直接访问另一台计算机内存的技术,无需目标计算机的 CPU 参与。这通过特殊的网络硬件(HCA)实现。

RDMA 的核心优势在于:

  • 零拷贝 (Zero-copy): 数据直接从发送方的应用内存传输到接收方的应用内存,无需经过操作系统内核的缓冲区。
  • CPU 卸载 (CPU Offload): 数据传输由 HCA 完全处理,释放 CPU 去执行其他计算任务。
  • 低延迟 (Low Latency): 减少了软件路径上的开销,通常可将网络延迟降低到微秒级别。
  • 高带宽 (High Bandwidth): 能够充分利用网络硬件的全部带宽,实现接近线速的数据传输。

这些特性使得 RDMA 非常适合需要极高性能和低延迟的分布式应用。

2.2 RDMA工作原理概述与关键组件

RDMA 的实现依赖于几个关键组件和概念:

组件名称 英文全称 描述
HCA Host Channel Adapter 专用网络接口卡,支持 RDMA 功能,负责处理 RDMA 请求。
QP Queue Pair RDMA 通信的基本单元,包含一个发送队列 (Send Queue, SQ) 和一个接收队列 (Receive Queue, RQ)。节点间通过 QP 建立连接。
MR Memory Region 应用程序内存区域的注册,使其可被 HCA 访问。注册后会生成一个 lkey (local key) 和 rkey (remote key)。
CQ Completion Queue 用于接收 RDMA 操作完成通知的队列。当一个 RDMA 操作完成时,HCA 会将一个完成事件 (Work Completion, WC) 放入 CQ。
SR Scatter/Gather List (或 SGE) 描述内存缓冲区和其长度的结构体,用于 RDMA 操作中指定数据源和目标。
WQE Work Queue Entry 放入 SQ 或 RQ 的描述 RDMA 操作的条目。

RDMA 操作流程简化版:

  1. 内存注册 (Memory Registration): 应用程序需要将用于 RDMA 传输的内存区域注册到 HCA。注册后,HCA 会为这块内存生成一个本地键(lkey)和一个远程键(rkey)。lkey 用于本地发起 RDMA 操作时标识内存,rkey 用于远程节点发起 RDMA 操作时标识内存。
  2. 建立连接: 两个节点通过 QP 建立连接。这通常涉及交换 QP 号、GID(全局标识符)等信息。
  3. 发起 RDMA 操作: 发起方将一个 Work Queue Entry (WQE) 提交到其 QP 的发送队列 (SQ)。WQE 中包含了操作类型(读、写、原子操作)、目标远程内存的 rkey 和地址、本地内存的 lkey 和地址等信息。
  4. HCA 处理: HCA 接管 WQE,直接从本地注册内存读取数据(对于 RDMA_WRITE)或将数据写入本地注册内存(对于 RDMA_READ),并通过网络将数据传输到远程 HCA。
  5. 远程 HCA 处理: 远程 HCA 根据收到的 RDMA 请求,直接访问其本地注册内存,完成读或写操作。
  6. 完成通知: 操作完成后,两个 HCA 都会将一个 Work Completion (WC) 放入各自的完成队列 (CQ)。应用程序通过轮询或事件通知机制从 CQ 获取 WC,以确认操作完成。

2.3 远程内存抽象与RDMA操作类型

在 C++ 层面,我们可以将远程内存抽象为一个可以通过 RDMA 访问的地址空间。关键在于,我们不能像本地指针那样直接解引用一个远程地址。所有对远程内存的操作都必须通过 RDMA verbs 或其封装层来完成。

常见的 RDMA 操作类型:

  • RDMA WRITE: 将本地注册内存中的数据写入远程注册内存。这是最常用的单向数据传输方式。
  • RDMA READ: 从远程注册内存读取数据到本地注册内存。
  • RDMA ATOMIC_COMPARE_AND_SWAP (CAS): 原子地比较远程内存中的一个值与期望值,如果相等则替换为新值。常用于实现分布式锁或无锁数据结构。
  • RDMA FETCH_AND_ADD (FAA): 原子地读取远程内存中的一个值,并将其加上一个指定值,然后将结果写回远程内存。常用于实现分布式计数器。

RDMA 操作通常是异步的。应用程序提交一个操作后,可以继续执行其他任务,然后稍后查询 CQ 以获取操作完成状态。

// 概念性 RDMA API 封装,实际 RDMA verbs API 更加复杂
// 这是一个高度简化的抽象,用于演示目的

#include <cstdint>
#include <vector>
#include <string>
#include <stdexcept>
#include <iostream>
#include <memory>
#include <mutex>
#include <map>

// 模拟 RDMA 内存区域 (Memory Region)
struct RDMA_MemoryRegion {
    void* addr;         // 本地虚拟地址
    size_t length;      // 区域大小
    uint32_t lkey;      // 本地键
    uint32_t rkey;      // 远程键

    RDMA_MemoryRegion(void* a, size_t len, uint32_t lk, uint32_t rk)
        : addr(a), length(len), lkey(lk), rkey(rk) {}
};

// 模拟远程节点的内存信息,用于发起 RDMA 操作
struct RemoteMemoryInfo {
    uint64_t remote_addr; // 远程节点的内存起始物理地址(或虚拟地址,取决于实现)
    uint32_t rkey;        // 远程键
    size_t length;        // 远程内存区域长度
    // 可以添加更多信息,例如节点ID
};

// 模拟 RDMA 通信层
class RDMADevice {
public:
    RDMADevice() : next_lkey(1), next_rkey(1) {}

    // 模拟内存注册
    std::shared_ptr<RDMA_MemoryRegion> register_memory(void* addr, size_t length) {
        // 在真实 RDMA 中,这里会调用 ibv_reg_mr
        // 模拟生成 lkey 和 rkey
        uint32_t lkey = next_lkey++;
        uint32_t rkey = next_rkey++;
        auto mr = std::make_shared<RDMA_MemoryRegion>(addr, length, lkey, rkey);
        registered_mrs[lkey] = mr;
        std::cout << "[RDMADevice] Registered memory at " << addr << " len " << length
                  << ", lkey=" << lkey << ", rkey=" << rkey << std::endl;
        return mr;
    }

    // 模拟 RDMA Write 操作
    // src_mr: 本地源内存区域
    // src_offset: 本地源内存偏移
    // dest_info: 远程目标内存信息
    // dest_offset: 远程目标内存偏移
    // size: 传输大小
    void rdma_write(std::shared_ptr<RDMA_MemoryRegion> src_mr, size_t src_offset,
                    const RemoteMemoryInfo& dest_info, size_t dest_offset, size_t size) {
        if (!src_mr) {
            throw std::runtime_error("Source MR is null.");
        }
        if (src_offset + size > src_mr->length) {
            throw std::runtime_error("Source buffer overflow.");
        }
        if (dest_offset + size > dest_info.length) {
            throw std::runtime_error("Destination buffer overflow.");
        }

        // 模拟 HCA 进行数据传输
        // 实际中,这里会提交一个 WQE 到 SQ
        std::cout << "[RDMADevice] Simulating RDMA WRITE from local MR(lkey=" << src_mr->lkey
                  << ") addr " << (static_cast<char*>(src_mr->addr) + src_offset)
                  << " to remote addr " << (dest_info.remote_addr + dest_offset)
                  << " (rkey=" << dest_info.rkey << ") size " << size << std::endl;

        // 在实际 RDMA 中,这里是异步操作,需要轮询 CQ
        // 为了演示目的,我们假设它是同步完成的
    }

    // 模拟 RDMA Read 操作
    // dest_mr: 本地目标内存区域
    // dest_offset: 本地目标内存偏移
    // src_info: 远程源内存信息
    // src_offset: 远程源内存偏移
    // size: 传输大小
    void rdma_read(std::shared_ptr<RDMA_MemoryRegion> dest_mr, size_t dest_offset,
                   const RemoteMemoryInfo& src_info, size_t src_offset, size_t size) {
        if (!dest_mr) {
            throw std::runtime_error("Destination MR is null.");
        }
        if (dest_offset + size > dest_mr->length) {
            throw std::runtime_error("Destination buffer overflow.");
        }
        if (src_offset + size > src_info.length) {
            throw std::runtime_error("Source buffer overflow.");
        }

        std::cout << "[RDMADevice] Simulating RDMA READ from remote addr " << (src_info.remote_addr + src_offset)
                  << " (rkey=" << src_info.rkey << ") to local MR(lkey=" << dest_mr->lkey
                  << ") addr " << (static_cast<char*>(dest_mr->addr) + dest_offset)
                  << " size " << size << std::endl;

        // 实际中,这里会提交一个 WQE 到 SQ
    }

    // 模拟 RDMA Atomic Compare And Swap (CAS)
    // target_info: 远程目标内存信息
    // target_offset: 远程目标内存偏移
    // expected: 期望值
    // new_value: 新值
    // 返回:远程内存中的旧值
    uint64_t rdma_atomic_cas(const RemoteMemoryInfo& target_info, size_t target_offset,
                             uint64_t expected, uint64_t new_value) {
        if (target_offset + sizeof(uint64_t) > target_info.length) {
            throw std::runtime_error("Target buffer overflow for CAS.");
        }
        std::cout << "[RDMADevice] Simulating RDMA ATOMIC_CAS on remote addr " << (target_info.remote_addr + target_offset)
                  << " (rkey=" << target_info.rkey << ") expected=" << expected
                  << ", new_value=" << new_value << std::endl;
        // 真实 RDMA 中,HCA 会在远程执行 CAS 并返回旧值
        // 这里我们无法模拟远程内存的实际值,假设总是成功并返回期望值
        return expected; // 简化处理,实际需要远程节点配合模拟
    }

private:
    uint32_t next_lkey;
    uint32_t next_rkey;
    std::map<uint32_t, std::shared_ptr<RDMA_MemoryRegion>> registered_mrs; // 模拟 MR 存储
};

3. 分布式共享内存池的设计挑战与机遇

现在我们已经理解了 RDMA 的基础,是时候考虑如何将其应用于分布式共享内存池了。

3.1 传统内存分配的局限性

在单个节点上,newmalloc 是我们分配内存的常用工具。它们从操作系统或 C++ 运行时库管理的堆中获取内存。但在分布式环境中,它们的局限性显而易见:

  • 本地性: new 只能在当前进程的地址空间中分配内存。
  • 通信开销: 如果一个对象需要跨节点访问,传统方式是序列化对象,通过网络发送,然后在远程节点反序列化。这带来了巨大的 CPU 和带宽开销。
  • 语义不匹配: C++ 指针的语义是针对本地地址空间的。一个远程内存地址无法直接被本地指针解引用。

3.2 共享内存池的优势

内存池(Memory Pool)是一种预先分配一大块内存,然后从中划分子块进行分配和回收的机制。在传统单机多线程环境中,内存池有以下优势:

  • 减少碎片: 避免频繁的 malloc/free 导致的内存碎片。
  • 提高性能: 预分配减少了系统调用开销,分配/回收操作通常更快。
  • 局部性: 相同类型的对象可以分配在相邻的内存区域,有利于缓存命中。

将这些优势扩展到分布式环境,一个跨节点的共享内存池将带来革命性的改变。

3.3 跨节点共享内存池的挑战与RDMA的解决方案

实现跨节点的共享内存池面临诸多挑战,而 RDMA 正是解决这些挑战的关键。

挑战点 描述 RDMA 如何解决
地址空间管理 如何让不同节点的应用程序感知并访问同一逻辑地址? RDMA 通过 rkey 和远程地址 (remote_addr) 来唯一标识远程内存。本地程序通过这些元数据发起操作。物理地址的差异由 HCA 抽象。
所有权与生命周期 谁拥有内存块?何时可以回收?如何跟踪所有权? 内存块的所有权通常归属于提供该内存的节点。分配器需要管理这些远程块的逻辑所有权和引用计数。RDMA 操作本身不涉及所有权概念,但其原子操作可用于实现分布式所有权协议。
并发控制与一致性 多个节点同时访问同一块远程内存时,如何保证数据一致性? RDMA 提供了原子操作(CAS, FAA),可以直接在远程内存上执行原子读-改-写操作,无需远程 CPU 参与,是实现分布式锁、无锁数据结构、或共享计数器的强大工具。
故障恢复 如果提供内存的节点宕机,正在使用该内存的其他节点如何处理? RDMA 本身不提供故障恢复。这需要上层应用或框架实现。例如,可以引入冗余机制、心跳检测和重新分配策略。但 RDMA 的高性能可以加速故障发现和恢复过程。
语义转换 如何将 C++ 的指针语义映射到远程内存访问? 需要封装一个“远程指针”类型,它内部存储 rkeyremote_addr,并重载操作符,使其通过 RDMA READ/WRITE 实现类似指针的行为,或者在本地缓存数据。
元数据同步 内存池的空闲列表、位图等元数据如何在节点间高效同步? 可以将元数据本身也注册为 RDMA 内存区域,通过 RDMA READ/WRITE 同步,或更高效地使用 RDMA 原子操作来维护共享的空闲列表头部指针或位图。

RDMA 最直接的贡献在于,它将网络通信的延迟和 CPU 开销降低到接近内存访问的水平,使得“远程内存”在性能上变得可行。

4. 基于RDMA的跨节点共享内存池分配机制实现

现在,让我们来构建这个机制。我们将设计一个模块化的系统,其中包含 RDMA 通信层、远程内存区域抽象、远程内存池管理器以及最终的 C++ 分配器接口。

4.1 架构概览

我们的系统将由以下几个主要部分组成:

  1. RDMA 通信层 (RDMADevice): 封装低级 RDMA verbs API,提供 rdma_read, rdma_write, rdma_atomic_cas 等高级接口。
  2. 远程内存池服务器 (RemoteMemoryPoolServer): 运行在每个节点上,负责预分配大块本地内存,将其注册为 RDMA 内存区域,并管理其内部的内存块分配状态(空闲/已用)。它会通过某种协议将自身的 RemoteMemoryInfo 暴露给其他节点。
  3. 远程内存池客户端 (RemoteMemoryPoolClient): 运行在需要远程内存的节点上,它维护一份所有远程节点提供的内存池元数据。当需要分配内存时,它会向合适的远程节点发起 RDMA 操作。
  4. C++ 跨节点分配器 (CrossNodeAllocator): 提供 std::allocator 兼容接口,供 C++ 应用程序直接使用。它会通过 RemoteMemoryPoolClient 来获取和释放内存。
  5. 远程指针 (RDMARemotePtr): 一个智能指针类型,封装远程内存地址和 rkey,提供安全的远程内存访问。

基本流程:

  1. 初始化: 每个节点启动时,初始化 RDMADevice,并创建一个或多个 RemoteMemoryPoolServer 实例,预分配并注册内存。
  2. 节点间握手: 节点间通过传统网络(如 TCP)建立初始连接,交换彼此的 RemoteMemoryInfo(包括 HCA 信息、QP 信息、以及所有已注册内存池的 rkey 和基地址)。
  3. 分配请求: 当一个 C++ 应用程序通过 CrossNodeAllocator 请求内存时,分配器会向 RemoteMemoryPoolClient 发出请求。
  4. 远程分配: RemoteMemoryPoolClient 根据策略(例如,轮询、负载均衡)选择一个远程 RemoteMemoryPoolServer,并使用 RDMA 原子操作(如 CAS)去修改远程服务器内存池的空闲列表或位图,从而“占有”一个内存块。
  5. 返回远程指针: 分配器返回一个 RDMARemotePtr,指向新分配的远程内存块。
  6. 远程操作: 应用程序通过 RDMARemotePtr 的方法(如 read_value, write_value)间接执行 RDMA READ/WRITE 操作,访问远程内存。
  7. 释放: 当内存不再需要时,CrossNodeAllocator 通过 RemoteMemoryPoolClient 向拥有该内存块的 RemoteMemoryPoolServer 发送 RDMA 原子操作,将其归还到空闲列表。

4.2 核心组件设计与代码示例

我们将逐步构建这些组件。

4.2.1 远程指针抽象 (RDMARemotePtr)

这是 C++ 应用程序与远程内存交互的桥梁。它不直接指向本地地址,而是封装了远程内存的 remote_addrrkey

// rdma_remote_ptr.h
#pragma once

#include <cstdint>
#include <stdexcept>
#include <memory> // For std::shared_ptr

// 前向声明 RDMADevice
class RDMADevice;

// 封装远程内存地址和 rkey 的智能指针
template<typename T>
class RDMARemotePtr {
public:
    // 默认构造函数,表示一个空的远程指针
    RDMARemotePtr() : remote_addr_(0), rkey_(0), length_(0), rdma_device_(nullptr) {}

    // 构造函数,需要远程地址、rkey、内存块长度以及RDMA设备实例
    RDMARemotePtr(uint64_t remote_addr, uint32_t rkey, size_t length, std::shared_ptr<RDMADevice> device)
        : remote_addr_(remote_addr), rkey_(rkey), length_(length), rdma_device_(device) {
        if (!rdma_device_) {
            throw std::runtime_error("RDMARemotePtr requires a valid RDMADevice instance.");
        }
    }

    // 检查指针是否有效
    bool is_valid() const {
        return remote_addr_ != 0 && rkey_ != 0 && rdma_device_ != nullptr;
    }

    // 获取原始远程地址
    uint64_t get_remote_addr() const { return remote_addr_; }

    // 获取 rkey
    uint32_t get_rkey() const { return rkey_; }

    // 获取内存块长度
    size_t get_length() const { return length_; }

    // 远程写入数据:将本地数据写入远程内存
    // 注意:这里需要一个本地的注册内存区域作为源。
    // 为了简化,我们假设数据直接从一个栈或堆分配的缓冲区复制。
    // 实际实现中,这个缓冲区也需要是 RDMA 注册过的。
    void write(const T* local_data, size_t count = 1, size_t offset_bytes = 0) const {
        if (!is_valid()) throw std::runtime_error("Invalid RDMARemotePtr for write.");
        if (offset_bytes + count * sizeof(T) > length_) throw std::runtime_error("Write out of bounds.");
        if (!rdma_device_) throw std::runtime_error("RDMA device not available for write.");

        // 在真实 RDMA 中,local_data 对应的内存也需要是注册过的 MR。
        // 为了演示,我们在这里模拟一个临时注册的 MR,或者假设 RDMADevice 内部处理了。
        // 更常见的方式是,write 操作接受一个已经注册的本地 MR 和偏移量。
        // 这里为了接口简单,我们假设 RDMADevice 有能力处理非注册内存的临时写入(通常是内部拷贝到已注册缓冲区)。

        // 模拟一个临时的本地 MR 用于写入
        std::shared_ptr<RDMA_MemoryRegion> temp_mr = rdma_device_->register_memory(const_cast<T*>(local_data), count * sizeof(T));

        RemoteMemoryInfo dest_info = {remote_addr_, rkey_, length_};
        rdma_device_->rdma_write(temp_mr, 0, dest_info, offset_bytes, count * sizeof(T));

        // 理论上这里需要销毁 temp_mr,但我们的模拟器没有提供销毁接口
        // 实际 RDMA API 中,MR 的生命周期需要严格管理。
    }

    // 远程读取数据:从远程内存读取到本地缓冲区
    // local_data 必须是 RDMA 注册过的内存区域的地址,或 RDMADevice 内部处理。
    void read(T* local_data, size_t count = 1, size_t offset_bytes = 0) const {
        if (!is_valid()) throw std::runtime_error("Invalid RDMARemotePtr for read.");
        if (offset_bytes + count * sizeof(T) > length_) throw std::runtime_error("Read out of bounds.");
        if (!rdma_device_) throw std::runtime_error("RDMA device not available for read.");

        // 模拟一个临时的本地 MR 用于读取
        std::shared_ptr<RDMA_MemoryRegion> temp_mr = rdma_device_->register_memory(local_data, count * sizeof(T));

        RemoteMemoryInfo src_info = {remote_addr_, rkey_, length_};
        rdma_device_->rdma_read(temp_mr, 0, src_info, offset_bytes, count * sizeof(T));

        // 理论上这里需要销毁 temp_mr
    }

    // 远程原子比较并交换 (CAS)
    // 只能用于 uint64_t 类型,并且需要确保远程内存中的数据也是 uint64_t 对齐
    uint64_t atomic_cas(uint64_t expected, uint64_t new_value, size_t offset_bytes = 0) const {
        if (!is_valid()) throw std::runtime_error("Invalid RDMARemotePtr for atomic_cas.");
        if (sizeof(T) != sizeof(uint64_t) || offset_bytes % sizeof(uint64_t) != 0) {
            throw std::runtime_error("Atomic CAS only supported for uint64_t aligned data.");
        }
        if (offset_bytes + sizeof(uint64_t) > length_) throw std::runtime_error("Atomic CAS out of bounds.");
        if (!rdma_device_) throw std::runtime_error("RDMA device not available for atomic_cas.");

        RemoteMemoryInfo target_info = {remote_addr_, rkey_, length_};
        return rdma_device_->rdma_atomic_cas(target_info, offset_bytes, expected, new_value);
    }

    // 转换为不同类型,但保持远程地址和 rkey 不变
    template<typename U>
    RDMARemotePtr<U> as() const {
        return RDMARemotePtr<U>(remote_addr_, rkey_, length_, rdma_device_);
    }

private:
    uint64_t remote_addr_;
    uint32_t rkey_;
    size_t length_; // 远程内存块的实际长度
    std::shared_ptr<RDMADevice> rdma_device_; // 关联的 RDMA 设备实例
};

注意: 上述 RDMARemotePtr::writeread 方法为了简化,内部模拟了对本地内存的注册。在实际的 RDMA 编程中,用于 rdma_writerdma_read 的本地缓冲区也必须是预先注册过的 RDMA_MemoryRegion。这通常意味着你需要维护一个本地的、预注册的“暂存区”(staging buffer)或要求用户传递一个已注册的 MR。

4.2.2 远程内存池服务器 (RemoteMemoryPoolServer)

每个节点都会运行一个这样的服务器,它管理本地预分配的 RDMA 内存。

// remote_memory_pool_server.h
#pragma once

#include "rdma_remote_ptr.h" // 包含 RDMA_MemoryRegion 和 RDMADevice
#include <vector>
#include <mutex>
#include <atomic>
#include <algorithm> // For std::find

// 内存块的元数据
struct MemoryBlockMetadata {
    uint64_t offset;    // 相对于内存池基地址的偏移量
    size_t size;        // 块大小
    std::atomic<bool> in_use; // 是否正在使用,通过 RDMA CAS 远程修改
    // 可以在这里添加更多字段,例如拥有者节点ID,以便追踪
};

class RemoteMemoryPoolServer {
public:
    // 构造函数:初始化内存池
    // device: 宿主 RDMA 设备实例
    // total_pool_size: 整个内存池的总大小
    // block_size: 内存池中每个分配块的大小
    RemoteMemoryPoolServer(std::shared_ptr<RDMADevice> device, size_t total_pool_size, size_t block_size)
        : rdma_device_(device), total_pool_size_(total_pool_size), block_size_(block_size) {

        // 1. 在本地分配大块内存
        // 使用 std::vector<char> 作为缓冲区,确保连续性
        // 注意:实际生产环境可能使用 mmap 或其他方式分配大页内存
        local_buffer_ = std::make_unique<char[]>(total_pool_size_);
        if (!local_buffer_) {
            throw std::runtime_error("Failed to allocate local memory for pool.");
        }

        // 2. 将本地内存注册到 RDMA HCA
        main_mr_ = rdma_device_->register_memory(local_buffer_.get(), total_pool_size_);
        if (!main_mr_) {
            throw std::runtime_error("Failed to register memory region for pool.");
        }

        // 3. 初始化内存块元数据和空闲列表
        num_blocks_ = total_pool_size_ / block_size_;
        if (num_blocks_ == 0) {
            throw std::runtime_error("Total pool size is too small for given block size.");
        }

        // 为每个块分配元数据。这些元数据也需要被 RDMA 访问,因此它们也应在注册内存中。
        // 为了简化,我们假定元数据存储在内存池的开头,并且是 RDMA 可访问的。
        // 实际中,元数据可以存储在单独的 RDMA 注册区域,或者在本地维护并通过 RDMA 原子操作更新。
        // 这里我们选择在本地维护元数据,并通过 RDMA CAS 操作远程修改其状态。
        // 这意味着元数据本身不是远程可直接修改的,只有其状态(in_use)通过 CAS 远程修改。
        // 这是一个设计选择,可以避免复杂的元数据RDMA读写。

        // 假设每个块的元数据都存放在一个本地维护的 vector 中
        // 远程节点通过 RDMA CAS 操作去修改这些元数据,这要求元数据本身是可远程访问的。
        // 更现实的场景是,元数据也注册为 MR,然后远程节点直接 CAS 内存中的原子变量。
        // 简化:这里我们假设一个远程节点通过某种方式(例如,另一个注册的MR)知道如何找到并操作这些元数据。

        // 为了演示 RDMA 原子操作,我们将设计一个简单的空闲列表,其头部指针存储在 RDMA 可访问的内存中。
        // 每一个内存块的头部也可以存储指向下一个空闲块的指针,构成一个链表。

        // 简化版:我们假设一个简单的位图或一个原子计数器来管理空闲块。
        // 最简单的:每个块的元数据都是本地的,远程节点通过 CAS 尝试“预定”一个块。
        // 我们可以将空闲链表的头指针本身存储在 RDMA 注册内存中,供远程节点 CAS。

        // 让我们采用一个更简单的策略:一个共享的“下一个可用块索引”原子变量。
        // 这个变量本身可以放在主 MR 的某个固定偏移量处,并用 CAS 保护。
        // 但这会导致所有分配都争用这一个变量,效率不高。
        // 更合理的:每个块自身携带一个 `std::atomic<bool>` 标志,表示是否被占用。
        // 远程分配时,客户端遍历块,找到第一个空闲的,然后尝试 CAS 它的 `in_use` 标志。

        // 我们将元数据也放在主 MR 中,紧接在实际数据块之后或之前。
        // 假设元数据也需要远程访问,所以它也应该在 main_mr_ 中
        // 为了简化,我们不把 MemoryBlockMetadata 对象本身放在 RDMA 区域
        // 而是模拟:远程节点通过某种方式知道某个远程地址对应的块是否空闲。

        // 我们将使用一个简单的位图来管理空闲块,位图本身也注册为 RDMA 内存。
        // 假设位图存储在 main_mr_ 的开始部分。
        size_t bitmap_size_bytes = (num_blocks_ + 7) / 8; // 位图所需字节数
        if (bitmap_size_bytes + block_size_ > total_pool_size_) {
             throw std::runtime_error("Pool size too small to even hold bitmap and one block.");
        }

        bitmap_ = static_cast<uint8_t*>(local_buffer_.get()); // 位图从缓冲区的开始处
        std::memset(bitmap_, 0, bitmap_size_bytes); // 所有块初始为空闲

        // 数据区从位图之后开始
        data_start_offset_ = bitmap_size_bytes;

        std::cout << "[RemoteMemoryPoolServer] Pool initialized: total_size=" << total_pool_size_
                  << ", block_size=" << block_size_ << ", num_blocks=" << num_blocks_
                  << ", bitmap_size=" << bitmap_size_bytes << " bytes." << std::endl;
        std::cout << "[RemoteMemoryPoolServer] Main MR: addr=" << main_mr_->addr
                  << ", lkey=" << main_mr_->lkey << ", rkey=" << main_mr_->rkey << std::endl;
    }

    // 获取内存池的远程信息,供其他节点连接
    RemoteMemoryInfo get_pool_info() const {
        return {reinterpret_cast<uint64_t>(main_mr_->addr), main_mr_->rkey, main_mr_->length};
    }

    // 获取 RDMA 设备
    std::shared_ptr<RDMADevice> get_rdma_device() const {
        return rdma_device_;
    }

    // 获取内存池的基地址 (本地)
    void* get_local_base_addr() const {
        return local_buffer_.get();
    }

    // 获取指定块的本地地址
    void* get_local_block_addr(size_t block_idx) const {
        if (block_idx >= num_blocks_) {
            throw std::out_of_range("Block index out of bounds.");
        }
        return static_cast<char*>(local_buffer_.get()) + data_start_offset_ + block_idx * block_size_;
    }

    // 本地分配一个块 (用于测试或本地使用)
    void* local_allocate() {
        std::lock_guard<std::mutex> lock(mtx_); // 保护本地访问
        for (size_t i = 0; i < num_blocks_; ++i) {
            if (!get_bit(i)) { // 如果块空闲
                set_bit(i, true); // 标记为占用
                std::cout << "[RemoteMemoryPoolServer] Local allocated block " << i << std::endl;
                return get_local_block_addr(i);
            }
        }
        return nullptr; // 没有可用块
    }

    // 本地释放一个块 (用于测试或本地使用)
    void local_deallocate(void* ptr) {
        std::lock_guard<std::mutex> lock(mtx_);
        uint64_t offset = static_cast<char*>(ptr) - static_cast<char*>(local_buffer_.get()) - data_start_offset_;
        if (offset < 0 || offset % block_size_ != 0) {
            throw std::runtime_error("Invalid pointer for local deallocate.");
        }
        size_t block_idx = offset / block_size_;
        if (block_idx >= num_blocks_) {
            throw std::out_of_range("Block index out of bounds for deallocate.");
        }
        if (!get_bit(block_idx)) {
            std::cerr << "[RemoteMemoryPoolServer] Warning: Deallocating an already free block " << block_idx << std::endl;
        }
        set_bit(block_idx, false); // 标记为空闲
        std::cout << "[RemoteMemoryPoolServer] Local deallocated block " << block_idx << std::endl;
    }

    // 远程分配一个块
    // 远程客户端会遍历并尝试 CAS 位图中的位
    // 这个方法是服务器端内部使用的,不是直接被远程调用的
    // 远程客户端通过 RDMA 原子操作直接修改位图。
    // 但是,为了在服务器端模拟本地分配,我们需要一个内部函数来操作位图。

    // 获取位图中的位
    bool get_bit(size_t idx) const {
        return (bitmap_[idx / 8] >> (idx % 8)) & 0x1;
    }

    // 设置位图中的位
    void set_bit(size_t idx, bool value) {
        if (value) {
            bitmap_[idx / 8] |= (1 << (idx % 8));
        } else {
            bitmap_[idx / 8] &= ~(1 << (idx % 8));
        }
    }

    size_t get_num_blocks() const { return num_blocks_; }
    size_t get_block_size() const { return block_size_; }
    size_t get_data_start_offset() const { return data_start_offset_; }

    // 获取位图的远程信息
    RemoteMemoryInfo get_bitmap_info() const {
        return {reinterpret_cast<uint64_t>(bitmap_), main_mr_->rkey, (num_blocks_ + 7) / 8};
    }

private:
    std::shared_ptr<RDMADevice> rdma_device_;
    std::unique_ptr<char[]> local_buffer_; // 本地内存缓冲区
    std::shared_ptr<RDMA_MemoryRegion> main_mr_; // 注册的 RDMA 内存区域

    size_t total_pool_size_;
    size_t block_size_;
    size_t num_blocks_;

    uint8_t* bitmap_; // 用于跟踪空闲块的位图,位于 local_buffer_ 内部
    size_t data_start_offset_; // 数据区在 local_buffer_ 中的起始偏移量

    mutable std::mutex mtx_; // 保护本地访问的互斥锁 (主要用于本地调试和测试)
};
4.2.3 远程内存池客户端 (RemoteMemoryPoolClient)

客户端负责与一个或多个 RemoteMemoryPoolServer 交互,进行实际的远程内存分配和释放。

// remote_memory_pool_client.h
#pragma once

#include "rdma_remote_ptr.h"
#include "remote_memory_pool_server.h" // 获取 RemoteMemoryInfo
#include <vector>
#include <map>
#include <mutex>
#include <random>

// 代表一个远程内存池的信息,客户端持有
struct RemotePoolClientInfo {
    uint64_t base_remote_addr; // 远程内存池的基地址
    uint32_t rkey;             // 远程内存池的 rkey
    size_t total_length;       // 远程内存池的总长度
    size_t block_size;         // 块大小
    size_t num_blocks;         // 块数量

    RemoteMemoryInfo bitmap_info; // 位图的远程信息
};

class RemoteMemoryPoolClient {
public:
    RemoteMemoryPoolClient(std::shared_ptr<RDMADevice> device)
        : rdma_device_(device) {
        if (!rdma_device_) {
            throw std::runtime_error("RemoteMemoryPoolClient requires a valid RDMADevice instance.");
        }
    }

    // 添加一个远程服务器的内存池信息
    void add_remote_pool(const RemotePoolClientInfo& info) {
        std::lock_guard<std::mutex> lock(mtx_);
        remote_pools_.push_back(info);
        std::cout << "[RemoteMemoryPoolClient] Added remote pool: addr=" << info.base_remote_addr
                  << ", rkey=" << info.rkey << ", blocks=" << info.num_blocks << std::endl;
    }

    // 分配一个远程内存块
    template<typename T>
    RDMARemotePtr<T> allocate_block() {
        std::lock_guard<std::mutex> lock(mtx_); // 保护远程池列表访问

        if (remote_pools_.empty()) {
            throw std::runtime_error("No remote memory pools available.");
        }

        // 简单轮询策略选择一个远程池
        // 实际场景可能需要更复杂的负载均衡或NUMA感知策略
        size_t start_idx = random_engine_() % remote_pools_.size();

        for (size_t i = 0; i < remote_pools_.size(); ++i) {
            size_t pool_idx = (start_idx + i) % remote_pools_.size();
            const auto& pool_info = remote_pools_[pool_idx];

            // 尝试在这个远程池中分配一个块
            // 客户端需要遍历远程位图,找到一个空闲位,并尝试通过 CAS 抢占它
            // 这是最复杂的部分,需要 RDMA READ 整个位图,或者分块读取,然后 CAS。
            // 假设我们一次性读取整个位图到本地,然后找到一个空闲位,再 CAS 远程位图。
            // 这会导致竞争,如果多个客户端同时读取并尝试 CAS 同一个位,只有一个会成功。
            // 失败的客户端需要重试。

            // 1. 读取远程位图到本地缓存
            size_t bitmap_size_bytes = (pool_info.num_blocks + 7) / 8;
            std::vector<uint8_t> local_bitmap_cache(bitmap_size_bytes);

            // 为了简化,我们假设 RDMA_MemoryRegion 可以在内部创建临时缓冲区
            // 实际中,local_bitmap_cache 的内存也需要被注册。
            std::shared_ptr<RDMA_MemoryRegion> temp_mr_bitmap = rdma_device_->register_memory(local_bitmap_cache.data(), bitmap_size_bytes);

            RemoteMemoryInfo remote_bitmap_info = pool_info.bitmap_info; // 远程位图信息
            rdma_device_->rdma_read(temp_mr_bitmap, 0, remote_bitmap_info, 0, bitmap_size_bytes);

            // 2. 在本地缓存中查找空闲块
            for (size_t block_idx = 0; block_idx < pool_info.num_blocks; ++block_idx) {
                size_t byte_idx = block_idx / 8;
                size_t bit_offset = block_idx % 8;

                if (!((local_bitmap_cache[byte_idx] >> bit_offset) & 0x1)) { // 如果本地缓存显示空闲
                    // 3. 尝试通过 RDMA CAS 远程抢占这个块
                    uint64_t expected_byte = local_bitmap_cache[byte_idx];
                    uint64_t new_byte = expected_byte | (1ULL << bit_offset); // 设置为占用

                    // RDMA CAS 操作要求操作的是 uint64_t
                    // 因此,我们需要将整个字节(uint8_t)提升为 uint64_t 进行 CAS
                    // 或者,更精确地,如果 HCA 支持,可以直接 CAS 字节。
                    // 假设我们的 rdma_atomic_cas 能够处理对齐的 uint64_t,且操作的是整个字节所在的 uint64_t 字。
                    // 这意味着我们可能要读取包含该字节的 8 字节字,CAS 后写回。
                    // 为了简化,我们假设 CAS 字节是可行的,或者我们操作的是一个8字节的位图区域

                    // 更真实的 CAS 需要读出整个 uint64_t,然后修改其中的位,再 CAS 整个 uint64_t
                    // 假设 bitmap_info.remote_addr 是 uint64_t 对齐的,并且我们 CAS 整个 uint64_t 块
                    // 这里简化为直接 CAS 字节所在的 uint64_t

                    // 假设位图是 uint64_t 数组
                    uint64_t expected_val_64 = 0; // 这里的 'expected' 应该从远程读取的 uint64_t 完整值来构建
                    uint64_t new_val_64 = 0;      // 这里的 'new_value' 应该从远程读取的 uint64_t 完整值来构建

                    // 精确的 CAS 位图操作:
                    // 1. 读取包含目标位(block_idx)的整个 uint64_t 到本地
                    // 2. 在本地检查该位是否空闲
                    // 3. 如果空闲,计算设置该位后的新 uint64_t 值
                    // 4. 使用旧的 uint64_t 值作为 expected,新的 uint64_t 值作为 new_value 进行 RDMA CAS
                    // 5. 如果 CAS 成功,则分配成功;否则重试。

                    // 为了简化演示,我们假设可以直接 CAS 字节
                    // 这种简化在实际中可能不完全符合 RDMA CAS 的语义,因为它通常操作 uint64_t。
                    // 假设我们的 RDMADevice::rdma_atomic_cas 内部能处理字节级的 CAS

                    // Simplified CAS for demonstration:
                    uint64_t actual_old_byte = rdma_device_->rdma_atomic_cas(
                        pool_info.bitmap_info, // 远程位图的 RemoteMemoryInfo
                        byte_idx,              // 远程位图中的字节偏移
                        expected_byte,         // 期望的字节值 (本地缓存的)
                        new_byte               // 要设置的字节值 (设置了目标位)
                    );

                    if (actual_old_byte == expected_byte) {
                        // CAS 成功,我们成功占用了这个块
                        uint64_t remote_block_addr = pool_info.base_remote_addr + pool_info.data_start_offset + block_idx * pool_info.block_size;
                        std::cout << "[RemoteMemoryPoolClient] Allocated remote block " << block_idx
                                  << " from pool " << pool_idx << " at remote addr " << remote_block_addr << std::endl;

                        // 记录这个分配,以便后续释放
                        std::lock_guard<std::mutex> alloc_lock(allocated_blocks_mtx_);
                        allocated_blocks_[RDMARemotePtr<void>(remote_block_addr, pool_info.rkey, pool_info.block_size, rdma_device_)] = pool_info;

                        return RDMARemotePtr<T>(remote_block_addr, pool_info.rkey, pool_info.block_size, rdma_device_);
                    }
                    // CAS 失败,说明其他客户端在我们读取后和 CAS 之前已经占用了这个块
                    // 继续查找下一个空闲块
                }
            }
        }
        throw std::runtime_error("No available remote memory blocks found in any pool.");
    }

    // 释放一个远程内存块
    template<typename T>
    void deallocate_block(RDMARemotePtr<T> ptr) {
        if (!ptr.is_valid()) {
            throw std::runtime_error("Attempted to deallocate an invalid remote pointer.");
        }

        std::lock_guard<std::mutex> alloc_lock(allocated_blocks_mtx_);
        auto it = allocated_blocks_.find(ptr.as<void>()); // 使用 void 类型的远程指针作为 key
        if (it == allocated_blocks_.end()) {
            throw std::runtime_error("Attempted to deallocate an unmanaged remote pointer.");
        }

        const auto& pool_info = it->second;

        // 计算块在远程池中的索引
        uint64_t offset_in_pool = ptr.get_remote_addr() - pool_info.base_remote_addr - pool_info.data_start_offset;
        if (offset_in_pool < 0 || offset_in_pool % pool_info.block_size != 0) {
            throw std::runtime_error("Invalid remote pointer address for deallocation.");
        }
        size_t block_idx = offset_in_pool / pool_info.block_size;

        if (block_idx >= pool_info.num_blocks) {
            throw std::out_of_range("Block index out of bounds for deallocate.");
        }

        // 尝试通过 RDMA CAS 远程释放这个块(将位图中的位清零)
        size_t byte_idx = block_idx / 8;
        size_t bit_offset = block_idx % 8;

        // 再次,简化 CAS 字节操作
        // 实际需要读取包含该字节的 uint64_t,修改位,再 CAS 整个 uint64_t
        uint64_t expected_byte = 0; // 假设之前是占用的,现在是 1
        uint64_t new_byte = 0;      // 设置为 0 (空闲)

        // 精确的释放 CAS 操作:
        // 1. 读取包含目标位(block_idx)的整个 uint64_t 到本地
        // 2. 在本地检查该位是否被占用
        // 3. 如果占用,计算清除该位后的新 uint64_t 值
        // 4. 使用旧的 uint64_t 值作为 expected,新的 uint64_t 值作为 new_value 进行 RDMA CAS
        // 5. 如果 CAS 成功,则释放成功;否则重试。

        // Simplified CAS for demonstration:
        // Assume we want to change bit from 1 to 0.
        // We read the byte, if the bit is 1, we try to CAS it to 0.
        std::vector<uint8_t> temp_byte_vec(1);
        std::shared_ptr<RDMA_MemoryRegion> temp_mr_byte = rdma_device_->register_memory(temp_byte_vec.data(), 1);
        rdma_device_->rdma_read(temp_mr_byte, 0, pool_info.bitmap_info, byte_idx, 1);

        expected_byte = temp_byte_vec[0];
        if (!((expected_byte >> bit_offset) & 0x1)) {
            std::cerr << "[RemoteMemoryPoolClient] Warning: Deallocating an already free remote block " << block_idx << std::endl;
            // 即使已经空闲,也从管理列表中移除
            allocated_blocks_.erase(it);
            return;
        }
        new_byte = expected_byte & ~(1ULL << bit_offset); // 清除位

        uint64_t actual_old_byte = rdma_device_->rdma_atomic_cas(
            pool_info.bitmap_info, byte_idx, expected_byte, new_byte
        );

        if (actual_old_byte == expected_byte) {
            std::cout << "[RemoteMemoryPoolClient] Deallocated remote block " << block_idx
                      << " from pool." << std::endl;
            allocated_blocks_.erase(it);
        } else {
            // CAS 失败,说明其他客户端在我们读取后和 CAS 之前已经修改了状态(可能已被释放或错误地被其他客户端占用)
            std::cerr << "[RemoteMemoryPoolClient] Error: Failed to CAS deallocate remote block " << block_idx << ". "
                      << "Expected byte: " << std::hex << expected_byte << ", Actual old byte: " << actual_old_byte << std::dec << std::endl;
            throw std::runtime_error("Failed to atomically deallocate remote block.");
        }
    }

private:
    std::shared_ptr<RDMADevice> rdma_device_;
    std::vector<RemotePoolClientInfo> remote_pools_;
    mutable std::mutex mtx_; // 保护 remote_pools_

    // 存储已分配的远程块及其对应的池信息,便于释放时查找
    std::map<RDMARemotePtr<void>, RemotePoolClientInfo> allocated_blocks_;
    mutable std::mutex allocated_blocks_mtx_;

    std::default_random_engine random_engine_; // 用于随机选择池
};

// 为 std::map 定义 RDMARemotePtr 的比较函数,因为它是模板类
namespace std {
    template<typename T>
    struct less<RDMARemotePtr<T>> {
        bool operator()(const RDMARemotePtr<T>& a, const RDMARemotePtr<T>& b) const {
            if (a.get_remote_addr() != b.get_remote_addr()) {
                return a.get_remote_addr() < b.get_remote_addr();
            }
            return a.get_rkey() < b.get_rkey(); // 确保唯一性
        }
    };
}

关于 RDMA CAS 的重要说明:
RDMA verbs 中的原子操作 (ibv_post_send with IBV_WR_ATOMIC_CMP_AND_SWPIBV_WR_ATOMIC_FETCH_AND_ADD) 通常操作 8 字节(uint64_t)数据。这意味着在位图场景中,我们不能直接 CAS 单个位或单个字节。正确的做法是:

  1. 通过 RDMA_READ 读取包含目标位的整个 uint64_t 字。
  2. 在本地修改这个 uint64_t 字,设置或清除目标位。
  3. 通过 RDMA_ATOMIC_CAS 尝试将远程的 uint64_t 字从步骤 1 读取的值更新为步骤 2 计算出的新值。
  4. 如果 RDMA_ATOMIC_CAS 返回的值不等于步骤 1 读取的值,说明在此期间远程 uint64_t 字已被其他节点修改,CAS 失败,需要重试整个过程。

我的 RemoteMemoryPoolClient::allocate_blockdeallocate_block 中的 CAS 逻辑为了简化,假设了可以直接 CAS 字节。在实际生产代码中,需要实现更复杂的 uint64_t 级别的 CAS 逻辑。

4.2.4 C++ 跨节点分配器 (CrossNodeAllocator)

最后,我们提供一个符合 std::allocator 接口的 C++ 分配器,让应用程序可以透明地使用远程内存。

// cross_node_allocator.h
#pragma once

#include "rdma_remote_ptr.h"
#include "remote_memory_pool_client.h"
#include <limits> // For std::numeric_limits

template<typename T>
class CrossNodeAllocator {
public:
    using value_type = T;
    using pointer = RDMARemotePtr<T>;
    using const_pointer = RDMARemotePtr<const T>;
    using reference = void; // Cannot directly reference remote memory
    using const_reference = void; // Cannot directly reference remote memory
    using size_type = std::size_t;
    using difference_type = std::ptrdiff_t;

    template<typename U>
    struct rebind {
        using other = CrossNodeAllocator<U>;
    };

    // 默认构造函数
    CrossNodeAllocator() noexcept : client_(nullptr) {}

    // 构造函数,需要一个 RemoteMemoryPoolClient 实例
    explicit CrossNodeAllocator(std::shared_ptr<RemoteMemoryPoolClient> client) noexcept
        : client_(client) {}

    // 拷贝构造函数
    template<typename U>
    CrossNodeAllocator(const CrossNodeAllocator<U>& other) noexcept
        : client_(other.get_client()) {}

    // 获取内部的 RemoteMemoryPoolClient 实例
    std::shared_ptr<RemoteMemoryPoolClient> get_client() const {
        return client_;
    }

    // 分配内存
    pointer allocate(size_type n) {
        if (n == 0) {
            return pointer();
        }
        if (n * sizeof(T) > std::numeric_limits<size_type>::max()) {
            throw std::bad_alloc(); // 请求过大
        }
        if (!client_) {
            throw std::runtime_error("CrossNodeAllocator not initialized with a RemoteMemoryPoolClient.");
        }

        // 远程内存池分配的是固定大小的块。
        // 这里我们简化处理,假设分配的块大小 >= n * sizeof(T)。
        // 实际中,如果 n * sizeof(T) > block_size,可能需要抛出错误或实现多块分配。
        if (n * sizeof(T) > client_->get_rdma_device()->get_registered_mr_length()) { // 假设 client_ 知道 block_size
             // 这里的判断需要更精确,client_ 应该知道它连接的池的 block_size
             // 为了简化,我们暂时跳过这个精确检查,假设我们请求的 T 大小符合块大小
             // 在实际中,分配器会请求 client 分配一个 `block_size` 大小的块
        }

        return client_->template allocate_block<T>();
    }

    // 释放内存
    void deallocate(pointer p, size_type n) {
        if (!p.is_valid()) {
            return;
        }
        if (!client_) {
            throw std::runtime_error("CrossNodeAllocator not initialized with a RemoteMemoryPoolClient.");
        }
        client_->deallocate_block(p);
    }

    // 构造对象(在远程内存上)
    // 注意:C++ 对象的构造函数需要在本地 CPU 上执行。
    // 这意味着我们不能直接在远程内存上“原地”调用构造函数。
    // 如果 T 包含指针,这些指针也需要指向远程内存。
    // 最常见的做法是,分配远程内存后,通过 RDMA_WRITE 将序列化后的对象数据写入。
    // 或者,远程内存中存储的是 POD 类型或只包含 RDMARemotePtr 的结构体。
    // 这里我们只是分配内存,不执行构造函数。
    // 如果需要构造函数,需要特殊处理,例如通过 RPC 触发远程节点的构造。
    template<typename U, typename... Args>
    void construct(U* p, Args&&... args) {
        // Warning: This `construct` method does NOT call the constructor on the remote node.
        // It operates on a local pointer `p`. For remote memory, this is complex.
        // For POD types, you might just write data. For complex types, you need a remote agent.
        // This is a common limitation when directly mapping C++ allocators to remote memory.
        // For demonstration, we'll assume `T` is a POD type or its construction is trivial
        // (i.e., just zeroing memory, which can be done via RDMA_WRITE).
        // If `p` is an `RDMARemotePtr`, this method is semantically incorrect.
        // We should instead provide a custom method like `remote_construct`.

        // For simplicity, we will assume `T` is a POD type and its construction
        // is effectively just writing the initial data, which can be done remotely.
        // For non-POD types, a remote agent (RPC) would be required to call the constructor.
        // For this allocator, `p` should ideally be a local pointer *to a staging buffer*,
        // then the constructed object is written to remote memory.
        // Or, we might only allow POD types or types that are "remote-constructible"
        // (i.e., their constructor can be emulated via RDMA writes).

        // Let's assume `T` is a POD type for this example.
        // If `T` had a constructor, this part would be problematic for `RDMARemotePtr`.
        // To make it work with `RDMARemotePtr`, we would need to rethink `construct` signature
        // or ensure `T` is trivial.
        // For now, let's just assert that `p` is a local pointer if `construct` is called.
        // std::cout << "Warning: CrossNodeAllocator::construct called. "
        //           << "Actual remote object construction is not directly supported by RDMA." << std::endl;
        // new (p) U(std::forward<Args>(args)...); // This would construct locally!

        // For a true RDMARemotePtr, we would need to write the data
        // For this example, we will ignore construct calls on remote pointers directly.
        // If `p` is `RDMARemotePtr<U>*`, it's trying to construct a `U` on the remote memory.
        // This is where RDMA_WRITE or a remote agent comes in.
        // This is a fundamental challenge for remote memory allocators with C++ semantics.
    }

    // 销毁对象
    template<typename U>
    void destroy(U* p) {
        // Similar to construct, `destroy` operates on local pointers.
        // For remote memory, this means cleaning up data or calling remote destructor via RPC.
        // For POD types, nothing to do. For non-POD, a remote agent is needed.
        // p->~U(); // This would destroy locally!
    }

    size_type max_size() const noexcept {
        return std::numeric_limits<size_type>::max() / sizeof(T);
    }

    bool operator==(const CrossNodeAllocator& other) const noexcept {
        return client_ == other.client_;
    }

    bool operator!=(const CrossNodeAllocator& other) const noexcept {
        return !(*this == other);
    }

private:
    std::shared_ptr<RemoteMemoryPoolClient> client_;

    // 允许其他类型的分配器访问私有成员,用于 rebind
    template<typename U> friend class CrossNodeAllocator;
};

关于 CrossNodeAllocator::constructdestroy 的说明:
这是 C++ 分配器接口与远程内存语义之间最大的不匹配点。C++ 对象的构造函数和析构函数是本地 CPU 执行的。你不能直接通过 RDMA 在远程内存上“调用”一个构造函数。

  • POD 类型或简单数据结构: 如果 T 是 Plain Old Data (POD) 类型,或者其构造函数/析构函数是 trivial 的,那么 construct 可能意味着简单地通过 RDMA_WRITE 写入初始数据(例如,零初始化)。destroy 则可能什么都不做。
  • 复杂对象: 如果 T 是包含指针、虚函数表或其他复杂成员的非 POD 类型,那么在远程内存上“构造”它将需要一个远程代理。这通常意味着在远程节点上运行一个 RPC 服务,由它来分配本地内存,构造对象,然后将构造好的对象数据通过 RDMA WRITE 回写到预先分配的远程 RDMA 内存区域。析构也是类似,通过 RPC 触发远程析构。

因此,CrossNodeAllocatorconstructdestroy 方法在处理 RDMARemotePtr 时,其语义是高度受限的。在实际应用中,通常会选择以下策略之一:

  1. 只在远程内存中存储 POD 类型或专门设计的、不依赖本地构造/析构的结构体。
  2. 为复杂对象实现一个“远程构造器”,它通过 RPC 协调远程节点进行构造。
  3. 使用本地暂存区: 在本地分配并构造对象,然后通过 RDMA_WRITE 将整个对象的数据拷贝到远程内存。这会产生一次额外的内存拷贝,但避免了复杂的远程构造问题。

在我们的示例中,我们假设 T 是一个 POD 类型,并省略了 constructdestroy 的实际实现,因为它们无法直接应用于 RDMARemotePtr。如果用户需要操作 RDMARemotePtr<T>,他们将直接使用 RDMARemotePtrreadwrite 方法。

4.3 整体演示流程

现在,让我们把这些组件放在一起,展示一个简单的端到端流程。

// main.cpp
#include <iostream>
#include <vector>
#include <thread>
#include <chrono>

#include "rdma_remote_ptr.h"
#include "remote_memory_pool_server.h"
#include "remote_memory_pool_client.h"
#include "cross_node_allocator.h"

// 模拟一个数据结构,假设是 POD 类型
struct MyData {
    int id;
    double value;
    char name[16];

    // 构造函数用于本地初始化
    MyData(int i = 0, double v = 0.0, const char* n = "") : id(i), value(v) {
        std::strncpy(name, n, sizeof(name) - 1);
        name[sizeof(name) - 1] = '';
    }

    void print() const {
        std::cout << "  ID: " << id << ", Value: " << value << ", Name: " << name << std::endl;
    }
};

// 模拟 Node 1 (作为 RemoteMemoryPoolServer)
void node1_main() {
    std::cout << "--- Node 1 (Server) Starting ---" << std::endl;
    auto rdma_dev = std::make_shared<RDMADevice>();
    RemoteMemoryPoolServer server(rdma_dev, 4096, 64); // 4KB pool, 64-byte blocks

    // 模拟将服务器信息(包括 RDMA 信息)通过某个发现服务发布
    RemotePoolClientInfo server_info;
    server_info.base_remote_addr = server.get_pool_info().remote_addr;
    server_info.rkey = server.get_pool_info().rkey;
    server_info.total_length = server.get_pool_info().length;
    server_info.block_size = server.get_block_size();
    server_info.num_blocks = server.get_num_blocks();
    server_info.bitmap_info = server.get_bitmap_info();

    // 存储服务器信息,供客户端获取 (实际通过网络传输)
    static RemotePoolClientInfo global_server_info = server_info; // 简化为全局变量

    std::cout << "Node 1 Server ready. Waiting for clients..." << std::endl;
    // 实际服务器会运行一个循环处理请求,这里简化为等待
    std::this_thread::sleep_for(std::chrono::seconds(5)); // 给客户端时间操作

    // 可以检查服务器端内存状态
    // void* local_ptr = server.get_local_block_addr(0);
    // MyData* local_data = static_cast<MyData*>(local_ptr);
    // std::cout << "Node 1 local block 0 after client write: " << std::endl;
    // if (local_data) local_data->print();

    std::cout << "--- Node 1 (Server) Exiting ---" << std::endl;
}

// 模拟 Node 2 (作为 RemoteMemoryPoolClient)
void node2_main() {
    std::cout << "--- Node 2 (Client) Starting ---" << std::endl;
    auto rdma_dev = std::make_shared<RDMADevice>();
    auto client = std::make_shared<RemoteMemoryPoolClient>(rdma_dev);

    // 模拟从发现服务获取服务器信息
    std::this_thread::sleep_for(std::chrono::milliseconds(100)); // 等待服务器启动
    client->add_remote_pool(global_server_info); // 从全局变量获取,实际通过网络

    // 使用 CrossNodeAllocator
    CrossNodeAllocator<MyData> allocator(client);

    std::vector<RDMARemotePtr<MyData>> remote_ptrs;

    // 分配远程内存并写入数据
    for (int i = 0; i < 3; ++i) {
        try {
            RDMARemotePtr<MyData> remote_ptr = allocator.allocate(1); // 分配一个 MyData 大小的块
            if (remote_ptr.is_valid()) {
                remote_ptrs.push_back(remote_ptr);
                MyData local_data(100 + i, 3.14 * i, ("Node2_Data_" + std::to_string(i)).c_str());
                remote_ptr.write(&local_data); // 将本地数据写入远程内存
                std::cout << "Node 2 Client: Allocated and wrote data to remote addr "
                          << remote_ptr.get_remote_addr() << ", rkey " << remote_ptr.get_rkey() << std::endl;
            }
        } catch (const std::exception& e) {
            std::cerr << "Node 2 Client Error during allocation: " << e.what() << std::endl;
        }
    }

    // 从远程内存读取数据
    std::cout << "nNode 2 Client: Reading data from remote memory:" << std::endl;
    for (const auto& ptr : remote_ptrs) {
        MyData read_data;
        ptr.read(&read_data); // 从远程内存读取到本地
        std::cout << "  Read from remote addr " << ptr.get_remote_addr() << ": ";
        read_data.print();
    }

    // 模拟原子操作:尝试 CAS 远程内存中的某个值
    // 假设 MyData 的 id 字段是 uint64_t 对齐的,并且我们想 CAS 它
    // 这需要 MyData 的 id 在远程内存中的偏移量是 8 字节对齐的
    // MyData 是 28 字节,int 是 4 字节,假设它位于 0 偏移
    // CAS 需要操作 uint64_t,所以这里我们不能直接 CAS int
    // 为了演示,假设我们有一个专门的 uint64_t 区域可以 CAS

    // 假设我们有一个远程的计数器,也分配在远程内存中
    // 分配一个 uint64_t 块来存储计数器
    try {
        RDMARemotePtr<uint64_t> remote_counter_ptr = allocator.allocate(1);
        uint64_t initial_val = 0;
        remote_counter_ptr.write(&initial_val); // 初始化为 0

        std::cout << "nNode 2 Client: Performing atomic CAS on remote counter at "
                  << remote_counter_ptr.get_remote_addr() << std::endl;

        uint64_t expected = 0;
        uint64_t new_val = 100;
        uint64_t old_val = remote_counter_ptr.atomic_cas(expected, new_val);
        std::cout << "  CAS result: old value was " << old_val << ". Expected " << expected << ", new " << new_val << std::endl;

        // 验证 CAS 结果
        uint64_t verify_val;
        remote_counter_ptr.read(&verify_val);
        std::cout << "  Verified remote counter value: " << verify_val << std::endl;

        allocator.deallocate(remote_counter_ptr, 1);
    } catch (const std::exception& e) {
        std::cerr << "Node 2 Client Error during CAS/counter ops: " << e.what() << std::endl;
    }

    // 释放远程内存
    std::cout << "nNode 2 Client: Deallocating remote memory:" << std::endl;
    for (auto& ptr : remote_ptrs) {
        try {
            allocator.deallocate(ptr, 1);
            std::cout << "  Deallocated remote addr " << ptr.get_remote_addr() << std::endl;
        } catch (const std::exception& e) {
            std::cerr << "Node 2 Client Error during deallocation: " << e.what() << std::endl;
        }
    }

    std::cout << "--- Node 2 (Client) Exiting ---" << std::endl;
}

// 全局变量用于模拟服务器信息交换
RemotePoolClientInfo global_server_info;

int main() {
    // 启动两个线程模拟两个节点
    std::thread node1_thread(node1_main);
    std::thread node2_thread(node2_main);

    node1_thread.join();
    node2_thread.join();

    std::cout << "All nodes finished." << std::endl;

    return 0;
}

编译与运行(概念性):
这个示例是高度简化的,模拟了 RDMA 的行为。实际编译和运行需要 Infiniband 或 RoCE 驱动和库,例如 libibverbs
例如,在 Linux 上,通常需要安装 libibverbs-devlibrdmacm-dev
编译命令可能类似于:
g++ -std=c++17 main.cpp -o rdma_mem_alloc -libverbs -lrdmacm -pthread
但请注意,我的 RDMADevice 是一个模拟器,无法直接链接到实际的 RDMA 库。这个 main.cpp 只能与我提供的模拟器代码一起运行。

运行结果(示例):


--- Node 1 (Server) Starting ---
[RDMADevice] Registered memory at 0x... len 4096, lkey=1, rkey=1
[RemoteMemoryPoolServer] Pool initialized: total_size=4096, block_size=64, num_blocks=64, bitmap_size=8 bytes.
[RemoteMemoryPoolServer] Main MR: addr=0x..., lkey=1, rkey=1
Node 1 Server ready. Waiting for clients...
--- Node 2 (Client) Starting ---
[RDMADevice] Registered memory at 0x... len 64, lkey=2, rkey=2
[RDMADevice] Simulating RDMA READ from remote addr 0x... (rkey=1) to local MR(lkey=2) addr 0x... size 8
[RDMADevice] Registered memory at 0x... len 64, lkey=3, rkey=3
[RDMADevice] Simulating RDMA ATOMIC_CAS on remote addr 0x... (rkey=1) expected=0, new_value=1
[RemoteMemoryPoolClient] Allocated remote block 0 from pool 0 at remote addr 0x...
[RDMADevice] Registered memory at 0x... len 28, lkey=4, rkey=4
[RDMADevice] Simulating RDMA WRITE from local MR(lkey=4) addr 0x... to remote addr 0x... (rkey=1) size 28
Node 2 Client: Allocated and wrote data to remote addr 0x..., rkey 1
[RDMADevice] Registered memory at 0x... len 64, lkey=5, rkey=5
[RDMADevice] Simulating RDMA READ from remote addr 0x... (rkey=1) to local MR(lkey=5) addr 0x... size 8
[RDMADevice] Registered memory at 0x... len 64, lkey=6, rkey=6
[RDMADevice] Simulating RDMA ATOMIC_CAS on remote addr 0x... (rkey=1) expected=0, new_value=2
[RemoteMemoryPoolClient] Allocated remote block 1 from pool 0 at remote addr 0x...
[RDMADevice] Registered memory at 0x... len 28, lkey=7, rkey=7
[RDMADevice] Simulating RDMA WRITE from local MR(lkey=7) addr 0x... to remote addr 0x... (rkey=1) size 28
Node 2 Client: Allocated and wrote data to remote addr 0x..., rkey 1
[RDMADevice] Registered memory at 0x... len 64, lkey=8, rkey=8
[RDMADevice]

发表回复

您的邮箱地址不会被公开。 必填项已用 * 标注