C++ 向量数据库内核:基于 HNSW 算法的底层索引构建与多线程检索优化

各位开发者、架构师以及对高性能系统充满热情的同仁们,大家好!

今天,我们将深入探讨向量数据库内核的构建艺术,特别是如何基于HNSW(Hierarchical Navigable Small World)算法,从底层搭建高效的索引结构,并在此基础上实现卓越的多线程检索优化。这是一个既充满理论深度又极具工程实践价值的课题,它不仅是现代AI应用,如语义搜索、推荐系统、图像识别等的核心驱动力,更是我们理解如何在海量高维数据中迅速定位“相似”信息的一扇窗户。

1. 向量数据库:连接数字世界的语义桥梁

在当今数据爆炸的时代,传统的关系型数据库在处理高维、非结构化数据,特别是需要进行“语义相似性”搜索的场景时,显得力不从心。例如,用户输入一个自然语言查询,我们希望找到语义最接近的文档,而不是简单地匹配关键词;或者给定一张图片,我们想找出数据库中最相似的图片。这些需求的核心在于理解数据的“含义”或“特征”,并将其转化为可在高维空间中度量的数学表示——向量。

向量数据库应运而生,它专门设计用于存储、索引和查询这些高维向量。其核心能力在于执行“近似最近邻”(Approximate Nearest Neighbor, ANN)搜索。与精确最近邻(Exact Nearest Neighbor, ENN)搜索不同,ANN算法牺牲了一点点精度,以换取在海量数据上的巨大性能提升。在大多数实际应用中,这种微小的精度损失是完全可以接受的,因为我们通常只需要“足够好”的相似结果。

在众多ANN算法中,HNSW以其卓越的性能和相对较高的召回率脱颖而出,成为了业界构建向量索引的优选方案之一。它巧妙地结合了图结构和多层级设计,使得在高维空间中的搜索效率得到了显著提升。

2. HNSW算法:高维空间导航的艺术

HNSW,全称Hierarchical Navigable Small World,是一种基于图的ANN搜索算法。它的核心思想是构建一个多层级的“小世界网络”,其中每一层都是一个图,上层图连接的节点更少,但连接的距离更远(跳数更少),提供了一个粗粒度的搜索路径;下层图连接的节点更多,提供更精细的搜索。通过这种层级结构,HNSW能够在搜索时快速收敛到目标区域,然后在局部进行精细搜索,从而实现高效的查询。

2.1 HNSW的核心概念与构建原理

Navigable Small World (NSW) 图: HNSW的基础是NSW图。在一个NSW图中,任意两个节点之间都存在一条相对较短的路径。通过随机或启发式地建立连接,NSW图能够模拟“小世界效应”,即在社交网络中,任意两个人之间只需经过很少的中间人就能建立联系。在ANN搜索中,这意味着我们可以通过较少的“跳跃”找到相似的向量。

层级结构: HNSW的关键创新在于引入了层级结构。每个向量在被添加到索引时,会被随机分配一个层级 L。这个向量将存在于从第 0 层到第 L 层的所有层中。

  • 第 0 层 (Base Layer): 包含所有向量,连接最稠密,用于进行最精细的局部搜索。
  • 更高层 (Top Layers): 包含更少的向量(只有那些被分配到较高层级的向量),连接稀疏,用于进行快速的全局导航,跳过大量不相关的区域。

图的构建(向量插入过程):
当一个新向量 q 被插入HNSW索引时,其过程如下:

  1. 随机分配层级: 为新向量 q 随机分配一个最大层级 L_q。这个层级通常通过一个概率函数(如 exp(-lambda * L))生成,使得高层级的节点数量呈指数级下降。
  2. 寻找入口点: 索引维护一个全局的“入口点”(entry_point),通常是第一个插入的向量或最近一次插入的最高层级向量。从 entry_point 开始,在当前最高层级 max_level_ 进行贪婪搜索,找到与 q 最近的节点 e_q
  3. 层级下降搜索:max_level_L_q + 1,逐层下降。在每一层,从上一层找到的最近邻 e_l 开始,进行贪婪搜索(searchLayer),找到 efConstruction 个与 q 最接近的节点作为候选集 W。然后,将 W 中距离 q 最近的节点作为下一层的入口点 e_{l-1}。这一步的目标是为新节点在所有层上找到一个合适的“落脚点”。
  4. 连接建立: 对于从 L_q 层到 0 层,逐层进行操作:
    • 在当前层 l,从 e_l 开始,进行贪婪搜索,找到 efConstruction 个最近邻节点。
    • 从这 efConstruction 个候选节点中,选择 M 个(或 M_max 个,对于第0层可能是 M_max0 个)距离 q 最近的节点作为 q 的邻居。
    • q 与这些选定的邻居建立连接。
    • 同时,对于这些被选为 q 邻居的节点 p,如果 p 的邻居数量超过了 M(或 M_max),则需要进行“邻居修剪”(pruneConnections),移除其中距离 p 最远的邻居,以保持图的稀疏性和效率。
  5. 更新入口点: 如果 L_q 大于当前的 max_level_,则更新 entry_pointq,并更新 max_level_

