解析 ‘Multi-vector Indexing’:如何为同一张表格建立“文本摘要”与“结构化数据”双向索引?

各位同仁,各位对数据架构与智能检索充满热情的工程师们,大家好。

今天,我们将深入探讨一个在现代数据处理中日益重要且充满挑战性的主题——“Multi-vector Indexing”,即多向量索引。具体来说,我们将聚焦于如何为同一张数据表,例如一张商品表、用户档案表或文档库表,同时构建“文本摘要”(或称语义内容)与“结构化数据”(如价格、类别、标签等)的双向索引,并实现高效的混合检索。

传统上,我们处理数据的方式是分而治之:文本内容交由全文搜索引擎,结构化数据则由关系型数据库负责。然而,随着人工智能,特别是深度学习和向量嵌入技术的发展,用户查询的需求变得越来越复杂。他们可能希望通过一段自然语言描述来查找符合特定价格范围和材质要求的产品,或者反过来,通过结构化属性来发现具有特定语义特征的文本。这种跨模态、混合类型的检索需求,正是多向量索引所要解决的核心问题。

1. 挑战的起源:异构数据索引的困境

我们的数据世界是异构的。一张看似简单的商品表,可能包含商品名称、详细描述(文本)、SKU(字符串)、价格(数值)、库存量(数值)、分类(枚举)、品牌(字符串)、材质(字符串)等多种类型的数据。

当我们需要检索这些数据时:

  • 文本检索: 用户可能搜索“适用于户外运动的轻便防水背包”。这需要理解“户外运动”、“轻便”、“防水”等语义概念,而不仅仅是关键词匹配。传统的 LIKEMATCH AGAINST 语句在语义理解上力不从心。
  • 结构化数据检索: 用户可能搜索“价格在500-1000元之间,品牌为Nike的背包”。这需要精确的数值范围过滤和枚举类型匹配,关系型数据库对此驾轻就熟。
  • 混合检索: 真正的挑战在于用户搜索“寻找一款适合城市通勤、设计简约的真皮手提包,预算不超过2000元”。这里,“城市通勤”、“设计简约”是语义特征,“真皮”、“手提包”是结构化属性,而“预算不超过2000元”是数值过滤。如何在一个查询中同时满足这些条件,并高效地返回最相关的结果?

传统的索引体系,如B-tree索引、哈希索引、全文索引(倒排索引),各自擅长处理特定类型的数据和查询模式。它们难以有效地桥接语义鸿沟,也无法在单一查询中无缝融合不同模态的检索结果。这就是“Multi-vector Indexing”应运而生背景。

2. 基石:多向量索引的构建模块

要理解多向量索引,我们首先需要掌握几个核心技术概念。

2.1 向量嵌入(Vector Embeddings)

向量嵌入是现代AI和机器学习的基石之一,它将各种类型的数据(文本、图片、音频,甚至结构化数据)转换为高维空间中的实数向量。这些向量捕获了数据项的语义或结构特征,使得相似的数据项在向量空间中彼此靠近。

2.1.1 文本嵌入 (Text Embeddings)

文本嵌入是将文本内容(如句子、段落、文档)转换为固定长度的数值向量的过程。这些向量能够捕获文本的语义信息。

  • 技术原理: 通常使用预训练的深度学习模型,如BERT、RoBERTa、GPT系列、Sentence-BERT等。这些模型通过在海量文本数据上进行训练,学习到如何将词语和句子映射到有意义的向量空间。
  • 应用:
    • 语义搜索: 查询文本与文档文本之间的语义相似度。
    • 文本分类/聚类: 基于向量距离对文本进行分组。
    • 问答系统: 匹配问题与答案的语义。

代码示例:使用SentenceTransformer生成文本嵌入

from sentence_transformers import SentenceTransformer
import numpy as np

# 加载预训练模型
# 可以选择不同的模型,例如 'all-MiniLM-L6-v2' 适用于速度,'all-mpnet-base-v2' 适用于精度
model = SentenceTransformer('all-MiniLM-L6-v2')

def generate_text_embedding(text: str) -> np.ndarray:
    """
    生成文本的向量嵌入。
    """
    if not text:
        return np.zeros(model.get_sentence_embedding_dimension()) # 返回零向量或处理空文本
    embedding = model.encode(text, convert_to_numpy=True)
    return embedding

# 示例:商品描述
product_descriptions = [
    "这款轻便防水的背包非常适合徒步旅行和户外探险。",
    "优雅的真皮手提包,适合商务通勤和日常使用。",
    "高性能跑步鞋,提供卓越的缓震和支撑,助你轻松超越自我。",
    "高品质不锈钢保温杯,持久保冷保热,是你户外活动的好伴侣。"
]

text_embeddings = [generate_text_embedding(desc) for desc in product_descriptions]

print(f"第一个文本嵌入的维度: {text_embeddings[0].shape}")
# print(f"第一个文本嵌入的前5个值: {text_embeddings[0][:5]}")

# 计算相似度
query_text = "户外运动装备"
query_embedding = generate_text_embedding(query_text)

# 使用余弦相似度计算
from sklearn.metrics.pairwise import cosine_similarity

similarities = cosine_similarity([query_embedding], text_embeddings)[0]

print(f"n查询 '{query_text}' 与商品描述的相似度:")
for i, sim in enumerate(similarities):
    print(f"  - '{product_descriptions[i][:20]}...' 相似度: {sim:.4f}")

2.1.2 结构化数据嵌入 (Structured Data Embeddings)

将结构化数据(如分类、数值、日期等)转换为向量比文本更具挑战性,但同样重要。其目标是让具有相似属性的数据点在向量空间中靠近。

  • 技术原理:
    • 分类特征 (Categorical Features):
      • 独热编码 (One-Hot Encoding): 将每个类别转换为一个稀疏向量,但维度可能很高。
      • 标签编码 (Label Encoding): 将类别映射为整数,但引入了顺序关系,不适合直接使用。
      • 嵌入层 (Embedding Layers): 类似处理词嵌入,为每个类别学习一个密集向量。这在深度学习模型中很常见。
    • 数值特征 (Numerical Features):
      • 归一化/标准化 (Normalization/Standardization): 将数值缩放到特定范围(如0-1或均值0方差1)。
      • 分箱 (Binning): 将数值转换为分类特征,然后进行编码。
      • 直接使用: 经过归一化后,可以直接作为向量的一部分。
    • 组合特征: 将所有处理过的结构化特征(分类嵌入、归一化数值等)连接起来,形成一个综合的结构化数据向量。更高级的方法可能使用自编码器(Autoencoder)或其他神经网络模型来学习这些特征的低维表示。

代码示例:生成结构化数据嵌入

import pandas as pd
from sklearn.preprocessing import StandardScaler, OneHotEncoder
from sklearn.compose import ColumnTransformer
from sklearn.pipeline import Pipeline

# 示例:商品结构化数据
product_data = pd.DataFrame([
    {'product_id': 'P001', 'category': '背包', 'price': 899, 'material': '尼龙', 'brand': '户外之星'},
    {'product_id': 'P002', 'category': '手提包', 'price': 1899, 'material': '真皮', 'brand': '时尚风向'},
    {'product_id': 'P003', 'category': '鞋子', 'price': 699, 'material': '合成纤维', 'brand': '活力跑者'},
    {'product_id': 'P004', 'category': '杯具', 'price': 159, 'material': '不锈钢', 'brand': '生活优品'},
    {'product_id': 'P005', 'category': '背包', 'price': 399, 'material': '帆布', 'brand': '城市漫步'}
])

