C++ 向量化存储(Vectorized Storage):在 C++ 数据库内核中利用列式存储提升算子扫描性能

在现代数据处理领域,性能是永恒的追求。随着数据量的爆炸式增长和分析型查询复杂度的提升,传统的关系型数据库系统在应对高吞吐量的扫描和聚合操作时,逐渐暴露出其固有的局限性。特别是在数据库内核层面,如何高效地组织和访问数据,以充分利用现代硬件的并行计算能力,是决定系统性能的关键。本讲座将深入探讨C++数据库内核中一种强大的技术——向量化存储(Vectorized Storage),以及它如何通过列式存储的原理,显著提升算子(operator)的扫描性能。

1. 性能瓶颈与传统存储的局限性

数据库系统面临的核心挑战之一,是如何在海量数据中快速定位、读取并处理所需的信息。对于联机事务处理(OLTP)工作负载,关注点是快速的单行插入、更新和删除,以及小范围的查询。而对于联机分析处理(OLAP)工作负载,其特点是扫描大量数据、执行复杂的聚合、连接和过滤操作。正是后者,对数据存储和处理效率提出了极高的要求。

1.1 CPU缓存与内存墙

现代CPU的速度远超内存。当CPU需要访问数据时,它会首先尝试从其多级缓存(L1、L2、L3)中获取。如果数据不在缓存中(缓存未命中),CPU就必须等待数据从主内存加载,这个过程被称为“内存墙”(Memory Wall)瓶颈。一次主内存访问可能需要数百个CPU周期,而一次缓存访问可能只需要几个周期。因此,优化数据访问模式,提高缓存命中率,是提升数据库性能的关键。

1.2 传统行式存储的问题

大多数传统关系型数据库采用行式存储(Row-Oriented Storage)模型。在这种模型中,一行数据的所有列值都连续地存储在一起。例如,一个包含idnamevalue的表,在磁盘或内存中会按以下方式布局:

| Row 1 (id, name, value) | Row 2 (id, name, value) | Row 3 (id, name, value) | … |

用C++结构体表示,大致如下:

struct EmployeeRow {
    int id;
    char name[64]; // Fixed size for simplicity
    double salary;
    bool isActive;
};

当数据库系统需要执行一个分析型查询,例如“计算所有员工的平均工资”:SELECT AVG(salary) FROM Employees;

在这种行式存储下,即使查询只关心salary这一列,系统也必须加载每一行完整的EmployeeRow数据到内存中。这意味着idnameisActive等无关数据也会被加载,占据宝贵的缓存空间。对于大型表,这将导致:

  1. 低缓存效率: 大量无关数据被加载到缓存,很快就会将真正需要的数据挤出,导致频繁的缓存未命中。
  2. 高I/O开销: 从磁盘读取的数据量远大于实际所需,增加I/O延迟。
  3. CPU利用率低下: CPU在处理每一行时,需要跳过不相关的列,导致非连续内存访问和分支预测失败,难以充分利用其向量化处理能力。

为了克服这些局限性,列式存储应运而生。

2. 列式存储:一种范式转变

列式存储(Column-Oriented Storage)是解决上述问题的一种根本性方法。顾名思义,它将同一列的所有值连续地存储在一起,而不是同一行的所有值。对于我们之前的EmployeeRow例子,数据布局将变成:

| ID Column (id1, id2, id3, …) | Name Column (name1, name2, name3, …) | Salary Column (salary1, salary2, salary3, …) | IsActive Column (isActive1, isActive2, isActive3, …) |

用C++的数据结构来模拟,可能看起来像这样:

// 假设数据存储在独立的向量中
std::vector<int> ids;
std::vector<std::string> names; // For variable-length strings, this is a simplification
std::vector<double> salaries;
std::vector<bool> activeStatuses;

2.1 列式存储的显著优势

列式存储带来了多方面的性能提升,尤其是在分析型工作负载下:

  1. 极高的缓存效率: 当查询仅涉及少数几列时,系统只需加载这些列的数据。例如,计算平均工资的查询只需加载salaries向量。这使得所需数据在内存中高度集中,极大地提高了缓存命中率。
  2. 优异的压缩比: 同一列的数据通常具有相同的数据类型和相似的分布特征。这使得应用各种高效的压缩算法(如字典编码、行程长度编码 RLE、位图编码、增量编码等)变得非常有效。数据压缩不仅减少了存储空间,更重要的是减少了磁盘I/O和内存带宽的消耗。
  3. SIMD(Single Instruction, Multiple Data)友好: 列中的数据是同构且连续的,这正是现代CPU SIMD指令集(如SSE、AVX、AVX-512)的理想输入。SIMD指令允许CPU在一个时钟周期内对多个数据元素执行相同的操作,从而实现数据级并行。
  4. 减少I/O: 由于只读取相关列,并且数据经过高效压缩,从存储介质读取的数据量大大减少。
  5. 延迟物化(Late Materialization): 行式存储通常在查询处理的早期阶段就将所有列组合成完整的行。而列式存储可以延迟行的物化,直到所有必要的过滤和聚合操作都完成,甚至在需要将结果返回给用户时才重构行。

