各位开发者、架构师以及对高性能系统充满热情的同仁们,大家好!
今天,我们将深入探讨向量数据库内核的构建艺术,特别是如何基于HNSW(Hierarchical Navigable Small World)算法,从底层搭建高效的索引结构,并在此基础上实现卓越的多线程检索优化。这是一个既充满理论深度又极具工程实践价值的课题,它不仅是现代AI应用,如语义搜索、推荐系统、图像识别等的核心驱动力,更是我们理解如何在海量高维数据中迅速定位“相似”信息的一扇窗户。
1. 向量数据库:连接数字世界的语义桥梁
在当今数据爆炸的时代,传统的关系型数据库在处理高维、非结构化数据,特别是需要进行“语义相似性”搜索的场景时,显得力不从心。例如,用户输入一个自然语言查询,我们希望找到语义最接近的文档,而不是简单地匹配关键词;或者给定一张图片,我们想找出数据库中最相似的图片。这些需求的核心在于理解数据的“含义”或“特征”,并将其转化为可在高维空间中度量的数学表示——向量。
向量数据库应运而生,它专门设计用于存储、索引和查询这些高维向量。其核心能力在于执行“近似最近邻”(Approximate Nearest Neighbor, ANN)搜索。与精确最近邻(Exact Nearest Neighbor, ENN)搜索不同,ANN算法牺牲了一点点精度,以换取在海量数据上的巨大性能提升。在大多数实际应用中,这种微小的精度损失是完全可以接受的,因为我们通常只需要“足够好”的相似结果。
在众多ANN算法中,HNSW以其卓越的性能和相对较高的召回率脱颖而出,成为了业界构建向量索引的优选方案之一。它巧妙地结合了图结构和多层级设计,使得在高维空间中的搜索效率得到了显著提升。
2. HNSW算法:高维空间导航的艺术
HNSW,全称Hierarchical Navigable Small World,是一种基于图的ANN搜索算法。它的核心思想是构建一个多层级的“小世界网络”,其中每一层都是一个图,上层图连接的节点更少,但连接的距离更远(跳数更少),提供了一个粗粒度的搜索路径;下层图连接的节点更多,提供更精细的搜索。通过这种层级结构,HNSW能够在搜索时快速收敛到目标区域,然后在局部进行精细搜索,从而实现高效的查询。
2.1 HNSW的核心概念与构建原理
Navigable Small World (NSW) 图: HNSW的基础是NSW图。在一个NSW图中,任意两个节点之间都存在一条相对较短的路径。通过随机或启发式地建立连接,NSW图能够模拟“小世界效应”,即在社交网络中,任意两个人之间只需经过很少的中间人就能建立联系。在ANN搜索中,这意味着我们可以通过较少的“跳跃”找到相似的向量。
层级结构: HNSW的关键创新在于引入了层级结构。每个向量在被添加到索引时,会被随机分配一个层级 L。这个向量将存在于从第 0 层到第 L 层的所有层中。
- 第 0 层 (Base Layer): 包含所有向量,连接最稠密,用于进行最精细的局部搜索。
- 更高层 (Top Layers): 包含更少的向量(只有那些被分配到较高层级的向量),连接稀疏,用于进行快速的全局导航,跳过大量不相关的区域。
图的构建(向量插入过程):
当一个新向量 q 被插入HNSW索引时,其过程如下:
- 随机分配层级: 为新向量
q随机分配一个最大层级L_q。这个层级通常通过一个概率函数(如exp(-lambda * L))生成,使得高层级的节点数量呈指数级下降。 - 寻找入口点: 索引维护一个全局的“入口点”(
entry_point),通常是第一个插入的向量或最近一次插入的最高层级向量。从entry_point开始,在当前最高层级max_level_进行贪婪搜索,找到与q最近的节点e_q。 - 层级下降搜索: 从
max_level_到L_q + 1,逐层下降。在每一层,从上一层找到的最近邻e_l开始,进行贪婪搜索(searchLayer),找到efConstruction个与q最接近的节点作为候选集W。然后,将W中距离q最近的节点作为下一层的入口点e_{l-1}。这一步的目标是为新节点在所有层上找到一个合适的“落脚点”。 - 连接建立: 对于从
L_q层到0层,逐层进行操作:- 在当前层
l,从e_l开始,进行贪婪搜索,找到efConstruction个最近邻节点。 - 从这
efConstruction个候选节点中,选择M个(或M_max个,对于第0层可能是M_max0个)距离q最近的节点作为q的邻居。 - 将
q与这些选定的邻居建立连接。 - 同时,对于这些被选为
q邻居的节点p,如果p的邻居数量超过了M(或M_max),则需要进行“邻居修剪”(pruneConnections),移除其中距离p最远的邻居,以保持图的稀疏性和效率。
- 在当前层
- 更新入口点: 如果
L_q大于当前的max_level_,则更新entry_point为q,并更新max_level_。
图的查询(向量检索过程):
当一个查询向量 q_query 到来时,其搜索过程与插入过程类似:
- 寻找入口点: 从
entry_point开始,在当前最高层级max_level_进行贪婪搜索,找到与q_query最近的节点e_q。 - 层级下降搜索: 从
max_level_到1层,逐层下降。在每一层,从上一层找到的最近邻e_l开始,进行贪婪搜索(searchLayer),找到efSearch个与q_query最接近的节点作为候选集W。然后,将W中距离q_query最近的节点作为下一层的入口点e_{l-1}。 - 最终搜索: 在第
0层,从e_0开始,进行贪婪搜索,找到efSearch个最近邻节点。 - 返回结果: 从这
efSearch个节点中,选出距离q_query最近的k个节点作为最终的搜索结果。
2.2 HNSW的关键参数
HNSW的性能和精度受到几个关键参数的影响:
| 参数名称 | 描述 | 影响 |
|---|---|---|
M |
每个节点在构建时,在任何一层中最多建立的邻居连接数。通常建议在 10-20 之间。 |
索引质量与内存: 越大,邻居连接越多,搜索路径选择更多,召回率可能更高,但内存消耗和构建时间增加。 |
M_max |
内部参数,通常是 2 * M。在构建过程中,节点可能暂时拥有多达 M_max 个邻居,之后会被修剪到 M 个。 |
构建效率: 影响邻居选择和修剪的复杂性。 |
M_max0 |
第 0 层每个节点的最大邻居连接数。通常是 2 * M,可以更大,因为第0层是所有节点的基础层。 |
第0层密度: 越大,第0层连接越密集,对召回率影响最大,但也会增加内存消耗和搜索时间。 |
efConstruction |
索引构建时,在 searchLayer 阶段,从当前层返回的最近邻候选集的数量。通常建议在 100-2000 之间。 |
构建质量与时间: 越大,构建过程中的搜索范围越广,找到更优的邻居连接的可能性越大,索引质量和召回率越高,但构建时间显著增加。 |
efSearch |
查询时,在 searchLayer 阶段,从当前层返回的最近邻候选集的数量。通常建议在 10-1000 之间,可动态调整。 |
搜索速度与召回率: 越大,搜索范围越广,召回率越高,但搜索时间增加。可在运行时调整,以平衡查询速度和精度。 |
max_levels |
整个索引的最大层级数。通常通过 log(num_elements) / log(M) 来估算,并结合 lambda 参数进行概率分配。 |
层级深度: 影响索引的层级结构深度。过深可能导致不必要的层级,过浅可能导致高层导航效率不足。通常不是直接设置,而是由 lambda 和元素数量隐式决定。 |
lambda (或 mL) |
用于控制节点层级分配的指数分布参数。mL = 1/log(M) 是一个常用值,使得 exp(-lambda * L) 能够有效地控制高层节点数量。 |
层级分布: 决定了HNSW中层级节点的分布密度。lambda 值越小,高层节点越多,高层导航能力越强,但层级构建和内存消耗也相应增加。通常 1/log(M) 是一个较好的经验值,使得在每层平均 M 个邻居的情况下,高层节点数量呈合理下降。 |
这些参数之间的权衡是构建高效HNSW索引的关键。M 控制了图的稠密程度,efConstruction 影响了构建质量,而 efSearch 则直接影响查询时的速度和召回率。
3. C++ 向量数据库内核架构设计
为了实现一个高性能的向量数据库内核,我们需要精心设计其C++架构。以下是核心组件和它们之间的协作关系:
3.1 核心组件
-
VectorStore:- 职责: 负责实际的高维向量数据的存储和管理。它不直接参与HNSW图的构建,而是作为HNSW索引的底层数据源。
- 接口: 提供
getVector(id)方法,根据向量ID获取原始的float*或std::vector<float>数据。 - 实现考量: 考虑到内存效率,向量数据通常存储为连续的
float数组。可以使用std::vector<float>的二维结构,或者更高效地,将所有向量拼接成一个大的float数组,通过偏移量访问。
-
DistanceMetric:- 职责: 定义向量之间距离的计算方法。HNSW依赖于这个距离函数来评估向量的相似性。
- 接口: 抽象基类,提供
virtual float calculate(const float* vec1, const float* vec2, size_t dim) const = 0;纯虚函数。 - 实现考量:
- L2 Distance (Euclidean Distance):
sum_sq = sum((vec1[i] - vec2[i])^2)。通常返回平方L2距离以避免开根号,因为比较大小时平方距离和实际距离的顺序是一致的。 - Cosine Similarity:
dot_product(vec1, vec2) / (norm(vec1) * norm(vec2))。通常转换为距离形式1 - cosine_similarity。 - 性能优化: 距离计算是热点代码,应考虑SIMD指令(如SSE/AVX)优化。
- L2 Distance (Euclidean Distance):
-
HNSWNode:- 职责: 代表HNSW图中的一个节点,存储其原始向量ID和在不同层级的邻居连接信息。
-
数据结构:
struct HNSWNode { size_t id; // 原始向量在VectorStore中的ID int max_level; // 该节点被分配到的最高层级 std::vector<std::vector<int>> neighbors; // neighbors[level] -> list of neighbor IDs // 构造函数 HNSWNode(size_t node_id, int node_max_level, int M_max_per_layer) : id(node_id), max_level(node_max_level) { // neighbors[0] to neighbors[max_level] neighbors.resize(node_max_level + 1); // 预留空间,避免频繁realloc for (int i = 0; i <= node_max_level; ++i) { neighbors[i].reserve(M_max_per_layer); } } // 线程安全访问邻居列表的局部锁 (如果需要细粒度锁) // mutable std::mutex node_mutex; }; - 考量:
neighbors使用std::vector<int>存储邻居ID。如果需要细粒度锁定,每个节点可以有一个自己的std::mutex,但会增加内存开销和锁竞争复杂性。
-
HNSWIndex:- 职责: 封装HNSW算法的所有核心逻辑,包括图的构建、向量插入、最近邻搜索等。
- 内部数据:
std::vector<std::unique_ptr<HNSWNode>> nodes;:存储所有HNSW节点。unique_ptr管理内存。std::atomic<int> entry_point_id;:当前索引的全局入口点ID,原子操作保证线程安全。std::atomic<int> current_max_level;:当前索引中所有节点达到的最高层级,原子操作保证线程安全。std::unique_ptr<DistanceMetric> dist_metric;:距离计算器。- HNSW参数:
M,efConstruction,efSearch,max_connections_0,max_connections_other,lambda等。 std::mutex graph_modification_mutex;:用于保护整个图的结构性修改(如nodes容器的push_back或resize),以及entry_point_id和current_max_level的非原子性更新。std::mt19937 rng;:随机数生成器,用于层级分配。std::uniform_real_distribution<> level_dist;:层级分配的概率分布。
- 核心方法:
addVector(size_t id, const float* vector_data);:插入新向量到HNSW索引。std::vector<size_t> searchKnn(const float* query_vector, size_t k, int ef) const;:执行K近邻搜索。- 内部辅助方法:
getRandomLevel():根据概率生成节点层级。searchLayer(...):在指定层级进行贪婪搜索。selectNeighbors(...):从候选集中选择最佳邻居。pruneConnections(...):修剪邻居,保持连接数限制。
-
ThreadPool:- 职责: 管理一组工作线程,用于异步执行任务,实现多线程并发。
- 接口: 提供
enqueue(task)方法,将任务提交到队列,并返回std::future以获取结果。 - 实现考量: 经典的生产者-消费者模型,使用
std::queue存储任务,std::mutex保护队列,std::condition_variable进行线程同步。
-
QueryExecutor:- 职责: 作为对外暴露的接口层,接收查询请求,调度
HNSWIndex的searchKnn方法,并将结果返回。可以利用ThreadPool来并发处理多个查询。
- 职责: 作为对外暴露的接口层,接收查询请求,调度
3.2 架构概览
graph TD
A[Client Application] --> B[QueryExecutor];
B --> C[ThreadPool];
C --> D[HNSWIndex];
D --> E[VectorStore];
D --> F[DistanceMetric];
D --> G[HNSWNode];
E -- vector data --> D;
F -- distance calculations --> D;
G -- graph structure --> D;
4. 实施细节:核心HNSW组件的C++代码示例
我们将聚焦于 DistanceMetric、HNSWNode 和 HNSWIndex 的关键部分。为了保持简洁,VectorStore 将被抽象为 HNSWIndex 内部的一个方法 getVectorData。
4.1 距离度量 (DistanceMetric)
#include <vector>
#include <cmath>
#include <memory>
#include <queue>
#include <random>
#include <atomic>
#include <mutex>
#include <thread>
#include <future>
#include <functional>
#include <unordered_set>
#include <algorithm> // For std::sort
// 抽象距离度量基类
class DistanceMetric {
public:
// 计算两个向量之间的距离
// 返回值通常是距离的平方,以避免开根号,提高性能
virtual float calculate(const float* vec1, const float* vec2, size_t dim) const = 0;
virtual ~DistanceMetric() = default;
};
// L2距离(欧氏距离)的平方实现
class L2SquaredDistance : public DistanceMetric {
public:
float calculate(const float* vec1, const float* vec2, size_t dim) const override {
float sum_sq = 0.0f;
for (size_t i = 0; i < dim; ++i) {
float diff = vec1[i] - vec2[i];
sum_sq += diff * diff;
}
return sum_sq; // 返回平方L2距离
}
};
// 余弦相似度转换为距离实现 (1 - similarity)
class CosineDistance : public DistanceMetric {
public:
float calculate(const float* vec1, const float* vec2, size_t dim) const override {
float dot_product = 0.0f;
float norm_vec1 = 0.0f;
float norm_vec2 = 0.0f;
for (size_t i = 0; i < dim; ++i) {
dot_product += vec1[i] * vec2[i];
norm_vec1 += vec1[i] * vec1[i];
norm_vec2 += vec2[i] * vec2[i];
}
if (norm_vec1 == 0.0f || norm_vec2 == 0.0f) {
// 处理零向量情况,可以返回最大距离或特定值
return 1.0f; // 假设距离为1.0f (不相似)
}
float similarity = dot_product / (std::sqrt(norm_vec1) * std::sqrt(norm_vec2));
return 1.0f - similarity; // 将相似度转换为距离
}
};
4.2 HNSW节点 (HNSWNode)
// HNSW图中的一个节点
struct HNSWNode {
size_t id; // 原始向量在VectorStore中的ID
int max_level; // 该节点被分配到的最高层级
// 存储邻居ID。neighbors[level] 是一个vector<int>
// mutable 用于在const方法中修改(如果需要细粒度锁)
std::vector<std::vector<int>> neighbors;
// 构造函数
HNSWNode(size_t node_id, int node_max_level, int M_max_per_layer)
: id(node_id), max_level(node_max_level) {
neighbors.resize(node_max_level + 1); // 从0到max_level
for (int i = 0; i <= node_max_level; ++i) {
neighbors[i].reserve(M_max_per_layer); // 预留空间
}
}
};
4.3 HNSW索引 (HNSWIndex)
class HNSWIndex {
public:
HNSWIndex(size_t dim, std::unique_ptr<DistanceMetric> metric,
int M, int efConstruction, int efSearch, float level_mult);
// 添加向量到索引
void addVector(size_t id, const float* vector_data);
// 搜索K近邻
std::vector<size_t> searchKnn(const float* query_vector, size_t k, int ef) const;
private:
size_t dimension;
std::unique_ptr<DistanceMetric> dist_metric;
int M_; // 每个节点在各层中的最大邻居数
int M_max0_; // 第0层每个节点的最大邻居数 (通常是2*M_)
int M_max_other_layers_;// 其他层每个节点的最大邻居数 (通常是M_)
int ef_construction_; // 构建时的搜索范围参数
int ef_search_; // 搜索时的搜索范围参数
float level_multiplier_;// 用于随机层级分配的参数,通常是1/log(M_)
std::vector<std::unique_ptr<HNSWNode>> nodes_; // 存储HNSWNode对象
// 注意:实际的向量数据需要一个单独的VectorStore来管理
// 为了简化示例,我们假设有一个内部函数可以根据ID获取向量数据
// std::vector<std::vector<float>> raw_vectors_; // 实际项目中应由VectorStore管理
// 假设这是获取原始向量数据的接口
// 在实际项目中,这会通过VectorStore::getVectorData(id)实现
const float* getVectorData(size_t node_id) const {
// 示例:这里只是一个占位符,实际需要从一个全局的向量存储中获取
// 在实际应用中,nodes_[node_id]->data 应该指向存储在VectorStore中的向量
// 这里为了演示,我们假设nodes_的ID与raw_vectors_的索引一致
if (node_id >= raw_vectors_.size()) return nullptr; // 边界检查
return raw_vectors_[node_id].data();
}
// 临时存储原始向量数据,以便示例运行
// 实际项目中应由 VectorStore 管理
std::vector<std::vector<float>> raw_vectors_;
std::atomic<size_t> next_vector_id_; // 用于分配新的向量ID
std::atomic<int> entry_point_id_; // 当前索引的入口点ID
std::atomic<int> current_max_level_; // 当前索引中节点达到的最高层级
mutable std::mutex graph_modification_mutex_; // 保护图结构修改(如nodes_增删)
std::mt19937 rng_; // 随机数生成器
std::uniform_real_distribution<> level_dist_; // 层级分配的概率分布
// 内部辅助函数
int getRandomLevel();
// searchLayer: 在指定层级进行贪婪搜索
// 返回一个优先级队列,包含(距离, 节点ID)对,按距离从小到大排序
std::priority_queue<std::pair<float, int>, std::vector<std::pair<float, int>>, std::greater<std::pair<float, int>>>
searchLayer(const float* query_vec, int entry_point, int layer, int ef_param) const;
// selectNeighbors: 从候选集中选择最佳邻居
// candidates: 优先级队列,包含(距离, 节点ID)对
// M_limit: 允许的最大邻居数
// level: 当前层级
// result_neighbors: 存储选定邻居ID的vector
void selectNeighbors(
std::priority_queue<std::pair<float, int>, std::vector<std::pair<float, int>>, std::greater<std::pair<float, int>>>& candidates,
int M_limit, int level, std::vector<int>& result_neighbors) const;
// pruneConnections: 修剪节点的邻居列表
void pruneConnections(HNSWNode* node, int M_limit, int level);
};
HNSWIndex::HNSWIndex(size_t dim, std::unique_ptr<DistanceMetric> metric,
int M, int efConstruction, int efSearch, float level_mult)
: dimension(dim), dist_metric(std::move(metric)),
M_(M),
M_max0_(M * 2), // 第0层通常允许更多连接
M_max_other_layers_(M), // 其他层保持M个
ef_construction_(efConstruction), ef_search_(efSearch),
level_multiplier_(level_mult),
entry_point_id_(-1), current_max_level_(-1),
next_vector_id_(0), // 初始化下一个向量ID
rng_(std::random_device{}()), // 使用随机设备初始化随机数生成器
level_dist_(0.0, 1.0) // [0.0, 1.0) 均匀分布
{
// 如果M_max0_与M_max_other_layers_相同,则不需要区分
// 确保level_multiplier_是正数,通常是 1.0 / log(M_)
if (level_multiplier_ <= 0) {
throw std::invalid_argument("Level multiplier must be positive.");
}
}
int HNSWIndex::getRandomLevel() {
// 概率分布:P(L) = exp(-L / level_multiplier_) * (1 - exp(-1 / level_multiplier_))
// 这里的实现简化为 -log(rand()) * level_multiplier_
// 使得高层级的节点更少
double r = level_dist_(rng_);
return static_cast<int>(-std::log(r) * level_multiplier_);
}
// searchLayer实现
std::priority_queue<std::pair<float, int>, std::vector<std::pair<float, int>>, std::greater<std::pair<float, int>>>
HNSWIndex::searchLayer(const float* query_vec, int entry_point_id, int layer, int ef_param) const {
// 存储已访问的节点,避免重复访问
std::unordered_set<int> visited_nodes;
// 存储候选节点,按距离从小到大排序
std::priority_queue<std::pair<float, int>, std::vector<std::pair<float, int>>, std::greater<std::pair<float, int>>> candidates;
// 存储结果节点,按距离从大到小排序 (因为我们想要ef_param个最近的)
std::priority_queue<std::pair<float, int>> results; // max-heap
if (entry_point_id == -1 || entry_point_id >= nodes_.size() || !nodes_[entry_point_id]) {
return results; // 没有有效的入口点
}
const float* entry_vec = getVectorData(entry_point_id);
if (!entry_vec) return results;
float dist_entry = dist_metric->calculate(query_vec, entry_vec, dimension);
candidates.push({dist_entry, entry_point_id});
results.push({dist_entry, entry_point_id});
visited_nodes.insert(entry_point_id);
while (!candidates.empty()) {
float current_dist = candidates.top().first;
int current_node_id = candidates.top().second;
candidates.pop();
// 优化:如果当前最远的结果比最近的候选节点还远,可以提前终止
if (results.top().first < current_dist && results.size() >= ef_param) {
break;
}
// 访问当前节点的邻居
// 注意:这里需要对HNSWNode::neighbors进行线程安全访问,尤其是在addVector中
// 但在searchLayer中,如果图结构被认为是稳定的(只有addVector修改),则可以是const访问
// 如果addVector可能在searchLayer运行时修改图,则需要细粒度锁
const auto& current_node_neighbors = nodes_[current_node_id]->neighbors[layer];
for (int neighbor_id : current_node_neighbors) {
if (visited_nodes.find(neighbor_id) == visited_nodes.end()) {
visited_nodes.insert(neighbor_id);
const float* neighbor_vec = getVectorData(neighbor_id);
if (!neighbor_vec) continue; // 确保向量数据存在
float dist_neighbor = dist_metric->calculate(query_vec, neighbor_vec, dimension);
if (results.size() < ef_param || dist_neighbor < results.top().first) {
candidates.push({dist_neighbor, neighbor_id});
results.push({dist_neighbor, neighbor_id});
while (results.size() > ef_param) {
results.pop();
}
}
}
}
}
return results; // 返回ef_param个最佳候选
}
// selectNeighbors实现
void HNSWIndex::selectNeighbors(
std::priority_queue<std::pair<float, int>, std::vector<std::pair<float, int>>, std::greater<std::pair<float, int>>>& candidates,
int M_limit, int level, std::vector<int>& result_neighbors) const {
// Simple approach: Take M_limit closest neighbors
// More sophisticated: Use "heuristic" or "extendCandidates" as in original HNSW
// For simplicity, we just take the M_limit closest.
// Transfer from priority queue to vector for sorting/pruning
std::vector<std::pair<float, int>> temp_neighbors;
while (!candidates.empty()) {
temp_neighbors.push_back(candidates.top());
candidates.pop();
}
// Sort by distance (already sorted by PQ, but useful for later pruning)
std::sort(temp_neighbors.begin(), temp_neighbors.end());
// Take up to M_limit neighbors
for (int i = 0; i < temp_neighbors.size() && i < M_limit; ++i) {
result_neighbors.push_back(temp_neighbors[i].second);
}
}
// pruneConnections实现
void HNSWIndex::pruneConnections(HNSWNode* node, int M_limit, int level) {
if (node->neighbors[level].size() <= M_limit) {
return; // 不需要修剪
}
// 计算当前节点与所有邻居的距离,并排序
std::vector<std::pair<float, int>> current_neighbors_with_dist;
const float* node_vec = getVectorData(node->id);
for (int neighbor_id : node->neighbors[level]) {
const float* neighbor_vec = getVectorData(neighbor_id);
if (node_vec && neighbor_vec) {
current_neighbors_with_dist.push_back({dist_metric->calculate(node_vec, neighbor_vec, dimension), neighbor_id});
}
}
// 按距离从小到大排序
std::sort(current_neighbors_with_dist.begin(), current_neighbors_with_dist.end());
// 保留M_limit个最近的邻居
node->neighbors[level].clear();
for (int i = 0; i < M_limit && i < current_neighbors_with_dist.size(); ++i) {
node->neighbors[level].push_back(current_neighbors_with_dist[i].second);
}
}
// addVector实现
void HNSWIndex::addVector(size_t id, const float* vector_data) {
// 假设id是递增的,并且与raw_vectors_的索引对应
// 在实际项目中,id可能是一个外部提供的,需要映射到内部存储
{
std::lock_guard<std::mutex> lock(graph_modification_mutex_);
// 确保raw_vectors_能够容纳新向量
if (id >= raw_vectors_.size()) {
raw_vectors_.resize(id + 1);
}
raw_vectors_[id].assign(vector_data, vector_data + dimension);
if (id >= nodes_.size()) {
nodes_.resize(id + 1);
}
}
int new_node_level = getRandomLevel(); // 为新节点分配层级
// 确保新节点级别不超过一个合理的最大值,防止极端情况
if (new_node_level >= 32) new_node_level = 31; // 例如,限制最大层级
std::unique_ptr<HNSWNode> new_node = std::make_unique<HNSWNode>(id, new_node_level, M_ * 2); // M_*2 for M_max
int current_entry_point_id = entry_point_id_.load();
int current_index_max_level = current_max_level_.load();
// 如果是第一个节点或者新节点层级更高,更新全局入口点
if (current_entry_point_id == -1) {
entry_point_id_.store(id);
current_max_level_.store(new_node_level);
// 直接将新节点添加到nodes_,并解锁
{
std::lock_guard<std::mutex> lock(graph_modification_mutex_);
nodes_[id] = std::move(new_node);
}
return;
}
// 从最高层级开始搜索
int entry_point_for_search = current_entry_point_id;
for (int lc = current_index_max_level; lc > new_node_level; --lc) {
auto search_results = searchLayer(vector_data, entry_point_for_search, lc, 1); // ef=1 for simple greedy entry point
if (!search_results.empty()) {
entry_point_for_search = search_results.top().second;
} else {
// 如果某一层没有找到邻居,可能索引太稀疏,或者entry_point_for_search不合适
// 在这种情况下,可以尝试使用当前层的其他已知节点作为入口,或者直接跳过
// 这里简化处理,如果找不到,就保持原入口点或使用一个默认值
// 更健壮的实现可能需要更复杂的逻辑
}
}
// 将新节点添加到nodes_
{
std::lock_guard<std::mutex> lock(graph_modification_mutex_);
nodes_[id] = std::move(new_node); // 移动语义,所有权转移
}
// 从新节点的最高层级开始,向下建立连接
for (int lc = new_node_level; lc >= 0; --lc) {
int M_limit = (lc == 0) ? M_max0_ : M_max_other_layers_;
// 搜索当前层级的efConstruction个最佳邻居
auto candidates = searchLayer(vector_data, entry_point_for_search, lc, ef_construction_);
std::vector<int> selected_neighbors_ids;
selectNeighbors(candidates, M_limit, lc, selected_neighbors_ids);
// 建立双向连接
for (int neighbor_id : selected_neighbors_ids) {
// 确保访问nodes_是线程安全的
HNSWNode* neighbor_node = nullptr;
{
std::lock_guard<std::mutex> lock(graph_modification_mutex_); // 保护nodes_访问
if (neighbor_id < nodes_.size() && nodes_[neighbor_id]) {
neighbor_node = nodes_[neighbor_id].get();
}
}
if (!neighbor_node) continue;
// 锁定被修改的两个节点(如果使用细粒度锁),这里简化为图修改锁
{
std::lock_guard<std::mutex> lock(graph_modification_mutex_);
nodes_[id]->neighbors[lc].push_back(neighbor_id);
neighbor_node->neighbors[lc].push_back(id);
}
// 修剪邻居
{
std::lock_guard<std::mutex> lock(graph_modification_mutex_); // 保护节点邻居列表修改
pruneConnections(nodes_[id].get(), M_limit, lc);
pruneConnections(neighbor_node, M_limit, lc);
}
}
// 更新下一层的入口点
if (!candidates.empty()) {
entry_point_for_search = candidates.top().second; // 离查询最近的那个
}
}
// 更新全局入口点和最高层级
if (new_node_level > current_index_max_level) {
current_max_level_.store(new_node_level);
entry_point_id_.store(id); // 新节点成为新的入口点
}
}
// searchKnn实现
std::vector<size_t> HNSWIndex::searchKnn(const float* query_vector, size_t k, int ef) const {
if (entry_point_id_.load() == -1) {
return {}; // 索引为空
}
int current_entry_point = entry_point_id_.load();
int current_index_max_level = current_max_level_.load();
// 从最高层级开始,找到第0层的最佳入口点
for (int lc = current_index_max_level; lc > 0; --lc) {
auto search_results = searchLayer(query_vector, current_entry_point, lc, 1); // ef=1 for greedy
if (!search_results.empty()) {
current_entry_point = search_results.top().second;
}
}
// 在第0层进行最终的ef_search_范围搜索
auto final_candidates = searchLayer(query_vector, current_entry_point, 0, ef);
// 从结果中取出K个最近的节点
std::vector<size_t> result_ids;
while (!final_candidates.empty() && result_ids.size() < k) {
result_ids.push_back(final_candidates.top().second);
final_candidates.pop();
}
std::reverse(result_ids.begin(), result_ids.end()); // 使得最近的在前
return result_ids;
}
代码说明:
HNSWNode的neighbors列表在HNSWIndex::addVector中被修改。为了线程安全,对nodes_容器本身(如resize或push_back)以及每个节点内部的neighbors列表的修改,都需要适当的锁保护。上述代码中,我使用了graph_modification_mutex_对所有对图结构和节点邻居列表的修改进行保护,这是一种粗粒度锁,可能会限制插入操作的并行度。searchLayer方法是const的,因为它只读取图结构。但在实践中,如果addVector正在进行,并且修改了某个节点的邻居列表,searchLayer可能会遇到不一致的状态。使用std::shared_mutex可以更好地处理读写并发。
5. 多线程检索优化
在向量数据库内核中,多线程优化是实现高性能的关键。我们需要区分两种主要场景:并发检索 和 并发索引构建/更新。
5.1 并发检索优化
检索操作(searchKnn)通常是读操作,不修改HNSW图的结构。因此,多个查询可以并发执行。
策略:
-
查询并行化: 最直接的方式是使用
ThreadPool来处理并发的searchKnn请求。当客户端提交多个查询时,每个查询被封装成一个任务,提交给线程池。// ThreadPool 示例 (简化版) class ThreadPool { public: ThreadPool(size_t num_threads) : stop(false) { for (size_t i = 0; i < num_threads; ++i) { workers.emplace_back([this] { for (;;) { std::function<void()> task; { std::unique_lock<std::mutex> lock(queue_mutex); condition.wait(lock, [this] { return stop || !tasks.empty(); }); if (stop && tasks.empty()) return; task = std::move(tasks.front()); tasks.pop(); } task(); } }); } } template<class F, class... Args> auto enqueue(F&& f, Args&&... args) -> std::future<typename std::result_of<F(Args...)>::type> { using return_type = typename std::result_of<F(Args...)>::type; auto task = std::make_shared<std::packaged_task<return_type()>>( std::bind(std::forward<F>(f), std::forward<Args>(args)...) ); std::future<return_type> res = task->get_future(); { std::unique_lock<std::mutex> lock(queue_mutex); if (stop) throw std::runtime_error("enqueue on stopped ThreadPool"); tasks.emplace([task]() { (*task)(); }); } condition.notify_one(); return res; } ~ThreadPool() { { std::unique_lock<std::mutex> lock(queue_mutex); stop = true; } condition.notify_all(); for (std::thread& worker : workers) { worker.join(); } } private: std::vector<std::thread> workers; std::queue<std::function<void()>> tasks; std::mutex queue_mutex; std::condition_variable condition; bool stop; }; // QueryExecutor 示例 class QueryExecutor { public: QueryExecutor(size_t num_threads, HNSWIndex& index) : pool_(num_threads), hnsw_index_(index) {} std::future<std::vector<size_t>> submitSearch(const float* query_vector, size_t k, int ef) { // 注意:这里需要确保query_vector在异步任务执行期间是有效的。 // 最好是复制一份query_vector,或者确保其生命周期足够长。 // 这里为了简化,假设query_vector是安全的。 const float* vec_copy = new float[hnsw_index_.getDimension()]; std::copy(query_vector, query_vector + hnsw_index_.getDimension(), const_cast<float*>(vec_copy)); return pool_.enqueue([this, vec_copy, k, ef]() { std::vector<size_t> results = hnsw_index_.searchKnn(vec_copy, k, ef); delete[] vec_copy; // 释放复制的向量 return results; }); } private: ThreadPool pool_; HNSWIndex& hnsw_index_; }; -
读写锁 (
std::shared_mutex):HNSWIndex在addVector时会修改图结构(写操作),而searchKnn只是读取图结构(读操作)。std::shared_mutex允许:- 多个读取者同时持有共享锁。
- 一个写入者独占互斥锁,阻止所有其他读写操作。
这比std::mutex提供了更好的并发性,因为std::mutex不区分读写,任何操作都会阻塞其他操作。// 在HNSWIndex中 mutable std::shared_mutex graph_rw_mutex_; // 读写锁
// searchKnn方法内部 (读操作)
std::shared_lock lock(graph_rwmutex);
// … 执行搜索逻辑 …// addVector方法内部 (写操作)
std::unique_lock lock(graph_rwmutex);
// … 执行图修改逻辑 …上述 `addVector` 示例代码中使用了 `std::mutex graph_modification_mutex_`,将其替换为 `std::shared_mutex` 将提高并发读的性能。
5.2 并发索引构建/更新优化
HNSW的索引构建(addVector)是一个写操作,涉及修改图结构(添加节点、修改邻居列表)。并发地添加向量是一个挑战。
挑战:
nodes_容器的修改:std::vector::push_back或resize可能会导致内存重新分配,使现有HNSWNode*指针失效。- 邻居列表修改: 多个线程可能尝试同时修改同一个节点的
neighbors列表。 - 全局状态更新:
entry_point_id_和current_max_level_是全局状态,需要原子操作或锁保护。
策略:
-
粗粒度锁: 最简单但并发度最低的方法是使用一个全局
std::mutex(如上面的graph_modification_mutex_)来保护整个addVector方法。这意味着一次只能有一个线程进行向量插入。void HNSWIndex::addVector(size_t id, const float* vector_data) { std::unique_lock<std::shared_mutex> lock(graph_rw_mutex_); // 使用shared_mutex的unique_lock实现写锁 // ... 向量添加逻辑 ... }这种方式虽然简单,但在多核CPU上,插入性能无法线性扩展。
-
细粒度锁和无锁技术:
-
Per-Node Mutex: 每个
HNSWNode内部可以有一个std::mutex来保护其neighbors列表。当修改某个节点的邻居时,只锁定该节点。// HNSWNode 结构中添加 // mutable std::mutex node_mutex; // 保护该节点的邻居列表 // 在addVector中,修改neighbor_node的邻居时 // std::lock_guard<std::mutex> neighbor_lock(neighbor_node->node_mutex); // neighbor_node->neighbors[lc].push_back(id);这种方式提高了并发性,但增加了锁的开销和死锁的可能性(如果两个节点同时尝试锁定对方)。需要谨慎设计锁的顺序。
- 写时复制 (Copy-on-Write): 对于
nodes_容器,可以考虑写时复制。当需要修改nodes_时,创建一个副本,在副本上进行修改,然后原子地替换旧的nodes_指针。这对于读取者是无锁的,但写入者开销大。 - 无锁数据结构: 使用像
std::atomic或专门的无锁数据结构(如Intel TBB的并发容器)来管理邻居列表,避免显式锁。这通常非常复杂且容易出错。 - 批量插入: 将多个向量的插入操作打包成一个批次。在批次内部,可以通过一些技巧减少锁竞争。例如,预先分配
nodes_空间,避免resize。
推荐的折衷方案:
对于HNSWIndex的nodes_容器,因为std::vector的push_back可能导致重新分配,一个安全的做法是:- 在索引初始化时,预估最大容量并
reserve足够的空间。 - 如果
nodes_必须动态增长,每次resize都需要graph_rw_mutex_的写锁。 - 对于每个节点的
neighbors列表的修改,std::unique_lock<std::shared_mutex> lock(graph_rw_mutex_);是一种相对安全且实现简单的方案,它保证了在图结构(包括所有节点的邻居列表)被修改时,没有其他读写操作并发进行。虽然它限制了addVector的并行度,但在很多场景下,索引构建通常是离线或低频操作,而查询是高频操作,这种牺牲是值得的。如果插入并发性是核心需求,那么需要更复杂的细粒度锁或无锁设计。
-
5.3 SIMD指令优化
距离计算是HNSW中的热点操作,尤其是在高维向量上。利用CPU的SIMD(Single Instruction, Multiple Data)指令集可以显著加速这些计算。例如,SSE(Streaming SIMD Extensions)和AVX(Advanced Vector Extensions)可以在单个指令中处理多个浮点数。
#include <immintrin.h> // For AVX intrinsics
// L2SquaredDistance with AVX (for float vectors, dimension must be a multiple of 8)
class L2SquaredDistanceAVX : public DistanceMetric {
public:
float calculate(const float* vec1, const float* vec2, size_t dim) const override {
float sum_sq = 0.0f;
// 确保维度是8的倍数,以便AVX-256处理
size_t i = 0;
for (; i + 7 < dim; i += 8) {
__m256 v1 = _mm256_loadu_ps(vec1 + i); // 加载8个float
__m256 v2 = _mm256_loadu_ps(vec2 + i);
__m256 diff = _mm256_sub_ps(v1, v2); // 8个差值
__m256 sq = _mm256_mul_ps(diff, diff); // 8个平方
__m256 hsum = _mm256_hadd_ps(sq, sq); // 水平相加 (x0+x1, x2+x3, x4+x5, x6+x7, ...)
hsum = _mm256_hadd_ps(hsum, hsum); // (x0+x1+x2+x3, x4+x5+x6+x7, ..., ...)
// 提取结果
float temp_array[8];
_mm256_storeu_ps(temp_array, hsum);
sum_sq += temp_array[0] + temp_array[4]; // AVX-256的hadd结果
}
// 处理剩余部分 (如果维度不是8的倍数)
for (; i < dim; ++i) {
float diff = vec1[i] - vec2[i];
sum_sq += diff * diff;
}
return sum_sq;
}
};
注意: SIMD优化需要编译器的支持(如GCC/Clang的-mavx或-msse标志),并确保向量数据按16或32字节对齐以获得最佳性能(尽管_mm256_loadu_ps可以处理未对齐数据,但性能可能受影响)。
6. 性能考量与调优
构建一个高性能的向量数据库内核,除了算法实现和多线程优化,还需要深入理解性能瓶颈并进行细致调优。
6.1 参数调优
| 参数 | 调优目标 |
| M | 构建与查询: M 越大,HNSW图的连接越密集,搜索路径选择越多,理论上召回率越高,但内存消耗和构建时间增加,搜索时间也可能略微增加。M 越小,图越稀疏,召回率可能下降,但构建和搜索更快。 经验值: 10-20。 |
| efConstruction | 索引质量与构建时间: efConstruction 越大,在构建 HNSW 索引时,每个新向量寻找邻居的搜索范围越大,找到的邻居越精确,最终索引的召回率越高。然而,这也意味着更长的构建时间。 经验值: 100-2000。