图的查询(向量检索过程):
当一个查询向量 q_query 到来时,其搜索过程与插入过程类似:

  1. 寻找入口点:entry_point 开始,在当前最高层级 max_level_ 进行贪婪搜索,找到与 q_query 最近的节点 e_q
  2. 层级下降搜索:max_level_1 层,逐层下降。在每一层,从上一层找到的最近邻 e_l 开始,进行贪婪搜索(searchLayer),找到 efSearch 个与 q_query 最接近的节点作为候选集 W。然后,将 W 中距离 q_query 最近的节点作为下一层的入口点 e_{l-1}
  3. 最终搜索: 在第 0 层,从 e_0 开始,进行贪婪搜索,找到 efSearch 个最近邻节点。
  4. 返回结果: 从这 efSearch 个节点中,选出距离 q_query 最近的 k 个节点作为最终的搜索结果。

2.2 HNSW的关键参数

HNSW的性能和精度受到几个关键参数的影响:

参数名称 描述 影响
M 每个节点在构建时,在任何一层中最多建立的邻居连接数。通常建议在 10-20 之间。 索引质量与内存: 越大,邻居连接越多,搜索路径选择更多,召回率可能更高,但内存消耗和构建时间增加。
M_max 内部参数,通常是 2 * M。在构建过程中,节点可能暂时拥有多达 M_max 个邻居,之后会被修剪到 M 个。 构建效率: 影响邻居选择和修剪的复杂性。
M_max0 第 0 层每个节点的最大邻居连接数。通常是 2 * M,可以更大,因为第0层是所有节点的基础层。 第0层密度: 越大,第0层连接越密集,对召回率影响最大,但也会增加内存消耗和搜索时间。
efConstruction 索引构建时,在 searchLayer 阶段,从当前层返回的最近邻候选集的数量。通常建议在 100-2000 之间。 构建质量与时间: 越大,构建过程中的搜索范围越广,找到更优的邻居连接的可能性越大,索引质量和召回率越高,但构建时间显著增加。
efSearch 查询时,在 searchLayer 阶段,从当前层返回的最近邻候选集的数量。通常建议在 10-1000 之间,可动态调整。 搜索速度与召回率: 越大,搜索范围越广,召回率越高,但搜索时间增加。可在运行时调整,以平衡查询速度和精度。
max_levels 整个索引的最大层级数。通常通过 log(num_elements) / log(M) 来估算,并结合 lambda 参数进行概率分配。 层级深度: 影响索引的层级结构深度。过深可能导致不必要的层级,过浅可能导致高层导航效率不足。通常不是直接设置,而是由 lambda 和元素数量隐式决定。
lambda (或 mL) 用于控制节点层级分配的指数分布参数。mL = 1/log(M) 是一个常用值,使得 exp(-lambda * L) 能够有效地控制高层节点数量。 层级分布: 决定了HNSW中层级节点的分布密度。lambda 值越小,高层节点越多,高层导航能力越强,但层级构建和内存消耗也相应增加。通常 1/log(M) 是一个较好的经验值,使得在每层平均 M 个邻居的情况下,高层节点数量呈合理下降。

这些参数之间的权衡是构建高效HNSW索引的关键。M 控制了图的稠密程度,efConstruction 影响了构建质量,而 efSearch 则直接影响查询时的速度和召回率。

3. C++ 向量数据库内核架构设计

为了实现一个高性能的向量数据库内核,我们需要精心设计其C++架构。以下是核心组件和它们之间的协作关系:

3.1 核心组件

  1. VectorStore

    • 职责: 负责实际的高维向量数据的存储和管理。它不直接参与HNSW图的构建,而是作为HNSW索引的底层数据源。
    • 接口: 提供 getVector(id) 方法,根据向量ID获取原始的 float*std::vector<float> 数据。
    • 实现考量: 考虑到内存效率,向量数据通常存储为连续的 float 数组。可以使用 std::vector<float> 的二维结构,或者更高效地,将所有向量拼接成一个大的 float 数组,通过偏移量访问。
  2. DistanceMetric

    • 职责: 定义向量之间距离的计算方法。HNSW依赖于这个距离函数来评估向量的相似性。
    • 接口: 抽象基类,提供 virtual float calculate(const float* vec1, const float* vec2, size_t dim) const = 0; 纯虚函数。
    • 实现考量:
      • L2 Distance (Euclidean Distance): sum_sq = sum((vec1[i] - vec2[i])^2)。通常返回平方L2距离以避免开根号,因为比较大小时平方距离和实际距离的顺序是一致的。
      • Cosine Similarity: dot_product(vec1, vec2) / (norm(vec1) * norm(vec2))。通常转换为距离形式 1 - cosine_similarity
      • 性能优化: 距离计算是热点代码,应考虑SIMD指令(如SSE/AVX)优化。
  3. HNSWNode

    • 职责: 代表HNSW图中的一个节点,存储其原始向量ID和在不同层级的邻居连接信息。
    • 数据结构:

      struct HNSWNode {
          size_t id; // 原始向量在VectorStore中的ID
          int max_level; // 该节点被分配到的最高层级
          std::vector<std::vector<int>> neighbors; // neighbors[level] -> list of neighbor IDs
      
          // 构造函数
          HNSWNode(size_t node_id, int node_max_level, int M_max_per_layer)
              : id(node_id), max_level(node_max_level) {
              // neighbors[0] to neighbors[max_level]
              neighbors.resize(node_max_level + 1);
              // 预留空间,避免频繁realloc
              for (int i = 0; i <= node_max_level; ++i) {
                  neighbors[i].reserve(M_max_per_layer);
              }
          }
      
          // 线程安全访问邻居列表的局部锁 (如果需要细粒度锁)
          // mutable std::mutex node_mutex;
      };
    • 考量: neighbors 使用 std::vector<int> 存储邻居ID。如果需要细粒度锁定,每个节点可以有一个自己的 std::mutex,但会增加内存开销和锁竞争复杂性。
  4. HNSWIndex

    • 职责: 封装HNSW算法的所有核心逻辑,包括图的构建、向量插入、最近邻搜索等。
    • 内部数据:
      • std::vector<std::unique_ptr<HNSWNode>> nodes;:存储所有HNSW节点。unique_ptr 管理内存。
      • std::atomic<int> entry_point_id;:当前索引的全局入口点ID,原子操作保证线程安全。
      • std::atomic<int> current_max_level;:当前索引中所有节点达到的最高层级,原子操作保证线程安全。
      • std::unique_ptr<DistanceMetric> dist_metric;:距离计算器。
      • HNSW参数:M, efConstruction, efSearch, max_connections_0, max_connections_other, lambda 等。
      • std::mutex graph_modification_mutex;:用于保护整个图的结构性修改(如 nodes 容器的 push_backresize),以及 entry_point_idcurrent_max_level 的非原子性更新。
      • std::mt19937 rng;:随机数生成器,用于层级分配。
      • std::uniform_real_distribution<> level_dist;:层级分配的概率分布。
    • 核心方法:
      • addVector(size_t id, const float* vector_data);:插入新向量到HNSW索引。
      • std::vector<size_t> searchKnn(const float* query_vector, size_t k, int ef) const;:执行K近邻搜索。
      • 内部辅助方法:
        • getRandomLevel():根据概率生成节点层级。
        • searchLayer(...):在指定层级进行贪婪搜索。
        • selectNeighbors(...):从候选集中选择最佳邻居。
        • pruneConnections(...):修剪邻居,保持连接数限制。
  5. ThreadPool

    • 职责: 管理一组工作线程,用于异步执行任务,实现多线程并发。
    • 接口: 提供 enqueue(task) 方法,将任务提交到队列,并返回 std::future 以获取结果。
    • 实现考量: 经典的生产者-消费者模型,使用 std::queue 存储任务,std::mutex 保护队列,std::condition_variable 进行线程同步。
  6. QueryExecutor

    • 职责: 作为对外暴露的接口层,接收查询请求,调度 HNSWIndexsearchKnn 方法,并将结果返回。可以利用 ThreadPool 来并发处理多个查询。

3.2 架构概览

graph TD
    A[Client Application] --> B[QueryExecutor];
    B --> C[ThreadPool];
    C --> D[HNSWIndex];
    D --> E[VectorStore];
    D --> F[DistanceMetric];
    D --> G[HNSWNode];
    E -- vector data --> D;
    F -- distance calculations --> D;
    G -- graph structure --> D;

4. 实施细节:核心HNSW组件的C++代码示例

我们将聚焦于 DistanceMetricHNSWNodeHNSWIndex 的关键部分。为了保持简洁,VectorStore 将被抽象为 HNSWIndex 内部的一个方法 getVectorData

4.1 距离度量 (DistanceMetric)

#include <vector>
#include <cmath>
#include <memory>
#include <queue>
#include <random>
#include <atomic>
#include <mutex>
#include <thread>
#include <future>
#include <functional>
#include <unordered_set>
#include <algorithm> // For std::sort

// 抽象距离度量基类
class DistanceMetric {
public:
    // 计算两个向量之间的距离
    // 返回值通常是距离的平方,以避免开根号,提高性能
    virtual float calculate(const float* vec1, const float* vec2, size_t dim) const = 0;
    virtual ~DistanceMetric() = default;
};

// L2距离(欧氏距离)的平方实现
class L2SquaredDistance : public DistanceMetric {
public:
    float calculate(const float* vec1, const float* vec2, size_t dim) const override {
        float sum_sq = 0.0f;
        for (size_t i = 0; i < dim; ++i) {
            float diff = vec1[i] - vec2[i];
            sum_sq += diff * diff;
        }
        return sum_sq; // 返回平方L2距离
    }
};

// 余弦相似度转换为距离实现 (1 - similarity)
class CosineDistance : public DistanceMetric {
public:
    float calculate(const float* vec1, const float* vec2, size_t dim) const override {
        float dot_product = 0.0f;
        float norm_vec1 = 0.0f;
        float norm_vec2 = 0.0f;

        for (size_t i = 0; i < dim; ++i) {
            dot_product += vec1[i] * vec2[i];
            norm_vec1 += vec1[i] * vec1[i];
            norm_vec2 += vec2[i] * vec2[i];
        }

        if (norm_vec1 == 0.0f || norm_vec2 == 0.0f) {
            // 处理零向量情况,可以返回最大距离或特定值
            return 1.0f; // 假设距离为1.0f (不相似)
        }

        float similarity = dot_product / (std::sqrt(norm_vec1) * std::sqrt(norm_vec2));
        return 1.0f - similarity; // 将相似度转换为距离
    }
};

4.2 HNSW节点 (HNSWNode)

// HNSW图中的一个节点
struct HNSWNode {
    size_t id; // 原始向量在VectorStore中的ID
    int max_level; // 该节点被分配到的最高层级

    // 存储邻居ID。neighbors[level] 是一个vector<int>
    // mutable 用于在const方法中修改(如果需要细粒度锁)
    std::vector<std::vector<int>> neighbors; 