当然,列式存储并非万能药。对于需要频繁更新单行所有列的OLTP场景,或者需要频繁插入新行的场景,列式存储可能会引入额外的复杂性和开销,因为单行数据的修改可能涉及到多个独立列的更新。然而,对于OLAP场景,其优势是压倒性的。

3. 向量化处理:列式存储的引擎

仅仅将数据存储为列式结构还不足以实现极致性能。真正的性能飞跃来自于将数据处理方式从“一次一元组”(tuple-at-a-time)模式转变为“一次一批数据”(batch-at-a-time)或“向量化”(vectorized)模式。

3.1 什么是向量化处理?

向量化处理是指数据库算子不再逐行处理数据,而是接收一个包含多行数据的“向量”或“批次”(batch/chunk),并对整个批次的数据执行操作。例如,一个过滤算子不再逐行检查条件,而是接收一个包含1024个整数的列向量,然后一次性对这1024个整数应用过滤条件,并生成一个包含通过过滤的整数的新向量(或一个位图来指示哪些通过了)。

这种批处理的方式有以下几个核心优点:

  1. 减少函数调用开销: 传统模式下,每个元组都可能涉及一次函数调用,而向量化处理将多次操作合并为一次,显著减少了函数调用和虚函数调用的开销。
  2. 更好的缓存局部性: 处理一个数据批次时,相关数据被连续访问,进一步提高了缓存命中率。
  3. 充分利用SIMD指令: 批处理模式天然地与SIMD指令兼容。CPU可以并行处理批次中的多个数据元素。
  4. 更少的控制流分支: 传统处理可能在每行数据上都有条件判断(if-else),导致分支预测失败。向量化处理可以将条件判断转换为位图操作,减少分支。

一个典型的向量化批次(Chunk)可能包含数千到数万行数据。这个大小需要根据缓存大小、SIMD寄存器宽度和具体工作负载进行调整。

3.2 C++中向量化存储的数据结构

在C++数据库内核中实现向量化存储,我们需要设计能够高效存储和操作列数据的结构。

3.2.1 基础列抽象

首先,定义一个通用的列接口,以便处理不同数据类型的列。

#include <vector>
#include <string>
#include <memory> // For std::unique_ptr
#include <variant> // C++17 for heterogeneous types, or base class + derived classes
#include <iostream>
#include <numeric> // For std::iota
#include <algorithm> // For std::transform, std::copy_if etc.

// Optional: For handling NULLs using a bitmap
using Bitmap = std::vector<bool>; // Simplified, real bitmaps are packed bits

// Base class for a column
class Column {
public:
    virtual ~Column() = default;
    virtual size_t size() const = 0;
    virtual std::string toString(size_t index) const = 0;
    virtual std::unique_ptr<Column> filter(const Bitmap& selection_vector) const = 0;
    virtual void appendNull(size_t count = 1) = 0; // For appending nulls
    virtual bool isNull(size_t index) const = 0;
    virtual void setNull(size_t index) = 0; // Set an existing entry to null

protected:
    // A real implementation would have a nullable bitmap
    Bitmap null_bitmap_; // True if value is NULL
};

// Typed column for integers
class IntColumn : public Column {
public:
    IntColumn(std::vector<int> data) : data_(std::move(data)) {
        null_bitmap_.resize(data_.size(), false);
    }
    IntColumn(std::vector<int> data, Bitmap null_bitmap)
        : data_(std::move(data)), null_bitmap_(std::move(null_bitmap)) {}

    size_t size() const override { return data_.size(); }
    std::string toString(size_t index) const override {
        if (isNull(index)) return "NULL";
        return std::to_string(data_[index]);
    }

    std::unique_ptr<Column> filter(const Bitmap& selection_vector) const override {
        std::vector<int> filtered_data;
        Bitmap filtered_null_bitmap;
        for (size_t i = 0; i < size(); ++i) {
            if (selection_vector[i]) {
                filtered_data.push_back(data_[i]);
                filtered_null_bitmap.push_back(null_bitmap_[i]);
            }
        }
        return std::make_unique<IntColumn>(std::move(filtered_data), std::move(filtered_null_bitmap));
    }

    // Accessors for specific types (downcasting needed, or use templates/variant)
    int get(size_t index) const {
        if (isNull(index)) throw std::runtime_error("Accessing NULL value");
        return data_[index];
    }
    const std::vector<int>& getData() const { return data_; } // For SIMD operations