# 定义预处理步骤
# 数值特征:标准化
numerical_features = ['price']
numerical_transformer = StandardScaler()

# 分类特征:独热编码
categorical_features = ['category', 'material', 'brand']
categorical_transformer = OneHotEncoder(handle_unknown='ignore')

# 创建预处理器
preprocessor = ColumnTransformer(
    transformers=[
        ('num', numerical_transformer, numerical_features),
        ('cat', categorical_transformer, categorical_features)
    ])

# 构建管道并拟合数据
# 在实际应用中,这个预处理器应该在整个数据集上进行fit,然后对新数据进行transform
preprocessor.fit(product_data)

def generate_structured_embedding(data_row: dict, preprocessor: Pipeline) -> np.ndarray:
    """
    生成单行结构化数据的向量嵌入。
    注意:这里的“嵌入”是经过预处理的特征向量,并非通过神经网络学习的深度嵌入。
    若要深度嵌入,需要训练一个小型NN。
    """
    df_row = pd.DataFrame([data_row]) # 转换为DataFrame以便preprocessor处理
    processed_features = preprocessor.transform(df_row)
    return processed_features.toarray().flatten()

# 生成所有商品的结构化数据嵌入
structured_embeddings = []
for index, row in product_data.iterrows():
    # 移除product_id,因为它不是用于生成特征的
    row_dict = row.drop('product_id').to_dict()
    structured_embeddings.append(generate_structured_embedding(row_dict, preprocessor))

print(f"n第一个结构化数据嵌入的维度: {structured_embeddings[0].shape}")
# print(f"第一个结构化数据嵌入的前5个值: {structured_embeddings[0][:5]}")

# 示例查询:寻找价格较低的尼龙背包
query_structured_data = {'category': '背包', 'price': 400, 'material': '尼龙', 'brand': '任意'}
# 对于查询,需要处理 '任意' 或缺失值,这里简化处理为使用一个接近目标的数据点
query_structured_data_for_embedding = {'category': '背包', 'price': 400, 'material': '尼龙', 'brand': '户外之星'} # 假设一个近似品牌
query_structured_embedding = generate_structured_embedding(query_structured_data_for_embedding, preprocessor)

similarities_structured = cosine_similarity([query_structured_embedding], structured_embeddings)[0]

print(f"n查询结构化数据与商品的相似度:")
for i, sim in enumerate(similarities_structured):
    print(f"  - 产品ID '{product_data.iloc[i]['product_id']}' (类别: {product_data.iloc[i]['category']}, 价格: {product_data.iloc[i]['price']}, 材质: {product_data.iloc[i]['material']}) 相似度: {sim:.4f}")

请注意,上述结构化数据嵌入是一个相对简单的示例,它将预处理后的特征直接拼接。更复杂的场景可能需要训练一个神经网络(如自编码器或多层感知机)来学习一个更紧凑、更具语义的结构化数据表示。

2.2 向量数据库与ANN索引 (Vector Databases & ANN Indexes)

生成了向量,如何高效地存储和查询它们?这就是向量数据库和ANN(Approximate Nearest Neighbor,近似最近邻)索引的用武之地。

  • ANN索引的必要性: 当向量维度很高,数据量巨大时,精确地计算每个查询向量与所有存储向量的距离并排序(KNN,K-Nearest Neighbor)是计算密集型任务,效率极低。ANN算法通过牺牲一小部分精度来换取查询速度的巨大提升。
  • 常见ANN算法:
    • HNSW (Hierarchical Navigable Small World): 构建一个多层图结构,在搜索时从顶层开始,逐步向下层逼近目标,效率和精度都非常高。
    • IVF_FLAT (Inverted File Index with Flat Quantization): 将向量空间划分为多个聚类,查询时只在最近的几个聚类中进行搜索。
    • PQ (Product Quantization): 将高维向量分解为多个低维子向量,对每个子向量进行量化,从而大幅压缩存储空间并加速距离计算。
  • 向量数据库: 专为向量存储和查询设计,底层通常集成了各种ANN算法。它们提供API接口、数据管理、扩展性、高可用性等特性。
    • 独立向量数据库: Pinecone, Weaviate, Milvus, Qdrant。
    • 集成向量扩展: PostgreSQL with pgvector, OpenSearch, Redis.

2.3 双向索引(Bidirectional Indexing)的含义

在我们的语境中,“双向索引”意味着:

  1. 从文本到结构化数据: 通过语义查询(文本向量)来查找相关的商品,并根据这些商品的结构化属性进行筛选。
  2. 从结构化数据到文本: 通过结构化属性查询(结构化向量)来查找相关商品,并根据这些商品的文本描述进行语义筛选。
  3. 混合查询: 最强大的形式,同时利用文本语义和结构化属性进行组合查询。

3. 架构模式:实现多向量索引

如何将上述概念整合到实际系统中?这里介绍几种常见的架构模式。

3.1 模式1:关系型数据库集成向量列 (PostgreSQL with pgvector)

这种模式适用于数据量不是极其庞大(百万级到千万级),且对纯向量检索性能要求不是极致的场景。它利用了关系型数据库的成熟稳定性和ACID特性。

优点:

  • 架构简单,所有数据(包括向量)都在一个数据库中管理。
  • 利用现有关系型数据库的生态系统(备份、恢复、事务)。
  • pgvector 提供了基本的ANN索引(IVFFlat)和距离计算。

缺点:

  • 对于超高维向量和海量数据,性能可能不如专门的向量数据库。
  • 向量操作和高级功能相对有限。
  • 查询优化可能需要更多手动干预。

数据模型:

我们假设有一个 products 表。

字段名 数据类型 说明
id UUID / INT 商品唯一标识符
name TEXT 商品名称
description TEXT 商品详细描述
category VARCHAR(255) 商品分类
price NUMERIC(10, 2) 商品价格
material VARCHAR(255) 商品材质
brand VARCHAR(255) 商品品牌
description_embedding VECTOR(384) description 的文本向量嵌入(例如384维)
structured_embedding VECTOR(N) 结构化数据(分类、价格、材质、品牌)的向量嵌入

代码示例:PostgreSQL with pgvector

首先,确保你的PostgreSQL安装了 pgvector 扩展。

-- 连接到你的数据库
-- psql -U your_user -d your_database

-- 1. 创建 pgvector 扩展 (如果尚未创建)
CREATE EXTENSION vector;

-- 2. 创建 products 表
CREATE TABLE products (
    id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
    name TEXT NOT NULL,
    description TEXT NOT NULL,
    category VARCHAR(255),
    price NUMERIC(10, 2),
    material VARCHAR(255),
    brand VARCHAR(255),
    description_embedding VECTOR(384), -- 假设我们的文本嵌入是384维
    structured_embedding VECTOR(15)   -- 假设我们的结构化嵌入是15维 (根据之前的preprocessor输出维度)
);

-- 3. 创建 ANN 索引
-- 对于 description_embedding (文本语义搜索)
CREATE INDEX ON products USING ivfflat (description_embedding vector_l2_ops) WITH (lists = 100);
-- 对于 structured_embedding (结构化属性语义搜索)
CREATE INDEX ON products USING ivfflat (structured_embedding vector_l2_ops) WITH (lists = 50); -- 结构化数据维度通常较低,lists可以少一些

Python 插入和查询代码:

import psycopg2
import numpy as np
from sentence_transformers import SentenceTransformer
from sklearn.preprocessing import StandardScaler, OneHotEncoder
from sklearn.compose import ColumnTransformer
from sklearn.pipeline import Pipeline
import pandas as pd
import json

# PostgreSQL连接信息
DB_CONFIG = {
    'host': 'localhost',
    'database': 'your_database',
    'user': 'your_user',
    'password': 'your_password'
}

# 文本嵌入模型
text_model = SentenceTransformer('all-MiniLM-L6-v2')

# 结构化数据预处理器 (与之前代码相同)
# 重新定义并fit,确保维度一致
product_data_for_preprocessor_fit = pd.DataFrame([
    {'category': '背包', 'price': 899, 'material': '尼龙', 'brand': '户外之星'},
    {'category': '手提包', 'price': 1899, 'material': '真皮', 'brand': '时尚风向'},
    {'category': '鞋子', 'price': 699, 'material': '合成纤维', 'brand': '活力跑者'},
    {'category': '杯具', 'price': 159, 'material': '不锈钢', 'brand': '生活优品'},
    {'category': '背包', 'price': 399, 'material': '帆布', 'brand': '城市漫步'},
    {'category': '手提包', 'price': 1200, 'material': '皮革', 'brand': '经典'}, # 增加一些数据以确保preprocessor泛化
    {'category': '服装', 'price': 250, 'material': '棉', 'brand': '休闲'}
])

numerical_features = ['price']
numerical_transformer = StandardScaler()
categorical_features = ['category', 'material', 'brand']
categorical_transformer = OneHotEncoder(handle_unknown='ignore')

preprocessor = ColumnTransformer(
    transformers=[
        ('num', numerical_transformer, numerical_features),
        ('cat', categorical_transformer, categorical_features)
    ])
preprocessor.fit(product_data_for_preprocessor_fit)

def get_text_embedding(text: str) -> list:
    return text_model.encode(text, convert_to_numpy=True).tolist()

def get_structured_embedding(data_row: dict) -> list:
    df_row = pd.DataFrame([data_row])
    processed_features = preprocessor.transform(df_row)
    return processed_features.toarray().flatten().tolist()

# 示例数据
products_to_insert = [
    {
        'name': '轻便防水徒步背包',
        'description': '这款背包采用高强度尼龙材料,轻便且完全防水,适合长途徒步和户外探险。多功能口袋设计,方便收纳。',
        'category': '背包', 'price': 899.00, 'material': '尼龙', 'brand': '户外之星'
    },
    {
        'name': '真皮商务手提包',
        'description': '优雅的真皮手提包,经典设计,内部空间宽敞,可容纳笔记本电脑,是商务通勤和日常使用的理想选择。',
        'category': '手提包', 'price': 1899.00, 'material': '真皮', 'brand': '时尚风向'
    },
    {
        'name': '高性能缓震跑鞋',
        'description': '专为跑步爱好者设计,提供卓越的缓震技术和稳定的支撑,轻量化设计,助你在赛道上轻松超越自我。',
        'category': '鞋子', 'price': 699.00, 'material': '合成纤维', 'brand': '活力跑者'
    },
    {
        'name': '不锈钢保温杯',
        'description': '高品质304不锈钢保温杯,双层真空隔热,持久保冷保热,是你户外活动、办公室的理想伴侣。',
        'category': '杯具', 'price': 159.00, 'material': '不锈钢', 'brand': '生活优品'
    },
    {
        'name': '帆布休闲背包',
        'description': '时尚简约的帆布背包,大容量设计,适合学生和城市日常出行,多色可选。',
        'category': '背包', 'price': 399.00, 'material': '帆布', 'brand': '城市漫步'
    }
]

def insert_products(products):
    conn = None
    try:
        conn = psycopg2.connect(**DB_CONFIG)
        cur = conn.cursor()
        for prod in products:
            desc_emb = get_text_embedding(prod['description'])
            struct_emb = get_structured_embedding({k: prod[k] for k in ['category', 'price', 'material', 'brand']})

            cur.execute("""
                INSERT INTO products (name, description, category, price, material, brand, description_embedding, structured_embedding)
                VALUES (%s, %s, %s, %s, %s, %s, %s, %s)
            """, (
                prod['name'], prod['description'], prod['category'], prod['price'],
                prod['material'], prod['brand'], desc_emb, struct_emb
            ))
        conn.commit()
        print(f"成功插入 {len(products)} 条产品数据。")
    except Exception as e:
        print(f"插入数据时发生错误: {e}")
    finally:
        if conn:
            cur.close()
            conn.close()

def query_products(text_query: str = None, structured_query_data: dict = None, price_range: tuple = None, limit: int = 5):
    conn = None
    try:
        conn = psycopg2.connect(**DB_CONFIG)
        cur = conn.cursor()

        query_parts = []
        params = []

        # 1. 文本语义查询
        if text_query:
            query_emb = get_text_embedding(text_query)
            query_parts.append("description_embedding <-> %s")
            params.append(query_emb)

        # 2. 结构化数据语义查询
        if structured_query_data:
            struct_query_emb = get_structured_embedding(structured_query_data)
            query_parts.append("structured_embedding <-> %s")
            params.append(struct_query_emb)

        # 3. 结构化数据精确过滤
        where_clauses = []
        if price_range:
            where_clauses.append("price BETWEEN %s AND %s")
            params.extend(price_range)
        if structured_query_data and 'category' in structured_query_data and structured_query_data['category'] != '任意':
            where_clauses.append("category = %s")
            params.append(structured_query_data['category'])
        if structured_query_data and 'brand' in structured_query_data and structured_query_data['brand'] != '任意':
            where_clauses.append("brand = %s")
            params.append(structured_query_data['brand'])
        # 可以添加更多结构化过滤条件

        # 构建最终SQL
        sql = "SELECT id, name, description, category, price, material, brand"

        if query_parts:
            # 计算平均相似度作为排序依据,这里简化为只考虑一个向量距离,或者取两者加权平均
            # 如果同时有文本和结构化向量查询,需要设计一个融合分数
            # 简单示例:如果两者都有,则对距离求和(L2距离越小越好)
            if len(query_parts) == 2:
                 sql += ", (description_embedding <-> %s + structured_embedding <-> %s) AS combined_distance"
                 params.extend([params[0], params[1]]) # 再次添加向量参数用于距离计算
            else: # 只有一个向量查询
                 sql += f", {query_parts[0]} AS distance"

        sql += " FROM products"

        if where_clauses:
            sql += " WHERE " + " AND ".join(where_clauses)

        if query_parts:
            sql += " ORDER BY combined_distance ASC" if len(query_parts) == 2 else " ORDER BY distance ASC"

        sql += f" LIMIT {limit}"

        # 调整params顺序,确保向量参数在WHERE子句参数之后,如果它们用于ORDER BY
        # 这里为了简化,假设query_parts的参数总是排在最前面,且用于ORDER BY
        # 如果SQL复杂,需要更精细的参数管理

        print("nGenerated SQL:")
        print(cur.mogrify(sql, tuple(params)).decode('utf-8')) # 打印带参数的SQL以便调试

        cur.execute(sql, tuple(params))
        results = cur.fetchall()

        columns = [desc[0] for desc in cur.description]
        if query_parts:
            # 移除距离列
            columns.pop() 

        return [dict(zip(columns, row)) for row in results]

    except Exception as e:
        print(f"查询数据时发生错误: {e}")
        return []
    finally:
        if conn:
            cur.close()
            conn.close()

