C++ 数据压缩算法集成:在 C++ 存储引擎中无缝嵌入 Zstd 或 LZ4 提升磁盘空间利用率与 I/O 效率

各位技术同仁,下午好!

今天,我们将深入探讨一个对于现代数据存储至关重要的议题:如何在 C++ 存储引擎中无缝集成高性能数据压缩算法,特别是 Zstd 和 LZ4,以显著提升磁盘空间利用率和 I/O 效率。在数据爆炸式增长的时代,存储成本和数据吞吐量已成为任何大型系统面临的核心挑战。有效的压缩策略不仅能缓解这些压力,还能间接提升整体系统性能。

作为一名编程专家,我将以讲座的形式,结合理论知识、工程实践与 C++ 代码示例,为大家详细剖析这一过程。我们将从压缩算法的基础原理出发,深入比较 Zstd 和 LZ4 的特性,探讨在存储引擎中集成它们的策略与最佳实践,并最终展望一些高级优化方向。

I. 引言:数据压缩在存储引擎中的核心价值

存储引擎,无论是关系型数据库、NoSQL 键值存储,还是分布式文件系统,都面临着海量数据的存储与访问挑战。随着数据规模的几何级增长,传统的存储方案很快就会遇到瓶颈。

存储引擎面临的挑战:

  1. 数据量激增: 业务数据、日志、多媒体文件等持续累积,导致存储硬件成本高昂。
  2. I/O 瓶颈: 磁盘(尤其是传统 HDD)的随机读写性能远低于 CPU 和内存,成为系统性能的短板。即使是 SSD,在海量小文件或高并发随机读写场景下,也可能达到其 I/O 极限。
  3. 网络带宽限制: 分布式存储系统中,数据在节点间传输的带宽也可能成为瓶颈。

数据压缩的直接效益:

  1. 磁盘空间利用率提升: 这是最直观的好处。通过减少存储数据量,可以显著降低存储硬件成本。
  2. 减少 I/O 操作: 存储的数据量减少意味着在相同逻辑数据量下,物理写入和读取的字节数更少。这直接降低了磁盘 I/O 负载,提高了吞吐量。对于机械硬盘,I/O 次数的减少还能降低寻道时间;对于 SSD,减少写入量有助于延长其寿命。
  3. 网络传输效率提升: 在分布式系统中,压缩数据可以减少网络传输量,从而降低网络延迟,提高数据同步和备份的速度。

数据压缩的间接效益:

  1. 提升缓存命中率: 更少的数据量意味着更多的数据可以被加载到内存或 CPU 缓存中。当应用程序需要访问这些数据时,更有可能直接从缓存中获取,避免了昂贵的磁盘 I/O,从而加速数据处理。
  2. 延长 SSD 寿命: 对于 SSD,写入放大是其寿命的主要影响因素。压缩能够减少写入到闪存的物理数据量,从而降低写入放大,延长 SSD 的使用寿命。

为何选择 Zstd 与 LZ4?
在众多通用数据压缩算法中,Zstandard (Zstd) 和 LZ4 因其卓越的性能和广泛的工业应用而脱颖而出。它们代表了两种不同的设计哲学:

  • LZ4: 极致的压缩速度和解压速度,尤其适用于对延迟要求极高的场景,尽管压缩比相对较低。
  • Zstd: 提供一个可调节的压缩级别范围,从极快的速度(接近 LZ4)到极高的压缩比(媲美 Zlib/Deflate),且解压速度始终保持顶尖水平。

这两种算法都由 Facebook(现 Meta)开源,拥有活跃的社区支持和高度优化的 C 语言实现,并提供了易于集成的 C++ 接口,非常适合在高性能 C++ 存储引擎中应用。

II. 压缩算法基础:Zstd 与 LZ4 的工作原理概览

理解压缩算法的核心原理有助于我们更好地选择和集成它们。

通用压缩原理回顾

大多数现代通用数据压缩算法都基于以下几种核心技术:

  1. LZ77 (Lempel-Ziv 77) 字典匹配: 这是许多流行压缩算法(如 Deflate, Gzip, Zlib, Zstd, LZ4)的基础。其核心思想是查找数据中的重复模式。当发现一个重复的字节序列时,它不会存储原始序列,而是存储一个指向之前出现过的相同序列的“指针”(偏移量和长度)。例如,"ABABAB" 可以被编码为 "AB" 后跟着一个指向 "AB" 的指针。
  2. 霍夫曼编码 (Huffman Coding): 一种变长编码方法,根据字符出现的频率赋予不同的二进制码。频率高的字符使用短码,频率低的字符使用长码,从而实现整体编码长度的缩减。
  3. 行程长度编码 (Run-Length Encoding, RLE): 对于连续重复的字符序列(如 "AAAAABBBB"),RLE 可以将其编码为 "5A4B",存储重复次数和字符本身。

LZ4 核心机制

LZ4 的设计哲学是“速度为王”。它通过一系列优化实现了惊人的压缩和解压速度:

  • 快速的字节匹配查找: LZ4 使用哈希表来快速查找重复的字节序列。它通常只查找长度为 4 字节的序列(minMatchLength),因为更长的匹配会增加查找时间。这种快速匹配机制是其速度的关键。
  • 固定长度的偏移编码: LZ4 的匹配指针通常包含一个相对偏移量和一个匹配长度。为了简化编码和解码,LZ4 对这些长度和偏移量的编码方式进行了优化,使其尽可能紧凑和易于解析。
  • 极简的字面量和匹配对结构: LZ4 的输出流由一系列的“字面量” (Literal) 和“匹配对” (Match) 组成。字面量是未压缩的原始数据,匹配对则包含一个匹配长度和一个后向偏移。其格式设计非常简洁,减少了解析开销。
  • 牺牲压缩比换取速度: 为了达到极致的速度,LZ4 在匹配策略上不如其他算法“贪婪”。它可能不会总是找到最长的匹配或最优的编码,而是选择第一个足够好的匹配。这使得压缩比相对较低,但极大地提升了速度。

简而言之,LZ4 就像一个高效的速记员,它专注于快速识别并标记简单的重复模式,而不追求完美无缺的压缩。

Zstd 核心机制

Zstd 旨在提供一个在速度和压缩比之间具有出色平衡的通用压缩算法。它结合了多种先进技术:

  • 基于有限状态熵编码 (FSE) 和 非对称数字系统 (ANS): Zstd 使用这些比传统霍夫曼编码更先进的熵编码器。它们能够以更低的计算成本实现更高的编码效率,尤其是在处理小块数据时。
  • 高级字典匹配 (Lz77变体): Zstd 采用了更复杂的 LZ77 变体,包括多级哈希表和后缀数组,以实现更长、更远的匹配查找。它能够更“贪婪”地寻找重复模式,从而获得更高的压缩比。
  • 强大的贪婪匹配策略: Zstd 拥有可配置的匹配查找器,可以根据压缩级别在速度和匹配深度之间进行权衡。在更高的压缩级别下,它会投入更多 CPU 周期来寻找最优的匹配,从而获得更高的压缩比。
  • 可调的压缩级别,平衡速度与比率: Zstd 提供了一个从 1 到 22 的压缩级别范围。级别 1-3 类似于 LZ4 的速度,但通常有更好的压缩比;级别 7-10 是一个很好的通用平衡点;级别 20+ 则追求极致压缩比,可能需要更长的压缩时间,但解压速度依然很快。