    void appendNull(size_t count = 1) override {
        for (size_t i = 0; i < count; ++i) {
            data_.push_back(0); // Default value for NULL, actual value doesn't matter
            null_bitmap_.push_back(true);
        }
    }

    bool isNull(size_t index) const override {
        if (index >= null_bitmap_.size()) return true; // Out of bounds is implicitly null
        return null_bitmap_[index];
    }

    void setNull(size_t index) override {
        if (index >= null_bitmap_.size()) {
            // Handle error or resize if allowed
            throw std::out_of_range("Index out of bounds for setting null.");
        }
        null_bitmap_[index] = true;
    }

private:
    std::vector<int> data_;
};

// Typed column for doubles
class DoubleColumn : public Column {
public:
    DoubleColumn(std::vector<double> data) : data_(std::move(data)) {
        null_bitmap_.resize(data_.size(), false);
    }
    DoubleColumn(std::vector<double> data, Bitmap null_bitmap)
        : data_(std::move(data)), null_bitmap_(std::move(null_bitmap)) {}

    size_t size() const override { return data_.size(); }
    std::string toString(size_t index) const override {
        if (isNull(index)) return "NULL";
        return std::to_string(data_[index]);
    }

    std::unique_ptr<Column> filter(const Bitmap& selection_vector) const override {
        std::vector<double> filtered_data;
        Bitmap filtered_null_bitmap;
        for (size_t i = 0; i < size(); ++i) {
            if (selection_vector[i]) {
                filtered_data.push_back(data_[i]);
                filtered_null_bitmap.push_back(null_bitmap_[i]);
            }
        }
        return std::make_unique<DoubleColumn>(std::move(filtered_data), std::move(filtered_null_bitmap));
    }

    double get(size_t index) const {
        if (isNull(index)) throw std::runtime_error("Accessing NULL value");
        return data_[index];
    }
    const std::vector<double>& getData() const { return data_; }

    void appendNull(size_t count = 1) override {
        for (size_t i = 0; i < count; ++i) {
            data_.push_back(0.0);
            null_bitmap_.push_back(true);
        }
    }

    bool isNull(size_t index) const override {
        if (index >= null_bitmap_.size()) return true;
        return null_bitmap_[index];
    }

    void setNull(size_t index) override {
        if (index >= null_bitmap_.size()) {
            throw std::out_of_range("Index out of bounds for setting null.");
        }
        null_bitmap_[index] = true;
    }

private:
    std::vector<double> data_;
};

// For string columns, handling variable-length strings is more complex.
// Typically, it involves an offset array and a contiguous data buffer.
class StringColumn : public Column {
public:
    StringColumn(std::vector<std::string> data) : data_strings_(std::move(data)) {
        null_bitmap_.resize(data_strings_.size(), false);
    }
    StringColumn(std::vector<std::string> data, Bitmap null_bitmap)
        : data_strings_(std::move(data)), null_bitmap_(std::move(null_bitmap)) {}

    size_t size() const override { return data_strings_.size(); }
    std::string toString(size_t index) const override {
        if (isNull(index)) return "NULL";
        return data_strings_[index];
    }

    std::unique_ptr<Column> filter(const Bitmap& selection_vector) const override {
        std::vector<std::string> filtered_data;
        Bitmap filtered_null_bitmap;
        for (size_t i = 0; i < size(); ++i) {
            if (selection_vector[i]) {
                filtered_data.push_back(data_strings_[i]);
                filtered_null_bitmap.push_back(null_bitmap_[i]);
            }
        }
        return std::make_unique<StringColumn>(std::move(filtered_data), std::move(filtered_null_bitmap));
    }

    const std::string& get(size_t index) const {
        if (isNull(index)) throw std::runtime_error("Accessing NULL value");
        return data_strings_[index];
    }

    void appendNull(size_t count = 1) override {
        for (size_t i = 0; i < count; ++i) {
            data_strings_.push_back(""); // Default value for NULL
            null_bitmap_.push_back(true);
        }
    }

    bool isNull(size_t index) const override {
        if (index >= null_bitmap_.size()) return true;
        return null_bitmap_[index];
    }

    void setNull(size_t index) override {
        if (index >= null_bitmap_.size()) {
            throw std::out_of_range("Index out of bounds for setting null.");
        }
        null_bitmap_[index] = true;
    }

private:
    std::vector<std::string> data_strings_; // Simplified: in real systems, this would be `char*` buffer + offset array
};

关于NULL值的处理:
在实际的数据库系统中,Bitmap通常是一个位数组(std::vector<bool>在内部可能不是位数组,需要自定义Bitmask类),每个位代表一行数据是否为NULL。这种方式非常节省空间,并且可以与SIMD指令配合,高效地处理NULL值。

3.2.2 批次(Chunk)的表示

一个批次(Chunk)由多个列组成,代表了多行数据的一个逻辑子集。