# --- 执行示例 ---
# 插入数据
insert_products(products_to_insert)

# 示例1: 纯文本语义查询 (找适合户外探险的包)
print("n--- 示例1: 纯文本语义查询 ---")
text_q = "适合户外探险的轻便包"
results1 = query_products(text_query=text_q)
for r in results1:
    print(f"ID: {r['id']}, 名称: {r['name']}, 描述: {r['description'][:30]}..., 价格: {r['price']}")

# 示例2: 纯结构化数据语义查询 (找便宜的尼龙包)
print("n--- 示例2: 纯结构化数据语义查询 ---")
struct_q_data = {'category': '背包', 'price': 400, 'material': '尼龙', 'brand': '任意'}
results2 = query_products(structured_query_data=struct_q_data)
for r in results2:
    print(f"ID: {r['id']}, 名称: {r['name']}, 类别: {r['category']}, 材质: {r['material']}, 价格: {r['price']}")

# 示例3: 混合查询 (找适合商务通勤的真皮包,价格不超过2000)
print("n--- 示例3: 混合查询 (文本语义 + 结构化过滤) ---")
text_q_mix = "适合商务通勤的时尚包"
struct_q_data_mix = {'category': '手提包', 'price': 1500, 'material': '真皮', 'brand': '时尚风向'} # 构造一个查询向量
price_range_mix = (0, 2000)
results3 = query_products(text_query=text_q_mix, structured_query_data=struct_q_data_mix, price_range=price_range_mix)
for r in results3:
    print(f"ID: {r['id']}, 名称: {r['name']}, 描述: {r['description'][:30]}..., 类别: {r['category']}, 价格: {r['price']}")

# 示例4: 混合查询 (文本语义 + 结构化精确过滤)
print("n--- 示例4: 混合查询 (文本语义 + 结构化精确过滤) ---")
text_q_mix_exact = "适合学生用的便宜背包"
# 注意:这里 structured_query_data 仅用于生成结构化向量,精确过滤由 price_range 和 category 进行
struct_q_data_mix_exact = {'category': '背包', 'price': 300, 'material': '帆布', 'brand': '城市漫步'} 
price_range_mix_exact = (0, 500)
results4 = query_products(text_query=text_q_mix_exact, structured_query_data=struct_q_data_mix_exact, price_range=price_range_mix_exact, limit=2)
for r in results4:
    print(f"ID: {r['id']}, 名称: {r['name']}, 描述: {r['description'][:30]}..., 类别: {r['category']}, 价格: {r['price']}")

代码解析:

  • description_embedding <-> %spgvector 中用于计算L2距离的操作符。其他距离如余弦距离是 <=>
  • ORDER BY distance ASC 用于按相似度(距离越小越相似)排序。
  • 混合查询时,我们将文本查询向量和结构化查询向量同时传入,并对它们的距离进行加权求和(这里简单地直接求和)作为最终的排序依据。同时,结构化数据还可以进行精确的 WHERE 条件过滤。
  • 这种模式的挑战在于如何有效地融合多个向量的距离,以及如何将向量距离和精确过滤结合起来进行高效的查询。

3.2 模式2:分离式向量数据库与RDBMS同步 (PostgreSQL + Milvus/Pinecone/Qdrant)

这种模式是处理大规模、高并发向量检索的常见选择。它将结构化数据存储在关系型数据库中,而向量嵌入存储在专门的向量数据库中。

优点:

  • 各自发挥所长:RDBMS处理结构化数据和事务,向量数据库提供高性能向量检索。
  • 更好的可扩展性:可以独立扩展RDBMS和向量数据库。
  • 向量数据库通常提供更丰富的向量操作和更优化的ANN算法。

缺点:

  • 架构复杂:需要维护两个数据库系统。
  • 数据同步挑战:确保RDBMS和向量数据库之间的数据一致性(例如,当RDBMS中的商品被删除或更新时,向量数据库中的对应向量也需要更新)。
  • 查询协调:混合查询需要从两个数据库获取结果并进行融合。

数据模型:

  • RDBMS (PostgreSQL) products 表: 字段名 数据类型 说明
    id UUID 商品唯一标识符
    name TEXT 商品名称
    description TEXT 商品详细描述
    category VARCHAR(255) 商品分类
    price NUMERIC(10, 2) 商品价格
    material VARCHAR(255) 商品材质
    brand VARCHAR(255) 商品品牌
  • 向量数据库 (Milvus/Qdrant) – Collection 1: product_text_embeddings 字段名 数据类型 说明
    id UUID 对应 products.id
    embedding VECTOR(384) description 的文本向量嵌入
  • 向量数据库 (Milvus/Qdrant) – Collection 2: product_structured_embeddings 字段名 数据类型 说明
    id UUID 对应 products.id
    embedding VECTOR(15) 结构化数据(分类、价格等)的向量嵌入
    category VARCHAR(255) 可选:将部分结构化属性作为元数据存储,用于向量数据库内部过滤。
    price FLOAT 可选:同上。

代码示例:PostgreSQL + Qdrant

假设Qdrant服务器已运行。

import psycopg2
import numpy as np
from sentence_transformers import SentenceTransformer
from sklearn.preprocessing import StandardScaler, OneHotEncoder
from sklearn.compose import ColumnTransformer
from sklearn.pipeline import Pipeline
import pandas as pd
from qdrant_client import QdrantClient, models
from uuid import uuid4

# PostgreSQL连接信息
DB_CONFIG = {
    'host': 'localhost',
    'database': 'your_database',
    'user': 'your_user',
    'password': 'your_password'
}

# Qdrant客户端
QDRANT_CLIENT = QdrantClient(host="localhost", port=6333) # 假设Qdrant运行在本地6333端口

TEXT_EMBEDDING_DIM = 384
STRUCTURED_EMBEDDING_DIM = 15 # 需要根据实际preprocessor的输出维度调整

# 文本嵌入模型
text_model = SentenceTransformer('all-MiniLM-L6-v2')

# 结构化数据预处理器 (与之前代码相同,确保fit的数据足够泛化)
product_data_for_preprocessor_fit = pd.DataFrame([
    {'category': '背包', 'price': 899, 'material': '尼龙', 'brand': '户外之星'},
    {'category': '手提包', 'price': 1899, 'material': '真皮', 'brand': '时尚风向'},
    {'category': '鞋子', 'price': 699, 'material': '合成纤维', 'brand': '活力跑者'},
    {'category': '杯具', 'price': 159, 'material': '不锈钢', 'brand': '生活优品'},
    {'category': '背包', 'price': 399, 'material': '帆布', 'brand': '城市漫步'},
    {'category': '手提包', 'price': 1200, 'material': '皮革', 'brand': '经典'},
    {'category': '服装', 'price': 250, 'material': '棉', 'brand': '休闲'}
])

numerical_features = ['price']
numerical_transformer = StandardScaler()
categorical_features = ['category', 'material', 'brand']
categorical_transformer = OneHotEncoder(handle_unknown='ignore')

preprocessor = ColumnTransformer(
    transformers=[
        ('num', numerical_transformer, numerical_features),
        ('cat', categorical_transformer, categorical_features)
    ])
preprocessor.fit(product_data_for_preprocessor_fit)

def get_text_embedding(text: str) -> list:
    return text_model.encode(text, convert_to_numpy=True).tolist()