Zstd 就像一位经验丰富的速记师,他不仅速度快,而且能够根据任务要求,选择更精确的速记技巧,在保证速度的同时,尽可能地精简内容。

字典压缩的威力

Zstd 和 LZ4 都支持字典压缩。字典是预先从大量相似数据中训练得到的字节序列集合。当压缩器使用字典时,它可以将数据中与字典匹配的部分直接引用字典中的条目,而不是重新编码它们。

  • 优势: 对于结构化或半结构化数据(如 JSON 日志、数据库行、HTTP 请求),它们通常包含大量重复的字段名、关键字、固定格式等。通过预先训练一个包含这些常见模式的字典,可以显著提高压缩比,尤其是在处理小块数据时。字典压缩能够让小块数据享受到大块数据才能达到的高压缩比,因为它为压缩器提供了额外的“上下文”。

III. Zstd 与 LZ4 的深度对比与选型考量

选择合适的压缩算法是集成成功的关键。以下是 Zstd 和 LZ4 的详细对比:

性能指标对比

特性 / 指标 LZ4 Zstd (级别 1-3) Zstd (级别 7-10) Zstd (级别 20+)
压缩速度 极快(数 GB/s) 极快(接近 LZ4) 很快(数百 MB/s – GB/s) 较慢(数十 MB/s)
解压速度 极快(数 GB/s) 极快(数 GB/s,通常略快于压缩) 极快(数 GB/s) 极快(数 GB/s)
压缩比 较低(通常 1.5x – 2.5x) 良好(通常 2x – 3x,优于 LZ4) 优秀(通常 3x – 5x) 极高(通常 5x – 10x+,媲美 Zlib-9)
CPU 占用
内存占用 极低(上下文通常几十 KB) 低(上下文通常几十 KB – 几百 KB) 中(上下文通常几百 KB – 几 MB) 高(上下文可能几十 MB)
适用场景 实时数据、日志、缓存、低延迟网络传输 实时、I/O 密集、对速度和比率均有要求 通用数据库、文件归档、网络传输 冷数据归档、对存储空间极度敏感的场景
字典支持 是(LZ4_compress_fast_usingDict) 是(ZSTD_compress_usingCDict)
流式 API 是(LZ4F Frame API) 是 (ZSTD_compressStream)
数据完整性 LZ4F 帧格式提供校验和 Zstd 帧格式提供校验和 Zstd 帧格式提供校验和 Zstd 帧格式提供校验和

内存占用

  • LZ4: 压缩和解压上下文的内存占用都非常小,通常在几十 KB 范围内。这使其在内存受限的环境中表现出色。
  • Zstd: 内存占用随压缩级别升高而增加。在低级别时(1-3),上下文内存占用与 LZ4 相当;在高级别时(20+),可能需要几十 MB 的内存用于维护内部数据结构(如哈希表、匹配查找器),但解压器的内存占用始终较低。

适用场景分析

  • 选择 LZ4 的场景:

    • 极致速度优先: 对压缩和解压速度有最高要求,即使牺牲部分压缩比也在所不惜。
    • 实时数据处理: 如高吞吐量日志系统、实时消息队列、内存数据库缓存层。
    • 频繁的小块数据压缩: 例如,数据库中的小记录,如果对每条记录进行压缩,LZ4 的低延迟至关重要。
    • CPU 资源有限: LZ4 的 CPU 占用极低,适合在 CPU 负载已经很高的系统中运行。
    • 短期或临时数据: 数据生命周期短,不需要追求极致的存储效率。
  • 选择 Zstd 的场景:

    • 通用数据库存储: 在大多数数据库场景中,Zstd 的中等级别(7-10)可以提供极佳的压缩比和仍然非常快的解压速度,是完美的平衡点。
    • 文件归档与备份: 对于需要长期存储的数据,可以选用更高级别的 Zstd 来最大化存储效率。
    • 网络数据传输: 结合其流式 API 和良好的压缩比,Zstd 是高效网络传输的理想选择。
    • 对解压速度有严格要求,但压缩速度可以接受适度牺牲: Zstd 的一个显著优势是其解压速度在所有压缩级别下都非常快且稳定。
    • 字典压缩的潜力: 如果数据具有重复模式,通过字典训练可以显著提升 Zstd 在小块数据上的压缩表现。

动态选型策略

在实际存储引擎中,可以考虑实现动态压缩策略:

  1. 根据数据类型: 文本日志可能适合 Zstd 高级压缩,而二进制图片或视频数据可能压缩效果不佳,甚至可以不压缩。
  2. 根据访问模式: 热数据(频繁访问)可能选择 LZ4 或 Zstd 低级别,以确保快速解压;冷数据(不常访问)可以考虑 Zstd 高级别,以最大化空间利用率。
  3. 根据存储层级: 内存缓存中的数据可能不压缩或 LZ4;SSD 上的数据可能 Zstd 中级别;HDD 上的数据可能 Zstd 高级别。
  4. 用户可配置: 允许用户根据其工作负载和成本预算选择压缩算法和级别。

IV. 存储引擎中的压缩集成策略

在存储引擎中集成压缩,需要仔细考虑数据的粒度、元数据的管理以及读写路径的改造。

块级压缩 (Block-level Compression)

这是在存储引擎中最常见、最有效的压缩策略。

  • 定义: 将多个逻辑记录打包成一个物理数据块(例如 4KB、16KB、64KB),然后对整个数据块进行压缩。
  • 优势:
    • 批量处理: 压缩器在处理大块数据时通常效率更高,因为有更多重复模式可供查找,字典效果更好。
    • 减少元数据开销: 每个块只需要存储一份压缩元数据,而不是每条记录一份。
    • I/O 效率高: 磁盘 I/O 通常以块为单位进行,读取一个压缩块意味着一次 I/O 操作就能获取多条记录。
  • 劣势:
    • 随机访问单条记录的开销: 如果需要访问块中的某一条记录,即使只读取一个字节,也可能需要解压整个数据块。这会增加随机读的延迟。
    • 更新操作复杂: 更新块中的一条记录可能需要解压整个块,修改数据,然后重新压缩整个块并写回,这被称为“读-修改-写”周期,效率较低。
    • 填充问题 (Fragmentation): 如果块内数据被删除,可能导致块内出现空洞,需要进行垃圾回收或重写。
  • 数据块大小选择: 这是一个重要的参数。
    • 大块: 压缩比通常更高,I/O 次数更少。但随机访问延迟可能更高,更新开销也大。
    • 小块: 随机访问延迟较低,更新开销较小。但压缩比可能降低,元数据开销相对增加,I/O 次数可能增多。
    • 常见大小:4KB, 8KB, 16KB, 32KB, 64KB, 128KB。根据实际数据和工作负载进行基准测试来确定最佳值。

记录级压缩 (Record-level Compression)

  • 定义: 对存储引擎中的每一条逻辑记录进行独立压缩。
  • 优势:
    • 精确的随机访问: 仅需解压所需的单条记录。
    • 灵活处理不同大小记录: 每条记录独立压缩,不受其他记录影响。
    • 更新操作简单: 只需解压、修改、重新压缩并写回单条记录。
  • 劣势:
    • 元数据开销大: 每条记录都需要存储其压缩元数据(压缩类型、原始大小、压缩后大小),这会增加存储开销。
    • 压缩效率可能不如块级: 短记录的可压缩性通常较差,压缩算法难以找到足够的重复模式。
    • CPU 开销高: 每次读写单条记录都需要进行独立的压缩/解压操作,如果记录数量巨大,CPU 负载会很高。
    • 字典效果有限: 字典通常需要一定量的数据才能发挥最佳效果,单条短记录可能无法充分利用字典。