class Chunk {
public:
    Chunk() = default;

    // Add a column to the chunk
    void addColumn(std::unique_ptr<Column> col) {
        if (!columns_.empty() && col->size() != columns_[0]->size()) {
            throw std::invalid_argument("All columns in a chunk must have the same size.");
        }
        columns_.push_back(std::move(col));
    }

    // Get a column by index
    const Column& getColumn(size_t col_idx) const {
        if (col_idx >= columns_.size()) {
            throw std::out_of_range("Column index out of range.");
        }
        return *columns_[col_idx];
    }

    // Get the number of rows in this chunk
    size_t size() const {
        if (columns_.empty()) return 0;
        return columns_[0]->size();
    }

    // Get the number of columns
    size_t numColumns() const {
        return columns_.size();
    }

    // Print the chunk (for demonstration)
    void print() const {
        if (size() == 0) {
            std::cout << "Empty Chunk" << std::endl;
            return;
        }
        // Print header (assuming fixed column names for simplicity)
        for (size_t col_idx = 0; col_idx < numColumns(); ++col_idx) {
            std::cout << "Col" << col_idx << "t";
        }
        std::cout << std::endl;

        for (size_t row_idx = 0; row_idx < size(); ++row_idx) {
            for (size_t col_idx = 0; col_idx < numColumns(); ++col_idx) {
                std::cout << getColumn(col_idx).toString(row_idx) << "t";
            }
            std::cout << std::endl;
        }
    }

private:
    std::vector<std::unique_ptr<Column>> columns_;
};

4. 实现向量化扫描算子

数据库查询通常由一系列算子(如扫描、过滤、投影、连接、聚合)组成,它们形成一个查询计划树。在向量化处理模型中,这些算子不再处理单行数据,而是处理Chunk

4.1 算子接口

定义一个通用的算子接口,其中最核心的方法是next(),它返回下一个处理过的Chunk

// Base class for all query operators
class Operator {
public:
    virtual ~Operator() = default;
    // The core method: processes and returns the next chunk of data
    virtual std::unique_ptr<Chunk> next() = 0;
};

4.2 表扫描算子(TableScanOperator)

TableScanOperator是查询计划的起点,负责从存储层读取数据并将其组织成Chunk

// A simplified "Table" that holds all the data in columns
class Table {
public:
    Table(std::vector<std::unique_ptr<Column>> columns) : columns_(std::move(columns)) {
        if (!columns_.empty()) {
            num_rows_ = columns_[0]->size();
            for (const auto& col : columns_) {
                if (col->size() != num_rows_) {
                    throw std::invalid_argument("All columns in a table must have the same size.");
                }
            }
        } else {
            num_rows_ = 0;
        }
    }

    size_t numRows() const { return num_rows_; }
    size_t numColumns() const { return columns_.size(); }
    const Column& getColumn(size_t col_idx) const { return *columns_[col_idx]; }

private:
    std::vector<std::unique_ptr<Column>> columns_;
    size_t num_rows_;
};

// TableScanOperator: reads data from a table and produces chunks
class TableScanOperator : public Operator {
public:
    TableScanOperator(const Table& table, size_t chunk_size)
        : table_(table), chunk_size_(chunk_size), current_row_idx_(0) {
        // We need to know which columns to project (return)
        // For simplicity, let's assume it projects all columns initially
        projected_column_indices_.reserve(table_.numColumns());
        for (size_t i = 0; i < table_.numColumns(); ++i) {
            projected_column_indices_.push_back(i);
        }
    }

    // A more advanced scan would allow specifying projected columns
    TableScanOperator(const Table& table, size_t chunk_size, std::vector<size_t> projected_cols)
        : table_(table), chunk_size_(chunk_size), current_row_idx_(0),
          projected_column_indices_(std::move(projected_cols)) {}