    // 构造函数
    HNSWNode(size_t node_id, int node_max_level, int M_max_per_layer)
        : id(node_id), max_level(node_max_level) {
        neighbors.resize(node_max_level + 1); // 从0到max_level
        for (int i = 0; i <= node_max_level; ++i) {
            neighbors[i].reserve(M_max_per_layer); // 预留空间
        }
    }
};

4.3 HNSW索引 (HNSWIndex)

class HNSWIndex {
public:
    HNSWIndex(size_t dim, std::unique_ptr<DistanceMetric> metric,
              int M, int efConstruction, int efSearch, float level_mult);

    // 添加向量到索引
    void addVector(size_t id, const float* vector_data);

    // 搜索K近邻
    std::vector<size_t> searchKnn(const float* query_vector, size_t k, int ef) const;

private:
    size_t dimension;
    std::unique_ptr<DistanceMetric> dist_metric;

    int M_;                 // 每个节点在各层中的最大邻居数
    int M_max0_;            // 第0层每个节点的最大邻居数 (通常是2*M_)
    int M_max_other_layers_;// 其他层每个节点的最大邻居数 (通常是M_)
    int ef_construction_;   // 构建时的搜索范围参数
    int ef_search_;         // 搜索时的搜索范围参数
    float level_multiplier_;// 用于随机层级分配的参数,通常是1/log(M_)

    std::vector<std::unique_ptr<HNSWNode>> nodes_; // 存储HNSWNode对象
    // 注意:实际的向量数据需要一个单独的VectorStore来管理
    // 为了简化示例,我们假设有一个内部函数可以根据ID获取向量数据
    // std::vector<std::vector<float>> raw_vectors_; // 实际项目中应由VectorStore管理

    // 假设这是获取原始向量数据的接口
    // 在实际项目中,这会通过VectorStore::getVectorData(id)实现
    const float* getVectorData(size_t node_id) const {
        // 示例:这里只是一个占位符,实际需要从一个全局的向量存储中获取
        // 在实际应用中,nodes_[node_id]->data 应该指向存储在VectorStore中的向量
        // 这里为了演示,我们假设nodes_的ID与raw_vectors_的索引一致
        if (node_id >= raw_vectors_.size()) return nullptr; // 边界检查
        return raw_vectors_[node_id].data();
    }
    // 临时存储原始向量数据,以便示例运行
    // 实际项目中应由 VectorStore 管理
    std::vector<std::vector<float>> raw_vectors_; 
    std::atomic<size_t> next_vector_id_; // 用于分配新的向量ID

    std::atomic<int> entry_point_id_;     // 当前索引的入口点ID
    std::atomic<int> current_max_level_;  // 当前索引中节点达到的最高层级

    mutable std::mutex graph_modification_mutex_; // 保护图结构修改(如nodes_增删)
    std::mt19937 rng_; // 随机数生成器
    std::uniform_real_distribution<> level_dist_; // 层级分配的概率分布

    // 内部辅助函数
    int getRandomLevel();

    // searchLayer: 在指定层级进行贪婪搜索
    // 返回一个优先级队列,包含(距离, 节点ID)对,按距离从小到大排序
    std::priority_queue<std::pair<float, int>, std::vector<std::pair<float, int>>, std::greater<std::pair<float, int>>>
    searchLayer(const float* query_vec, int entry_point, int layer, int ef_param) const;

    // selectNeighbors: 从候选集中选择最佳邻居
    // candidates: 优先级队列,包含(距离, 节点ID)对
    // M_limit: 允许的最大邻居数
    // level: 当前层级
    // result_neighbors: 存储选定邻居ID的vector
    void selectNeighbors(
        std::priority_queue<std::pair<float, int>, std::vector<std::pair<float, int>>, std::greater<std::pair<float, int>>>& candidates,
        int M_limit, int level, std::vector<int>& result_neighbors) const;

    // pruneConnections: 修剪节点的邻居列表
    void pruneConnections(HNSWNode* node, int M_limit, int level);
};

HNSWIndex::HNSWIndex(size_t dim, std::unique_ptr<DistanceMetric> metric,
                     int M, int efConstruction, int efSearch, float level_mult)
    : dimension(dim), dist_metric(std::move(metric)),
      M_(M), 
      M_max0_(M * 2), // 第0层通常允许更多连接
      M_max_other_layers_(M), // 其他层保持M个
      ef_construction_(efConstruction), ef_search_(efSearch),
      level_multiplier_(level_mult),
      entry_point_id_(-1), current_max_level_(-1),
      next_vector_id_(0), // 初始化下一个向量ID
      rng_(std::random_device{}()), // 使用随机设备初始化随机数生成器
      level_dist_(0.0, 1.0) // [0.0, 1.0) 均匀分布
{
    // 如果M_max0_与M_max_other_layers_相同,则不需要区分
    // 确保level_multiplier_是正数,通常是 1.0 / log(M_)
    if (level_multiplier_ <= 0) {
        throw std::invalid_argument("Level multiplier must be positive.");
    }
}

int HNSWIndex::getRandomLevel() {
    // 概率分布:P(L) = exp(-L / level_multiplier_) * (1 - exp(-1 / level_multiplier_))
    // 这里的实现简化为 -log(rand()) * level_multiplier_
    // 使得高层级的节点更少
    double r = level_dist_(rng_);
    return static_cast<int>(-std::log(r) * level_multiplier_);
}

