各位编程爱好者、系统架构师,大家好!
今天,我们将深入探讨C++环境下,如何实现一个高性能、高可控性的物理内存分配器。我们所说的“物理内存”,并非指直接操作硬件地址,而是在操作系统提供的巨大连续内存区域(通常通过mmap等系统调用获取)之上,模拟硬件层级的内存管理,以实现对连续内存块的动态调度。我们的核心策略将围绕“隔离区域”(Contiguous Memory Allocator, CMA)展开。
在现代软件开发中,我们通常依赖于操作系统提供的malloc/new等通用内存分配机制。然而,在某些特定场景下,例如嵌入式系统、游戏引擎、高性能计算、实时系统或需要直接与硬件(如DMA控制器、GPU)交互的应用中,对内存的连续性、分配速度和可预测性有着极高的要求。通用分配器往往无法满足这些严苛的需求,因为它们可能引入不可控的碎片、过高的系统调用开销,或者无法保证分配的内存块在物理上(或至少在虚拟地址空间中是连续的,且底层映射到连续的物理页)是连续的。
这就是自定义内存分配器大显身手的地方,而CMA策略正是为了解决连续性问题而生。本讲座将从内存管理的基础概念出发,逐步深入到CMA分配器的设计哲学、核心数据结构、算法实现、线程安全及性能优化,最终提供一个可操作的C++代码示例。
第一部分:内存管理基础与隔离区域(CMA)概念
1.1 物理内存与虚拟内存概览
在深入自定义分配器之前,我们首先需要对计算机内存管理的基本概念有一个清晰的认识。
- 物理内存 (Physical Memory):这是计算机中实际存在的RAM芯片。每个存储单元都有一个唯一的物理地址。
- 虚拟内存 (Virtual Memory):这是操作系统为每个进程提供的一个抽象概念。每个进程都拥有一个独立的、从0开始的虚拟地址空间。当程序访问一个虚拟地址时,操作系统通过内存管理单元(MMU)将其翻译成对应的物理地址。
通用内存分配器(如malloc)工作在虚拟内存层面。它们从操作系统请求一块或多块虚拟内存区域,然后在这个区域内进行管理和分配。即便malloc返回了一个看似连续的内存块,其底层映射到的物理页在物理内存中也可能是不连续的。这对于大多数应用来说是透明且无关紧要的,但对于某些硬件交互来说,物理连续性是强制性的。
我们的CMA分配器,目标是在一个由操作系统一次性分配的大型、虚拟地址上连续的内存区域内,进一步实现内部管理上的连续性,并尽可能地模拟对“物理”内存的管理。我们假定这个大型区域是我们的“物理内存池”,并且我们将在这个池中寻找连续的块。
1.2 内存碎片问题
内存碎片是内存管理中一个普遍且令人头疼的问题,它主要分为两种:
- 内部碎片 (Internal Fragmentation):当分配器为一个请求分配了一个比请求大小更大的内存块时,多余的部分就形成了内部碎片。例如,如果分配器只能分配固定大小的块(如4KB),而应用程序只请求了1KB,那么剩余的3KB就是内部碎片。
- 外部碎片 (External Fragmentation):当总的空闲内存足够满足一个分配请求,但这些空闲内存分布在多个不连续的小块中,无法找到一个足够大的连续块来满足请求时,就发生了外部碎片。这是CMA分配器最需要关注和解决的问题。
CMA分配器由于其核心目标是提供连续内存,因此对外部碎片问题尤为敏感。一个设计不佳的CMA分配器在长时间运行后,可能会因为外部碎片而导致大内存块分配失败,即使系统仍有大量空闲内存。
1.3 隔离区域(CMA)的核心思想
隔离区域(Contiguous Memory Allocator, CMA)并非一种单一的算法,而是一种内存管理策略,其核心目标是确保为特定用途分配连续的内存块。它通常在系统启动时或程序初始化阶段,从操作系统预留一大块连续的内存区域,然后在这个区域内部进行精细化管理。
CMA的主要特点和应用场景包括:
- 保证连续性:这是其最核心的价值。例如,DMA(直接内存访问)控制器通常需要物理上连续的内存地址来传输数据,否则无法正常工作。
- 专用性:CMA区域通常是为特定目的而保留的,而不是作为通用内存池。这有助于减少与其他通用分配器之间的干扰和碎片化。
- 性能和可预测性:通过避免频繁的系统调用和复杂的页表操作,CMA分配器可以提供更快的分配/释放速度和更可预测的延迟。
- 资源受限环境:在嵌入式系统等内存资源受限的环境中,精细的内存控制至关重要。
与通用分配器(如Buddy System、Slab Allocator、Free List等)不同,CMA更侧重于如何有效地发现、管理和合并连续的空闲区域,以确保在需要时能够找到足够大的连续块。它通常会采用最佳适应(Best Fit)或首次适应(First Fit)等策略,并强调内存合并(Coalescing)机制来对抗外部碎片。
第二部分:内存分配器设计考量
在着手实现CMA分配器之前,我们需要仔细考虑其设计目标、约束以及关键的技术选择。
2.1 目标与约束
主要目标:
- 高效分配与释放连续内存:尽可能降低分配和释放操作的时间开销。
- 最小化外部碎片:这是CMA的核心挑战,需要通过合理的算法和合并机制来解决。
- 支持不同大小的请求:能够灵活地处理从几十字节到几兆字节甚至更大的内存请求。
- 支持内存对齐:许多硬件设备和特定数据结构对内存地址有对齐要求。
- 线程安全:在多线程环境中能够安全地进行内存操作。
主要约束:
- 内存开销(元数据):跟踪内存块状态和信息的元数据需要占用一定的内存。应尽量精简。
- 初始化开销:预分配整个内存池的开销。
- 避免系统调用开销:一旦内存池初始化完成,后续的分配和释放应尽量避免再次调用操作系统接口。
2.2 元数据管理
为了管理内存池中的各个内存块,我们必须存储其状态和属性。这些信息称为元数据。
-
需要存储的信息:
- 块的起始地址:唯一标识一个内存块。
- 块的大小:包括头部和数据区。
- 块的状态:是空闲(Free)还是已分配(Allocated)。
- 相邻块的信息:为了实现高效的合并,我们需要知道一个块的前一个和后一个块。
-
元数据的存储位置:
- 块内头部 (In-band):将元数据直接存储在每个内存块的起始位置。这种方式简单直观,但会占用用户请求内存的一部分,导致返回给用户的指针不是块的起始地址,且增加了内部碎片。
- 独立数据结构 (Out-of-band):将所有内存块的元数据集中存储在一个或多个独立的数据结构中(例如,一个
std::map或std::vector)。这种方式将元数据与用户数据分离,更灵活,但在查找块信息时可能需要额外的哈希表或搜索操作。
考虑到CMA需要频繁地查找相邻块进行合并,且用户对返回地址的“纯净性”可能要求较高,独立数据结构通常是更优的选择,因为它允许我们维护更复杂的索引结构来加速操作。
2.3 分配策略
当接收到一个内存分配请求时,分配器需要从空闲内存块中选择一个合适的块来满足请求。常见的策略有:
- 首次适应 (First Fit):
- 描述:遍历空闲块列表,找到第一个大小足够满足请求的块。
- 优点:实现简单,查找速度快。
- 缺点:倾向于在内存池的低地址端分配,可能导致低地址端产生大量小的碎片,使得后续的大型分配难以成功。
- 最佳适应 (Best Fit):
- 描述:遍历所有空闲块,找到大小最接近请求大小且能满足请求的块。
- 优点:尽量不浪费内存,有助于减少内部碎片,并保留较大的空闲块,从而减缓外部碎片化。
- 缺点:需要遍历或搜索整个空闲块列表,查找速度慢于首次适应。
- 最差适应 (Worst Fit):
- 描述:找到最大的空闲块,从中分割出所需大小的内存。
- 优点:希望通过分割大块来产生中等大小的空闲块,而不是非常小的碎片。
- 缺点:查找速度慢,且实践中可能导致最大的空闲块迅速消失,反而更难满足后续的大型请求。
对于CMA,最佳适应通常是更好的选择,因为它试图保留大的空闲块,从而更好地应对连续内存的需求。然而,为了提高查找效率,我们需要设计合适的空闲块数据结构。
2.4 内存合并 (Coalescing)
内存合并是CMA分配器对抗外部碎片的关键机制。当一个内存块被释放时,分配器会检查其物理(或虚拟地址空间中的)前后相邻的内存块。如果相邻块也是空闲的,那么这些空闲块就会被合并成一个更大的连续空闲块。
- 如何高效查找相邻块:
- 地址排序的列表/树:如果所有空闲块都按照它们的起始地址排序存储,那么找到一个块的相邻块就非常高效。
std::map<void*, BlockDescriptor>就是一个很好的选择。 - 双向链表:在每个块的元数据中包含指向前后相邻块的指针。但在释放时,需要先找到被释放块的元数据,然后才能通过指针访问其邻居。
- 地址排序的列表/树:如果所有空闲块都按照它们的起始地址排序存储,那么找到一个块的相邻块就非常高效。
内存合并的及时性对分配器的性能至关重要。立即合并(Immediate Coalescing)是最常见的策略,即在每次释放操作时立即进行合并。
第三部分:CMA分配器的核心实现
现在,我们开始构建CMA分配器的核心组件。
3.1 基石:内存区域的获取
首先,我们需要从操作系统获取一个大的、连续的内存区域作为我们的内存池。在Linux/Unix系统上,mmap系统调用是实现这一目的的常用方法。
#include <sys/mman.h> // For mmap, munmap
#include <unistd.h> // For sysconf (page size)
#include <stddef.h> // For size_t
#include <cstdint> // For uintptr_t
#include <map>
#include <set>
#include <mutex> // For std::mutex
#include <iostream>
#include <algorithm> // For std::max
// 辅助函数,用于获取系统页大小
size_t get_page_size() {
static size_t page_size = 0;
if (page_size == 0) {
page_size = sysconf(_SC_PAGESIZE);
}
return page_size;
}
// 模拟的物理内存池初始化
void* initialize_memory_pool(size_t total_size) {
// 确保请求大小是页面大小的倍数
size_t page_size = get_page_size();
if (total_size % page_size != 0) {
total_size = (total_size / page_size + 1) * page_size;
}
// 使用mmap分配一个匿名、私有、可读写的内存区域
// MAP_ANONYMOUS: 映射不与任何文件关联
// MAP_PRIVATE: 映射是私有的,修改不会影响其他进程
void* base_address = mmap(
nullptr, // 让系统选择映射地址
total_size, // 映射区域的大小
PROT_READ | PROT_WRITE, // 可读写
MAP_PRIVATE | MAP_ANONYMOUS, // 私有匿名映射
-1, // 文件描述符,-1表示匿名映射
0 // 偏移量
);
if (base_address == MAP_FAILED) {
perror("Failed to mmap memory pool");
return nullptr;
}
std::cout << "Memory pool initialized at: " << base_address << ", size: " << total_size << " bytes." << std::endl;
return base_address;
}
// 释放内存池
void destroy_memory_pool(void* base_address, size_t total_size) {
if (base_address != nullptr) {
if (munmap(base_address, total_size) == -1) {
perror("Failed to munmap memory pool");
} else {
std::cout << "Memory pool at " << base_address << " destroyed." << std::endl;
}
}
}
3.2 内存块结构 (Block Descriptor)
我们将使用一个结构体来描述内存池中的每个块。为了方便管理,我们将元数据与实际的内存块分离开来。
// 内存块描述符
struct BlockDescriptor {
void* address; // 块的起始地址
size_t size; // 块的大小
bool is_free; // 块是否空闲
// 构造函数
BlockDescriptor(void* addr, size_t sz, bool free_status)
: address(addr), size(sz), is_free(free_status) {}
// 为了在std::map中作为值类型,需要默认构造函数
BlockDescriptor() : address(nullptr), size(0), is_free(false) {}
// 调试辅助
void print() const {
std::cout << "Block [Addr: " << address << ", Size: " << size
<< ", Status: " << (is_free ? "FREE" : "ALLOCATED") << "]" << std::endl;
}
};
3.3 核心数据结构:空闲块管理
为了实现最佳适应策略和高效的内存合并,我们需要一个能够快速查找空闲块并支持按地址查找相邻块的数据结构。std::map<void*, BlockDescriptor> 是一个很好的选择,它能自动按地址排序。
_all_blocks: 一个std::map<void*, BlockDescriptor>,存储内存池中所有(空闲和已分配)内存块的描述符,键是块的起始地址。这使得通过地址查找块和遍历相邻块变得高效。_free_blocks_by_size: 一个std::map<size_t, std::set<void*>>,用于实现最佳适应策略。键是空闲块的大小,值是一个std::set,存储所有具有该大小的空闲块的起始地址。std::set保证了同一大小的块可以被快速查找和移除。
3.4 CMAMemoryAllocator 类结构
现在,我们将以上组件整合到一个CMA分配器类中。
class CMAMemoryAllocator {
public:
CMAMemoryAllocator() : _base_address(nullptr), _total_size(0) {}
// 初始化分配器
bool initialize(size_t pool_size) {
if (_base_address != nullptr) {
std::cerr << "Allocator already initialized." << std::endl;
return false;
}
_base_address = initialize_memory_pool(pool_size);
if (_base_address == nullptr) {
return false;
}
_total_size = pool_size;
// 初始化时,整个内存池是一个大的空闲块
BlockDescriptor initial_block(_base_address, _total_size, true);
_all_blocks[_base_address] = initial_block;
_free_blocks_by_size[_total_size].insert(_base_address);
return true;
}
// 销毁分配器
void destroy() {
if (_base_address != nullptr) {
destroy_memory_pool(_base_address, _total_size);
_base_address = nullptr;
_total_size = 0;
_all_blocks.clear();
_free_blocks_by_size.clear();
}
}
// 分配内存
void* allocate(size_t size, size_t alignment = 1) {
std::lock_guard<std::mutex> lock(_mutex);
if (size == 0) return nullptr;
// 确保请求的大小和对齐是合理的
// 为了存储可能的对齐偏移量,我们可能需要额外字节
// 至少需要 sizeof(void*) 来存储原始指针以供deallocate使用
size_t effective_size = size + alignment + sizeof(void*);
// 寻找最佳适应块
auto it_size_map = _free_blocks_by_size.lower_bound(effective_size);
if (it_size_map == _free_blocks_by_size.end()) {
// 没有足够大的空闲块
std::cerr << "Allocation failed: No free block of size " << effective_size << " available." << std::endl;
return nullptr;
}
void* found_block_addr = nullptr;
BlockDescriptor* found_block_desc = nullptr;
// 在找到的第一个大小类别中,寻找地址最小的块 (或者任意一个,因为都是足够大的)
// 事实上,lower_bound 已经找到了满足最小大小要求的第一个类别
// 如果我们想实现纯粹的 Best Fit,我们需要遍历所有满足条件的 size_t 键
// 并从每个 set 中选择一个块,然后比较它们的大小。
// 但为了简化和效率,我们先从第一个足够大的 size_t 类别中找一个。
// 实际上 std::map 的 lower_bound 行为已经很接近 Best Fit 的思想了
// 因为它会找到第一个 (最小的) 足够大的 size_t 键。
// 然后我们从这个键对应的 set 中取一个地址 (通常是第一个,或者任意一个)。
size_t actual_block_size = 0;
for (auto current_it = it_size_map; current_it != _free_blocks_by_size.end(); ++current_it) {
if (!current_it->second.empty()) {
// 找到了一个符合大小要求的空闲块集
found_block_addr = *current_it->second.begin(); // 取出其中一个块的地址
actual_block_size = current_it->first;
// 检查这个块是否能满足对齐要求
BlockDescriptor& candidate_desc = _all_blocks.at(found_block_addr);
uintptr_t current_addr_val = reinterpret_cast<uintptr_t>(candidate_desc.address);
// 计算对齐后的起始地址
uintptr_t aligned_addr_val = (current_addr_val + sizeof(void*) + alignment - 1) & ~(alignment - 1);
// 计算实际用户数据区的起始地址
void* user_ptr = reinterpret_cast<void*>(aligned_addr_val);
// 计算头部(存储原始块地址)的起始地址
void* header_ptr = reinterpret_cast<void*>(aligned_addr_val - sizeof(void*));
// 检查是否在原始块内且大小足够
if (reinterpret_cast<uintptr_t>(header_ptr) >= current_addr_val &&
(reinterpret_cast<uintptr_t>(user_ptr) + size) <= (current_addr_val + candidate_desc.size)) {
found_block_desc = &candidate_desc;
// 由于我们找到了一个满足大小和对齐的块,我们可以立即使用它
// 如果要严格实现Best Fit,需要继续遍历找到最小的那个
// 但对于很多CMA场景,找到第一个合适的就足够了
break;
}
}
}
if (!found_block_desc) {
std::cerr << "Allocation failed: No suitable free block found with alignment " << alignment << "." << std::endl;
return nullptr;
}
// 从空闲列表中移除这个块
_free_blocks_by_size[found_block_desc->size].erase(found_block_desc->address);
if (_free_blocks_by_size[found_block_desc->size].empty()) {
_free_blocks_by_size.erase(found_block_desc->size);
}
// 计算用户实际将获得的地址和存储原始块地址的头部
uintptr_t original_block_addr_val = reinterpret_cast<uintptr_t>(found_block_desc->address);
uintptr_t aligned_user_addr_val = (original_block_addr_val + sizeof(void*) + alignment - 1) & ~(alignment - 1);
void* user_ptr = reinterpret_cast<void*>(aligned_user_addr_val);
void* header_ptr = reinterpret_cast<void*>(aligned_user_addr_val - sizeof(void*));
// 在header_ptr位置存储原始块的地址
*reinterpret_cast<void**>(header_ptr) = found_block_desc->address;
// 处理块的分割
// 1. 原始块的头部 (如果存在前导碎片)
size_t leading_fragment_size = reinterpret_cast<uintptr_t>(header_ptr) - original_block_addr_val;
if (leading_fragment_size > 0) {
BlockDescriptor leading_frag(found_block_desc->address, leading_fragment_size, true);
_all_blocks[leading_frag.address] = leading_frag;
_free_blocks_by_size[leading_frag.size].insert(leading_frag.address);
}
// 2. 分配给用户的块 (实际上是原始块的一部分,包含头部和用户数据)
found_block_desc->address = header_ptr; // 分配出去的是包含头部的完整块
found_block_desc->size = size + sizeof(void*) + (reinterpret_cast<uintptr_t>(user_ptr) - reinterpret_cast<uintptr_t>(header_ptr)); // 实际分配的大小
found_block_desc->is_free = false;
_all_blocks[found_block_desc->address] = *found_block_desc; // 更新all_blocks中的信息,键也可能变化
// 3. 原始块的尾部 (如果存在尾部碎片)
void* end_of_allocated_region = reinterpret_cast<void*>(reinterpret_cast<uintptr_t>(user_ptr) + size);
size_t trailing_fragment_size = (original_block_addr_val + actual_block_size) - reinterpret_cast<uintptr_t>(end_of_allocated_region);
if (trailing_fragment_size > 0) {
void* trailing_frag_addr = end_of_allocated_region;
BlockDescriptor trailing_frag(trailing_frag_addr, trailing_fragment_size, true);
_all_blocks[trailing_frag.address] = trailing_frag;
_free_blocks_by_size[trailing_frag.size].insert(trailing_frag.address);
}
std::cout << "Allocated " << size << " bytes at " << user_ptr << " (actual block from " << found_block_desc->address << ", total size " << found_block_desc->size << ") with alignment " << alignment << std::endl;
return user_ptr; // 返回给用户的是对齐后的地址
}
// 释放内存
void deallocate(void* ptr) {
std::lock_guard<std::mutex> lock(_mutex);
if (ptr == nullptr) return;
// 根据用户指针,回溯找到存储的原始块地址
void* original_block_base_ptr = *reinterpret_cast<void**>(reinterpret_cast<uintptr_t>(ptr) - sizeof(void*));
auto it = _all_blocks.find(original_block_base_ptr);
if (it == _all_blocks.end() || it->second.is_free) {
std::cerr << "Deallocation failed: Invalid or already freed pointer: " << ptr << std::endl;
return;
}
BlockDescriptor& block_to_free = it->second;
std::cout << "Deallocating block at " << ptr << " (original block " << block_to_free.address << ", size " << block_to_free.size << ")" << std::endl;
block_to_free.is_free = true; // 标记为已释放
// 尝试合并前后相邻的空闲块
void* current_block_addr = block_to_free.address;
size_t current_block_size = block_to_free.size;
// 1. 尝试与前一个块合并
auto prev_it = _all_blocks.lower_bound(current_block_addr);
if (prev_it != _all_blocks.begin()) {
--prev_it; // 移动到当前块的前一个块
// 检查前一个块是否真正是紧邻的且空闲
if (prev_it->second.is_free &&
reinterpret_cast<uintptr_t>(prev_it->second.address) + prev_it->second.size == reinterpret_cast<uintptr_t>(current_block_addr)) {
// 从空闲列表中移除前一个块
_free_blocks_by_size[prev_it->second.size].erase(prev_it->second.address);
if (_free_blocks_by_size[prev_it->second.size].empty()) {
_free_blocks_by_size.erase(prev_it->second.size);
}
// 合并:更新当前块的地址和大小
block_to_free.address = prev_it->second.address;
block_to_free.size += prev_it->second.size;
// 从_all_blocks中移除前一个块(因为它已经被合并到当前块了)
_all_blocks.erase(prev_it);
// 更新当前块在_all_blocks中的键(如果地址变了)
_all_blocks.erase(current_block_addr); // 移除旧地址的键
_all_blocks[block_to_free.address] = block_to_free; // 插入新地址的键
current_block_addr = block_to_free.address; // 更新当前块的地址
current_block_size = block_to_free.size;
}
}
// 2. 尝试与后一个块合并
auto next_it = _all_blocks.lower_bound(reinterpret_cast<void*>(reinterpret_cast<uintptr_t>(current_block_addr) + current_block_size));
if (next_it != _all_blocks.end()) {
// 检查后一个块是否真正是紧邻的且空闲
if (next_it->second.is_free &&
reinterpret_cast<uintptr_t>(current_block_addr) + current_block_size == reinterpret_cast<uintptr_t>(next_it->second.address)) {
// 从空闲列表中移除后一个块
_free_blocks_by_size[next_it->second.size].erase(next_it->second.address);
if (_free_blocks_by_size[next_it->second.size].empty()) {
_free_blocks_by_size.erase(next_it->second.size);
}
// 合并:更新当前块的大小
block_to_free.size += next_it->second.size;
// 从_all_blocks中移除后一个块
_all_blocks.erase(next_it);
// 更新当前块在_all_blocks中的信息 (大小已变)
_all_blocks[block_to_free.address] = block_to_free;
current_block_size = block_to_free.size;
}
}
// 将最终合并后的空闲块添加到空闲列表
_free_blocks_by_size[block_to_free.size].insert(block_to_free.address);
}
// 辅助函数:打印所有块的状态
void print_memory_map() const {
std::lock_guard<std::mutex> lock(_mutex);
std::cout << "n--- Memory Map ---" << std::endl;
for (const auto& pair : _all_blocks) {
pair.second.print();
}
std::cout << "--- End Memory Map ---n" << std::endl;
}
// 获取总空闲内存
size_t get_total_free_memory() const {
std::lock_guard<std::mutex> lock(_mutex);
size_t total_free = 0;
for (const auto& size_set_pair : _free_blocks_by_size) {
total_free += size_set_pair.first * size_set_pair.second.size();
}
return total_free;
}
// 获取总已分配内存 (大致估算,不包括元数据开销)
size_t get_total_allocated_memory() const {
std::lock_guard<std::mutex> lock(_mutex);
size_t total_allocated = 0;
for (const auto& pair : _all_blocks) {
if (!pair.second.is_free) {
total_allocated += pair.second.size;
}
}
return total_allocated;
}
private:
void* _base_address; // 内存池的起始地址
size_t _total_size; // 内存池的总大小
mutable std::mutex _mutex; // 线程安全锁
// 存储所有内存块的描述符,按地址排序
std::map<void*, BlockDescriptor> _all_blocks;
// 存储空闲内存块的描述符,按大小排序,方便Best Fit查找
// map的键是size,值是一个set,存储所有这个size的空闲块的地址
std::map<size_t, std::set<void*>> _free_blocks_by_size;
};
对allocate和deallocate的详细解释:
-
allocate(size_t size, size_t alignment):- 线程安全:使用
std::lock_guard确保分配过程的原子性。 - 有效大小计算:除了用户请求的
size,我们还需要额外的空间来存储对齐信息(sizeof(void*)用于存储原始块地址)和满足对齐要求可能产生的填充。 - 最佳适应查找:
_free_blocks_by_size.lower_bound(effective_size)会找到第一个大小大于或等于effective_size的空闲块类别。然后我们在这个类别中选择一个块。为了严格的Best Fit,理论上需要遍历所有满足条件的size类别,找到其中最小的一个,但这会增加开销。当前实现是选择第一个足够大的size类别中的任意一个块。 - 对齐处理:计算出在找到的空闲块中,能够满足
alignment要求的起始地址user_ptr。为了能在deallocate时找到原始块的描述符,我们在user_ptr之前sizeof(void*)的位置存储了原始块的起始地址。 - 块分割:
- 前导碎片:如果找到的空闲块的起始地址到
header_ptr之间有空闲空间,则将其作为一个新的空闲块放回管理系统。 - 已分配块:将
header_ptr到user_ptr + size的区域标记为已分配。 - 尾部碎片:如果
user_ptr + size之后到原始块结束还有空闲空间,则将其作为一个新的空闲块放回管理系统。
- 前导碎片:如果找到的空闲块的起始地址到
- 更新元数据:
_all_blocks和_free_blocks_by_size被相应地更新。
- 线程安全:使用
-
*`deallocate(void ptr)`**:
- 线程安全:同样使用
std::lock_guard。 - 回溯原始块:通过
ptr减去sizeof(void*)找到我们存储的原始块地址,再通过_all_blocks找到对应的BlockDescriptor。 - 标记空闲:将
block_to_free.is_free设为true。 - 内存合并:
- 查找前一个块:使用
_all_blocks.lower_bound(current_block_addr)找到当前块,然后--prev_it得到前一个块。 - 检查并合并:如果前一个块空闲且物理相邻,则将其从
_free_blocks_by_size和_all_blocks中移除,并扩展当前块的大小和地址。 - 查找后一个块:使用
_all_blocks.lower_bound(current_block_addr + current_block_size)找到后一个块。 - 检查并合并:如果后一个块空闲且物理相邻,则将其从
_free_blocks_by_size和_all_blocks中移除,并扩展当前块的大小。
- 查找前一个块:使用
- 添加到空闲列表:将最终合并后的空闲块(可能是一个更大的块)添加到
_free_blocks_by_size中。
- 线程安全:同样使用
3.5 示例用法
int main() {
CMAMemoryAllocator allocator;
const size_t POOL_SIZE = 1024 * 1024; // 1MB
if (!allocator.initialize(POOL_SIZE)) {
return 1;
}
allocator.print_memory_map();
void* p1 = allocator.allocate(100, 16); // 100 bytes, 16-byte alignment
allocator.print_memory_map();
void* p2 = allocator.allocate(500, 32); // 500 bytes, 32-byte alignment
allocator.print_memory_map();
void* p3 = allocator.allocate(200); // 200 bytes, default 1-byte alignment
allocator.print_memory_map();
allocator.deallocate(p2);
allocator.print_memory_map();
void* p4 = allocator.allocate(150, 64); // 150 bytes, 64-byte alignment
allocator.print_memory_map();
allocator.deallocate(p1);
allocator.print_memory_map();
allocator.deallocate(p3);
allocator.print_memory_map();
allocator.deallocate(p4);
allocator.print_memory_map();
std::cout << "Total free memory: " << allocator.get_total_free_memory() << " bytes." << std::endl;
std::cout << "Total allocated memory: " << allocator.get_total_allocated_memory() << " bytes." << std::endl;
allocator.destroy();
return 0;
}
第四部分:线程安全与高级特性
4.1 线程安全
在多线程环境中,多个线程可能同时请求分配或释放内存。如果没有适当的同步机制,将导致数据竞争和未定义行为。
- 互斥锁 (Mutex):最简单也是最常用的方法是使用
std::mutex保护分配器内部的所有关键数据结构(如_all_blocks和_free_blocks_by_size)。在allocate和deallocate方法的开头加上std::lock_guard<std::mutex> lock(_mutex);即可实现粗粒度锁定。 - 锁粒度:当前实现的锁粒度是整个分配器。这意味着任何时候只有一个线程能进行分配或释放操作。对于高并发场景,可以考虑更细粒度的锁(例如,为不同的空闲块列表或不同的内存区域设置独立的锁),但这会显著增加实现的复杂性。
- 无锁算法 (Lock-free Algorithms):对于极高性能要求的场景,可以探索使用原子操作和无锁数据结构(如CAS操作)来实现无锁分配器。这属于高级主题,实现难度和调试难度都非常高。
本实现采用了最简单的全局互斥锁,足以保证正确性,但在高并发下可能成为性能瓶颈。
4.2 内存对齐 (Alignment)
内存对齐是确保数据在内存中以特定边界开始的关键。例如,一个32位整数可能需要4字节对齐,一个DMA缓冲区可能需要64字节甚至更高的对齐。
-
对齐计算:
- 给定一个原始地址
addr和一个对齐值alignment(必须是2的幂),新的对齐地址可以通过(addr + alignment - 1) & ~(alignment - 1)计算得到。 - 我们的
allocate函数在找到一个足够大的块后,会计算出在这个块内能够满足对齐要求的起始地址。
- 给定一个原始地址
-
对齐信息存储:为了在
deallocate时能正确找到原始分配的块,我们必须在返回给用户的地址之前,存储一个指向原始块起始地址的指针。本例中,我们在对齐后的用户指针前sizeof(void*)的位置存储了这个信息。这是一种常见的“in-band”元数据存储方式,它占用了一小部分用户空间,但简化了deallocate的查找过程。
4.3 碎片整理 (Compaction)
碎片整理是消除外部碎片的最有效方法,它通过移动已分配的内存块来合并小的空闲块,形成大的连续空闲区域。
- 挑战:碎片整理是极其复杂的,因为它涉及移动正在使用的内存块。一旦内存块被移动,所有指向该块的指针都将失效。
- 解决方案:
- 句柄 (Handles):不直接返回内存指针,而是返回一个“句柄”(例如,一个指向指针的指针,或一个索引)。当内存块移动时,只需更新句柄指向的指针,而句柄本身不变。
- 垃圾回收 (Garbage Collection):与碎片整理紧密相关,通过跟踪所有内存引用来安全地移动内存。
- 不移动的内存:对于某些不能移动的内存(例如,被硬件DMA锁定的内存),碎片整理无法进行。
对于CMA分配器,由于其核心目标是提供连续性,并且通常用于对性能和实时性要求较高的场景,碎片整理的开销往往是不可接受的。因此,CMA分配器更多地依赖于良好的分配策略和高效的内存合并来预防外部碎片,而不是事后进行整理。本实现不包含碎片整理功能。
4.4 调试与监控
一个健壮的内存分配器需要强大的调试和监控能力。
- 断言 (Assertions):在开发阶段,使用
assert()来检查指针是否为空、大小是否合理、块状态是否一致等。 - 日志记录 (Logging):记录每次分配和释放操作,包括地址、大小、调用栈等信息,这对于追踪内存泄漏和错误非常有用。
- 内存使用统计:提供API来查询当前总空闲内存、总已分配内存、最大空闲块大小、碎片率等。这有助于理解分配器的行为和性能。
- 内存污染 (Memory Poisoning):在分配内存时用特定模式填充未初始化区域,在释放内存时用另一种模式填充已释放区域。这有助于检测未初始化内存的使用和使用已释放内存的错误(use-after-free)。
- 边界检查 (Red Zones):在分配的内存块前后添加一小块“红色区域”,用于检测缓冲区溢出/下溢。
第五部分:性能分析与优化
5.1 时间复杂度
initialize:O(1),主要是一个mmap系统调用和一次std::map插入。allocate:- 查找空闲块:
_free_blocks_by_size.lower_bound()是O(log N),其中N是不同大小的空闲块类别数。然后从std::set中取元素是O(log M),M是该大小类别中的块数。在最坏情况下,如果所有空闲块大小不同,N可能接近总块数。 - 块分割和元数据更新:
std::map的插入和删除操作是O(log K),K是_all_blocks中的总块数。 - 总体而言,分配操作的时间复杂度大致为O(log N + log K)。
- 查找空闲块:
deallocate:- 查找块:
_all_blocks.find()是O(log K)。 - 合并:
std::map的迭代器操作、插入和删除是O(log K)。 - 总体而言,释放操作的时间复杂度大致为O(log K)。
- 查找块:
5.2 空间复杂度
- 元数据开销:每个内存块(无论是空闲还是已分配)都需要一个
BlockDescriptor,存储在_all_blocks中。这会导致每个块的固定开销。如果内存池被分割成大量小块,这个开销会显著增加。 _free_blocks_by_size的开销:存储了空闲块的地址,其大小取决于空闲块的数量和不同大小类别的数量。
5.3 优化策略
- 多级空闲列表 (Multiple Free Lists):将空闲块按照大小范围(例如,小于64字节、64-256字节、256-1KB等)分成多个独立的空闲列表。对于小块请求,可以直接在对应的小块列表中快速查找,避免遍历大块列表。这使得
allocate的查找效率更高。 - Buddy System (伙伴系统):虽然与CMA不同,但伙伴系统是一种高效管理固定大小块的策略,它通过将内存块分裂成两半(伙伴)来满足请求,并在释放时合并伙伴。CMA可以借鉴其高效合并的思想。
- 固定大小块池 (Fixed-Size Pools / Slab Allocator):对于特定大小且频繁分配的对象,可以预先分配一个只管理该大小对象的内存池。这可以消除内部碎片和外部碎片,并极大地加速分配和释放。
- 缓存 (Caching):维护一个最近释放的、常用大小的块的本地缓存(例如,每个线程一个缓存)。在分配小块时,优先从缓存中获取,避免全局锁和复杂的数据结构操作。
- 延迟合并 (Deferred Coalescing):不是在每次释放时都立即合并,而是将释放的块放入一个“待合并”队列,然后定期或在需要时(例如,当分配失败时)批量执行合并操作。这可以减少
deallocate的平均开销,但可能暂时增加碎片。
第六部分:实际应用场景与注意事项
6.1 何时选择CMA分配器
- DMA操作:当硬件设备需要直接访问物理上连续的内存区域时,CMA是必不可少的。
- 图形和游戏开发:纹理、顶点缓冲区、渲染目标等通常需要连续的GPU内存或系统内存,以优化数据传输和渲染性能。
- 实时系统:对内存分配的延迟和抖动有严格要求。CMA可以提供更可预测的性能,避免操作系统调度和虚拟内存管理的开销。
- 嵌入式系统:资源受限,需要对内存进行精细控制,避免不必要的开销和碎片。
- 高性能计算:某些算法或数据结构需要处理大型连续数据集,CMA可以确保数据局部性并提高缓存效率。
6.2 潜在挑战
- 外部碎片化:尽管有合并机制,长时间运行和复杂的分配/释放模式仍然可能导致外部碎片,最终导致大块分配失败。
- 实现复杂性:相比于直接使用
malloc/new,实现一个健壮、高效且线程安全的CMA分配器需要更深入的内存管理知识和细致的编码。 - 调试难度:内存错误(如越界访问、use-after-free)在这种低级别分配器中可能更难发现和调试。
- 固定大小池限制:如果CMA内存池的大小是固定的,一旦耗尽,就无法再分配内存,除非进行碎片整理或扩展池(通常需要重新初始化,这会中断正在运行的应用)。
总结
本讲座详细探讨了在C++环境下实现基于隔离区域(CMA)的物理内存分配器。我们从内存管理的基础概念出发,深入分析了CMA策略的设计考量、核心数据结构和算法实现,包括内存池的获取、块描述符、空闲块列表管理、分配与释放逻辑、内存合并以及线程安全。通过一个具体的代码示例,我们展示了如何构建一个功能完备的CMA分配器。
CMA分配器是为满足特定高性能和连续性要求而设计的专业工具。它的实现虽然比通用分配器复杂,但在需要精细控制内存布局、避免操作系统开销或与硬件直接交互的场景中,其价值无可替代。理解其原理和实现细节,将使您能够构建更高效、更健壮的系统。