    std::unique_ptr<Chunk> next() override {
        if (current_row_idx_ >= table_.numRows()) {
            return nullptr; // No more data
        }

        auto chunk = std::make_unique<Chunk>();
        size_t rows_to_read = std::min(chunk_size_, table_.numRows() - current_row_idx_);

        for (size_t col_idx : projected_column_indices_) {
            // In a real system, this would involve reading from storage and
            // creating a new column with a slice of data.
            // For this example, we'll simulate by creating new column objects with copied data.
            // This is NOT efficient for real systems but demonstrates the concept.
            // Real systems would use views (std::span) or shared buffers.

            const Column& source_col = table_.getColumn(col_idx);

            // A more efficient way for primitive types is to get a pointer/span and create a new column
            // For this example, we'll create new vectors for simplicity.
            if (auto int_col = dynamic_cast<const IntColumn*>(&source_col)) {
                std::vector<int> data_slice;
                Bitmap null_slice;
                for (size_t i = 0; i < rows_to_read; ++i) {
                    data_slice.push_back(int_col->getData()[current_row_idx_ + i]);
                    null_slice.push_back(int_col->isNull(current_row_idx_ + i));
                }
                chunk->addColumn(std::make_unique<IntColumn>(std::move(data_slice), std::move(null_slice)));
            } else if (auto double_col = dynamic_cast<const DoubleColumn*>(&source_col)) {
                std::vector<double> data_slice;
                Bitmap null_slice;
                for (size_t i = 0; i < rows_to_read; ++i) {
                    data_slice.push_back(double_col->getData()[current_row_idx_ + i]);
                    null_slice.push_back(double_col->isNull(current_row_idx_ + i));
                }
                chunk->addColumn(std::make_unique<DoubleColumn>(std::move(data_slice), std::move(null_slice)));
            } else if (auto string_col = dynamic_cast<const StringColumn*>(&source_col)) {
                std::vector<std::string> data_slice;
                Bitmap null_slice;
                for (size_t i = 0; i < rows_to_read; ++i) {
                    data_slice.push_back(string_col->get(current_row_idx_ + i)); // get() returns const ref
                    null_slice.push_back(string_col->isNull(current_row_idx_ + i));
                }
                chunk->addColumn(std::make_unique<StringColumn>(std::move(data_slice), std::move(null_slice)));
            } else {
                throw std::runtime_error("Unsupported column type during scan.");
            }
        }

        current_row_idx_ += rows_to_read;
        return chunk;
    }

private:
    const Table& table_;
    size_t chunk_size_;
    size_t current_row_idx_;
    std::vector<size_t> projected_column_indices_;
};

注意: 上述TableScanOperatornext()中通过复制数据来创建新的Column对象。在生产级数据库内核中,为了避免数据复制的开销,通常会使用以下策略:

  • 零拷贝(Zero-copy): 返回指向原始数据缓冲区的std::span或自定义的ColumnView对象,这些对象不拥有数据,只是提供一个视图。
  • 引用计数/共享指针: 对于需要修改的场景,可能使用std::shared_ptr管理底层数据缓冲区,但尽量避免。
  • 内存池: 从预分配的内存池中分配ChunkColumn数据,减少堆分配开销。

4.3 过滤算子(FilterOperator)

过滤算子接收上游算子(例如TableScanOperator)生成的Chunk,应用谓词(predicate),并生成一个只包含满足条件的行的新Chunk

// Predicate interface (for filtering)
class Predicate {
public:
    virtual ~Predicate() = default;
    // Evaluates the predicate on a column and returns a selection vector (bitmap)
    // `column_idx` specifies which column to apply the predicate to within the chunk
    virtual Bitmap evaluate(const Chunk& chunk) const = 0;
};

// Example: Integer equality predicate (e.g., col_id == value)
class IntEqualityPredicate : public Predicate {
public:
    IntEqualityPredicate(size_t column_idx, int value)
        : column_idx_(column_idx), value_(value) {}

    Bitmap evaluate(const Chunk& chunk) const override {
        Bitmap selection_vector(chunk.size(), false);
        if (column_idx_ >= chunk.numColumns()) {
            throw std::out_of_range("Predicate column index out of range.");
        }

        const Column& column = chunk.getColumn(column_idx_);
        if (auto int_col = dynamic_cast<const IntColumn*>(&column)) {
            const auto& data = int_col->getData();
            for (size_t i = 0; i < chunk.size(); ++i) {
                if (!int_col->isNull(i) && data[i] == value_) {
                    selection_vector[i] = true;
                }
            }
        } else {
            throw std::runtime_error("IntEqualityPredicate applied to non-integer column.");
        }
        return selection_vector;
    }

private:
    size_t column_idx_;
    int value_;
};

// FilterOperator: applies a predicate to an incoming chunk
class FilterOperator : public Operator {
public:
    FilterOperator(std::unique_ptr<Operator> source, std::unique_ptr<Predicate> predicate)
        : source_(std::move(source)), predicate_(std::move(predicate)) {}

    std::unique_ptr<Chunk> next() override {
        std::unique_ptr<Chunk> input_chunk = source_->next();
        if (!input_chunk) {
            return nullptr; // No more data from source
        }

        // Evaluate the predicate on the input chunk to get a selection vector
        Bitmap selection_vector = predicate_->evaluate(*input_chunk);

        // Create a new chunk containing only the selected rows
        auto filtered_chunk = std::make_unique<Chunk>();
        for (size_t col_idx = 0; col_idx < input_chunk->numColumns(); ++col_idx) {
            const Column& source_col = input_chunk->getColumn(col_idx);
            // The filter method in Column takes care of creating a new column with selected rows
            filtered_chunk->addColumn(source_col.filter(selection_vector));
        }

        // If no rows passed the filter, we might return an empty chunk or recurse to get the next non-empty chunk
        if (filtered_chunk->size() == 0) {
            // Option 1: Return empty chunk (might be inefficient if many empty chunks are returned)
            // return filtered_chunk;
            // Option 2: Recursively call next() to get the next non-empty chunk
            return next();
        }

        return filtered_chunk;
    }

private:
    std::unique_ptr<Operator> source_;
    std::unique_ptr<Predicate> predicate_;
};