def get_structured_embedding(data_row: dict) -> list:
    df_row = pd.DataFrame([data_row])
    processed_features = preprocessor.transform(df_row)
    return processed_features.toarray().flatten().tolist()

# 确保Qdrant Collection存在
def setup_qdrant_collections():
    # 文本嵌入集合
    if not QDRANT_CLIENT.collection_exists(collection_name="product_text_embeddings"):
        QDRANT_CLIENT.create_collection(
            collection_name="product_text_embeddings",
            vectors_config=models.VectorParams(size=TEXT_EMBEDDING_DIM, distance=models.Distance.COSINE),
            # 可以添加索引,例如 HNSW
            hnsw_config=models.HnswConfigDiff(on_disk=True)
        )
        print("Created Qdrant collection: product_text_embeddings")

    # 结构化嵌入集合
    if not QDRANT_CLIENT.collection_exists(collection_name="product_structured_embeddings"):
        QDRANT_CLIENT.create_collection(
            collection_name="product_structured_embeddings",
            vectors_config=models.VectorParams(size=STRUCTURED_EMBEDDING_DIM, distance=models.Distance.COSINE),
            hnsw_config=models.HnswConfigDiff(on_disk=True)
        )
        # 可以为结构化数据中的某些字段添加payload索引,用于过滤
        QDRANT_CLIENT.create_payload_index(
            collection_name="product_structured_embeddings",
            field_name="category",
            field_schema=models.FieldSchema(field_type=models.FieldType.KEYWORD)
        )
        QDRANT_CLIENT.create_payload_index(
            collection_name="product_structured_embeddings",
            field_name="price",
            field_schema=models.FieldSchema(field_type=models.FieldType.FLOAT)
        )
        print("Created Qdrant collection: product_structured_embeddings")

# 插入数据函数
def insert_product_data(products):
    conn = None
    try:
        conn = psycopg2.connect(**DB_CONFIG)
        cur = conn.cursor()

        text_points = []
        structured_points = []

        for prod in products:
            product_id = str(uuid4()) # 生成一个UUID作为产品ID
            prod['id'] = product_id

            # 插入RDBMS
            cur.execute("""
                INSERT INTO products (id, name, description, category, price, material, brand)
                VALUES (%s, %s, %s, %s, %s, %s, %s)
            """, (
                prod['id'], prod['name'], prod['description'], prod['category'], prod['price'],
                prod['material'], prod['brand']
            ))

            # 生成并准备文本嵌入点
            text_embedding = get_text_embedding(prod['description'])
            text_points.append(models.PointStruct(
                id=product_id,
                vector=text_embedding,
                payload={'product_name': prod['name'], 'category': prod['category']} # 可以存储一些元数据
            ))

            # 生成并准备结构化嵌入点
            structured_embedding = get_structured_embedding({k: prod[k] for k in ['category', 'price', 'material', 'brand']})
            structured_points.append(models.PointStruct(
                id=product_id,
                vector=structured_embedding,
                payload={'category': prod['category'], 'price': prod['price']} # 存储用于过滤的元数据
            ))

        conn.commit()
        print(f"成功插入 {len(products)} 条产品数据到PostgreSQL。")

        # 批量插入Qdrant
        QDRANT_CLIENT.upsert(
            collection_name="product_text_embeddings",
            wait=True,
            points=text_points
        )
        print(f"成功插入 {len(text_points)} 条文本嵌入到Qdrant。")

        QDRANT_CLIENT.upsert(
            collection_name="product_structured_embeddings",
            wait=True,
            points=structured_points
        )
        print(f"成功插入 {len(structured_points)} 条结构化嵌入到Qdrant。")

    except Exception as e:
        print(f"插入数据时发生错误: {e}")
    finally:
        if conn:
            cur.close()
            conn.close()

# 混合查询函数
def hybrid_query_products(text_query: str = None, structured_query_data: dict = None, price_range: tuple = None, category_filter: str = None, limit: int = 5):

    product_ids_from_text = set()
    product_ids_from_structured = set()

    # 1. 文本语义搜索 (Qdrant)
    if text_query:
        query_text_vector = get_text_embedding(text_query)
        text_search_results = QDRANT_CLIENT.search(
            collection_name="product_text_embeddings",
            query_vector=query_text_vector,
            limit=limit * 2 # 稍微多取一些,以便后续过滤和融合
        )
        product_ids_from_text = {hit.id for hit in text_search_results}
        print(f"文本查询 '{text_query}' 获得 {len(product_ids_from_text)} 个结果。")

    # 2. 结构化数据语义搜索 (Qdrant,可能带Qdrant内部的payload过滤)
    structured_qdrant_filter = []
    if category_filter:
        structured_qdrant_filter.append(models.FieldCondition(
            key="category",
            match=models.MatchValue(value=category_filter)
        ))
    if price_range:
        structured_qdrant_filter.append(models.FieldCondition(
            key="price",
            range=models.Range(gte=price_range[0], lte=price_range[1])
        ))

    if structured_query_data:
        query_structured_vector = get_structured_embedding(structured_query_data)
        structured_search_results = QDRANT_CLIENT.search(
            collection_name="product_structured_embeddings",
            query_vector=query_structured_vector,
            query_filter=models.Filter(must=structured_qdrant_filter) if structured_qdrant_filter else None,
            limit=limit * 2
        )
        product_ids_from_structured = {hit.id for hit in structured_search_results}
        print(f"结构化查询获得 {len(product_ids_from_structured)} 个结果。")
    elif structured_qdrant_filter: # 如果没有语义查询,但有结构化过滤,直接在Qdrant中过滤
        # Qdrant的search API要求有query_vector,如果没有语义向量,可以考虑用point_lookup或filter_points
        # 这里简化处理,如果只有filter,就执行一个不带向量的filter操作 (Qdrant的scroll或search-without-vector)
        # 实际操作中,可以构造一个dummy vector或者用Qdrant的filter_points
        # For simplicity, if only filter is present, we will fetch all points matching the filter
        filtered_points = QDRANT_CLIENT.scroll(
            collection_name="product_structured_embeddings",
            scroll_filter=models.Filter(must=structured_qdrant_filter),
            limit=limit * 2,
            with_payload=False,
            with_vectors=False
        )
        product_ids_from_structured = {point.id for point, _ in filtered_points}
        print(f"结构化过滤获得 {len(product_ids_from_structured)} 个结果。")

    # 3. 结果融合 (这里使用简单的交集或并集,更复杂可使用RRF等)
    final_product_ids = set()
    if product_ids_from_text and product_ids_from_structured:
        final_product_ids = product_ids_from_text.intersection(product_ids_from_structured)
        print(f"文本和结构化查询交集获得 {len(final_product_ids)} 个结果。")
    elif product_ids_from_text:
        final_product_ids = product_ids_from_text
    elif product_ids_from_structured:
        final_product_ids = product_ids_from_structured
    else:
        return [] # 没有匹配结果

    if not final_product_ids:
        return []

    # 4. 从RDBMS获取详细数据
    conn = None
    try:
        conn = psycopg2.connect(**DB_CONFIG)
        cur = conn.cursor()

        # 将UUID列表转换为字符串列表,并用JOIN查询
        ids_str = tuple(str(uid) for uid in final_product_ids)

        # 排序策略:这里从Qdrant拿到的结果本身带有相似度排序,但交集后会打乱。
        # 最好的方法是使用RRF(Reciprocal Rank Fusion)等方法在Qdrant端融合排序,
        # 或者在Python代码中手动计算融合分数并重新排序。
        # 这里简化,直接从RDBMS按ID取出,不进行额外的排序。
        cur.execute(f"""
            SELECT id, name, description, category, price, material, brand
            FROM products
            WHERE id IN %s
            LIMIT %s
        """, (ids_str, limit)) # tuple(ids_str)

        results = cur.fetchall()
        columns = [desc[0] for desc in cur.description]
        return [dict(zip(columns, row)) for row in results]

    except Exception as e:
        print(f"从PostgreSQL获取数据时发生错误: {e}")
        return []
    finally:
        if conn:
            cur.close()
            conn.close()