混合模式与自适应策略

结合块级和记录级压缩的优点是一种常见的优化思路:

  • 块内记录压缩: 块级压缩应用于整个数据块,但块内部的每条记录也可以选择性地进行记录级压缩。例如,可以对整个块使用 LZ4,但对块内某些特别大的字段使用 Zstd。
  • 动态阈值: 根据记录大小决定是否进行压缩。例如,小于某个阈值的记录不压缩,大于阈值的记录进行压缩。
  • 自适应压缩算法: 根据数据类型或数据块的特性,动态选择 LZ4 或 Zstd,甚至不同的 Zstd 级别。这需要存储额外的元数据来指示所用的压缩算法和级别。

字典管理

字典在存储引擎中的应用非常关键,尤其对于 Zstd。

  • 全局字典: 从大量历史数据或代表性数据中训练一个或少数几个全局字典。所有数据块都使用相同的字典进行压缩。
    • 优点: 简单、高效,尤其适用于数据模式相似的场景。
    • 缺点: 如果数据模式多样化,单个全局字典可能效果不佳;字典本身需要存储和加载。
  • 数据块字典: 每个数据块或每组数据块拥有自己的字典。
    • 优点: 字典更具针对性,压缩效果可能更好。
    • 缺点: 字典数量多,管理复杂;每个字典都需要存储,增加了元数据开销。
  • 字典更新: 随着数据模式的变化,字典可能需要定期更新。这涉及到字典训练、发布、版本管理以及旧字典数据的兼容性问题。

V. Zstd C++ API 的高效使用

Zstd 库提供了简洁而强大的 C API,可以很容易地在 C++ 中使用。

环境配置

首先,确保你的项目链接了 libzstd 库。如果使用 CMake,可以这样配置:

find_package(ZSTD REQUIRED)
target_link_libraries(YourProject PRIVATE ZSTD::zstd)

基本压缩与解压

最简单的使用方式是 ZSTD_compressZSTD_decompress

#include <zstd.h>
#include <vector>
#include <string>
#include <iostream>
#include <stdexcept> // For std::runtime_error

// 辅助函数:检查Zstd返回码并抛出异常
void check_zstd_error(size_t result, const std::string& msg) {
    if (ZSTD_isError(result)) {
        throw std::runtime_error(msg + ": " + ZSTD_getErrorName(result));
    }
}

// 示例:Zstd 基础压缩与解压
std::vector<char> zstd_compress_data(const std::vector<char>& input_data, int compression_level) {
    if (input_data.empty()) {
        return {};
    }

    // 计算压缩后可能的最大大小
    size_t compressed_buffer_size = ZSTD_compressBound(input_data.size());
    std::vector<char> compressed_data(compressed_buffer_size);

    // 执行压缩
    size_t actual_compressed_size = ZSTD_compress(
        compressed_data.data(), compressed_buffer_size,
        input_data.data(), input_data.size(),
        compression_level
    );
    check_zstd_error(actual_compressed_size, "ZSTD compression failed");

    compressed_data.resize(actual_compressed_size); // 调整到实际大小
    return compressed_data;
}

std::vector<char> zstd_decompress_data(const std::vector<char>& compressed_data, size_t original_size) {
    if (compressed_data.empty()) {
        return {};
    }

    std::vector<char> decompressed_data(original_size);

    // 执行解压
    size_t actual_decompressed_size = ZSTD_decompress(
        decompressed_data.data(), original_size,
        compressed_data.data(), compressed_data.size()
    );
    check_zstd_error(actual_decompressed_size, "ZSTD decompression failed");

    // 实际解压大小必须与原始大小匹配
    if (actual_decompressed_size != original_size) {
        throw std::runtime_error("ZSTD decompression size mismatch");
    }

    return decompressed_data;
}

// int main() {
//     std::string original_str = "This is a sample string that will be compressed using Zstd. "
//                                "It contains some repetitive patterns for better compression demo.";
//     std::vector<char> original_data(original_str.begin(), original_str.end());
//     size_t original_size = original_data.size();

//     try {
//         // 压缩
//         std::vector<char> compressed_data = zstd_compress_data(original_data, 3); // 级别3
//         std::cout << "Original size: " << original_size << " bytesn";
//         std::cout << "Compressed size (Zstd): " << compressed_data.size() << " bytesn";

//         // 解压
//         std::vector<char> decompressed_data = zstd_decompress_data(compressed_data, original_size);
//         std::string decompressed_str(decompressed_data.begin(), decompressed_data.end());
//         std::cout << "Decompressed string: " << decompressed_str << "n";
//         std::cout << "Decompressed size: " << decompressed_data.size() << " bytesn";

//         if (decompressed_str == original_str) {
//             std::cout << "Compression and decompression successful!n";
//         } else {
//             std::cout << "Error: Data mismatch after decompression!n";
//         }
//     } catch (const std::exception& e) {
//         std::cerr << "Error: " << e.what() << std::endl;
//     }
//     return 0;
// }

关键点:

  • ZSTD_compressBound(size_t srcSize): 这个函数用于计算压缩操作所需的最大输出缓冲区大小。总是建议使用它来预分配缓冲区,以避免缓冲区溢出。
  • compression_level: Zstd 压缩级别,范围通常为 1-22。
  • 原始大小的重要性: 解压时,通常需要知道原始数据的确切大小。在存储引擎中,这个原始大小必须作为元数据与压缩数据一起保存。
  • 错误处理: ZSTD_isError(size_t code) 用于检查 Zstd 函数的返回值是否表示错误。ZSTD_getErrorName(size_t code) 可以获取错误的字符串描述。

上下文管理

对于频繁的压缩/解压操作,创建和销毁上下文(ZSTD_CCtx 用于压缩,ZSTD_DCtx 用于解压)会有性能开销。推荐复用这些上下文。

// Zstd 上下文复用示例
class ZstdCompressor {
public:
    ZstdCompressor(int level = 3) : cctx_(ZSTD_createCCtx()), compression_level_(level) {
        if (!cctx_) {
            throw std::runtime_error("Failed to create ZSTD_CCtx");
        }
        ZSTD_CCtx_setParameter(cctx_, ZSTD_c_compressionLevel, compression_level_);
    }

    ~ZstdCompressor() {
        ZSTD_freeCCtx(cctx_);
    }

    std::vector<char> compress(const std::vector<char>& input_data) {
        if (input_data.empty()) {
            return {};
        }
        size_t compressed_buffer_size = ZSTD_compressBound(input_data.size());
        std::vector<char> compressed_data(compressed_buffer_size);
        size_t actual_compressed_size = ZSTD_compressCCtx(
            cctx_, compressed_data.data(), compressed_buffer_size,
            input_data.data(), input_data.size(),
            compression_level_
        );
        check_zstd_error(actual_compressed_size, "ZSTD_compressCCtx failed");
        compressed_data.resize(actual_compressed_size);
        return compressed_data;
    }

private:
    ZSTD_CCtx* cctx_;
    int compression_level_;
};