// searchLayer实现
std::priority_queue<std::pair<float, int>, std::vector<std::pair<float, int>>, std::greater<std::pair<float, int>>>
HNSWIndex::searchLayer(const float* query_vec, int entry_point_id, int layer, int ef_param) const {
    // 存储已访问的节点,避免重复访问
    std::unordered_set<int> visited_nodes;
    // 存储候选节点,按距离从小到大排序
    std::priority_queue<std::pair<float, int>, std::vector<std::pair<float, int>>, std::greater<std::pair<float, int>>> candidates;
    // 存储结果节点,按距离从大到小排序 (因为我们想要ef_param个最近的)
    std::priority_queue<std::pair<float, int>> results; // max-heap

    if (entry_point_id == -1 || entry_point_id >= nodes_.size() || !nodes_[entry_point_id]) {
        return results; // 没有有效的入口点
    }

    const float* entry_vec = getVectorData(entry_point_id);
    if (!entry_vec) return results;

    float dist_entry = dist_metric->calculate(query_vec, entry_vec, dimension);
    candidates.push({dist_entry, entry_point_id});
    results.push({dist_entry, entry_point_id});
    visited_nodes.insert(entry_point_id);

    while (!candidates.empty()) {
        float current_dist = candidates.top().first;
        int current_node_id = candidates.top().second;
        candidates.pop();

        // 优化:如果当前最远的结果比最近的候选节点还远,可以提前终止
        if (results.top().first < current_dist && results.size() >= ef_param) {
            break;
        }

        // 访问当前节点的邻居
        // 注意:这里需要对HNSWNode::neighbors进行线程安全访问,尤其是在addVector中
        // 但在searchLayer中,如果图结构被认为是稳定的(只有addVector修改),则可以是const访问
        // 如果addVector可能在searchLayer运行时修改图,则需要细粒度锁
        const auto& current_node_neighbors = nodes_[current_node_id]->neighbors[layer];
        for (int neighbor_id : current_node_neighbors) {
            if (visited_nodes.find(neighbor_id) == visited_nodes.end()) {
                visited_nodes.insert(neighbor_id);

                const float* neighbor_vec = getVectorData(neighbor_id);
                if (!neighbor_vec) continue; // 确保向量数据存在

                float dist_neighbor = dist_metric->calculate(query_vec, neighbor_vec, dimension);

                if (results.size() < ef_param || dist_neighbor < results.top().first) {
                    candidates.push({dist_neighbor, neighbor_id});
                    results.push({dist_neighbor, neighbor_id});

                    while (results.size() > ef_param) {
                        results.pop();
                    }
                }
            }
        }
    }
    return results; // 返回ef_param个最佳候选
}

// selectNeighbors实现
void HNSWIndex::selectNeighbors(
    std::priority_queue<std::pair<float, int>, std::vector<std::pair<float, int>>, std::greater<std::pair<float, int>>>& candidates,
    int M_limit, int level, std::vector<int>& result_neighbors) const {

    // Simple approach: Take M_limit closest neighbors
    // More sophisticated: Use "heuristic" or "extendCandidates" as in original HNSW
    // For simplicity, we just take the M_limit closest.

    // Transfer from priority queue to vector for sorting/pruning
    std::vector<std::pair<float, int>> temp_neighbors;
    while (!candidates.empty()) {
        temp_neighbors.push_back(candidates.top());
        candidates.pop();
    }

    // Sort by distance (already sorted by PQ, but useful for later pruning)
    std::sort(temp_neighbors.begin(), temp_neighbors.end());

    // Take up to M_limit neighbors
    for (int i = 0; i < temp_neighbors.size() && i < M_limit; ++i) {
        result_neighbors.push_back(temp_neighbors[i].second);
    }
}

// pruneConnections实现
void HNSWIndex::pruneConnections(HNSWNode* node, int M_limit, int level) {
    if (node->neighbors[level].size() <= M_limit) {
        return; // 不需要修剪
    }

    // 计算当前节点与所有邻居的距离,并排序
    std::vector<std::pair<float, int>> current_neighbors_with_dist;
    const float* node_vec = getVectorData(node->id);

    for (int neighbor_id : node->neighbors[level]) {
        const float* neighbor_vec = getVectorData(neighbor_id);
        if (node_vec && neighbor_vec) {
            current_neighbors_with_dist.push_back({dist_metric->calculate(node_vec, neighbor_vec, dimension), neighbor_id});
        }
    }

    // 按距离从小到大排序
    std::sort(current_neighbors_with_dist.begin(), current_neighbors_with_dist.end());

    // 保留M_limit个最近的邻居
    node->neighbors[level].clear();
    for (int i = 0; i < M_limit && i < current_neighbors_with_dist.size(); ++i) {
        node->neighbors[level].push_back(current_neighbors_with_dist[i].second);
    }
}