# --- 执行示例 ---
# 确保Qdrant Collection存在
setup_qdrant_collections()

# 示例数据
products_to_insert = [
    {
        'name': '轻便防水徒步背包',
        'description': '这款背包采用高强度尼龙材料,轻便且完全防水,适合长途徒步和户外探险。多功能口袋设计,方便收纳。',
        'category': '背包', 'price': 899.00, 'material': '尼龙', 'brand': '户外之星'
    },
    {
        'name': '真皮商务手提包',
        'description': '优雅的真皮手提包,经典设计,内部空间宽敞,可容纳笔记本电脑,是商务通勤和日常使用的理想选择。',
        'category': '手提包', 'price': 1899.00, 'material': '真皮', 'brand': '时尚风向'
    },
    {
        'name': '高性能缓震跑鞋',
        'description': '专为跑步爱好者设计,提供卓越的缓震技术和稳定的支撑,轻量化设计,助你在赛道上轻松超越自我。',
        'category': '鞋子', 'price': 699.00, 'material': '合成纤维', 'brand': '活力跑者'
    },
    {
        'name': '不锈钢保温杯',
        'description': '高品质304不锈钢保温杯,双层真空隔热,持久保冷保热,是你户外活动、办公室的理想伴侣。',
        'category': '杯具', 'price': 159.00, 'material': '不锈钢', 'brand': '生活优品'
    },
    {
        'name': '帆布休闲背包',
        'description': '时尚简约的帆布背包,大容量设计,适合学生和城市日常出行,多色可选。',
        'category': '背包', 'price': 399.00, 'material': '帆布', 'brand': '城市漫步'
    }
]

# 插入数据
insert_product_data(products_to_insert)

# 示例1: 纯文本语义查询 (找适合户外探险的包)
print("n--- 示例1: 纯文本语义查询 ---")
text_q = "适合户外探险的轻便包"
results1 = hybrid_query_products(text_query=text_q)
for r in results1:
    print(f"ID: {r['id']}, 名称: {r['name']}, 描述: {r['description'][:30]}..., 价格: {r['price']}")

# 示例2: 纯结构化数据语义查询 (找便宜的尼龙包)
print("n--- 示例2: 纯结构化数据语义查询 ---")
# 这里的 structured_query_data 仅用于生成向量,精确过滤由 category_filter 和 price_range 控制
struct_q_data = {'category': '背包', 'price': 400, 'material': '尼龙', 'brand': '任意'}
results2 = hybrid_query_products(structured_query_data=struct_q_data, category_filter='背包', price_range=(0, 500))
for r in results2:
    print(f"ID: {r['id']}, 名称: {r['name']}, 类别: {r['category']}, 材质: {r['material']}, 价格: {r['price']}")

# 示例3: 混合查询 (找适合商务通勤的真皮包,价格不超过2000,类别为手提包)
print("n--- 示例3: 混合查询 (文本语义 + 结构化语义 + 结构化精确过滤) ---")
text_q_mix = "适合商务通勤的时尚包"
struct_q_data_mix = {'category': '手提包', 'price': 1500, 'material': '真皮', 'brand': '时尚风向'}
results3 = hybrid_query_products(text_query=text_q_mix, structured_query_data=struct_q_data_mix, 
                                 price_range=(0, 2000), category_filter='手提包')
for r in results3:
    print(f"ID: {r['id']}, 名称: {r['name']}, 描述: {r['description'][:30]}..., 类别: {r['category']}, 价格: {r['price']}")

代码解析:

  • 我们创建了两个Qdrant集合,分别用于存储文本嵌入和结构化嵌入,并通过 product_id 进行关联。
  • Qdrant支持payload(元数据)过滤,这使得我们可以在向量搜索的同时,对结构化属性进行精确过滤,例如 categoryprice
  • 混合查询时,我们分别向两个Qdrant集合发出向量搜索请求,获取各自的匹配 product_id 集合。
  • 然后,通过集合操作(如交集 intersection)来融合两个模态的结果。
  • 最后,使用这些过滤后的 product_id 从PostgreSQL中查询完整的商品详情。
  • 排序融合:这里只是简单地取交集,实际生产中,需要更复杂的排序融合策略,例如RRF (Reciprocal Rank Fusion),它能有效地结合来自不同搜索源的排名列表。

3.3 模式3:统一多模态向量数据库 (Weaviate/Qdrant Named Vectors)

一些新兴的向量数据库开始支持在单个集合/类中存储多个命名向量,或者提供更高级的多模态处理能力。这简化了架构,因为它将所有向量和元数据存储在一个系统中。

优点:

  • 架构简化:所有数据(包括多个向量)都在一个系统中。
  • 原生的多向量支持:数据库可能提供内置的多向量查询和融合机制。
  • 高性能:专为向量和混合查询设计。

缺点:

  • 相对较新,生态系统不如RDBMS成熟。
  • 可能存在供应商锁定。
  • 学习曲线:需要熟悉特定数据库的API和概念。

数据模型 (以Weaviate为例):

在Weaviate中,一个Class(相当于表)可以定义多个属性。每个Class默认可以有一个向量。通过 named_vectors 功能,可以为一个对象定义多个向量。

{
  "class": "Product",
  "description": "Represents a product item with text and structured attributes",
  "vectorizer": "none", // 不使用默认的全局向量化器,我们将手动处理向量
  "properties": [
    {
      "name": "id",
      "dataType": ["text"],
      "description": "Unique product identifier"
    },
    {
      "name": "name",
      "dataType": ["text"],
      "description": "Product name"
    },
    {
      "name": "description",
      "dataType": ["text"],
      "description": "Detailed product description"
    },
    {
      "name": "category",
      "dataType": ["text"],
      "description": "Product category"
    },
    {
      "name": "price",
      "dataType": ["number"],
      "description": "Product price"
    },
    {
      "name": "material",
      "dataType": ["text"],
      "description": "Product material"
    },
    {
      "name": "brand",
      "dataType": ["text"],
      "description": "Product brand"
    }
  ],
  "vectorConfig": {
    "text_embedding": { // 文本向量配置
      "vectorizer": {
        "text2vec-transformers": { // 使用Weaviate的text2vec模块进行向量化
          "vectorizeClassName": false,
          "properties": ["description"] // 指定哪些属性用于生成此向量
        }
      },
      "distance": "cosine"
    },
    "structured_embedding": { // 结构化向量配置
      "vectorizer": {
        "none": {} // 此向量不通过Weaviate的模块生成,而是我们手动生成并上传
      },
      "distance": "cosine"
    }
  }
}

代码示例:Weaviate (With Named Vectors)

假设Weaviate服务器已运行,并安装了 text2vec-transformers 模块。

import weaviate
from weaviate.util import get_valid_uuid
import numpy as np
from sentence_transformers import SentenceTransformer
from sklearn.preprocessing import StandardScaler, OneHotEncoder
from sklearn.compose import ColumnTransformer
from sklearn.pipeline import Pipeline
import pandas as pd
from uuid import uuid4