class ZstdDecompressor {
public:
    ZstdDecompressor() : dctx_(ZSTD_createDCtx()) {
        if (!dctx_) {
            throw std::runtime_error("Failed to create ZSTD_DCtx");
        }
    }

    ~ZstdDecompressor() {
        ZSTD_freeDCtx(dctx_);
    }

    std::vector<char> decompress(const std::vector<char>& compressed_data, size_t original_size) {
        if (compressed_data.empty()) {
            return {};
        }
        std::vector<char> decompressed_data(original_size);
        size_t actual_decompressed_size = ZSTD_decompressDCtx(
            dctx_, decompressed_data.data(), original_size,
            compressed_data.data(), compressed_data.size()
        );
        check_zstd_error(actual_decompressed_size, "ZSTD_decompressDCtx failed");
        if (actual_decompressed_size != original_size) {
            throw std::runtime_error("ZSTD_decompressDCtx size mismatch");
        }
        return decompressed_data;
    }

private:
    ZSTD_DCtx* dctx_;
};

// int main() {
//     // ... (同上,但使用 ZstdCompressor 和 ZstdDecompressor 实例)
//     ZstdCompressor compressor(5);
//     ZstdDecompressor decompressor;
//     // ...
// }

关键点:

  • ZSTD_createCCtx() / ZSTD_createDCtx(): 创建上下文对象。
  • ZSTD_freeCCtx() / ZSTD_freeDCtx(): 释放上下文对象。
  • ZSTD_compressCCtx() / ZSTD_decompressDCtx(): 使用上下文进行压缩/解压。
  • 在多线程环境中,每个线程应该有自己的上下文,或者使用线程安全的上下文池。

字典训练与应用

// Zstd 字典训练与应用示例
// 假设我们有一批相似的文本数据用于训练
std::vector<std::string> sample_data_for_dict = {
    "{"timestamp": "2023-10-27T10:00:00Z", "level": "INFO", "message": "User logged in.", "user_id": "user123"}",
    "{"timestamp": "2023-10-27T10:00:01Z", "level": "WARN", "message": "Failed attempt to access resource.", "user_id": "user456", "resource": "/api/data"}",
    "{"timestamp": "2023-10-27T10:00:02Z", "level": "INFO", "message": "Data processed successfully.", "task_id": "task789"}",
    // ... 更多相似的日志行
};

std::vector<char> zstd_train_dictionary(const std::vector<std::string>& samples, size_t max_dict_size) {
    std::vector<const void*> sample_bufs;
    std::vector<size_t> sample_sizes;
    size_t total_samples_size = 0;

    for (const auto& s : samples) {
        sample_bufs.push_back(s.data());
        sample_sizes.push_back(s.size());
        total_samples_size += s.size();
    }

    std::vector<char> dictionary_buffer(max_dict_size);
    size_t actual_dict_size = ZSTD_trainFromBuffer(
        dictionary_buffer.data(), max_dict_size,
        sample_bufs.data(), sample_sizes.data(), samples.size()
    );
    check_zstd_error(actual_dict_size, "ZSTD dictionary training failed");
    dictionary_buffer.resize(actual_dict_size);
    return dictionary_buffer;
}

// CDict / DDict 是压缩/解压字典的优化表示
// 它们是线程安全的,可以在多个上下文之间共享
class ZstdDictCompressor {
public:
    ZstdDictCompressor(const std::vector<char>& dictionary, int level = 3)
        : cctx_(ZSTD_createCCtx()), cdict_(ZSTD_createCDict(dictionary.data(), dictionary.size())) {
        if (!cctx_ || !cdict_) {
            throw std::runtime_error("Failed to create ZSTD_CCtx or ZSTD_CDict");
        }
        ZSTD_CCtx_setParameter(cctx_, ZSTD_c_compressionLevel, level);
    }

    ~ZstdDictCompressor() {
        ZSTD_freeCCtx(cctx_);
        ZSTD_freeCDict(cdict_); // 注意:CDict 可以被多个 CCtx 共享,但这里为了示例简化,每个实例拥有自己的CDict
    }

    std::vector<char> compress(const std::vector<char>& input_data) {
        if (input_data.empty()) {
            return {};
        }
        size_t compressed_buffer_size = ZSTD_compressBound(input_data.size());
        std::vector<char> compressed_data(compressed_buffer_size);
        size_t actual_compressed_size = ZSTD_compress_usingCDict(
            cctx_, compressed_data.data(), compressed_buffer_size,
            input_data.data(), input_data.size(),
            cdict_
        );
        check_zstd_error(actual_compressed_size, "ZSTD_compress_usingCDict failed");
        compressed_data.resize(actual_compressed_size);
        return compressed_data;
    }

private:
    ZSTD_CCtx* cctx_;
    ZSTD_CDict* cdict_; // 压缩字典
};

class ZstdDictDecompressor {
public:
    ZstdDictDecompressor(const std::vector<char>& dictionary)
        : dctx_(ZSTD_createDCtx()), ddict_(ZSTD_createDDict(dictionary.data(), dictionary.size())) {
        if (!dctx_ || !ddict_) {
            throw std::runtime_error("Failed to create ZSTD_DCtx or ZSTD_DDict");
        }
    }

    ~ZstdDictDecompressor() {
        ZSTD_freeDCtx(dctx_);
        ZSTD_freeDDict(ddict_); // 同理,DDict 可以共享
    }

    std::vector<char> decompress(const std::vector<char>& compressed_data, size_t original_size) {
        if (compressed_data.empty()) {
            return {};
        }
        std::vector<char> decompressed_data(original_size);
        size_t actual_decompressed_size = ZSTD_decompress_usingDDict(
            dctx_, decompressed_data.data(), original_size,
            compressed_data.data(), compressed_data.size(),
            ddict_
        );
        check_zstd_error(actual_decompressed_size, "ZSTD_decompress_usingDDict failed");
        if (actual_decompressed_size != original_size) {
            throw std::runtime_error("ZSTD_decompress_usingDDict size mismatch");
        }
        return decompressed_data;
    }

private:
    ZSTD_DCtx* dctx_;
    ZSTD_DDict* ddict_; // 解压字典
};

// int main() {
//     // ... 训练字典
//     std::vector<char> dict = zstd_train_dictionary(sample_data_for_dict, 131072); // 128KB 字典
//     std::cout << "Trained dictionary size: " << dict.size() << " bytesn";

//     ZstdDictCompressor dict_compressor(dict);
//     ZstdDictDecompressor dict_decompressor(dict);

//     std::string log_entry = "{"timestamp": "2023-10-27T10:00:03Z", "level": "ERROR", "message": "Database connection lost.", "db_id": "main_db"}";
//     std::vector<char> original_data(log_entry.begin(), log_entry.end());
//     size_t original_size = original_data.size();

//     std::vector<char> compressed_data = dict_compressor.compress(original_data);
//     std::cout << "Original size: " << original_size << " bytesn";
//     std::cout << "Compressed size (Zstd with dict): " << compressed_data.size() << " bytesn";

//     std::vector<char> decompressed_data = dict_decompressor.decompress(compressed_data, original_size);
//     std::string decompressed_str(decompressed_data.begin(), decompressed_data.end());
//     std::cout << "Decompressed string: " << decompressed_str << "n";
//     // ... 验证
// }