示例查询流程:

假设我们有一个Employees表,包含idnamesalary列。我们想查找salary大于50000的员工。

// 1. Create the table data
std::vector<int> ids = {1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15};
std::vector<std::string> names = {"Alice", "Bob", "Charlie", "David", "Eve", "Frank", "Grace", "Heidi", "Ivan", "Judy", "Kyle", "Liam", "Mia", "Noah", "Olivia"};
std::vector<double> salaries = {45000.0, 60000.0, 50000.0, 75000.0, 48000.0, 90000.0, 55000.0, 42000.0, 62000.0, 51000.0, 80000.0, 49000.0, 70000.0, 53000.0, 47000.0};

// Introduce some NULLs for demonstration
Bitmap salary_nulls(salaries.size(), false);
salary_nulls[2] = true; // Charlie's salary is NULL
salaries[2] = 0.0; // Placeholder for NULL

std::vector<std::unique_ptr<Column>> table_columns;
table_columns.push_back(std::make_unique<IntColumn>(std::move(ids)));
table_columns.push_back(std::make_unique<StringColumn>(std::move(names)));
table_columns.push_back(std::make_unique<DoubleColumn>(std::move(salaries), std::move(salary_nulls)));

Table employees_table(std::move(table_columns));

// 2. Define the query plan: TableScan -> Filter
// We want to filter on salary (column index 2)
size_t salary_col_idx = 2;
double filter_value = 50000.0;

// Predicate: salary > 50000 (need to define a greater-than predicate)
class DoubleGreaterThanPredicate : public Predicate {
public:
    DoubleGreaterThanPredicate(size_t column_idx, double value)
        : column_idx_(column_idx), value_(value) {}

    Bitmap evaluate(const Chunk& chunk) const override {
        Bitmap selection_vector(chunk.size(), false);
        const Column& column = chunk.getColumn(column_idx_);
        if (auto double_col = dynamic_cast<const DoubleColumn*>(&column)) {
            const auto& data = double_col->getData();
            for (size_t i = 0; i < chunk.size(); ++i) {
                if (!double_col->isNull(i) && data[i] > value_) {
                    selection_vector[i] = true;
                }
            }
        } else {
            throw std::runtime_error("DoubleGreaterThanPredicate applied to non-double column.");
        }
        return selection_vector;
    }
private:
    size_t column_idx_;
    double value_;
};

// 3. Construct the operator tree
// Scan all columns (0, 1, 2)
std::vector<size_t> projected_cols = {0, 1, 2};
auto scan_op = std::make_unique<TableScanOperator>(employees_table, 5 /* chunk size */, projected_cols);

auto filter_predicate = std::make_unique<DoubleGreaterThanPredicate>(salary_col_idx, filter_value);
auto filter_op = std::make_unique<FilterOperator>(std::move(scan_op), std::move(filter_predicate));

// 4. Execute the query
std::cout << "Employees with salary > " << filter_value << ":" << std::endl;
while (auto result_chunk = filter_op->next()) {
    result_chunk->print();
}

5. 利用SIMD指令提升极致性能

向量化处理的真正威力在于其能够充分利用现代CPU的SIMD指令集。SIMD允许CPU的单个指令同时操作多个数据元素。例如,一个SIMD寄存器可以存储4个int或2个double,一条指令就可以对这4个int或2个double进行加法、比较等操作。

5.1 SIMD基础

  • 寄存器: x86架构有SSE(128位)、AVX(256位)、AVX-512(512位)等SIMD寄存器。
  • 数据类型: 对应有__m128i(128位整数)、__m128d(128位双精度浮点)、__m256i__m256d等。
  • 内在函数(Intrinsics): C++编译器提供了一系列内在函数,允许直接调用SIMD指令,例如_mm_loadu_si128(加载128位整数)、_mm_cmpeq_epi32(比较128位整数是否相等)、_mm_and_si128(按位与)。

5.2 SIMD加速的过滤操作示例

考虑一个整数列的过滤操作:column_value == target_value。使用SIMD可以大大加速这个过程。

#include <immintrin.h> // For SSE/AVX intrinsics

// SIMD-accelerated IntEqualityPredicate for AVX2 (256-bit registers)
class IntEqualityPredicateSIMD : public Predicate {
public:
    IntEqualityPredicateSIMD(size_t column_idx, int value)
        : column_idx_(column_idx), value_(value) {}