# Weaviate客户端
WEAVIATE_CLIENT = weaviate.Client("http://localhost:8080") # 假设Weaviate运行在本地8080端口

# 文本嵌入模型 (仅用于生成结构化数据查询向量,因为文本数据由Weaviate自身向量化)
text_model = SentenceTransformer('all-MiniLM-L6-v2')

TEXT_EMBEDDING_DIM = text_model.get_sentence_embedding_dimension() # 384 for all-MiniLM-L6-v2
STRUCTURED_EMBEDDING_DIM = 15 # 需要根据实际preprocessor的输出维度调整

# 结构化数据预处理器 (与之前代码相同,确保fit的数据足够泛化)
product_data_for_preprocessor_fit = pd.DataFrame([
    {'category': '背包', 'price': 899, 'material': '尼龙', 'brand': '户外之星'},
    {'category': '手提包', 'price': 1899, 'material': '真皮', 'brand': '时尚风向'},
    {'category': '鞋子', 'price': 699, 'material': '合成纤维', 'brand': '活力跑者'},
    {'category': '杯具', 'price': 159, 'material': '不锈钢', 'brand': '生活优品'},
    {'category': '背包', 'price': 399, 'material': '帆布', 'brand': '城市漫步'},
    {'category': '手提包', 'price': 1200, 'material': '皮革', 'brand': '经典'},
    {'category': '服装', 'price': 250, 'material': '棉', 'brand': '休闲'}
])

numerical_features = ['price']
numerical_transformer = StandardScaler()
categorical_features = ['category', 'material', 'brand']
categorical_transformer = OneHotEncoder(handle_unknown='ignore')

preprocessor = ColumnTransformer(
    transformers=[
        ('num', numerical_transformer, numerical_features),
        ('cat', categorical_transformer, categorical_features)
    ])
preprocessor.fit(product_data_for_preprocessor_fit)

def get_structured_embedding(data_row: dict) -> list:
    df_row = pd.DataFrame([data_row])
    processed_features = preprocessor.transform(df_row)
    return processed_features.toarray().flatten().tolist()

# 设置Weaviate schema
def setup_weaviate_schema():
    if WEAVIATE_CLIENT.schema.exists("Product"):
        WEAVIATE_CLIENT.schema.delete_class("Product")
        print("Deleted existing 'Product' class.")

    product_class_schema = {
      "class": "Product",
      "description": "Represents a product item with text and structured attributes",
      "vectorizer": "none", # 不使用默认的全局向量化器,我们将手动处理结构化向量
      "properties": [
        {"name": "name", "dataType": ["text"]},
        {"name": "description", "dataType": ["text"]},
        {"name": "category", "dataType": ["text"]},
        {"name": "price", "dataType": ["number"]},
        {"name": "material", "dataType": ["text"]},
        {"name": "brand", "dataType": ["text"]}
      ],
      "vectorConfig": {
        "text_embedding": { # 文本向量配置
          "vectorizer": {
            "text2vec-transformers": { # 使用Weaviate的text2vec模块进行向量化
              "vectorizeClassName": False,
              "properties": ["description"] # 指定哪些属性用于生成此向量
            }
          },
          "distance": "cosine"
        },
        "structured_embedding": { # 结构化向量配置
          "vectorizer": {
            "none": {} # 此向量不通过Weaviate的模块生成,而是我们手动生成并上传
          },
          "distance": "cosine"
        }
      }
    }
    WEAVIATE_CLIENT.schema.create_class(product_class_schema)
    print("Created 'Product' class schema in Weaviate with named vectors.")

# 插入数据函数
def insert_product_data_weaviate(products):
    with WEAVIATE_CLIENT.batch as batch:
        batch.batch_size = 100
        for prod in products:
            product_id = str(uuid4())

            # 准备结构化数据向量
            structured_embedding = get_structured_embedding({k: prod[k] for k in ['category', 'price', 'material', 'brand']})

            data_object = {
                "name": prod['name'],
                "description": prod['description'],
                "category": prod['category'],
                "price": prod['price'],
                "material": prod['material'],
                "brand": prod['brand']
            }

            # 使用named_vectors参数上传手动生成的向量
            batch.add_data_object(
                data_object=data_object,
                class_name="Product",
                uuid=product_id,
                vector={
                    "structured_embedding": structured_embedding # 手动生成的结构化向量
                }
                # text_embedding 会由Weaviate的text2vec-transformers模块自动生成
            )
    print(f"成功插入 {len(products)} 条产品数据到Weaviate。")

# 混合查询函数
def hybrid_query_products_weaviate(text_query: str = None, structured_query_data: dict = None, price_range: tuple = None, category_filter: str = None, limit: int = 5):

    query = WEAVIATE_CLIENT.query.get("Product", ["name", "description", "category", "price", "material", "brand"])

    # 构建Where过滤器 (结构化属性的精确过滤)
    where_filter_operands = []
    if category_filter:
        where_filter_operands.append({
            "path": ["category"],
            "operator": "Equal",
            "valueText": category_filter
        })
    if price_range:
        where_filter_operands.append({
            "path": ["price"],
            "operator": "GreaterThanEqual",
            "valueNumber": price_range[0]
        })
        where_filter_operands.append({
            "path": ["price"],
            "operator": "LessThanEqual",
            "valueNumber": price_range[1]
        })

    if where_filter_operands:
        query = query.with_where({"operator": "And", "operands": where_filter_operands})

    # 构建多向量查询
    vector_queries = []
    if text_query:
        # Weaviate可以直接对属性进行文本向量化查询
        vector_queries.append(
            models.MultiVector(
                vector_name="text_embedding",
                vector=text_model.encode(text_query, convert_to_numpy=True).tolist()
            )
        )

    if structured_query_data:
        structured_q_vector = get_structured_embedding(structured_query_data)
        vector_queries.append(
            models.MultiVector(
                vector_name="structured_embedding",
                vector=structured_q_vector
            )
        )

    if vector_queries:
        # Weaviate的near_vectors支持传入多个命名向量进行搜索和融合
        query = query.with_near_vectors(vector_queries)

    results = query.with_limit(limit).do()

    if "data" in results and "Get" in results["data"] and "Product" in results["data"]["Get"]:
        return results["data"]["Get"]["Product"]
    return []

# --- 执行示例 ---
# 确保Weaviate Schema存在
setup_weaviate_schema()

# 示例数据 (与之前相同)
products_to_insert = [
    {
        'name': '轻便防水徒步背包',
        'description': '这款背包采用高强度尼龙材料,轻便且完全防水,适合长途徒步和户外探险。多功能口袋设计,方便收纳。',
        'category': '背包', 'price': 899.00, 'material': '尼龙', 'brand': '户外之星'
    },
    {
        'name': '真皮商务手提包',
        'description': '优雅的真皮手提包,经典设计,内部空间宽敞,可容纳笔记本电脑,是商务通勤和日常使用的理想选择。',
        'category': '手提包', 'price': 1899.00, 'material': '真皮', 'brand': '时尚风向'
    },
    {
        'name': '高性能缓震跑鞋',
        'description': '专为跑步爱好者设计,提供卓越的缓震技术和稳定的支撑,轻量化设计,助你在赛道上轻松超越自我。',
        'category': '鞋子', 'price': 699.00, 'material': '合成纤维', 'brand': '活力跑者'
    },
    {
        'name': '不锈钢保温杯',
        'description': '高品质304不锈钢保温杯,双层真空隔热,持久保冷保热,是你户外活动、办公室的理想伴侣。',
        'category': '杯具', 'price': 159.00, 'material': '不锈钢', 'brand': '生活优品'
    },
    {
        'name': '帆布休闲背包',
        'description': '时尚简约的帆布背包,大容量设计,适合学生和城市日常出行,多色可选。',
        'category': '背包', 'price': 399.00, 'material': '帆布', 'brand': '城市漫步'
    }
]