关键点:

  • ZSTD_trainFromBuffer(): 用于从一组原始数据样本中训练字典。
  • ZSTD_createCDict() / ZSTD_createDDict(): 创建经过优化、线程安全的压缩/解压字典对象。这些对象比原始字典数据更高效。
  • ZSTD_compress_usingCDict() / ZSTD_decompress_usingDDict(): 使用字典进行压缩/解压。

流式压缩与解压 (Streaming API)

流式 API 适用于数据无法一次性全部加载到内存,或者需要管道处理的场景。

// Zstd 流式压缩与解压示例 (简化版)
// 实际应用中需要更完善的循环和缓冲区管理

std::vector<char> zstd_compress_stream(const std::vector<char>& input_data, int compression_level) {
    ZSTD_CCtx* const cctx = ZSTD_createCCtx();
    if (!cctx) throw std::runtime_error("ZSTD_createCCtx failed");
    ZSTD_CCtx_setParameter(cctx, ZSTD_c_compressionLevel, compression_level);

    std::vector<char> compressed_output_buffer;
    size_t const input_buffer_size = ZSTD_CStreamInSize();
    size_t const output_buffer_size = ZSTD_CStreamOutSize();

    // 理论上这里应该分块读取 input_data,并循环处理
    // 为了简化,我们一次性将所有数据作为输入
    ZSTD_inBuffer input = { input_data.data(), input_data.size(), 0 };

    // 预估输出大小,并初始化一个足够大的向量,或者动态增长
    compressed_output_buffer.reserve(ZSTD_compressBound(input_data.size())); 

    while (input.pos < input.size) {
        ZSTD_outBuffer output = { nullptr, output_buffer_size, 0 };
        // 确保 output.dst 指向有效内存
        size_t current_output_start_idx = compressed_output_buffer.size();
        compressed_output_buffer.resize(current_output_start_idx + output_buffer_size);
        output.dst = compressed_output_buffer.data() + current_output_start_idx;

        size_t const ret = ZSTD_compressStream(cctx, &output, &input);
        check_zstd_error(ret, "ZSTD_compressStream failed");
        compressed_output_buffer.resize(current_output_start_idx + output.pos); // 调整到实际写入大小
    }

    // Flush any remaining data
    for (;;) {
        ZSTD_outBuffer output = { nullptr, output_buffer_size, 0 };
        size_t current_output_start_idx = compressed_output_buffer.size();
        compressed_output_buffer.resize(current_output_start_idx + output_buffer_size);
        output.dst = compressed_output_buffer.data() + current_output_start_idx;

        size_t const remaining = ZSTD_endStream(cctx, &output);
        check_zstd_error(remaining, "ZSTD_endStream failed");
        compressed_output_buffer.resize(current_output_start_idx + output.pos);
        if (remaining == 0) break; // All data flushed
    }

    ZSTD_freeCCtx(cctx);
    return compressed_output_buffer;
}

std::vector<char> zstd_decompress_stream(const std::vector<char>& compressed_data, size_t original_size) {
    ZSTD_DCtx* const dctx = ZSTD_createDCtx();
    if (!dctx) throw std::runtime_error("ZSTD_createDCtx failed");

    std::vector<char> decompressed_output_buffer;
    decompressed_output_buffer.reserve(original_size); // 预分配,或动态增长

    size_t const input_buffer_size = ZSTD_DStreamInSize();
    size_t const output_buffer_size = ZSTD_DStreamOutSize();

    ZSTD_inBuffer input = { compressed_data.data(), compressed_data.size(), 0 };

    while (input.pos < input.size) {
        ZSTD_outBuffer output = { nullptr, output_buffer_size, 0 };
        size_t current_output_start_idx = decompressed_output_buffer.size();
        decompressed_output_buffer.resize(current_output_start_idx + output_buffer_size);
        output.dst = decompressed_output_buffer.data() + current_output_start_idx;

        size_t const ret = ZSTD_decompressStream(dctx, &output, &input);
        check_zstd_error(ret, "ZSTD_decompressStream failed");
        decompressed_output_buffer.resize(current_output_start_idx + output.pos);

        if (ret == 0) { // All input consumed, and all output produced.
            break;
        }
    }

    ZSTD_freeDCtx(dctx);

    if (decompressed_output_buffer.size() != original_size) {
        throw std::runtime_error("ZSTD_decompress_stream size mismatch");
    }
    return decompressed_output_buffer;
}

// int main() {
//     std::string original_str = "Long string for streaming compression demonstration... "
//                                "This string is intentionally made longer to better illustrate "
//                                "the streaming capabilities of Zstd. In real-world scenarios, "
//                                "streaming is crucial for handling data of unknown or very large sizes. "
//                                "The input data would typically come from a file stream or network socket, "
//                                "and the output would go to another stream. The buffer management "
//                                "in this example is simplified, but the core API usage is shown.";
//     for (int i = 0; i < 5; ++i) original_str += original_str; // Make it really long
//     std::vector<char> original_data(original_str.begin(), original_str.end());
//     size_t original_size = original_data.size();

//     try {
//         std::vector<char> compressed_data = zstd_compress_stream(original_data, 7);
//         std::cout << "Original size: " << original_size << " bytesn";
//         std::cout << "Compressed size (Zstd Stream): " << compressed_data.size() << " bytesn";

//         std::vector<char> decompressed_data = zstd_decompress_stream(compressed_data, original_size);
//         std::string decompressed_str(decompressed_data.begin(), decompressed_data.end());
//         std::cout << "Decompressed size: " << decompressed_data.size() << " bytesn";

//         if (decompressed_str == original_str) {
//             std::cout << "Streaming compression and decompression successful!n";
//         } else {
//             std::cout << "Error: Streaming data mismatch!n";
//         }
//     } catch (const std::exception& e) {
//         std::cerr << "Error: " << e.what() << std::endl;
//     }
//     return 0;
// }

关键点:

  • ZSTD_CStreamInSize() / ZSTD_CStreamOutSize(): 建议的输入/输出缓冲区大小。
  • ZSTD_inBuffer / ZSTD_outBuffer: 结构体用于管理输入/输出数据的指针、大小和当前位置。
  • ZSTD_compressStream() / ZSTD_decompressStream(): 核心流处理函数。
  • ZSTD_endStream(): 在压缩结束时调用,用于刷新任何剩余的内部缓冲区数据并写入帧结束标记。
  • 缓冲区管理: 在实际应用中,需要更精细地管理输入/输出缓冲区,例如从文件读取数据到输入缓冲区,然后将输出缓冲区的数据写入文件。

VI. LZ4 C++ API 的实践指南

LZ4 库同样提供了高效的 C API。

环境配置

链接 liblz4 库。

find_package(LZ4 REQUIRED)
target_link_libraries(YourProject PRIVATE LZ4::lz4)

基本压缩与解压

LZ4 的 API 通常比 Zstd 更直接,因为它专注于速度。

#include <lz4.h>
#include <lz4hc.h> // for high compression mode
#include <vector>
#include <string>
#include <iostream>
#include <stdexcept>