    Bitmap evaluate(const Chunk& chunk) const override {
        Bitmap selection_vector(chunk.size(), false);
        if (column_idx_ >= chunk.numColumns()) {
            throw std::out_of_range("Predicate column index out of range.");
        }

        const Column& column = chunk.getColumn(column_idx_);
        if (auto int_col = dynamic_cast<const IntColumn*>(&column)) {
            const std::vector<int>& data = int_col->getData();
            size_t num_elements = data.size();

            // Load the target value into an AVX2 register, replicating it across all 8 32-bit lanes
            __m256i target_vec = _mm256_set1_epi32(value_);

            // Process data in chunks of 8 integers (since AVX2 operates on 256 bits, and int is 32 bits)
            size_t i = 0;
            for (; i + 7 < num_elements; i += 8) {
                // Load 8 integers from the data vector
                __m256i data_vec = _mm256_loadu_si256(reinterpret_cast<const __m256i*>(&data[i]));

                // Perform the equality comparison: result is a mask where 0xFFFFFFFF means true, 0x00000000 means false
                __m256i cmp_result = _mm256_cmpeq_epi32(data_vec, target_vec);

                // Convert the comparison result mask to a bitmask (1 for true, 0 for false)
                // This converts each 32-bit lane's MSB into a bit in a 8-bit integer
                int mask = _mm256_movemask_epi8(cmp_result); // Each 32-bit lane gets its MSB moved.
                                                            // For cmpeq_epi32, if equal, all bits are 1, so MSB is 1.
                                                            // For cmpeq_epi32, if not equal, all bits are 0, so MSB is 0.
                                                            // We need to extract the 8-bit mask representing 8 integers.
                                                            // Correct approach: `_mm256_movemask_ps` for floats, or
                                                            // custom bit extraction for integers.
                                                            // A simpler way often involves `_mm256_testz_si256` or similar,
                                                            // or just using the comparison mask directly for further operations.
                                                            // For simple boolean output, we need to convert the 32-bit mask per lane
                                                            // into a single bit per lane.
                                                            // Example: (mask & (1 << (j * 4 + 3))) != 0 for j-th 32-bit element.
                                                            // A more direct way to get a bitmask per 32-bit element is:
                unsigned int comparison_mask = 0;
                for (int j = 0; j < 8; ++j) {
                    if ((_mm256_extract_epi32(cmp_result, j) & 0xFFFFFFFF) == 0xFFFFFFFF) { // Check if all bits are set (true)
                        comparison_mask |= (1 << j);
                    }
                }

                for (int j = 0; j < 8; ++j) {
                    if ((comparison_mask >> j) & 1) { // If the j-th element was true
                        if (!int_col->isNull(i + j)) { // Also check for NULLs
                             selection_vector[i + j] = true;
                        }
                    }
                }
            }

            // Handle remaining elements (if num_elements is not a multiple of 8)
            for (; i < num_elements; ++i) {
                if (!int_col->isNull(i) && data[i] == value_) {
                    selection_vector[i] = true;
                }
            }
        } else {
            throw std::runtime_error("IntEqualityPredicateSIMD applied to non-integer column.");
        }
        return selection_vector;
    }

private:
    size_t column_idx_;
    int value_;
};

SIMD代码的复杂性与考量:

  • 平台相关性: SIMD指令集(SSE, AVX, AVX-512, ARM SVE)是平台特定的。代码需要针对不同的CPU架构进行优化或提供回退方案。
  • 对齐: _mm256_loadu_si256是非对齐加载。如果数据能够保证对齐(例如,使用_mm_malloc或C++17 std::aligned_alloc),可以使用对齐加载指令(如_mm256_load_si256),它们通常更快。
  • NULL值处理: 上述示例在SIMD循环后依然检查NULLs。更优化的方式是将NULL位图也加载到SIMD寄存器中,并与比较结果进行位运算。
  • 编译器自动向量化: 现代C++编译器(GCC, Clang, MSVC)在开启优化(如-O3)时,能够自动识别一些简单的循环并将其向量化。但对于复杂的数据结构和逻辑,手动使用内在函数通常能获得更好的控制和性能。
  • 可读性和维护性: SIMD内在函数代码通常比普通的C++代码更难读懂和维护。

5.3 生产级SIMD库

为了简化SIMD编程,许多库被开发出来,例如:

  • Intel ISPC (Implicit SPMD Program Compiler): 专门为CPU SIMD编程设计的编译器。
  • Vector Class Library (VCL): 一个C++模板库,提供了一个面向对象的SIMD接口。
  • xsimd: 另一个C++模板库,支持多种SIMD指令集。
  • Eigen: 一个用于线性代数的C++模板库,其内部也大量使用了SIMD优化。

这些库提供了更高级的抽象,使得开发者可以编写更接近常规C++代码的SIMD优化代码,同时保持高性能。

