在现代数据处理领域,性能是永恒的追求。随着数据量的爆炸式增长和分析型查询复杂度的提升,传统的关系型数据库系统在应对高吞吐量的扫描和聚合操作时,逐渐暴露出其固有的局限性。特别是在数据库内核层面,如何高效地组织和访问数据,以充分利用现代硬件的并行计算能力,是决定系统性能的关键。本讲座将深入探讨C++数据库内核中一种强大的技术——向量化存储(Vectorized Storage),以及它如何通过列式存储的原理,显著提升算子(operator)的扫描性能。
1. 性能瓶颈与传统存储的局限性
数据库系统面临的核心挑战之一,是如何在海量数据中快速定位、读取并处理所需的信息。对于联机事务处理(OLTP)工作负载,关注点是快速的单行插入、更新和删除,以及小范围的查询。而对于联机分析处理(OLAP)工作负载,其特点是扫描大量数据、执行复杂的聚合、连接和过滤操作。正是后者,对数据存储和处理效率提出了极高的要求。
1.1 CPU缓存与内存墙
现代CPU的速度远超内存。当CPU需要访问数据时,它会首先尝试从其多级缓存(L1、L2、L3)中获取。如果数据不在缓存中(缓存未命中),CPU就必须等待数据从主内存加载,这个过程被称为“内存墙”(Memory Wall)瓶颈。一次主内存访问可能需要数百个CPU周期,而一次缓存访问可能只需要几个周期。因此,优化数据访问模式,提高缓存命中率,是提升数据库性能的关键。
1.2 传统行式存储的问题
大多数传统关系型数据库采用行式存储(Row-Oriented Storage)模型。在这种模型中,一行数据的所有列值都连续地存储在一起。例如,一个包含id、name和value的表,在磁盘或内存中会按以下方式布局:
| Row 1 (id, name, value) | Row 2 (id, name, value) | Row 3 (id, name, value) | … |
用C++结构体表示,大致如下:
struct EmployeeRow {
int id;
char name[64]; // Fixed size for simplicity
double salary;
bool isActive;
};
当数据库系统需要执行一个分析型查询,例如“计算所有员工的平均工资”:SELECT AVG(salary) FROM Employees;
在这种行式存储下,即使查询只关心salary这一列,系统也必须加载每一行完整的EmployeeRow数据到内存中。这意味着id、name和isActive等无关数据也会被加载,占据宝贵的缓存空间。对于大型表,这将导致:
- 低缓存效率: 大量无关数据被加载到缓存,很快就会将真正需要的数据挤出,导致频繁的缓存未命中。
- 高I/O开销: 从磁盘读取的数据量远大于实际所需,增加I/O延迟。
- CPU利用率低下: CPU在处理每一行时,需要跳过不相关的列,导致非连续内存访问和分支预测失败,难以充分利用其向量化处理能力。
为了克服这些局限性,列式存储应运而生。
2. 列式存储:一种范式转变
列式存储(Column-Oriented Storage)是解决上述问题的一种根本性方法。顾名思义,它将同一列的所有值连续地存储在一起,而不是同一行的所有值。对于我们之前的EmployeeRow例子,数据布局将变成:
| ID Column (id1, id2, id3, …) | Name Column (name1, name2, name3, …) | Salary Column (salary1, salary2, salary3, …) | IsActive Column (isActive1, isActive2, isActive3, …) |
用C++的数据结构来模拟,可能看起来像这样:
// 假设数据存储在独立的向量中
std::vector<int> ids;
std::vector<std::string> names; // For variable-length strings, this is a simplification
std::vector<double> salaries;
std::vector<bool> activeStatuses;
2.1 列式存储的显著优势
列式存储带来了多方面的性能提升,尤其是在分析型工作负载下:
- 极高的缓存效率: 当查询仅涉及少数几列时,系统只需加载这些列的数据。例如,计算平均工资的查询只需加载
salaries向量。这使得所需数据在内存中高度集中,极大地提高了缓存命中率。 - 优异的压缩比: 同一列的数据通常具有相同的数据类型和相似的分布特征。这使得应用各种高效的压缩算法(如字典编码、行程长度编码 RLE、位图编码、增量编码等)变得非常有效。数据压缩不仅减少了存储空间,更重要的是减少了磁盘I/O和内存带宽的消耗。
- SIMD(Single Instruction, Multiple Data)友好: 列中的数据是同构且连续的,这正是现代CPU SIMD指令集(如SSE、AVX、AVX-512)的理想输入。SIMD指令允许CPU在一个时钟周期内对多个数据元素执行相同的操作,从而实现数据级并行。
- 减少I/O: 由于只读取相关列,并且数据经过高效压缩,从存储介质读取的数据量大大减少。
- 延迟物化(Late Materialization): 行式存储通常在查询处理的早期阶段就将所有列组合成完整的行。而列式存储可以延迟行的物化,直到所有必要的过滤和聚合操作都完成,甚至在需要将结果返回给用户时才重构行。
当然,列式存储并非万能药。对于需要频繁更新单行所有列的OLTP场景,或者需要频繁插入新行的场景,列式存储可能会引入额外的复杂性和开销,因为单行数据的修改可能涉及到多个独立列的更新。然而,对于OLAP场景,其优势是压倒性的。
3. 向量化处理:列式存储的引擎
仅仅将数据存储为列式结构还不足以实现极致性能。真正的性能飞跃来自于将数据处理方式从“一次一元组”(tuple-at-a-time)模式转变为“一次一批数据”(batch-at-a-time)或“向量化”(vectorized)模式。
3.1 什么是向量化处理?
向量化处理是指数据库算子不再逐行处理数据,而是接收一个包含多行数据的“向量”或“批次”(batch/chunk),并对整个批次的数据执行操作。例如,一个过滤算子不再逐行检查条件,而是接收一个包含1024个整数的列向量,然后一次性对这1024个整数应用过滤条件,并生成一个包含通过过滤的整数的新向量(或一个位图来指示哪些通过了)。
这种批处理的方式有以下几个核心优点:
- 减少函数调用开销: 传统模式下,每个元组都可能涉及一次函数调用,而向量化处理将多次操作合并为一次,显著减少了函数调用和虚函数调用的开销。
- 更好的缓存局部性: 处理一个数据批次时,相关数据被连续访问,进一步提高了缓存命中率。
- 充分利用SIMD指令: 批处理模式天然地与SIMD指令兼容。CPU可以并行处理批次中的多个数据元素。
- 更少的控制流分支: 传统处理可能在每行数据上都有条件判断(if-else),导致分支预测失败。向量化处理可以将条件判断转换为位图操作,减少分支。
一个典型的向量化批次(Chunk)可能包含数千到数万行数据。这个大小需要根据缓存大小、SIMD寄存器宽度和具体工作负载进行调整。
3.2 C++中向量化存储的数据结构
在C++数据库内核中实现向量化存储,我们需要设计能够高效存储和操作列数据的结构。
3.2.1 基础列抽象
首先,定义一个通用的列接口,以便处理不同数据类型的列。
#include <vector>
#include <string>
#include <memory> // For std::unique_ptr
#include <variant> // C++17 for heterogeneous types, or base class + derived classes
#include <iostream>
#include <numeric> // For std::iota
#include <algorithm> // For std::transform, std::copy_if etc.
// Optional: For handling NULLs using a bitmap
using Bitmap = std::vector<bool>; // Simplified, real bitmaps are packed bits
// Base class for a column
class Column {
public:
virtual ~Column() = default;
virtual size_t size() const = 0;
virtual std::string toString(size_t index) const = 0;
virtual std::unique_ptr<Column> filter(const Bitmap& selection_vector) const = 0;
virtual void appendNull(size_t count = 1) = 0; // For appending nulls
virtual bool isNull(size_t index) const = 0;
virtual void setNull(size_t index) = 0; // Set an existing entry to null
protected:
// A real implementation would have a nullable bitmap
Bitmap null_bitmap_; // True if value is NULL
};
// Typed column for integers
class IntColumn : public Column {
public:
IntColumn(std::vector<int> data) : data_(std::move(data)) {
null_bitmap_.resize(data_.size(), false);
}
IntColumn(std::vector<int> data, Bitmap null_bitmap)
: data_(std::move(data)), null_bitmap_(std::move(null_bitmap)) {}
size_t size() const override { return data_.size(); }
std::string toString(size_t index) const override {
if (isNull(index)) return "NULL";
return std::to_string(data_[index]);
}
std::unique_ptr<Column> filter(const Bitmap& selection_vector) const override {
std::vector<int> filtered_data;
Bitmap filtered_null_bitmap;
for (size_t i = 0; i < size(); ++i) {
if (selection_vector[i]) {
filtered_data.push_back(data_[i]);
filtered_null_bitmap.push_back(null_bitmap_[i]);
}
}
return std::make_unique<IntColumn>(std::move(filtered_data), std::move(filtered_null_bitmap));
}
// Accessors for specific types (downcasting needed, or use templates/variant)
int get(size_t index) const {
if (isNull(index)) throw std::runtime_error("Accessing NULL value");
return data_[index];
}
const std::vector<int>& getData() const { return data_; } // For SIMD operations
void appendNull(size_t count = 1) override {
for (size_t i = 0; i < count; ++i) {
data_.push_back(0); // Default value for NULL, actual value doesn't matter
null_bitmap_.push_back(true);
}
}
bool isNull(size_t index) const override {
if (index >= null_bitmap_.size()) return true; // Out of bounds is implicitly null
return null_bitmap_[index];
}
void setNull(size_t index) override {
if (index >= null_bitmap_.size()) {
// Handle error or resize if allowed
throw std::out_of_range("Index out of bounds for setting null.");
}
null_bitmap_[index] = true;
}
private:
std::vector<int> data_;
};
// Typed column for doubles
class DoubleColumn : public Column {
public:
DoubleColumn(std::vector<double> data) : data_(std::move(data)) {
null_bitmap_.resize(data_.size(), false);
}
DoubleColumn(std::vector<double> data, Bitmap null_bitmap)
: data_(std::move(data)), null_bitmap_(std::move(null_bitmap)) {}
size_t size() const override { return data_.size(); }
std::string toString(size_t index) const override {
if (isNull(index)) return "NULL";
return std::to_string(data_[index]);
}
std::unique_ptr<Column> filter(const Bitmap& selection_vector) const override {
std::vector<double> filtered_data;
Bitmap filtered_null_bitmap;
for (size_t i = 0; i < size(); ++i) {
if (selection_vector[i]) {
filtered_data.push_back(data_[i]);
filtered_null_bitmap.push_back(null_bitmap_[i]);
}
}
return std::make_unique<DoubleColumn>(std::move(filtered_data), std::move(filtered_null_bitmap));
}
double get(size_t index) const {
if (isNull(index)) throw std::runtime_error("Accessing NULL value");
return data_[index];
}
const std::vector<double>& getData() const { return data_; }
void appendNull(size_t count = 1) override {
for (size_t i = 0; i < count; ++i) {
data_.push_back(0.0);
null_bitmap_.push_back(true);
}
}
bool isNull(size_t index) const override {
if (index >= null_bitmap_.size()) return true;
return null_bitmap_[index];
}
void setNull(size_t index) override {
if (index >= null_bitmap_.size()) {
throw std::out_of_range("Index out of bounds for setting null.");
}
null_bitmap_[index] = true;
}
private:
std::vector<double> data_;
};
// For string columns, handling variable-length strings is more complex.
// Typically, it involves an offset array and a contiguous data buffer.
class StringColumn : public Column {
public:
StringColumn(std::vector<std::string> data) : data_strings_(std::move(data)) {
null_bitmap_.resize(data_strings_.size(), false);
}
StringColumn(std::vector<std::string> data, Bitmap null_bitmap)
: data_strings_(std::move(data)), null_bitmap_(std::move(null_bitmap)) {}
size_t size() const override { return data_strings_.size(); }
std::string toString(size_t index) const override {
if (isNull(index)) return "NULL";
return data_strings_[index];
}
std::unique_ptr<Column> filter(const Bitmap& selection_vector) const override {
std::vector<std::string> filtered_data;
Bitmap filtered_null_bitmap;
for (size_t i = 0; i < size(); ++i) {
if (selection_vector[i]) {
filtered_data.push_back(data_strings_[i]);
filtered_null_bitmap.push_back(null_bitmap_[i]);
}
}
return std::make_unique<StringColumn>(std::move(filtered_data), std::move(filtered_null_bitmap));
}
const std::string& get(size_t index) const {
if (isNull(index)) throw std::runtime_error("Accessing NULL value");
return data_strings_[index];
}
void appendNull(size_t count = 1) override {
for (size_t i = 0; i < count; ++i) {
data_strings_.push_back(""); // Default value for NULL
null_bitmap_.push_back(true);
}
}
bool isNull(size_t index) const override {
if (index >= null_bitmap_.size()) return true;
return null_bitmap_[index];
}
void setNull(size_t index) override {
if (index >= null_bitmap_.size()) {
throw std::out_of_range("Index out of bounds for setting null.");
}
null_bitmap_[index] = true;
}
private:
std::vector<std::string> data_strings_; // Simplified: in real systems, this would be `char*` buffer + offset array
};
关于NULL值的处理:
在实际的数据库系统中,Bitmap通常是一个位数组(std::vector<bool>在内部可能不是位数组,需要自定义Bitmask类),每个位代表一行数据是否为NULL。这种方式非常节省空间,并且可以与SIMD指令配合,高效地处理NULL值。
3.2.2 批次(Chunk)的表示
一个批次(Chunk)由多个列组成,代表了多行数据的一个逻辑子集。
class Chunk {
public:
Chunk() = default;
// Add a column to the chunk
void addColumn(std::unique_ptr<Column> col) {
if (!columns_.empty() && col->size() != columns_[0]->size()) {
throw std::invalid_argument("All columns in a chunk must have the same size.");
}
columns_.push_back(std::move(col));
}
// Get a column by index
const Column& getColumn(size_t col_idx) const {
if (col_idx >= columns_.size()) {
throw std::out_of_range("Column index out of range.");
}
return *columns_[col_idx];
}
// Get the number of rows in this chunk
size_t size() const {
if (columns_.empty()) return 0;
return columns_[0]->size();
}
// Get the number of columns
size_t numColumns() const {
return columns_.size();
}
// Print the chunk (for demonstration)
void print() const {
if (size() == 0) {
std::cout << "Empty Chunk" << std::endl;
return;
}
// Print header (assuming fixed column names for simplicity)
for (size_t col_idx = 0; col_idx < numColumns(); ++col_idx) {
std::cout << "Col" << col_idx << "t";
}
std::cout << std::endl;
for (size_t row_idx = 0; row_idx < size(); ++row_idx) {
for (size_t col_idx = 0; col_idx < numColumns(); ++col_idx) {
std::cout << getColumn(col_idx).toString(row_idx) << "t";
}
std::cout << std::endl;
}
}
private:
std::vector<std::unique_ptr<Column>> columns_;
};
4. 实现向量化扫描算子
数据库查询通常由一系列算子(如扫描、过滤、投影、连接、聚合)组成,它们形成一个查询计划树。在向量化处理模型中,这些算子不再处理单行数据,而是处理Chunk。
4.1 算子接口
定义一个通用的算子接口,其中最核心的方法是next(),它返回下一个处理过的Chunk。
// Base class for all query operators
class Operator {
public:
virtual ~Operator() = default;
// The core method: processes and returns the next chunk of data
virtual std::unique_ptr<Chunk> next() = 0;
};
4.2 表扫描算子(TableScanOperator)
TableScanOperator是查询计划的起点,负责从存储层读取数据并将其组织成Chunk。
// A simplified "Table" that holds all the data in columns
class Table {
public:
Table(std::vector<std::unique_ptr<Column>> columns) : columns_(std::move(columns)) {
if (!columns_.empty()) {
num_rows_ = columns_[0]->size();
for (const auto& col : columns_) {
if (col->size() != num_rows_) {
throw std::invalid_argument("All columns in a table must have the same size.");
}
}
} else {
num_rows_ = 0;
}
}
size_t numRows() const { return num_rows_; }
size_t numColumns() const { return columns_.size(); }
const Column& getColumn(size_t col_idx) const { return *columns_[col_idx]; }
private:
std::vector<std::unique_ptr<Column>> columns_;
size_t num_rows_;
};
// TableScanOperator: reads data from a table and produces chunks
class TableScanOperator : public Operator {
public:
TableScanOperator(const Table& table, size_t chunk_size)
: table_(table), chunk_size_(chunk_size), current_row_idx_(0) {
// We need to know which columns to project (return)
// For simplicity, let's assume it projects all columns initially
projected_column_indices_.reserve(table_.numColumns());
for (size_t i = 0; i < table_.numColumns(); ++i) {
projected_column_indices_.push_back(i);
}
}
// A more advanced scan would allow specifying projected columns
TableScanOperator(const Table& table, size_t chunk_size, std::vector<size_t> projected_cols)
: table_(table), chunk_size_(chunk_size), current_row_idx_(0),
projected_column_indices_(std::move(projected_cols)) {}
std::unique_ptr<Chunk> next() override {
if (current_row_idx_ >= table_.numRows()) {
return nullptr; // No more data
}
auto chunk = std::make_unique<Chunk>();
size_t rows_to_read = std::min(chunk_size_, table_.numRows() - current_row_idx_);
for (size_t col_idx : projected_column_indices_) {
// In a real system, this would involve reading from storage and
// creating a new column with a slice of data.
// For this example, we'll simulate by creating new column objects with copied data.
// This is NOT efficient for real systems but demonstrates the concept.
// Real systems would use views (std::span) or shared buffers.
const Column& source_col = table_.getColumn(col_idx);
// A more efficient way for primitive types is to get a pointer/span and create a new column
// For this example, we'll create new vectors for simplicity.
if (auto int_col = dynamic_cast<const IntColumn*>(&source_col)) {
std::vector<int> data_slice;
Bitmap null_slice;
for (size_t i = 0; i < rows_to_read; ++i) {
data_slice.push_back(int_col->getData()[current_row_idx_ + i]);
null_slice.push_back(int_col->isNull(current_row_idx_ + i));
}
chunk->addColumn(std::make_unique<IntColumn>(std::move(data_slice), std::move(null_slice)));
} else if (auto double_col = dynamic_cast<const DoubleColumn*>(&source_col)) {
std::vector<double> data_slice;
Bitmap null_slice;
for (size_t i = 0; i < rows_to_read; ++i) {
data_slice.push_back(double_col->getData()[current_row_idx_ + i]);
null_slice.push_back(double_col->isNull(current_row_idx_ + i));
}
chunk->addColumn(std::make_unique<DoubleColumn>(std::move(data_slice), std::move(null_slice)));
} else if (auto string_col = dynamic_cast<const StringColumn*>(&source_col)) {
std::vector<std::string> data_slice;
Bitmap null_slice;
for (size_t i = 0; i < rows_to_read; ++i) {
data_slice.push_back(string_col->get(current_row_idx_ + i)); // get() returns const ref
null_slice.push_back(string_col->isNull(current_row_idx_ + i));
}
chunk->addColumn(std::make_unique<StringColumn>(std::move(data_slice), std::move(null_slice)));
} else {
throw std::runtime_error("Unsupported column type during scan.");
}
}
current_row_idx_ += rows_to_read;
return chunk;
}
private:
const Table& table_;
size_t chunk_size_;
size_t current_row_idx_;
std::vector<size_t> projected_column_indices_;
};
注意: 上述TableScanOperator在next()中通过复制数据来创建新的Column对象。在生产级数据库内核中,为了避免数据复制的开销,通常会使用以下策略:
- 零拷贝(Zero-copy): 返回指向原始数据缓冲区的
std::span或自定义的ColumnView对象,这些对象不拥有数据,只是提供一个视图。 - 引用计数/共享指针: 对于需要修改的场景,可能使用
std::shared_ptr管理底层数据缓冲区,但尽量避免。 - 内存池: 从预分配的内存池中分配
Chunk和Column数据,减少堆分配开销。
4.3 过滤算子(FilterOperator)
过滤算子接收上游算子(例如TableScanOperator)生成的Chunk,应用谓词(predicate),并生成一个只包含满足条件的行的新Chunk。
// Predicate interface (for filtering)
class Predicate {
public:
virtual ~Predicate() = default;
// Evaluates the predicate on a column and returns a selection vector (bitmap)
// `column_idx` specifies which column to apply the predicate to within the chunk
virtual Bitmap evaluate(const Chunk& chunk) const = 0;
};
// Example: Integer equality predicate (e.g., col_id == value)
class IntEqualityPredicate : public Predicate {
public:
IntEqualityPredicate(size_t column_idx, int value)
: column_idx_(column_idx), value_(value) {}
Bitmap evaluate(const Chunk& chunk) const override {
Bitmap selection_vector(chunk.size(), false);
if (column_idx_ >= chunk.numColumns()) {
throw std::out_of_range("Predicate column index out of range.");
}
const Column& column = chunk.getColumn(column_idx_);
if (auto int_col = dynamic_cast<const IntColumn*>(&column)) {
const auto& data = int_col->getData();
for (size_t i = 0; i < chunk.size(); ++i) {
if (!int_col->isNull(i) && data[i] == value_) {
selection_vector[i] = true;
}
}
} else {
throw std::runtime_error("IntEqualityPredicate applied to non-integer column.");
}
return selection_vector;
}
private:
size_t column_idx_;
int value_;
};
// FilterOperator: applies a predicate to an incoming chunk
class FilterOperator : public Operator {
public:
FilterOperator(std::unique_ptr<Operator> source, std::unique_ptr<Predicate> predicate)
: source_(std::move(source)), predicate_(std::move(predicate)) {}
std::unique_ptr<Chunk> next() override {
std::unique_ptr<Chunk> input_chunk = source_->next();
if (!input_chunk) {
return nullptr; // No more data from source
}
// Evaluate the predicate on the input chunk to get a selection vector
Bitmap selection_vector = predicate_->evaluate(*input_chunk);
// Create a new chunk containing only the selected rows
auto filtered_chunk = std::make_unique<Chunk>();
for (size_t col_idx = 0; col_idx < input_chunk->numColumns(); ++col_idx) {
const Column& source_col = input_chunk->getColumn(col_idx);
// The filter method in Column takes care of creating a new column with selected rows
filtered_chunk->addColumn(source_col.filter(selection_vector));
}
// If no rows passed the filter, we might return an empty chunk or recurse to get the next non-empty chunk
if (filtered_chunk->size() == 0) {
// Option 1: Return empty chunk (might be inefficient if many empty chunks are returned)
// return filtered_chunk;
// Option 2: Recursively call next() to get the next non-empty chunk
return next();
}
return filtered_chunk;
}
private:
std::unique_ptr<Operator> source_;
std::unique_ptr<Predicate> predicate_;
};
示例查询流程:
假设我们有一个Employees表,包含id、name和salary列。我们想查找salary大于50000的员工。
// 1. Create the table data
std::vector<int> ids = {1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15};
std::vector<std::string> names = {"Alice", "Bob", "Charlie", "David", "Eve", "Frank", "Grace", "Heidi", "Ivan", "Judy", "Kyle", "Liam", "Mia", "Noah", "Olivia"};
std::vector<double> salaries = {45000.0, 60000.0, 50000.0, 75000.0, 48000.0, 90000.0, 55000.0, 42000.0, 62000.0, 51000.0, 80000.0, 49000.0, 70000.0, 53000.0, 47000.0};
// Introduce some NULLs for demonstration
Bitmap salary_nulls(salaries.size(), false);
salary_nulls[2] = true; // Charlie's salary is NULL
salaries[2] = 0.0; // Placeholder for NULL
std::vector<std::unique_ptr<Column>> table_columns;
table_columns.push_back(std::make_unique<IntColumn>(std::move(ids)));
table_columns.push_back(std::make_unique<StringColumn>(std::move(names)));
table_columns.push_back(std::make_unique<DoubleColumn>(std::move(salaries), std::move(salary_nulls)));
Table employees_table(std::move(table_columns));
// 2. Define the query plan: TableScan -> Filter
// We want to filter on salary (column index 2)
size_t salary_col_idx = 2;
double filter_value = 50000.0;
// Predicate: salary > 50000 (need to define a greater-than predicate)
class DoubleGreaterThanPredicate : public Predicate {
public:
DoubleGreaterThanPredicate(size_t column_idx, double value)
: column_idx_(column_idx), value_(value) {}
Bitmap evaluate(const Chunk& chunk) const override {
Bitmap selection_vector(chunk.size(), false);
const Column& column = chunk.getColumn(column_idx_);
if (auto double_col = dynamic_cast<const DoubleColumn*>(&column)) {
const auto& data = double_col->getData();
for (size_t i = 0; i < chunk.size(); ++i) {
if (!double_col->isNull(i) && data[i] > value_) {
selection_vector[i] = true;
}
}
} else {
throw std::runtime_error("DoubleGreaterThanPredicate applied to non-double column.");
}
return selection_vector;
}
private:
size_t column_idx_;
double value_;
};
// 3. Construct the operator tree
// Scan all columns (0, 1, 2)
std::vector<size_t> projected_cols = {0, 1, 2};
auto scan_op = std::make_unique<TableScanOperator>(employees_table, 5 /* chunk size */, projected_cols);
auto filter_predicate = std::make_unique<DoubleGreaterThanPredicate>(salary_col_idx, filter_value);
auto filter_op = std::make_unique<FilterOperator>(std::move(scan_op), std::move(filter_predicate));
// 4. Execute the query
std::cout << "Employees with salary > " << filter_value << ":" << std::endl;
while (auto result_chunk = filter_op->next()) {
result_chunk->print();
}
5. 利用SIMD指令提升极致性能
向量化处理的真正威力在于其能够充分利用现代CPU的SIMD指令集。SIMD允许CPU的单个指令同时操作多个数据元素。例如,一个SIMD寄存器可以存储4个int或2个double,一条指令就可以对这4个int或2个double进行加法、比较等操作。
5.1 SIMD基础
- 寄存器: x86架构有SSE(128位)、AVX(256位)、AVX-512(512位)等SIMD寄存器。
- 数据类型: 对应有
__m128i(128位整数)、__m128d(128位双精度浮点)、__m256i、__m256d等。 - 内在函数(Intrinsics): C++编译器提供了一系列内在函数,允许直接调用SIMD指令,例如
_mm_loadu_si128(加载128位整数)、_mm_cmpeq_epi32(比较128位整数是否相等)、_mm_and_si128(按位与)。
5.2 SIMD加速的过滤操作示例
考虑一个整数列的过滤操作:column_value == target_value。使用SIMD可以大大加速这个过程。
#include <immintrin.h> // For SSE/AVX intrinsics
// SIMD-accelerated IntEqualityPredicate for AVX2 (256-bit registers)
class IntEqualityPredicateSIMD : public Predicate {
public:
IntEqualityPredicateSIMD(size_t column_idx, int value)
: column_idx_(column_idx), value_(value) {}
Bitmap evaluate(const Chunk& chunk) const override {
Bitmap selection_vector(chunk.size(), false);
if (column_idx_ >= chunk.numColumns()) {
throw std::out_of_range("Predicate column index out of range.");
}
const Column& column = chunk.getColumn(column_idx_);
if (auto int_col = dynamic_cast<const IntColumn*>(&column)) {
const std::vector<int>& data = int_col->getData();
size_t num_elements = data.size();
// Load the target value into an AVX2 register, replicating it across all 8 32-bit lanes
__m256i target_vec = _mm256_set1_epi32(value_);
// Process data in chunks of 8 integers (since AVX2 operates on 256 bits, and int is 32 bits)
size_t i = 0;
for (; i + 7 < num_elements; i += 8) {
// Load 8 integers from the data vector
__m256i data_vec = _mm256_loadu_si256(reinterpret_cast<const __m256i*>(&data[i]));
// Perform the equality comparison: result is a mask where 0xFFFFFFFF means true, 0x00000000 means false
__m256i cmp_result = _mm256_cmpeq_epi32(data_vec, target_vec);
// Convert the comparison result mask to a bitmask (1 for true, 0 for false)
// This converts each 32-bit lane's MSB into a bit in a 8-bit integer
int mask = _mm256_movemask_epi8(cmp_result); // Each 32-bit lane gets its MSB moved.
// For cmpeq_epi32, if equal, all bits are 1, so MSB is 1.
// For cmpeq_epi32, if not equal, all bits are 0, so MSB is 0.
// We need to extract the 8-bit mask representing 8 integers.
// Correct approach: `_mm256_movemask_ps` for floats, or
// custom bit extraction for integers.
// A simpler way often involves `_mm256_testz_si256` or similar,
// or just using the comparison mask directly for further operations.
// For simple boolean output, we need to convert the 32-bit mask per lane
// into a single bit per lane.
// Example: (mask & (1 << (j * 4 + 3))) != 0 for j-th 32-bit element.
// A more direct way to get a bitmask per 32-bit element is:
unsigned int comparison_mask = 0;
for (int j = 0; j < 8; ++j) {
if ((_mm256_extract_epi32(cmp_result, j) & 0xFFFFFFFF) == 0xFFFFFFFF) { // Check if all bits are set (true)
comparison_mask |= (1 << j);
}
}
for (int j = 0; j < 8; ++j) {
if ((comparison_mask >> j) & 1) { // If the j-th element was true
if (!int_col->isNull(i + j)) { // Also check for NULLs
selection_vector[i + j] = true;
}
}
}
}
// Handle remaining elements (if num_elements is not a multiple of 8)
for (; i < num_elements; ++i) {
if (!int_col->isNull(i) && data[i] == value_) {
selection_vector[i] = true;
}
}
} else {
throw std::runtime_error("IntEqualityPredicateSIMD applied to non-integer column.");
}
return selection_vector;
}
private:
size_t column_idx_;
int value_;
};
SIMD代码的复杂性与考量:
- 平台相关性: SIMD指令集(SSE, AVX, AVX-512, ARM SVE)是平台特定的。代码需要针对不同的CPU架构进行优化或提供回退方案。
- 对齐:
_mm256_loadu_si256是非对齐加载。如果数据能够保证对齐(例如,使用_mm_malloc或C++17std::aligned_alloc),可以使用对齐加载指令(如_mm256_load_si256),它们通常更快。 - NULL值处理: 上述示例在SIMD循环后依然检查NULLs。更优化的方式是将NULL位图也加载到SIMD寄存器中,并与比较结果进行位运算。
- 编译器自动向量化: 现代C++编译器(GCC, Clang, MSVC)在开启优化(如
-O3)时,能够自动识别一些简单的循环并将其向量化。但对于复杂的数据结构和逻辑,手动使用内在函数通常能获得更好的控制和性能。 - 可读性和维护性: SIMD内在函数代码通常比普通的C++代码更难读懂和维护。
5.3 生产级SIMD库
为了简化SIMD编程,许多库被开发出来,例如:
- Intel ISPC (Implicit SPMD Program Compiler): 专门为CPU SIMD编程设计的编译器。
- Vector Class Library (VCL): 一个C++模板库,提供了一个面向对象的SIMD接口。
- xsimd: 另一个C++模板库,支持多种SIMD指令集。
- Eigen: 一个用于线性代数的C++模板库,其内部也大量使用了SIMD优化。
这些库提供了更高级的抽象,使得开发者可以编写更接近常规C++代码的SIMD优化代码,同时保持高性能。
6. 向量化存储与处理的进阶话题
6.1 混合存储(Hybrid Storage)
在实际系统中,纯粹的行式或列式存储都有其局限性。许多现代数据库系统采用混合存储模型:
- 行式存储(或小批量的行式存储) 用于最近写入的少量数据,以支持快速的OLTP写入和单行读取。
- 列式存储 用于历史数据或冷数据,以优化OLAP查询性能。
- 数据会周期性地从行式存储区域迁移到列式存储区域。
6.2 压缩技术
列式存储为高效压缩提供了天然的优势。除了前面提到的,还有:
- 字典编码(Dictionary Encoding): 对于低基数(distinct值少)的列(如枚举类型、国家名称),可以将所有唯一值存储在一个字典中,列中存储的只是字典的索引。
- Delta编码(Delta Encoding): 对于递增或递减的序列(如时间戳、ID),只存储相邻值之间的差值,这些差值通常更小,更易于压缩。
- 位图索引(Bitmap Index): 对于低基数离散值,可以为每个唯一值创建一个位图,快速进行过滤操作。
| 压缩技术 | 适用场景 | 优势 |
|---|---|---|
| 字典编码 | 低基数字符串、枚举 | 节省空间,加速字符串比较 |
| 行程长度编码 | 包含大量重复值的列 | 极大压缩连续重复值 |
| Bitpacking | 小范围整数(如索引、delta值) | 精确分配位,极致压缩 |
| Delta编码 | 有序序列(时间戳、ID) | 减小存储值范围,利于其他压缩 |
| LZ4/Snappy等通用 | 无明显模式的混合数据,或作为二级压缩 | 速度快,适用于各种数据 |
6.3 延迟物化(Late Materialization)
延迟物化是向量化处理的一个重要优化。在列式数据库中,直到查询的最后阶段,例如需要将结果返回给客户端时,才将不同列的数据组合成完整的行。在此之前,所有中间操作(过滤、聚合、连接)都直接在列数据上执行,避免了不必要的行重构开销,进一步提升性能。
6.4 JIT编译(Just-In-Time Compilation)
一些高性能数据库系统(如HyPer、Vectorwise)利用JIT编译技术。它们在运行时根据具体的查询计划,生成高度优化的机器码。这使得数据库能够为每个查询量身定制执行逻辑,包括:
- 消除虚函数调用: 直接调用具体实现。
- 常量折叠和传播: 在编译时处理已知常量。
- 循环展开和SIMD指令生成: 编译器可以更好地利用JIT信息生成高效的向量化代码。
- 谓词融合: 将多个过滤条件合并到一个循环中,减少数据遍历次数。
6.5 内存管理
对于高性能数据库内核,定制的内存分配器是必不可少的。标准库的std::allocator或new/delete可能存在碎片化、锁竞争和分配/释放开销过大的问题。
- Arena Allocator / Bump Allocator: 预先分配一大块内存,然后通过简单地移动指针来快速分配小块内存。非常适合生命周期相似的临时对象(如查询中间结果的
Chunk)。 - Fixed-size Block Allocator: 对于固定大小的
Column对象或Chunk对象,可以实现专用的分配器。 - Huge Pages: 利用操作系统的巨页功能,减少TLB(Translation Lookaside Buffer)未命中,提高内存访问效率。
7. 挑战与考量
尽管向量化存储和处理在OLAP场景下优势显著,但也存在一些挑战:
- 更新和删除: 在列式存储中更新或删除单行数据可能效率低下,因为它可能需要修改多个独立的列。常见的策略是使用多版本并发控制(MVCC)和“追加式”(append-only)存储,通过标记旧版本为无效并追加新版本来实现逻辑更新。
- 事务处理(OLTP): 对于高并发的单行读写事务,行式存储通常更优。列式存储通常更适合批量的、只读的分析查询。
- 模式演化: 增加新列或修改现有列的类型可能需要重新组织数据,这在列式存储中可能更复杂。
- 实现复杂性: 向量化算子、SIMD优化和高级内存管理增加了数据库内核的实现复杂性。
8. 展望
向量化存储和处理是现代分析型数据库系统性能的基石。从开源项目如Apache Arrow、ClickHouse,到商业系统如Snowflake、Google Dremel,都广泛采用了这些技术。未来,随着新的硬件架构(如ARM SVE、TPU等)和指令集不断涌现,数据库内核将继续演进,更紧密地与硬件协同,实现更高的数据处理吞吐量。同时,向量化技术也将与机器学习、人工智能等领域更紧密地结合,为复杂的数据科学工作负载提供强大的底层支持。
这种将数据按列组织、以批次为单位进行处理、并利用SIMD指令加速计算的方法,彻底改变了数据库内核的性能格局。它使得数据库系统能够以前所未有的效率,驾驭海量数据,满足日益增长的分析需求。