// addVector实现
void HNSWIndex::addVector(size_t id, const float* vector_data) {
    // 假设id是递增的,并且与raw_vectors_的索引对应
    // 在实际项目中,id可能是一个外部提供的,需要映射到内部存储
    {
        std::lock_guard<std::mutex> lock(graph_modification_mutex_);
        // 确保raw_vectors_能够容纳新向量
        if (id >= raw_vectors_.size()) {
            raw_vectors_.resize(id + 1);
        }
        raw_vectors_[id].assign(vector_data, vector_data + dimension);

        if (id >= nodes_.size()) {
            nodes_.resize(id + 1);
        }
    }

    int new_node_level = getRandomLevel(); // 为新节点分配层级
    // 确保新节点级别不超过一个合理的最大值,防止极端情况
    if (new_node_level >= 32) new_node_level = 31; // 例如,限制最大层级

    std::unique_ptr<HNSWNode> new_node = std::make_unique<HNSWNode>(id, new_node_level, M_ * 2); // M_*2 for M_max

    int current_entry_point_id = entry_point_id_.load();
    int current_index_max_level = current_max_level_.load();

    // 如果是第一个节点或者新节点层级更高,更新全局入口点
    if (current_entry_point_id == -1) {
        entry_point_id_.store(id);
        current_max_level_.store(new_node_level);
        // 直接将新节点添加到nodes_,并解锁
        {
            std::lock_guard<std::mutex> lock(graph_modification_mutex_);
            nodes_[id] = std::move(new_node);
        }
        return;
    }

    // 从最高层级开始搜索
    int entry_point_for_search = current_entry_point_id;
    for (int lc = current_index_max_level; lc > new_node_level; --lc) {
        auto search_results = searchLayer(vector_data, entry_point_for_search, lc, 1); // ef=1 for simple greedy entry point
        if (!search_results.empty()) {
            entry_point_for_search = search_results.top().second;
        } else {
            // 如果某一层没有找到邻居,可能索引太稀疏,或者entry_point_for_search不合适
            // 在这种情况下,可以尝试使用当前层的其他已知节点作为入口,或者直接跳过
            // 这里简化处理,如果找不到,就保持原入口点或使用一个默认值
            // 更健壮的实现可能需要更复杂的逻辑
        }
    }

    // 将新节点添加到nodes_
    {
        std::lock_guard<std::mutex> lock(graph_modification_mutex_);
        nodes_[id] = std::move(new_node); // 移动语义,所有权转移
    }

    // 从新节点的最高层级开始,向下建立连接
    for (int lc = new_node_level; lc >= 0; --lc) {
        int M_limit = (lc == 0) ? M_max0_ : M_max_other_layers_;

        // 搜索当前层级的efConstruction个最佳邻居
        auto candidates = searchLayer(vector_data, entry_point_for_search, lc, ef_construction_);

        std::vector<int> selected_neighbors_ids;
        selectNeighbors(candidates, M_limit, lc, selected_neighbors_ids);

        // 建立双向连接
        for (int neighbor_id : selected_neighbors_ids) {
            // 确保访问nodes_是线程安全的
            HNSWNode* neighbor_node = nullptr;
            {
                std::lock_guard<std::mutex> lock(graph_modification_mutex_); // 保护nodes_访问
                if (neighbor_id < nodes_.size() && nodes_[neighbor_id]) {
                    neighbor_node = nodes_[neighbor_id].get();
                }
            }
            if (!neighbor_node) continue;

            // 锁定被修改的两个节点(如果使用细粒度锁),这里简化为图修改锁
            {
                std::lock_guard<std::mutex> lock(graph_modification_mutex_);
                nodes_[id]->neighbors[lc].push_back(neighbor_id);
                neighbor_node->neighbors[lc].push_back(id);
            }

            // 修剪邻居
            {
                std::lock_guard<std::mutex> lock(graph_modification_mutex_); // 保护节点邻居列表修改
                pruneConnections(nodes_[id].get(), M_limit, lc);
                pruneConnections(neighbor_node, M_limit, lc);
            }
        }
        // 更新下一层的入口点
        if (!candidates.empty()) {
            entry_point_for_search = candidates.top().second; // 离查询最近的那个
        }
    }

    // 更新全局入口点和最高层级
    if (new_node_level > current_index_max_level) {
        current_max_level_.store(new_node_level);
        entry_point_id_.store(id); // 新节点成为新的入口点
    }
}

// searchKnn实现
std::vector<size_t> HNSWIndex::searchKnn(const float* query_vector, size_t k, int ef) const {
    if (entry_point_id_.load() == -1) {
        return {}; // 索引为空
    }

    int current_entry_point = entry_point_id_.load();
    int current_index_max_level = current_max_level_.load();

    // 从最高层级开始,找到第0层的最佳入口点
    for (int lc = current_index_max_level; lc > 0; --lc) {
        auto search_results = searchLayer(query_vector, current_entry_point, lc, 1); // ef=1 for greedy
        if (!search_results.empty()) {
            current_entry_point = search_results.top().second;
        }
    }

    // 在第0层进行最终的ef_search_范围搜索
    auto final_candidates = searchLayer(query_vector, current_entry_point, 0, ef);

    // 从结果中取出K个最近的节点
    std::vector<size_t> result_ids;
    while (!final_candidates.empty() && result_ids.size() < k) {
        result_ids.push_back(final_candidates.top().second);
        final_candidates.pop();
    }
    std::reverse(result_ids.begin(), result_ids.end()); // 使得最近的在前

    return result_ids;
}

代码说明:

  • HNSWNodeneighbors 列表在 HNSWIndex::addVector 中被修改。为了线程安全,对 nodes_ 容器本身(如 resizepush_back)以及每个节点内部的 neighbors 列表的修改,都需要适当的锁保护。上述代码中,我使用了 graph_modification_mutex_ 对所有对图结构和节点邻居列表的修改进行保护,这是一种粗粒度锁,可能会限制插入操作的并行度。
  • searchLayer 方法是 const 的,因为它只读取图结构。但在实践中,如果 addVector 正在进行,并且修改了某个节点的邻居列表,searchLayer 可能会遇到不一致的状态。使用 std::shared_mutex 可以更好地处理读写并发。