6. 向量化存储与处理的进阶话题

6.1 混合存储(Hybrid Storage)

在实际系统中,纯粹的行式或列式存储都有其局限性。许多现代数据库系统采用混合存储模型:

  • 行式存储(或小批量的行式存储) 用于最近写入的少量数据,以支持快速的OLTP写入和单行读取。
  • 列式存储 用于历史数据或冷数据,以优化OLAP查询性能。
  • 数据会周期性地从行式存储区域迁移到列式存储区域。

6.2 压缩技术

列式存储为高效压缩提供了天然的优势。除了前面提到的,还有:

  • 字典编码(Dictionary Encoding): 对于低基数(distinct值少)的列(如枚举类型、国家名称),可以将所有唯一值存储在一个字典中,列中存储的只是字典的索引。
  • Delta编码(Delta Encoding): 对于递增或递减的序列(如时间戳、ID),只存储相邻值之间的差值,这些差值通常更小,更易于压缩。
  • 位图索引(Bitmap Index): 对于低基数离散值,可以为每个唯一值创建一个位图,快速进行过滤操作。
压缩技术 适用场景 优势
字典编码 低基数字符串、枚举 节省空间,加速字符串比较
行程长度编码 包含大量重复值的列 极大压缩连续重复值
Bitpacking 小范围整数(如索引、delta值) 精确分配位,极致压缩
Delta编码 有序序列(时间戳、ID) 减小存储值范围,利于其他压缩
LZ4/Snappy等通用 无明显模式的混合数据,或作为二级压缩 速度快,适用于各种数据

6.3 延迟物化(Late Materialization)

延迟物化是向量化处理的一个重要优化。在列式数据库中,直到查询的最后阶段,例如需要将结果返回给客户端时,才将不同列的数据组合成完整的行。在此之前,所有中间操作(过滤、聚合、连接)都直接在列数据上执行,避免了不必要的行重构开销,进一步提升性能。

6.4 JIT编译(Just-In-Time Compilation)

一些高性能数据库系统(如HyPer、Vectorwise)利用JIT编译技术。它们在运行时根据具体的查询计划,生成高度优化的机器码。这使得数据库能够为每个查询量身定制执行逻辑,包括:

  • 消除虚函数调用: 直接调用具体实现。
  • 常量折叠和传播: 在编译时处理已知常量。
  • 循环展开和SIMD指令生成: 编译器可以更好地利用JIT信息生成高效的向量化代码。
  • 谓词融合: 将多个过滤条件合并到一个循环中,减少数据遍历次数。

6.5 内存管理

对于高性能数据库内核,定制的内存分配器是必不可少的。标准库的std::allocatornew/delete可能存在碎片化、锁竞争和分配/释放开销过大的问题。

  • Arena Allocator / Bump Allocator: 预先分配一大块内存,然后通过简单地移动指针来快速分配小块内存。非常适合生命周期相似的临时对象(如查询中间结果的Chunk)。
  • Fixed-size Block Allocator: 对于固定大小的Column对象或Chunk对象,可以实现专用的分配器。
  • Huge Pages: 利用操作系统的巨页功能,减少TLB(Translation Lookaside Buffer)未命中,提高内存访问效率。

7. 挑战与考量

尽管向量化存储和处理在OLAP场景下优势显著,但也存在一些挑战:

  • 更新和删除: 在列式存储中更新或删除单行数据可能效率低下,因为它可能需要修改多个独立的列。常见的策略是使用多版本并发控制(MVCC)和“追加式”(append-only)存储,通过标记旧版本为无效并追加新版本来实现逻辑更新。
  • 事务处理(OLTP): 对于高并发的单行读写事务,行式存储通常更优。列式存储通常更适合批量的、只读的分析查询。
  • 模式演化: 增加新列或修改现有列的类型可能需要重新组织数据,这在列式存储中可能更复杂。
  • 实现复杂性: 向量化算子、SIMD优化和高级内存管理增加了数据库内核的实现复杂性。

8. 展望

向量化存储和处理是现代分析型数据库系统性能的基石。从开源项目如Apache Arrow、ClickHouse,到商业系统如Snowflake、Google Dremel,都广泛采用了这些技术。未来,随着新的硬件架构(如ARM SVE、TPU等)和指令集不断涌现,数据库内核将继续演进,更紧密地与硬件协同,实现更高的数据处理吞吐量。同时,向量化技术也将与机器学习、人工智能等领域更紧密地结合,为复杂的数据科学工作负载提供强大的底层支持。

这种将数据按列组织、以批次为单位进行处理、并利用SIMD指令加速计算的方法,彻底改变了数据库内核的性能格局。它使得数据库系统能够以前所未有的效率,驾驭海量数据,满足日益增长的分析需求。

发表回复

您的邮箱地址不会被公开。 必填项已用 * 标注