各位同仁,
今天我们将深入探讨一个在现代数据处理领域越来越关键的话题:Vector-Relational Hybrid Memory。在当前信息爆炸的时代,我们面临着一个两难局面:一方面,业务对结构化数据的精准查询和事务完整性有着不可妥协的要求;另一方面,海量的非结构化文本、图像、音频数据,以及用户对语义理解、模糊匹配的需求日益增长。传统的解决方案,无论是纯关系型数据库还是新兴的向量数据库,都无法单独满足这两种截然不同的需求。
因此,我们提出并构建一套既能做语义模糊匹配、又能做精准 SQL 查询的混合状态层。这不仅仅是将两种技术简单地堆叠在一起,而是一种深思熟虑的架构整合,旨在发挥各自所长,弥补彼此短板,最终为应用程序提供一个统一、强大且灵活的数据访问接口。
1. 为什么需要混合内存?
在深入技术细节之前,我们首先明确问题的根源。
关系型数据库 (RDBMS) 的优势与局限:
RDBMS,如PostgreSQL、MySQL、Oracle,是结构化数据管理的基石。它们提供:
- 强一致性 (ACID): 事务的原子性、一致性、隔离性和持久性保证了数据可靠性。
- 严格的模式 (Schema): 定义了数据结构,有助于数据完整性。
- 强大的查询能力 (SQL): 复杂的联接、聚合、过滤,能够进行极其精确的数据检索。
- 数据完整性约束: 外键、唯一约束等确保数据质量。
然而,RDBMS 在处理非结构化数据和语义查询时力不从心:
- 语义鸿沟: RDBMS 不理解文本、图像内容的“含义”,只能进行关键字匹配,无法理解“相似性”或“相关性”。
- 模糊查询弱: 对于“找到与‘高效节能’相关的产品”,RDBMS 难以直接满足,需要复杂的全文检索(如
LIKE或全文索引),且效果有限。 - 性能瓶颈: 存储和索引大量非结构化数据成本高昂,且查询效率低下。
向量数据库 (Vector Database) 的优势与局限:
向量数据库,如Milvus、Pinecone、Weaviate,是为高效存储和查询高维向量而生。它们的核心在于:
- 语义表示: 通过深度学习模型将文本、图像、音频等非结构化数据转换为高维向量(Embedding),这些向量捕捉了数据的语义信息。
- 相似性搜索: 在向量空间中,通过计算向量之间的距离(如余弦相似度、欧氏距离),快速找到语义上相似的数据点。
- 高效 ANN (Approximate Nearest Neighbor) 算法: HNSW、IVF_FLAT等算法能在海量向量中实现近实时查询。
尽管向量数据库在语义搜索方面表现卓越,但它们也存在局限性:
- 缺乏结构化查询能力: 它们通常不擅长处理复杂的结构化查询,如按日期范围过滤、多表联接等。
- 弱事务保证: 大多数向量数据库不提供 RDBMS 级别的 ACID 事务保证。
- 数据模型简单: 通常只关注向量本身及其元数据,对复杂实体关系的支持不足。
混合的必要性:
设想一个电商平台,用户可能搜索“蓝色运动鞋,价格低于100美元,且评价中提到‘舒适’”。
- “蓝色运动鞋”:可能需要语义匹配(用户可能输入“天蓝色跑鞋”)。
- “价格低于100美元”:精准的结构化过滤。
- “评价中提到‘舒适’”:需要对评论文本进行语义分析。
纯 RDBMS 难以处理“舒适”的语义匹配,纯向量数据库难以处理“价格低于100美元”的精准过滤和多条件组合。因此,一个能够同时处理这两种需求,并在两者之间无缝切换的混合状态层,成为了现代智能应用的必然选择。
2. 混合状态层的核心架构设计
我们的目标是构建一个能够将结构化数据(及其关系)与非结构化数据的语义表示(向量)统一起来的系统。这并非简单地在应用层调用两个独立的数据库,而是在数据模型、数据流和查询引擎层面进行深度整合。
2.1 架构概览
一个典型的Vector-Relational Hybrid Memory架构通常包含以下核心组件:
| 组件类型 | 核心功能 | 典型技术选型 |
|---|---|---|
| 关系型存储 (RDBMS) | 存储结构化数据、元数据、实体关系、事务保障。 | PostgreSQL, MySQL, SQL Server |
| 向量存储 (Vector DB) | 存储高维向量、支持高效相似性搜索。 | Milvus, Pinecone, Weaviate, Qdrant |
| 数据摄入与嵌入管线 | 负责数据清洗、切块、生成向量并同步到双存储。 | Python (Langchain, Transformers), Kafka |
| 嵌入模型服务 | 提供将文本/图像等转换为向量的API。 | Sentence-Transformers, OpenAI API, Hugging Face APIs |
| 混合查询引擎 | 协调跨RDBMS和Vector DB的查询,整合结果。 | 自定义Python/Go服务,API Gateway |
| 应用层接口 | 暴露统一的API供上层应用调用。 | REST API, GraphQL |
2.2 数据模型策略:桥接关系与向量
关键在于如何在关系型数据和向量数据之间建立起稳固的桥梁。我们采取的核心策略是:在关系型数据库中存储基础的结构化信息和指向向量的引用,而在向量数据库中存储实际的向量及其必要的关联ID。
考虑一个文档管理系统作为示例:
关系型数据库中的表结构 (PostgreSQL):
documents 表用于存储文档的结构化元数据和内容摘要。
CREATE TABLE documents (
id SERIAL PRIMARY KEY,
title VARCHAR(255) NOT NULL,
author VARCHAR(255),
category VARCHAR(100),
created_at TIMESTAMP WITH TIME ZONE DEFAULT CURRENT_TIMESTAMP,
content_summary TEXT, -- 存储文档的摘要或部分内容
vector_id VARCHAR(255) UNIQUE NOT NULL -- 关联到向量数据库中的唯一ID
);
CREATE INDEX idx_documents_category ON documents (category);
CREATE INDEX idx_documents_author ON documents (author);
-- 为created_at创建索引以支持时间范围查询
CREATE INDEX idx_documents_created_at ON documents (created_at);
向量数据库中的数据模型 (概念性 Milvus/Pinecone Collection):
document_vectors 集合 (Collection) 存储文档内容的向量表示。
Collection: document_vectors
Fields:
- id (Primary Key, VARCHAR/Int64): 对应RDBMS中的 `documents.vector_id`。
- embedding (Vector, FLOAT_VECTOR): 存储文档内容的嵌入向量。
- metadata (Optional, JSON/key-value): 可以存储一些轻量级、高频过滤的元数据(如 `category`),用于在向量搜索阶段进行前置过滤。
vector_id 是连接两个世界的关键。它在 RDBMS 中作为外键(逻辑上的,因为不是直接的数据库外键约束,而是应用层面的关联),指向向量数据库中的对应向量。
数据流示意:
- 原始数据 (Raw Data): 文档的完整文本、标题、作者、创建日期等。
- 数据清洗与切块 (Preprocessing & Chunking): 对原始文本进行清洗,并根据需要分割成更小的、适合嵌入的逻辑单元。
- 向量生成 (Embedding Generation): 使用预训练的语言模型(如 BERT, OpenAI Embeddings)将文本切块转换为高维向量。
- 双存储写入 (Dual Storage Write):
- 将文档的结构化元数据(
id,title,author,category,created_at,content_summary)写入 PostgreSQL 的documents表。 - 将生成的向量和对应的
vector_id写入向量数据库的document_vectors集合。 - 确保
vector_id在两个系统中保持一致。
- 将文档的结构化元数据(
3. 详细实现:构建混合内存的核心模块
接下来,我们将通过 Python 代码示例,逐步构建这个混合状态层的关键模块。
3.1 环境准备与依赖
我们将使用 Python 作为实现语言。需要安装以下库:
psycopg2-binary: PostgreSQL 客户端。sentence-transformers: 用于生成文本嵌入。langchain: 用于文本切块。faiss-cpu: 一个强大的向量搜索库,用于演示目的,替代生产级向量数据库。在生产环境中,通常会使用 Milvus、Pinecone 等。
pip install psycopg2-binary sentence-transformers langchain faiss-cpu
3.2 关系型数据库初始化
首先,创建 PostgreSQL 数据库和 documents 表。
import psycopg2
from psycopg2 import sql
class PGClient:
def __init__(self, dbname, user, password, host="localhost", port="5432"):
self.conn_params = {
"dbname": dbname,
"user": user,
"password": password,
"host": host,
"port": port
}
self.conn = None
def connect(self):
try:
self.conn = psycopg2.connect(**self.conn_params)
self.conn.autocommit = True # For simplicity in this demo
print("Connected to PostgreSQL successfully.")
except psycopg2.Error as e:
print(f"Error connecting to PostgreSQL: {e}")
self.conn = None
def close(self):
if self.conn:
self.conn.close()
print("PostgreSQL connection closed.")
def execute_query(self, query, params=None, fetch_results=False):
if not self.conn:
self.connect()
if not self.conn:
return None
try:
with self.conn.cursor() as cur:
cur.execute(query, params)
if fetch_results:
return cur.fetchall()
return True
except psycopg2.Error as e:
print(f"Error executing query: {e}")
return False
# 初始化 PostgreSQL 客户端
pg_client = PGClient(dbname="hybrid_db", user="your_user", password="your_password")
pg_client.connect()
# 创建 documents 表
create_table_query = """
CREATE TABLE IF NOT EXISTS documents (
id SERIAL PRIMARY KEY,
title VARCHAR(255) NOT NULL,
author VARCHAR(255),
category VARCHAR(100),
created_at TIMESTAMP WITH TIME ZONE DEFAULT CURRENT_TIMESTAMP,
content_summary TEXT,
vector_id VARCHAR(255) UNIQUE NOT NULL
);
CREATE INDEX IF NOT EXISTS idx_documents_category ON documents (category);
CREATE INDEX IF NOT EXISTS idx_documents_author ON documents (author);
CREATE INDEX IF NOT EXISTS idx_documents_created_at ON documents (created_at);
"""
pg_client.execute_query(create_table_query)
print("PostgreSQL documents table ensured to exist.")
3.3 向量存储初始化 (FAISS 演示)
为了简化,我们使用 faiss-cpu 在内存中模拟向量存储。在生产环境中,这将被替换为 Milvus、Pinecone 等专门的向量数据库客户端。
import faiss
import numpy as np
import uuid
class VectorDBClient:
def __init__(self, dim):
self.dim = dim
self.index = faiss.IndexFlatL2(dim) # L2 distance for similarity
self.id_to_idx = {} # Map vector_id (string) to FAISS internal index (int)
self.idx_counter = 0
def insert(self, vector_id: str, embedding: np.ndarray):
if vector_id in self.id_to_idx:
# For simplicity, we'll just ignore if already exists, or raise error
print(f"Warning: vector_id {vector_id} already exists. Skipping insertion.")
return
embedding = embedding.astype('float32').reshape(1, -1)
if embedding.shape[1] != self.dim:
raise ValueError(f"Embedding dimension mismatch. Expected {self.dim}, got {embedding.shape[1]}")
self.index.add(embedding)
self.id_to_idx[vector_id] = self.idx_counter
self.idx_counter += 1
return True
def search(self, query_embedding: np.ndarray, k: int = 5):
query_embedding = query_embedding.astype('float32').reshape(1, -1)
if query_embedding.shape[1] != self.dim:
raise ValueError(f"Query embedding dimension mismatch. Expected {self.dim}, got {query_embedding.shape[1]}")
distances, faiss_indices = self.index.search(query_embedding, k)
# Map FAISS internal indices back to our vector_ids
results = []
idx_to_id = {v: k for k, v in self.id_to_idx.items()}
for i, dist in zip(faiss_indices[0], distances[0]):
if i != -1: # FAISS returns -1 for empty slots
results.append({"vector_id": idx_to_id[i], "distance": dist})
return results
def get_vector_ids(self):
return list(self.id_to_idx.keys())
# 嵌入模型的维度,例如 sentence-transformers/all-MiniLM-L6-v2 的维度是 384
EMBEDDING_DIM = 384
vector_db_client = VectorDBClient(dim=EMBEDDING_DIM)
print(f"FAISS vector store initialized with dimension {EMBEDDING_DIM}.")
3.4 嵌入模型和文本切块
使用 sentence-transformers 加载预训练模型,并使用 langchain 进行文本切块。
from sentence_transformers import SentenceTransformer
from langchain.text_splitter import RecursiveCharacterTextSplitter
# 加载嵌入模型
embedding_model = SentenceTransformer('all-MiniLM-L6-v2')
print("Embedding model 'all-MiniLM-L6-v2' loaded.")
# 初始化文本切块器
text_splitter = RecursiveCharacterTextSplitter(
chunk_size=500, # 每个块的最大字符数
chunk_overlap=100, # 块之间的重叠字符数
length_function=len,
add_start_index=True,
)
# 辅助函数:生成嵌入
def get_embedding(text: str) -> np.ndarray:
return embedding_model.encode(text)
print("Text splitter initialized.")
3.5 数据摄入管线
现在,我们构建一个函数来模拟数据摄入:读取文档,切块,生成嵌入,然后将元数据和向量分别存入 PostgreSQL 和 FAISS。
from datetime import datetime
class Document:
def __init__(self, id: int, title: str, author: str, category: str, content: str, created_at: datetime = None):
self.id = id
self.title = title
self.author = author
self.category = category
self.content = content
self.created_at = created_at if created_at else datetime.now()
def ingest_document(doc: Document):
# 1. 文本切块
chunks = text_splitter.split_text(doc.content)
# 为了简化,我们只对第一个或最具代表性的块生成向量,或所有块的平均向量
# 生产环境中可能需要为每个块生成一个向量,并管理其与原始文档的关联
if not chunks:
print(f"No content chunks generated for document ID: {doc.id}")
return False
# 这里我们只取第一个块作为文档的代表性摘要和嵌入源
content_summary_for_embedding = chunks[0]
# 2. 生成嵌入
embedding = get_embedding(content_summary_for_embedding)
# 3. 生成唯一的 vector_id
vector_id = str(uuid.uuid4())
# 4. 插入到 PostgreSQL
insert_pg_query = """
INSERT INTO documents (id, title, author, category, created_at, content_summary, vector_id)
VALUES (%s, %s, %s, %s, %s, %s, %s)
ON CONFLICT (id) DO UPDATE SET
title = EXCLUDED.title,
author = EXCLUDED.author,
category = EXCLUDED.category,
created_at = EXCLUDED.created_at,
content_summary = EXCLUDED.content_summary,
vector_id = EXCLUDED.vector_id;
"""
pg_success = pg_client.execute_query(
insert_pg_query,
(doc.id, doc.title, doc.author, doc.category, doc.created_at, content_summary_for_embedding, vector_id)
)
if not pg_success:
print(f"Failed to ingest document {doc.id} into PostgreSQL.")
return False
# 5. 插入到向量数据库
vector_db_success = vector_db_client.insert(vector_id, embedding)
if not vector_db_success:
print(f"Failed to ingest vector for document {doc.id} into vector store.")
# 实际生产中,这里可能需要回滚PostgreSQL的插入,或标记为待处理
return False
print(f"Successfully ingested document {doc.id} with vector_id {vector_id}.")
return True
# 模拟一些文档数据
sample_docs = [
Document(1, "The Future of Artificial Intelligence", "Dr. Alan Turing", "AI",
"Artificial intelligence is rapidly evolving, promising to revolutionize various industries. From machine learning advancements to natural language processing breakthroughs, AI's impact is profound. Ethical considerations and the development of responsible AI are paramount for its sustainable growth.",
datetime(2023, 1, 15)),
Document(2, "Quantum Computing Explained", "Alice Smith", "Physics",
"Quantum computing harnesses quantum-mechanical phenomena such as superposition and entanglement to perform computations. Unlike classical computers, quantum computers can process complex problems faster, opening new frontiers in drug discovery and materials science.",
datetime(2022, 11, 1)),
Document(3, "Understanding Blockchain Technology", "Bob Johnson", "Fintech",
"Blockchain is a decentralized, distributed ledger technology that records transactions across many computers. It's the underlying technology for cryptocurrencies like Bitcoin, offering transparency, security, and immutability.",
datetime(2023, 3, 20)),
Document(4, "AI Ethics and Governance", "Dr. Alan Turing", "AI",
"The ethical implications of artificial intelligence are a growing concern. Discussions around bias, fairness, privacy, and accountability are crucial for developing AI systems that benefit society while mitigating potential harms. Governance frameworks are being developed globally.",
datetime(2023, 5, 10)),
Document(5, "Sustainable Energy Solutions", "Dr. Emily White", "Environment",
"Exploring renewable energy sources like solar, wind, and hydropower is vital for combating climate change. Innovations in battery storage and smart grids are accelerating the transition to a sustainable energy future, reducing reliance on fossil fuels.",
datetime(2024, 1, 5)),
]
print("n--- Ingesting sample documents ---")
for doc in sample_docs:
ingest_document(doc)
print("--- Document ingestion complete ---")
3.6 混合查询引擎设计与实现
这是混合状态层的核心,它负责根据查询的类型和条件,智能地将查询路由到 RDBMS 或向量数据库,或两者协同工作。
3.6.1 纯语义模糊匹配 (Vector-First)
当用户只关心语义相似性时,我们首先查询向量数据库,然后根据返回的 vector_id 到 PostgreSQL 中获取完整的元数据。
def semantic_search(query_text: str, k: int = 5):
query_embedding = get_embedding(query_text)
vector_results = vector_db_client.search(query_embedding, k=k)
if not vector_results:
return []
# 提取 vector_ids
vector_ids = [res["vector_id"] for res in vector_results]
# 从 PostgreSQL 获取对应的文档元数据
# 使用 IN 子句来批量查询,提高效率
pg_query = f"""
SELECT id, title, author, category, created_at, content_summary, vector_id
FROM documents
WHERE vector_id IN ({', '.join(['%s'] * len(vector_ids))});
"""
pg_docs_raw = pg_client.execute_query(pg_query, tuple(vector_ids), fetch_results=True)
if not pg_docs_raw:
return []
# 将 PostgreSQL 结果与向量搜索结果合并,并按相似度排序
pg_docs_map = {doc[6]: doc for doc in pg_docs_raw} # Map vector_id to doc data
final_results = []
for vec_res in vector_results:
doc_data = pg_docs_map.get(vec_res["vector_id"])
if doc_data:
final_results.append({
"id": doc_data[0],
"title": doc_data[1],
"author": doc_data[2],
"category": doc_data[3],
"created_at": doc_data[4],
"content_summary": doc_data[5],
"vector_id": doc_data[6],
"distance": vec_res["distance"] # 包含相似度距离
})
return final_results
print("n--- Semantic Search Example ---")
query = "latest advances in machine learning and AI ethics"
semantic_results = semantic_search(query, k=3)
for i, res in enumerate(semantic_results):
print(f"{i+1}. Title: {res['title']}, Author: {res['author']}, Category: {res['category']}, Distance: {res['distance']:.4f}")
输出示例 (可能因模型和数据略有不同):
--- Semantic Search Example ---
1. Title: The Future of Artificial Intelligence, Author: Dr. Alan Turing, Category: AI, Distance: 0.8123
2. Title: AI Ethics and Governance, Author: Dr. Alan Turing, Category: AI, Distance: 0.8541
3. Title: Quantum Computing Explained, Author: Alice Smith, Category: Physics, Distance: 1.2345
(注意:FAISS IndexFlatL2 使用 L2 距离,距离越小表示越相似。余弦距离是越大越相似。)
3.6.2 精准 SQL 查询 (Relational-First)
当查询完全基于结构化条件时,直接查询 PostgreSQL。
def precise_sql_query(conditions: dict, k: int = 5):
where_clauses = []
params = []
if "author" in conditions:
where_clauses.append("author ILIKE %s") # ILIKE for case-insensitive
params.append(f"%{conditions['author']}%")
if "category" in conditions:
where_clauses.append("category = %s")
params.append(conditions['category'])
if "start_date" in conditions:
where_clauses.append("created_at >= %s")
params.append(conditions['start_date'])
if "end_date" in conditions:
where_clauses.append("created_at <= %s")
params.append(conditions['end_date'])
where_str = "WHERE " + " AND ".join(where_clauses) if where_clauses else ""
pg_query = f"""
SELECT id, title, author, category, created_at, content_summary, vector_id
FROM documents
{where_str}
ORDER BY created_at DESC
LIMIT %s;
"""
params.append(k)
pg_docs_raw = pg_client.execute_query(pg_query, tuple(params), fetch_results=True)
results = []
if pg_docs_raw:
for doc_data in pg_docs_raw:
results.append({
"id": doc_data[0],
"title": doc_data[1],
"author": doc_data[2],
"category": doc_data[3],
"created_at": doc_data[4],
"content_summary": doc_data[5],
"vector_id": doc_data[6],
})
return results
print("n--- Precise SQL Query Example ---")
sql_query_conditions = {
"author": "turing",
"category": "AI",
"start_date": datetime(2023, 1, 1)
}
precise_results = precise_sql_query(sql_query_conditions, k=2)
for i, res in enumerate(precise_results):
print(f"{i+1}. Title: {res['title']}, Author: {res['author']}, Category: {res['category']}, Created At: {res['created_at'].strftime('%Y-%m-%d')}")
输出示例:
--- Precise SQL Query Example ---
1. Title: AI Ethics and Governance, Author: Dr. Alan Turing, Category: AI, Created At: 2023-05-10
2. Title: The Future of Artificial Intelligence, Author: Dr. Alan Turing, Category: AI, Created At: 2023-01-15
3.6.3 混合查询:语义搜索 + 关系型过滤
这是最常见的混合查询模式。用户提供一个语义查询字符串,同时附带结构化过滤条件。
流程:
- 首先,根据结构化过滤条件在 PostgreSQL 中预过滤文档,获取符合条件的
vector_id列表。 - 然后,使用用户查询字符串生成嵌入,并在向量数据库中进行相似性搜索,但仅限于步骤1中获取的
vector_id集合。 - 最后,将向量搜索结果与 PostgreSQL 中的完整元数据联接,并按相似度排序返回。
def hybrid_search_semantic_with_filters(query_text: str, filters: dict, k: int = 5):
# 1. 构建 SQL 查询以预过滤文档,获取 vector_ids
where_clauses = []
params = []
if "author" in filters:
where_clauses.append("author ILIKE %s")
params.append(f"%{filters['author']}%")
if "category" in filters:
where_clauses.append("category = %s")
params.append(filters['category'])
if "start_date" in filters:
where_clauses.append("created_at >= %s")
params.append(filters['start_date'])
if "end_date" in filters:
where_clauses.append("created_at <= %s")
params.append(filters['end_date'])
where_str = "WHERE " + " AND ".join(where_clauses) if where_clauses else ""
# 仅查询 vector_id 和 id
pg_pre_filter_query = f"""
SELECT vector_id, id
FROM documents
{where_str};
"""
pg_filtered_vector_ids_raw = pg_client.execute_query(pg_pre_filter_query, tuple(params), fetch_results=True)
if not pg_filtered_vector_ids_raw:
print("No documents found after applying relational filters.")
return []
# 提取符合条件的 vector_ids
allowed_vector_ids = {row[0] for row in pg_filtered_vector_ids_raw}
# 2. 生成查询嵌入
query_embedding = get_embedding(query_text)
# 3. 在向量数据库中进行相似性搜索,并过滤结果
all_vector_results = vector_db_client.search(query_embedding, k=len(vector_db_client.id_to_idx)) # search all, then filter
filtered_vector_results = []
for res in all_vector_results:
if res["vector_id"] in allowed_vector_ids:
filtered_vector_results.append(res)
if len(filtered_vector_results) >= k: # Stop if we found enough
break
if not filtered_vector_results:
return []
# 4. 从 PostgreSQL 获取最终的文档元数据
final_vector_ids = [res["vector_id"] for res in filtered_vector_results]
pg_final_query = f"""
SELECT id, title, author, category, created_at, content_summary, vector_id
FROM documents
WHERE vector_id IN ({', '.join(['%s'] * len(final_vector_ids))});
"""
pg_docs_raw = pg_client.execute_query(pg_final_query, tuple(final_vector_ids), fetch_results=True)
if not pg_docs_raw:
return []
pg_docs_map = {doc[6]: doc for doc in pg_docs_raw}
final_results = []
for vec_res in filtered_vector_results:
doc_data = pg_docs_map.get(vec_res["vector_id"])
if doc_data:
final_results.append({
"id": doc_data[0],
"title": doc_data[1],
"author": doc_data[2],
"category": doc_data[3],
"created_at": doc_data[4],
"content_summary": doc_data[5],
"vector_id": doc_data[6],
"distance": vec_res["distance"]
})
return final_results
print("n--- Hybrid Search (Semantic + Relational Filters) Example ---")
hybrid_query_text = "ethical considerations in AI"
hybrid_filters = {
"author": "turing",
"category": "AI",
"start_date": datetime(2023, 1, 1)
}
hybrid_results = hybrid_search_semantic_with_filters(hybrid_query_text, hybrid_filters, k=2)
for i, res in enumerate(hybrid_results):
print(f"{i+1}. Title: {res['title']}, Author: {res['author']}, Category: {res['category']}, Created At: {res['created_at'].strftime('%Y-%m-%d')}, Distance: {res['distance']:.4f}")
输出示例:
--- Hybrid Search (Semantic + Relational Filters) Example ---
1. Title: AI Ethics and Governance, Author: Dr. Alan Turing, Category: AI, Created At: 2023-05-10, Distance: 0.8541
2. Title: The Future of Artificial Intelligence, Author: Dr. Alan Turing, Category: AI, Created At: 2023-01-15, Distance: 0.9123
这个例子展示了如何先通过关系型数据库进行精确过滤,缩小向量搜索的范围,再进行语义匹配。在真实的向量数据库中,通常会支持在向量搜索时直接传入 filter 条件,这样可以在向量数据库层面完成过滤,效率更高。
3.6.4 混合查询:关系型查询 + 语义重排序 (Relational-First, Semantic Re-ranking)
这种模式适用于用户首先关注结构化条件,但希望结果能根据某个语义进行二次排序的场景。
流程:
- 首先,在 PostgreSQL 中执行精确的结构化查询。
- 获取所有符合条件的文档。
- 对于每个文档,获取其
content_summary或完整内容(如果存储在 RDBMS 中),并与用户提供的语义查询进行相似度计算。 - 根据相似度对 PostgreSQL 的结果进行重排序。
def hybrid_search_relational_with_semantic_reranking(sql_filters: dict, semantic_query_text: str, k: int = 5):
# 1. 首先在 PostgreSQL 中执行精确查询
initial_pg_results = precise_sql_query(sql_filters, k=100) # Fetch more than k to allow for re-ranking
if not initial_pg_results:
print("No documents found with initial relational filters.")
return []
# 2. 生成语义查询的嵌入
query_embedding = get_embedding(semantic_query_text)
# 3. 对每个结果进行语义相似度计算和重排序
reranked_results = []
for doc in initial_pg_results:
doc_content_embedding = get_embedding(doc["content_summary"]) # Use content_summary for embedding
# 计算 L2 距离,距离越小越相似
distance = np.linalg.norm(query_embedding - doc_content_embedding)
reranked_results.append({**doc, "distance": distance})
# 按距离(相似度)升序排序
reranked_results.sort(key=lambda x: x["distance"])
return reranked_results[:k]
print("n--- Hybrid Search (Relational First, Semantic Re-ranking) Example ---")
rerank_sql_filters = {
"author": "Dr. Alan Turing"
}
rerank_semantic_query = "ethical considerations for AI development"
rerank_results = hybrid_search_relational_with_semantic_reranking(rerank_sql_filters, rerank_semantic_query, k=2)
for i, res in enumerate(rerank_results):
print(f"{i+1}. Title: {res['title']}, Author: {res['author']}, Category: {res['category']}, Distance: {res['distance']:.4f}")
pg_client.close() # Close PostgreSQL connection
输出示例:
--- Hybrid Search (Relational First, Semantic Re-ranking) Example ---
1. Title: AI Ethics and Governance, Author: Dr. Alan Turing, Category: AI, Distance: 0.7890
2. Title: The Future of Artificial Intelligence, Author: Dr. Alan Turing, Category: AI, Distance: 0.9567
3.7 实践考量与优化
构建这样的混合系统,除了核心逻辑,还需要考虑一系列工程和运维问题:
-
数据一致性: RDBMS 和 Vector DB 之间的数据同步是关键。
- 事务性更新: 理想情况下,对文档的增删改是原子性的。但由于涉及两个独立系统,这很难实现。
- 事件驱动架构: 使用 Kafka 或 RabbitMQ 等消息队列,将数据变更事件发送给处理器,由处理器负责更新两个数据库。这增加了最终一致性(Eventual Consistency),但更具弹性。
- CDC (Change Data Capture): 监控 RDBMS 的事务日志,捕获变更并同步到向量数据库。
- 批处理与重试机制: 对于失败的同步操作,需要有重试和回滚机制。
-
性能优化:
- RDBMS 索引: 确保所有查询条件字段都有适当索引。
- 向量索引: 针对向量数据库,选择合适的 ANN 算法(HNSW、IVF_FLAT)及其参数调优(如 HNSW 的
M和efConstruction,IVF 的nlist和nprobe)。 - 查询下推: 尽可能将过滤条件推送到数据存储层执行,减少数据传输和处理量。例如,在向量数据库中利用其
metadata过滤能力。 - 缓存: 对频繁查询的结果进行缓存。
-
可伸缩性:
- RDBMS 扩展: 读写分离、分库分表。
- 向量数据库扩展: 大多数生产级向量数据库本身就是分布式系统,支持水平扩展。
- 服务解耦: 将数据摄入、嵌入生成、查询引擎等拆分为独立微服务,便于独立扩展。
-
嵌入模型管理:
- 模型版本控制: 不同的嵌入模型会生成不同的向量空间。
- 重嵌入策略: 当更新嵌入模型时,需要重新生成所有文档的向量。这通常是一个资源密集型操作,需要精心规划。
-
数据安全与访问控制: 确保两个数据库的数据都得到适当的保护和权限管理。
4. 典型应用场景
这种Vector-Relational Hybrid Memory架构在许多领域都有巨大的应用潜力:
- 智能企业知识库/文档搜索: 用户可以通过自然语言(语义)搜索文档,同时可以根据作者、部门、日期、文档类型等结构化元数据进行精确过滤。例如,“找到2023年关于人工智能伦理的内部报告”。
- 电商产品推荐与搜索: 用户可以搜索“与这款衬衫风格相似但价格低于50美元的商品”,同时结合产品描述的语义和价格、品牌等结构化属性。
- 客户支持与智能客服: 智能客服可以理解用户问题的语义,匹配到最相关的 FAQ 或知识库文章,同时可以根据客户ID、问题类型、产品型号等信息进行过滤。
- 法律与合规性检索: 律师可以搜索“与此案件判例语义相似的所有法律文件”,同时限定在特定法院、日期范围或案件类型。
- 内容审核与管理: 识别潜在有害内容时,既可以利用文本内容的语义相似性,也可以结合发布者、发布时间、关联标签等结构化信息进行高效筛选。
5. 挑战与未来展望
尽管混合内存提供了强大的能力,但也面临一些挑战:
- 复杂性管理: 维护两个不同类型的数据存储,以及它们之间的数据一致性和查询协调,增加了系统的复杂性和运维成本。
- 缺乏标准化: 目前没有统一的“混合数据库”标准,大多数解决方案都是定制化的集成。
- 成本考量: 运行两个数据库实例、嵌入模型服务以及协调逻辑,可能比单一数据库系统成本更高。
- 数据同步延迟: 实时性要求高的场景下,如何保证RDBMS和VDB的极低延迟同步是一个挑战。
然而,未来发展也充满希望:
- 数据库融合: 现有 RDBMS 正在积极集成向量能力(如 PostgreSQL 的
pgvector扩展),未来可能会出现更原生的混合数据库,将向量和结构化数据存储在同一系统中,大大简化架构。 - 更强大的嵌入模型: 多模态(文本、图像、音频)嵌入将使混合内存能够处理更丰富的非结构化数据类型。
- AI-Native Databases: 新一代数据库可能会将 AI 推理和向量处理作为其核心能力,提供更智能的数据管理和查询优化。
结语
Vector-Relational Hybrid Memory 代表了数据存储和查询范式的重要演进。它通过巧妙地结合关系型数据库的精准与向量数据库的语义能力,为构建能够真正理解和响应复杂用户意图的智能应用奠定了基础。虽然工程实现上存在挑战,但其带来的巨大价值和广阔的应用前景,无疑使其成为未来数据架构中不可或缺的一环。随着技术的不断成熟和融合,我们期待看到更加无缝、高效的混合数据解决方案的出现。