5. 多线程检索优化

在向量数据库内核中,多线程优化是实现高性能的关键。我们需要区分两种主要场景:并发检索并发索引构建/更新

5.1 并发检索优化

检索操作(searchKnn)通常是读操作,不修改HNSW图的结构。因此,多个查询可以并发执行。

策略:

  1. 查询并行化: 最直接的方式是使用 ThreadPool 来处理并发的 searchKnn 请求。当客户端提交多个查询时,每个查询被封装成一个任务,提交给线程池。

    // ThreadPool 示例 (简化版)
    class ThreadPool {
    public:
        ThreadPool(size_t num_threads) : stop(false) {
            for (size_t i = 0; i < num_threads; ++i) {
                workers.emplace_back([this] {
                    for (;;) {
                        std::function<void()> task;
                        {
                            std::unique_lock<std::mutex> lock(queue_mutex);
                            condition.wait(lock, [this] { return stop || !tasks.empty(); });
                            if (stop && tasks.empty()) return;
                            task = std::move(tasks.front());
                            tasks.pop();
                        }
                        task();
                    }
                });
            }
        }
    
        template<class F, class... Args>
        auto enqueue(F&& f, Args&&... args) 
            -> std::future<typename std::result_of<F(Args...)>::type> {
            using return_type = typename std::result_of<F(Args...)>::type;
            auto task = std::make_shared<std::packaged_task<return_type()>>(
                std::bind(std::forward<F>(f), std::forward<Args>(args)...)
            );
            std::future<return_type> res = task->get_future();
            {
                std::unique_lock<std::mutex> lock(queue_mutex);
                if (stop) throw std::runtime_error("enqueue on stopped ThreadPool");
                tasks.emplace([task]() { (*task)(); });
            }
            condition.notify_one();
            return res;
        }
    
        ~ThreadPool() {
            {
                std::unique_lock<std::mutex> lock(queue_mutex);
                stop = true;
            }
            condition.notify_all();
            for (std::thread& worker : workers) {
                worker.join();
            }
        }
    private:
        std::vector<std::thread> workers;
        std::queue<std::function<void()>> tasks;
        std::mutex queue_mutex;
        std::condition_variable condition;
        bool stop;
    };
    
    // QueryExecutor 示例
    class QueryExecutor {
    public:
        QueryExecutor(size_t num_threads, HNSWIndex& index) : pool_(num_threads), hnsw_index_(index) {}
    
        std::future<std::vector<size_t>> submitSearch(const float* query_vector, size_t k, int ef) {
            // 注意:这里需要确保query_vector在异步任务执行期间是有效的。
            // 最好是复制一份query_vector,或者确保其生命周期足够长。
            // 这里为了简化,假设query_vector是安全的。
            const float* vec_copy = new float[hnsw_index_.getDimension()];
            std::copy(query_vector, query_vector + hnsw_index_.getDimension(), const_cast<float*>(vec_copy));
    
            return pool_.enqueue([this, vec_copy, k, ef]() {
                std::vector<size_t> results = hnsw_index_.searchKnn(vec_copy, k, ef);
                delete[] vec_copy; // 释放复制的向量
                return results;
            });
        }
    private:
        ThreadPool pool_;
        HNSWIndex& hnsw_index_;
    };
  2. 读写锁 (std::shared_mutex): HNSWIndexaddVector 时会修改图结构(写操作),而 searchKnn 只是读取图结构(读操作)。std::shared_mutex 允许:

    • 多个读取者同时持有共享锁。
    • 一个写入者独占互斥锁,阻止所有其他读写操作。
      这比 std::mutex 提供了更好的并发性,因为 std::mutex 不区分读写,任何操作都会阻塞其他操作。

      
      // 在HNSWIndex中
      mutable std::shared_mutex graph_rw_mutex_; // 读写锁

    // searchKnn方法内部 (读操作)
    std::shared_lock lock(graph_rwmutex);
    // … 执行搜索逻辑 …

    // addVector方法内部 (写操作)
    std::unique_lock lock(graph_rwmutex);
    // … 执行图修改逻辑 …

    
    上述 `addVector` 示例代码中使用了 `std::mutex graph_modification_mutex_`,将其替换为 `std::shared_mutex` 将提高并发读的性能。

5.2 并发索引构建/更新优化

HNSW的索引构建(addVector)是一个写操作,涉及修改图结构(添加节点、修改邻居列表)。并发地添加向量是一个挑战。

挑战:

  • nodes_ 容器的修改: std::vector::push_backresize 可能会导致内存重新分配,使现有 HNSWNode* 指针失效。
  • 邻居列表修改: 多个线程可能尝试同时修改同一个节点的 neighbors 列表。
  • 全局状态更新: entry_point_id_current_max_level_ 是全局状态,需要原子操作或锁保护。