// 示例:LZ4 基础压缩与解压
std::vector<char> lz4_compress_data(const std::vector<char>& input_data, bool high_compression = false) {
    if (input_data.empty()) {
        return {};
    }

    // 计算压缩后可能的最大大小
    int compressed_buffer_size = LZ4_compressBound(input_data.size());
    std::vector<char> compressed_data(compressed_buffer_size);

    int actual_compressed_size;
    if (high_compression) {
        // LZ4_compress_HC 提供更高的压缩比,但速度较慢
        actual_compressed_size = LZ4_compress_HC(
            input_data.data(), compressed_data.data(), input_data.size(), compressed_buffer_size, 0
        ); // compressionLevel 0 uses default HC level
    } else {
        // LZ4_compress_default 是最快的模式
        actual_compressed_size = LZ4_compress_default(
            input_data.data(), compressed_data.data(), input_data.size(), compressed_buffer_size
        );
    }

    if (actual_compressed_size <= 0) {
        throw std::runtime_error("LZ4 compression failed");
    }

    compressed_data.resize(actual_compressed_size);
    return compressed_data;
}

std::vector<char> lz4_decompress_data(const std::vector<char>& compressed_data, size_t original_size) {
    if (compressed_data.empty()) {
        return {};
    }

    std::vector<char> decompressed_data(original_size);

    int actual_decompressed_size = LZ4_decompress_safe(
        compressed_data.data(), decompressed_data.data(), compressed_data.size(), original_size
    );

    if (actual_decompressed_size <= 0) {
        throw std::runtime_error("LZ4 decompression failed");
    }
    if (static_cast<size_t>(actual_decompressed_size) != original_size) {
        throw std::runtime_error("LZ4 decompression size mismatch");
    }

    return decompressed_data;
}

// int main() {
//     std::string original_str = "This is a sample string that will be compressed using LZ4. "
//                                "It contains some repetitive patterns for better compression demo.";
//     std::vector<char> original_data(original_str.begin(), original_str.end());
//     size_t original_size = original_data.size();

//     try {
//         // 压缩 (默认速度模式)
//         std::vector<char> compressed_data_fast = lz4_compress_data(original_data, false);
//         std::cout << "Original size: " << original_size << " bytesn";
//         std::cout << "Compressed size (LZ4 Fast): " << compressed_data_fast.size() << " bytesn";
//         // 解压
//         std::vector<char> decompressed_data_fast = lz4_decompress_data(compressed_data_fast, original_size);
//         std::string decompressed_str_fast(decompressed_data_fast.begin(), decompressed_data_fast.end());
//         if (decompressed_str_fast == original_str) std::cout << "LZ4 Fast successful!n";

//         // 压缩 (高压缩比模式)
//         std::vector<char> compressed_data_hc = lz4_compress_data(original_data, true);
//         std::cout << "Compressed size (LZ4 HC): " << compressed_data_hc.size() << " bytesn";
//         // 解压
//         std::vector<char> decompressed_data_hc = lz4_decompress_data(compressed_data_hc, original_size);
//         std::string decompressed_str_hc(decompressed_data_hc.begin(), decompressed_data_hc.end());
//         if (decompressed_str_hc == original_str) std::cout << "LZ4 HC successful!n";

//     } catch (const std::exception& e) {
//         std::cerr << "Error: " << e.what() << std::endl;
//     }
//     return 0;
// }

关键点:

  • LZ4_compressBound(size_t srcSize): 计算压缩所需的最大输出缓冲区大小。
  • LZ4_compress_default(): LZ4 最快的压缩函数。
  • LZ4_compress_HC(): LZ4 的高压缩比模式,速度比 _default 慢,但压缩比更好。
  • LZ4_decompress_safe(): 安全解压函数,需要提供压缩数据大小和原始数据最大大小,会进行边界检查。
  • LZ4 没有像 Zstd 那样的上下文对象,因为它的内部状态非常小,可以直接通过函数参数传递。

帧格式与流式处理 (LZ4 Frame Format)

LZ4 库也提供了 LZ4F API,它实现了 LZ4 帧格式,这是一种更健壮的格式,包含魔数、头部、块校验和、帧校验和等,适用于流式处理和确保数据完整性。

#include <lz4frame.h> // For LZ4 Frame API
#include <vector>
#include <string>
#include <iostream>
#include <stdexcept>

// 辅助函数:检查LZ4F返回码
void check_lz4f_error(size_t result, const std::string& msg) {
    if (LZ4F_isError(result)) {
        throw std::runtime_error(msg + ": " + LZ4F_getErrorName(result));
    }
}

std::vector<char> lz4f_compress_stream(const std::vector<char>& input_data, int compression_level = 0) {
    LZ4F_compressionContext_t cctx;
    LZ4F_preferences_t prefs;
    memset(&prefs, 0, sizeof(prefs));
    prefs.frameInfo.blockMode = LZ4F_blockLinked; // 块之间链接
    prefs.frameInfo.contentChecksumFlag = LZ4F_contentChecksumEnabled; // 启用帧校验和
    prefs.compressionLevel = compression_level; // 0 for default, 9 for HC-like

    check_lz4f_error(LZ4F_createCompressionContext(&cctx, LZ4F_VERSION), "LZ4F_createCompressionContext failed");

    std::vector<char> compressed_output_buffer;
    size_t const output_buffer_size = LZ4F_compressFrameBound(input_data.size(), &prefs);
    compressed_output_buffer.reserve(output_buffer_size); // 预分配足够大的空间

    // 写入帧头
    size_t const header_size = LZ4F_compressBegin(cctx, compressed_output_buffer.data(), output_buffer_size, &prefs);
    check_lz4f_error(header_size, "LZ4F_compressBegin failed");
    compressed_output_buffer.resize(header_size);

    // 压缩数据块
    size_t const body_size = LZ4F_compressUpdate(cctx,
                                                 compressed_output_buffer.data() + compressed_output_buffer.size(),
                                                 output_buffer_size - compressed_output_buffer.size(),
                                                 input_data.data(), input_data.size(),
                                                 nullptr); // no preferences for update
    check_lz4f_error(body_size, "LZ4F_compressUpdate failed");
    compressed_output_buffer.resize(compressed_output_buffer.size() + body_size);

    // 写入帧尾
    size_t const end_size = LZ4F_compressEnd(cctx,
                                              compressed_output_buffer.data() + compressed_output_buffer.size(),
                                              output_buffer_size - compressed_output_buffer.size(),
                                              nullptr); // no preferences for end
    check_lz4f_error(end_size, "LZ4F_compressEnd failed");
    compressed_output_buffer.resize(compressed_output_buffer.size() + end_size);

    LZ4F_freeCompressionContext(cctx);
    return compressed_output_buffer;
}

std::vector<char> lz4f_decompress_stream(const std::vector<char>& compressed_data) {
    LZ4F_decompressionContext_t dctx;
    check_lz4f_error(LZ4F_createDecompressionContext(&dctx, LZ4F_VERSION), "LZ4F_createDecompressionContext failed");

    std::vector<char> decompressed_output_buffer;
    size_t const output_buffer_size = 65536; // 典型的解压缓冲区大小
    decompressed_output_buffer.reserve(compressed_data.size() * 2); // 预估一个大小

    size_t src_size = compressed_data.size();
    size_t src_offset = 0;

    for (;;) {
        size_t dst_buf_offset = decompressed_output_buffer.size();
        decompressed_output_buffer.resize(dst_buf_offset + output_buffer_size);

        size_t current_dst_size = output_buffer_size;
        size_t current_src_size = src_size - src_offset;

        size_t const ret = LZ4F_decompress(dctx,
                                           decompressed_output_buffer.data() + dst_buf_offset, &current_dst_size,
                                           compressed_data.data() + src_offset, &current_src_size,
                                           nullptr); // no preferences for decompress

        check_lz4f_error(ret, "LZ4F_decompress failed");

        decompressed_output_buffer.resize(dst_buf_offset + current_dst_size);
        src_offset += current_src_size;

        if (ret == 0) { // End of frame
            break;
        }
        if (src_offset >= src_size && ret != 0) {
            // Reached end of input but not end of frame, implies truncated data
            throw std::runtime_error("LZ4F_decompress: truncated data");
        }
    }

    LZ4F_freeDecompressionContext(dctx);
    return decompressed_output_buffer;
}