# 插入数据
insert_product_data_weaviate(products_to_insert)

# 示例1: 纯文本语义查询 (找适合户外探险的包)
print("n--- 示例1: 纯文本语义查询 (Weaviate) ---")
text_q = "适合户外探险的轻便包"
results1 = hybrid_query_products_weaviate(text_query=text_q)
for r in results1:
    print(f"名称: {r['name']}, 描述: {r['description'][:30]}..., 价格: {r['price']}")

# 示例2: 纯结构化数据语义查询 (找便宜的尼龙包)
print("n--- 示例2: 纯结构化数据语义查询 (Weaviate) ---")
struct_q_data = {'category': '背包', 'price': 400, 'material': '尼龙', 'brand': '任意'}
results2 = hybrid_query_products_weaviate(structured_query_data=struct_q_data, category_filter='背包', price_range=(0, 500))
for r in results2:
    print(f"名称: {r['name']}, 类别: {r['category']}, 材质: {r['material']}, 价格: {r['price']}")

# 示例3: 混合查询 (找适合商务通勤的真皮包,价格不超过2000,类别为手提包)
print("n--- 示例3: 混合查询 (文本语义 + 结构化语义 + 结构化精确过滤 - Weaviate) ---")
text_q_mix = "适合商务通勤的时尚包"
struct_q_data_mix = {'category': '手提包', 'price': 1500, 'material': '真皮', 'brand': '时尚风向'}
results3 = hybrid_query_products_weaviate(text_query=text_q_mix, structured_query_data=struct_q_data_mix, 
                                          price_range=(0, 2000), category_filter='手提包')
for r in results3:
    print(f"名称: {r['name']}, 描述: {r['description'][:30]}..., 类别: {r['category']}, 价格: {r['price']}")

代码解析:

  • 我们定义了一个Weaviate Product 类,并配置了两个 vectorConfig,分别为 text_embeddingstructured_embedding
  • text_embedding 利用Weaviate内置的 text2vec-transformers 模块,自动从 description 属性生成向量。
  • structured_embedding 配置为 vectorizer: none,表示我们手动生成此向量并在上传数据时提供。
  • 插入数据时,我们为 description 属性赋值,Weaviate会自动处理其文本向量化。同时,我们手动生成 structured_embedding 并通过 vector 参数的 structured_embedding 键上传。
  • 查询时,Weaviate的 with_near_vectors 方法允许同时传入多个命名向量进行搜索。Weaviate会内部融合这些向量的相似度。
  • with_where 方法则用于结构化属性的精确过滤,它与向量搜索是并行且高效的。
  • 这种模式极大地简化了多模态查询的逻辑,将向量管理、融合和结构化过滤统一在数据库层进行。

4. 高级考量与最佳实践

4.1 嵌入模型选择与微调

  • 领域适应性: 通用嵌入模型(如all-MiniLM-L6-v2)在多数情况下表现良好,但对于特定领域(如医疗、法律、金融),微调(Fine-tuning)或使用领域特定的预训练模型能显著提升效果。
  • 维度与性能: 嵌入维度越高,通常能捕获更多信息,但存储和查询成本也越高。需要权衡精度和性能。
  • 更新策略: 随着数据变化,文本内容和结构化属性可能更新。需要有机制重新生成和更新对应的向量。

4.2 结构化数据向量化的细微之处

  • 高基数分类特征: 对于类别数量巨大的特征(如用户ID、产品型号),独热编码不再适用。可以考虑类别嵌入(Learning Embeddings for Categorical Features)或哈希编码。
  • 稀疏特征: 许多结构化特征可能是稀疏的。如何有效地将其编码到密集向量中是一个挑战。自编码器或深度学习模型对此有优势。
  • 特征工程: 在生成结构化向量之前,进行良好的特征工程(如组合特征、多项式特征)可以提升向量的表达能力。
  • 时间序列数据: 对于日期、时间戳等,可以提取年、月、日、星期几等作为分类或数值特征,或者使用专门的时间序列编码器。

4.3 权重与融合策略

当同时存在多个向量搜索结果(例如文本相似度、结构化相似度)和结构化精确过滤时,如何将它们的结果融合并排序,是影响检索质量的关键。

  • 简单加权求和: 对不同模态的相似度分数(或距离的倒数)进行加权求和。权重需要通过实验或机器学习进行调整。
  • RRF (Reciprocal Rank Fusion): 一种非参数的融合方法,它根据每个结果在不同搜索列表中的排名来计算一个融合得分,对排名靠前的结果给予更高的权重,对排名靠后的结果进行惩罚。它不需要调整权重,对不同搜索源的得分尺度不敏感。
  • 学习排序 (Learning to Rank): 最复杂但也最有效的方法。将所有特征(包括向量相似度、结构化属性匹配度、关键词匹配度等)作为输入,训练一个机器学习模型来预测最终的排序。

4.4 可扩展性与性能

  • 分片与分区: 对于海量数据,向量数据库通常支持数据分片和分区,以实现水平扩展。
  • 索引参数调优: ANN索引有许多参数(如HNSW的M、efConstruction、efSearch,IVFFlat的nlist、nprobe),需要根据数据集和查询需求进行细致调优。
  • 硬件选择: 向量计算通常受益于GPU加速(如果向量数据库支持)或高性能CPU。

4.5 维护与更新

  • 增量更新: 对于新增或修改的数据,需要及时更新RDBMS和向量数据库中的记录。可以利用消息队列(如Kafka)或CDC (Change Data Capture) 技术实现数据同步。
  • 模型迭代: 嵌入模型需要定期评估和更新,新的模型可能提供更好的语义理解能力。模型更新后,可能需要重新生成所有数据的向量。

4.6 混合搜索:向量与关键词的结合

纯向量搜索可能面临“召回不足”的问题,尤其是在关键词非常精确的场景。将向量搜索与传统的关键词搜索(如BM25)结合,可以实现“混合搜索”:

  • 并行搜索与融合: 同时执行向量搜索和关键词搜索,然后使用RRF等方法融合两个结果列表。
  • 预过滤: 先用关键词过滤一小部分数据,再对这部分数据进行向量搜索。

5. 展望未来

“Multi-vector Indexing”代表了现代信息检索系统的一个重要演进方向。它打破了传统数据模态的界限,使得系统能够以更智能、更符合人类认知的方式理解和检索数据。随着多模态AI模型(如CLIP、GPT-4V)的不断发展,我们将能够更轻松地从文本、图像、视频等多种源头生成统一或互补的向量表示,进一步丰富多向量索引的应用场景。从智能推荐系统到高级问答机器人,从个性化购物体验到复杂数据分析,多向量索引无疑将成为构建下一代智能应用的关键技术。其核心思想——将所有可量化的信息转化为向量,并以统一的数学形式进行存储、查询和融合,正引领我们走向一个更加语义化、更加智能的数据世界。

发表回复

您的邮箱地址不会被公开。 必填项已用 * 标注