策略:

  1. 粗粒度锁: 最简单但并发度最低的方法是使用一个全局 std::mutex(如上面的 graph_modification_mutex_)来保护整个 addVector 方法。这意味着一次只能有一个线程进行向量插入。

    void HNSWIndex::addVector(size_t id, const float* vector_data) {
        std::unique_lock<std::shared_mutex> lock(graph_rw_mutex_); // 使用shared_mutex的unique_lock实现写锁
        // ... 向量添加逻辑 ...
    }

    这种方式虽然简单,但在多核CPU上,插入性能无法线性扩展。

  2. 细粒度锁和无锁技术:

    • Per-Node Mutex: 每个 HNSWNode 内部可以有一个 std::mutex 来保护其 neighbors 列表。当修改某个节点的邻居时,只锁定该节点。

      // HNSWNode 结构中添加
      // mutable std::mutex node_mutex; // 保护该节点的邻居列表
      
      // 在addVector中,修改neighbor_node的邻居时
      // std::lock_guard<std::mutex> neighbor_lock(neighbor_node->node_mutex);
      // neighbor_node->neighbors[lc].push_back(id);

      这种方式提高了并发性,但增加了锁的开销和死锁的可能性(如果两个节点同时尝试锁定对方)。需要谨慎设计锁的顺序。

    • 写时复制 (Copy-on-Write): 对于 nodes_ 容器,可以考虑写时复制。当需要修改 nodes_ 时,创建一个副本,在副本上进行修改,然后原子地替换旧的 nodes_ 指针。这对于读取者是无锁的,但写入者开销大。
    • 无锁数据结构: 使用像 std::atomic 或专门的无锁数据结构(如Intel TBB的并发容器)来管理邻居列表,避免显式锁。这通常非常复杂且容易出错。
    • 批量插入: 将多个向量的插入操作打包成一个批次。在批次内部,可以通过一些技巧减少锁竞争。例如,预先分配 nodes_ 空间,避免 resize

    推荐的折衷方案:
    对于 HNSWIndexnodes_ 容器,因为 std::vectorpush_back 可能导致重新分配,一个安全的做法是:

    • 在索引初始化时,预估最大容量并 reserve 足够的空间。
    • 如果 nodes_ 必须动态增长,每次 resize 都需要 graph_rw_mutex_ 的写锁。
    • 对于每个节点的 neighbors 列表的修改,std::unique_lock<std::shared_mutex> lock(graph_rw_mutex_); 是一种相对安全且实现简单的方案,它保证了在图结构(包括所有节点的邻居列表)被修改时,没有其他读写操作并发进行。虽然它限制了 addVector 的并行度,但在很多场景下,索引构建通常是离线或低频操作,而查询是高频操作,这种牺牲是值得的。如果插入并发性是核心需求,那么需要更复杂的细粒度锁或无锁设计。

5.3 SIMD指令优化

距离计算是HNSW中的热点操作,尤其是在高维向量上。利用CPU的SIMD(Single Instruction, Multiple Data)指令集可以显著加速这些计算。例如,SSE(Streaming SIMD Extensions)和AVX(Advanced Vector Extensions)可以在单个指令中处理多个浮点数。

#include <immintrin.h> // For AVX intrinsics

// L2SquaredDistance with AVX (for float vectors, dimension must be a multiple of 8)
class L2SquaredDistanceAVX : public DistanceMetric {
public:
    float calculate(const float* vec1, const float* vec2, size_t dim) const override {
        float sum_sq = 0.0f;
        // 确保维度是8的倍数,以便AVX-256处理
        size_t i = 0;
        for (; i + 7 < dim; i += 8) {
            __m256 v1 = _mm256_loadu_ps(vec1 + i); // 加载8个float
            __m256 v2 = _mm256_loadu_ps(vec2 + i);
            __m256 diff = _mm256_sub_ps(v1, v2);  // 8个差值
            __m256 sq = _mm256_mul_ps(diff, diff); // 8个平方
            __m256 hsum = _mm256_hadd_ps(sq, sq); // 水平相加 (x0+x1, x2+x3, x4+x5, x6+x7, ...)
            hsum = _mm256_hadd_ps(hsum, hsum);   // (x0+x1+x2+x3, x4+x5+x6+x7, ..., ...)

            // 提取结果
            float temp_array[8];
            _mm256_storeu_ps(temp_array, hsum);
            sum_sq += temp_array[0] + temp_array[4]; // AVX-256的hadd结果
        }

        // 处理剩余部分 (如果维度不是8的倍数)
        for (; i < dim; ++i) {
            float diff = vec1[i] - vec2[i];
            sum_sq += diff * diff;
        }
        return sum_sq;
    }
};

注意: SIMD优化需要编译器的支持(如GCC/Clang的-mavx-msse标志),并确保向量数据按16或32字节对齐以获得最佳性能(尽管_mm256_loadu_ps可以处理未对齐数据,但性能可能受影响)。

6. 性能考量与调优

构建一个高性能的向量数据库内核,除了算法实现和多线程优化,还需要深入理解性能瓶颈并进行细致调优。

6.1 参数调优

| 参数 | 调优目标 |
| M | 构建与查询: M 越大,HNSW图的连接越密集,搜索路径选择越多,理论上召回率越高,但内存消耗和构建时间增加,搜索时间也可能略微增加。M 越小,图越稀疏,召回率可能下降,但构建和搜索更快。 经验值: 10-20。 |
| efConstruction | 索引质量与构建时间: efConstruction 越大,在构建 HNSW 索引时,每个新向量寻找邻居的搜索范围越大,找到的邻居越精确,最终索引的召回率越高。然而,这也意味着更长的构建时间。 经验值: 100-2000

发表回复

您的邮箱地址不会被公开。 必填项已用 * 标注