// int main() {
//     std::string original_str = "A longer string for LZ4 Frame streaming. "
//                                "This format is more robust and includes checksums, "
//                                "making it suitable for reliable data transfer over networks or disk storage. "
//                                "It handles data in blocks and ensures integrity.";
//     for (int i = 0; i < 3; ++i) original_str += original_str; // Make it longer
//     std::vector<char> original_data(original_str.begin(), original_str.end());
//     size_t original_size = original_data.size();

//     try {
//         std::vector<char> compressed_data = lz4f_compress_stream(original_data); // Default level
//         std::cout << "Original size: " << original_size << " bytesn";
//         std::cout << "Compressed size (LZ4F Stream): " << compressed_data.size() << " bytesn";

//         std::vector<char> decompressed_data = lz4f_decompress_stream(compressed_data);
//         std::string decompressed_str(decompressed_data.begin(), decompressed_data.end());
//         std::cout << "Decompressed size: " << decompressed_data.size() << " bytesn";

//         if (decompressed_str == original_str) {
//             std::cout << "LZ4F streaming compression and decompression successful!n";
//         } else {
//             std::cout << "Error: LZ4F streaming data mismatch!n";
//         }
//     } catch (const std::exception& e) {
//         std::cerr << "Error: " << e.what() << std::endl;
//     }
//     return 0;
// }

关键点:

  • LZ4F_createCompressionContext() / LZ4F_createDecompressionContext(): 创建帧上下文。
  • LZ4F_compressBegin(): 写入帧头。
  • LZ4F_compressUpdate(): 压缩数据块。
  • LZ4F_compressEnd(): 写入帧尾。
  • LZ4F_decompress(): 解压数据,一次处理一个或多个块。
  • LZ4F_preferences_t: 配置压缩选项,如块模式、校验和等。
  • LZ4F 提供了内置的数据完整性校验机制(如帧校验和),这在 Zstd 的帧格式中也有体现。

VII. 存储引擎架构的改造与集成点

将压缩集成到存储引擎中,需要对数据块的存储格式、读写路径以及元数据管理进行改造。

数据块结构定义

每个存储在磁盘上的数据块,无论是否压缩,都应包含一个头部,用于描述其内容。

#include <cstdint> // For fixed-width integers like uint8_t, uint32_t, uint64_t

// 抽象的存储块头结构
struct StorageBlockHeader {
    // 压缩算法类型枚举
    enum CompressionType : uint8_t {
        NONE = 0,
        LZ4 = 1,
        ZSTD_LVL_1 = 2,  // Zstd 级别 1
        ZSTD_LVL_3 = 3,  // Zstd 级别 3
        ZSTD_LVL_7 = 4,  // Zstd 级别 7
        ZSTD_LVL_MAX = 5 // Zstd 级别 22 (或某个最大级别)
        // ... 可以根据需要添加更多级别或算法
    };

    CompressionType type;      // 实际使用的压缩算法和级别
    uint32_t original_size;    // 原始(未压缩)数据的大小
    uint32_t compressed_size;  // 压缩后数据的大小(如果未压缩,则与original_size相同)
    uint64_t checksum;         // 数据校验和,用于完整性检查 (e.g., XXH64, CRC32)
    uint32_t dictionary_id;    // 如果使用了字典压缩,这里存储字典的ID (0表示无字典)
    // ... 其他元数据,如版本号、时间戳等
    uint8_t padding[7]; // 填充字节,使头部对齐到 64 字节,方便I/O
}; // 假设总大小为 32 字节或 64 字节,方便磁盘I/O对齐

// 存储引擎中的核心接口(伪代码)
class StorageEngine {
public:
    // 初始化,可以加载字典等
    StorageEngine();
    ~StorageEngine();

    // 写入数据块
    // block_id 是逻辑块ID,数据是原始未压缩数据
    void writeBlock(uint64_t block_id, const std::vector<char>& raw_data);

    // 读取数据块
    // 返回原始未压缩数据
    std::vector<char> readBlock(uint64_t block_id);

private:
    // 内部函数:执行压缩
    // 返回压缩后的数据和头部
    std::pair<StorageBlockHeader, std::vector<char>> internal_compress(const std::vector<char>& raw_data);

    // 内部函数:执行解压
    // 从磁盘读取的原始数据可能包含头部和压缩数据
    std::vector<char> internal_decompress(const StorageBlockHeader& header, const std::vector<char>& compressed_payload);

    // 实际的磁盘读写操作
    void write_to_disk(uint64_t block_id, const StorageBlockHeader& header, const std::vector<char>& data);
    std::pair<StorageBlockHeader, std::vector<char>> read_from_disk(uint64_t block_id);

    // Zstd/LZ4 压缩器/解压器实例或上下文池
    ZstdCompressor zstd_compressor_lvl3_; // 示例:使用 Zstd 级别 3 压缩器
    ZstdDecompressor zstd_decompressor_;
    // LZ4Compressor lz4_compressor_; // 如果也支持 LZ4
    // LZ4Decompressor lz4_decompressor_;

    // 字典管理
    std::map<uint32_t, std::vector<char>> dictionaries_; // 存储字典数据
    std::map<uint32_t, ZSTD_DDict*> ddicts_; // 存储预处理的 Zstd 解压字典

    // 线程安全的上下文池 (针对多线程环境)
    // ThreadSafeObjectPool<ZSTD_CCtx> cctx_pool_;
    // ThreadSafeObjectPool<ZSTD_DCtx> dctx_pool_;
};

写路径 (Write Path) 改造

  1. 数据收集: 应用程序将逻辑数据(例如多条记录)写入存储引擎。存储引擎将这些数据累积到内存缓冲区,形成一个待处理的原始数据块。
  2. 选择压缩策略: 根据配置(如全局设置、数据类型、用户指定),选择合适的压缩算法(Zstd 或 LZ4)和压缩级别。
  3. 执行压缩: 调用选定算法的压缩 API,对原始数据块进行压缩。
    • 计算原始数据块的校验和。
    • 分配压缩输出缓冲区。
    • 执行压缩操作。
    • 记录压缩后的数据大小。
  4. 构建块头部: 填充 StorageBlockHeader 结构体,包括压缩类型、原始大小、压缩后大小、校验和、字典 ID 等。
  5. 写入磁盘: 将块头部和压缩后的数据序列化并写入磁盘。
    • 原子性: 确保写入操作的原子性,防止数据损坏。例如,先写入临时文件,成功后再重命名,或者使用日志写前模式。
    • 对齐: 确保写入的数据块在磁盘上对齐,以优化 I/O 性能。
// 伪代码:StorageEngine::writeBlock 实现
void StorageEngine::writeBlock(uint64_t block_id, const std::vector<char>& raw_data) {
    // 1. 选择压缩策略 (这里简化为固定使用 Zstd 级别 3)
    StorageBlockHeader::CompressionType compression_type = StorageBlockHeader::ZSTD_LVL_3;
    int compression_level = 3; // 对应 ZSTD_LVL_3

    // 2. 计算原始数据校验和 (假设使用 XXH64)
    uint64_t original_checksum = XXH64(raw_data.data(), raw_data.size(), 0); // 0 is seed

    // 3. 执行压缩
    std::vector<char> compressed_payload;
    if (compression_type == StorageBlockHeader::NONE || raw_data.empty()) {
        compressed_payload = raw_data;
        compression_type = StorageBlockHeader::NONE;
    } else if (compression_type == StorageBlockHeader::ZSTD_LVL_3) {
        // 使用 Zstd 压缩器
        // 实际中可能从池中获取 ZSTD_CCtx 或 ZstdCompressor 实例
        compressed_payload = zstd_compressor_lvl3_.compress(raw_data);
        if (compressed_payload.size() >= raw_data.size()) { // 如果压缩反而变大,则不压缩
            compressed_payload = raw_data;
            compression_type = StorageBlockHeader::NONE;
        }
    }
    // else if (compression_type == StorageBlockHeader::LZ4) { ... }

    // 4. 构建块头部
    StorageBlockHeader header;
    header.type = compression_type;
    header.original_size = static_cast<uint32_t>(raw_data.size());
    header.compressed_size = static_cast<uint32_t>(compressed_payload.size());
    header.checksum = original_checksum;
    header.dictionary_id = 0; // 示例:无字典

    // 5. 写入磁盘
    write_to_disk(block_id, header, compressed_payload);
    // 更新索引等元数据...
}

读路径 (Read Path) 改造

  1. 从磁盘读取: 根据逻辑块 ID,从磁盘读取对应的物理数据(包含块头部和压缩数据)。
  2. 解析块头部: 从读取的数据中解析出 StorageBlockHeader
  3. 校验:
    • 检查块头部是否有效。
    • 重要: 如果存储引擎支持块级校验和(例如对整个物理块进行校验),则首先进行此校验以确保读取的数据未损坏。
  4. 选择解压策略: 根据头部中的 CompressionTypedictionary_id,选择正确的解压算法和字典。
  5. 执行解压: 调用选定算法的解压 API。
    • 分配原始数据大小的解压输出缓冲区。
    • 执行解压操作。
    • 重要: 解压完成后,根据头部中的 original_size 检查解压后的数据大小是否匹配。
    • 重要: 计算解压后数据的校验和,并与头部中存储的 checksum 进行比对,确保数据完整性。
  6. 返回原始数据: 将解压后的原始数据返回给应用程序。
// 伪代码:StorageEngine::readBlock 实现
std::vector<char> StorageEngine::readBlock(uint64_t block_id) {
    // 1. 从磁盘读取整个物理块 (头部 + 载荷)
    std::pair<StorageBlockHeader, std::vector<char>> disk_data = read_from_disk(block_id);
    const StorageBlockHeader& header = disk_data.first;
    const std::vector<char>& compressed_payload = disk_data.second;

    // 2. 验证基本一致性
    if (compressed_payload.size() != header.compressed_size) {
        throw std::runtime_error("Read compressed size mismatch with header for block " + std::to_string(block_id));
    }

    // 3. 执行解压
    std::vector<char> raw_data;
    if (header.type == StorageBlockHeader::NONE) {
        raw_data = compressed_payload; // 未压缩数据直接返回
    } else if (header.type >= StorageBlockHeader::ZSTD_LVL_1 && header.type <= StorageBlockHeader::ZSTD_LVL_MAX) {
        // 使用 Zstd 解压器
        // 实际中可能从池中获取 ZSTD_DCtx 或 ZstdDecompressor 实例
        // 如果有字典,需要使用 ZstdDictDecompressor
        if (header.dictionary_id != 0) {
            // 假设已加载字典并创建了 ddict_
            // ZstdDictDecompressor dict_decompressor(dictionaries_[header.dictionary_id]);
            // raw_data = dict_decompressor.decompress(compressed_payload, header.original_size);
            throw std::runtime_error("Dictionary decompression not fully implemented in example.");
        } else {
            raw_data = zstd_decompressor_.decompress(compressed_payload, header.original_size);
        }
    }
    // else if (header.type == StorageBlockHeader::LZ4) { ... }
    else {
        throw std::runtime_error("Unsupported compression type for block " + std::to_string(block_id));
    }

    // 4. 校验解压后数据完整性
    if (raw_data.size() != header.original_size) {
        throw std::runtime_error("Decompressed size mismatch with header for block " + std::to_string(block_id));
    }
    uint64_t decompressed_checksum = XXH64(raw_data.data(), raw_data.size(), 0);
    if (decompressed_checksum != header.checksum) {
        throw std::runtime_error("Data checksum mismatch after decompression for block " + std::to_string(block_id));
    }

    // 5. 返回原始数据
    return raw_data;
}

缓冲区管理

  • 动态分配与复用: 压缩和解压操作需要临时缓冲区。频繁的 new/deletestd::vector 的重新分配会导致性能开销。
    • 可以使用内存池(Memory Pool)或缓冲区池(Buffer Pool)来复用这些缓冲区。
    • 对于 Zstd 的上下文,可以创建线程安全的上下文池,每个线程在需要时从池中获取,用完后归还。
  • 避免不必要的拷贝: 尽量直接在读取/写入的缓冲区上进行操作,减少数据在不同内存区域之间的拷贝。

元数据存储

  • 压缩相关的元数据(如 StorageBlockHeader)必须与数据块一同持久化。
  • 对于块级压缩,元数据通常存储在每个数据块的开头。
  • 对于记录级压缩,元数据可能存储在每条记录的头部,或者在一个单独的元数据区域中,通过偏移量索引到实际数据。

VIII. 性能优化与最佳实践

集成压缩不仅仅是调用 API,更需要全面的性能考量和最佳实践。

CPU 开销与吞吐量权衡

  • 压缩级别选择: 这是最直接的权衡点。
    • LZ4 / Zstd 1-3 级: 极高的速度,极低的 CPU 占用,但压缩比一般。适合 I/O 密集型但 CPU 资源有限,或对延迟极其敏感的场景。
    • Zstd 7-10 级: 良好的速度与优秀的压缩比平衡点,通常是数据库和通用存储的首选。CPU 占用适中。
    • Zstd 20+ 级: 牺牲压缩速度换取极致压缩比。适合冷数据归档,或写入不频繁但读取频繁的场景(因为解压速度依然很快)。
  • 动态调整: 可以根据系统负载、数据类型、存储层级等因素动态调整压缩级别。例如,在系统写入压力大时降低压缩级别以减少 CPU 负载;在夜间低峰期对旧数据进行重压缩以提高存储效率。

内存效率

  • 避免不必要的内存拷贝: 尽可能在原地操作数据,或者使用零拷贝技术(如果底层 I/O 栈支持)。
  • 缓冲区池化: 为压缩/解压操作创建固定大小的缓冲区池,而不是每次都动态分配。这可以减少内存碎片,提高内存分配效率。
  • 上下文复用: 之前提到的 Zstd CCtx/DCtx 应该被复用,而不是频繁创建和销毁。在多

发表回复

您的邮箱地址不会被公开。 必填项已用 * 标注