探讨 ‘Vector Store as Memory’:将 LangGraph 的 Checkpoints 与向量数据库融合,打造真正的‘第二大脑’
各位同仁,各位对人工智能未来充满好奇的技术专家们,晚上好。
我们今天齐聚一堂,共同探讨一个令人兴奋且极具潜力的技术方向:如何通过将 LangGraph 的 Checkpoints 与向量数据库深度融合,构建一个真正具备“第二大脑”能力的智能系统。在当前大语言模型(LLM)驱动的应用浪潮中,我们面临的核心挑战之一是:如何让我们的AI应用拥有持续的、上下文感知的、能够从历史经验中学习和推理的“记忆”?
传统的RAG(Retrieval Augmented Generation)模式,虽然极大地扩展了LLM的知识边界,但其本质是无状态的,每次查询都是相对独立的。而LangGraph,作为LLM编排的利器,通过状态机和节点间跳转,赋予了应用流程和生命周期。它的Checkpoints机制,更是为我们提供了保存应用状态的强大能力。然而,仅仅保存状态是不够的,如果不能智能地检索和利用这些历史状态,它们就只是一堆沉睡的数据。
我们的目标,正是要唤醒这些沉睡的记忆。我们不仅仅要存储过去的执行路径和结果,更要让系统能够理解这些记忆的“含义”,并能在未来需要时,根据当前的上下文和意图,智能地召回最相关的历史经验。这,就是“Vector Store as Memory”的核心思想,也是我们构建AI“第二大脑”的关键一步。
一、LangGraph:智能应用的骨架与记忆的起点
在深入探讨记忆机制之前,我们必须先理解其承载者——LangGraph。LangGraph是LangChain家族中的一员,旨在解决构建复杂LLM应用时的编排难题。它将一个复杂的任务分解为一系列可执行的步骤(节点),并通过定义节点之间的转换(边)来构建一个有向图。每个节点可以是一个LLM调用、一个工具执行、一个数据处理函数,甚至是另一个子图。
1. LangGraph 的核心优势:
- 状态管理 (State Management): LangGraph的核心是其状态管理机制。它维护一个共享的、可变的状态对象,在图中的所有节点之间传递和更新。这使得应用程序能够记住过去的步骤和结果,并在后续步骤中利用这些信息。
- 节点与边 (Nodes and Edges): 通过定义节点(执行单元)和边(状态转换逻辑),我们可以直观地构建复杂的业务流程。
- 循环与条件逻辑 (Loops and Conditional Logic): LangGraph支持条件性地根据当前状态选择下一个节点,甚至可以构建循环,实现迭代或自我修正的行为。
- 工具集成 (Tool Integration): 轻松集成各种工具,如搜索、数据库查询、API调用等,扩展LLM的能力边界。
- Checkpointing (检查点): 这是我们今天讨论的重点。LangGraph提供了一种机制,可以将图的当前状态持久化存储。
2. LangGraph Checkpoints 的作用:
Checkpoints是LangGraph提供的一种将图的当前状态保存到持久化存储中的机制。它主要服务于以下目的:
- 容错与恢复: 当应用程序崩溃或中断时,可以从最近的检查点恢复执行,避免从头开始。
- 调试与回溯: 开发者可以检查任意历史检查点的状态,理解应用程序的执行路径和中间结果。
- 可解释性: 记录了每个步骤的输入、输出和状态变化,有助于理解AI的决策过程。
- 重放与分析: 可以基于历史检查点重放应用程序的执行,进行性能分析或行为模式研究。
- 多轮对话与会话管理: 对于需要维护长会话状态的应用(如聊天机器人),检查点是保存对话历史的关键。
3. LangGraph Checkpoint 的底层机制:
LangGraph通过 BaseCheckpointSaver 抽象接口来管理检查点的持久化。最常用的实现包括 MemorySaver(内存存储,非持久化)和 SQLSaver(SQLite或PostgreSQL等关系型数据库存储)。
一个检查点通常包含以下核心信息:
thread_id: 标识一个独立的执行会话或对话。checkpoint_id: 检查点自身的唯一标识,通常是一个UUID。parent_checkpoint_id: 指向前一个检查点,构建历史链。thread_ts: 检查点创建的时间戳。state: 序列化后的图的完整状态(例如,消息列表、变量等)。metadata: 其他附加元数据。
让我们看一个简单的LangGraph应用如何使用 SQLSaver 来保存检查点:
import os
from typing import Dict, TypedDict, List
from langchain_core.messages import BaseMessage, HumanMessage, AIMessage
from langchain_openai import ChatOpenAI
from langgraph.graph import StateGraph, END
from langgraph.checkpoint.sqlite import SQLiteSaver
# 确保设置了OpenAI API Key
# os.environ["OPENAI_API_KEY"] = "YOUR_OPENAI_API_KEY"
# 1. 定义图的状态
class AgentState(TypedDict):
messages: List[BaseMessage]
current_query: str
tool_output: str
# 2. 定义节点:这是一个简单的LLM调用节点
def call_llm(state: AgentState) -> AgentState:
messages = state["messages"]
model = ChatOpenAI(temperature=0)
response = model.invoke(messages)
return {"messages": messages + [response]}
# 3. 定义图
workflow = StateGraph(AgentState)
workflow.add_node("llm", call_llm)
workflow.set_entry_point("llm")
workflow.add_edge("llm", END) # 简单图,LLM调用后即结束
# 4. 配置检查点保存器
memory = SQLiteSaver.from_file("langgraph_checkpoints.sqlite")
# 5. 编译图并传入检查点保存器
app = workflow.compile(checkpointer=memory)
# 6. 运行图并观察检查点
config = {"configurable": {"thread_id": "user_123"}}
# 第一次运行
print("--- 第一次运行 ---")
input_messages = [HumanMessage(content="你好,我需要一些关于太阳系行星的信息。")]
final_state = app.invoke({"messages": input_messages}, config=config)
print(f"Final State (run 1): {final_state['messages'][-1].content}n")
# 第二次运行 (同一thread_id,会加载之前的状态)
print("--- 第二次运行 ---")
input_messages = [HumanMessage(content="具体来说,火星有什么独特的特征?")]
final_state_2 = app.invoke({"messages": input_messages}, config=config)
print(f"Final State (run 2): {final_state_2['messages'][-1].content}n")
# 加载历史检查点
print("--- 加载历史检查点 ---")
# 假设我们想看第一次运行结束时的状态
# 注意: app.invoke 会自动加载最新的检查点,如果想加载特定历史检查点,需要更复杂的逻辑
# 我们可以直接查询SQLite数据库来验证
import sqlite3
conn = sqlite3.connect("langgraph_checkpoints.sqlite")
cursor = conn.cursor()
cursor.execute("SELECT thread_id, checkpoint_id, parent_checkpoint_id, thread_ts, state FROM checkpoints WHERE thread_id = 'user_123' ORDER BY thread_ts DESC")
rows = cursor.fetchall()
print(f"Found {len(rows)} checkpoints for thread_id 'user_123':")
for i, row in enumerate(rows):
print(f"Checkpoint {i+1}: thread_id={row[0]}, checkpoint_id={row[1]}, thread_ts={row[3]}")
# print(f"State: {row[4][:200]}...") # 打印部分状态,避免过长
conn.close()
上面的例子展示了LangGraph如何通过 thread_id 简单地实现会话状态的延续。然而,这种延续是基于精确匹配 thread_id 的。如果用户开启了一个新的 thread_id,但提出的问题与过去某个 thread_id 中的某个关键时刻高度相关,当前的机制就无法提供帮助。我们缺乏一种“语义搜索”历史状态的能力。
二、缺失的环节:检查点的语义检索
当前的LangGraph检查点机制,虽然强大,但其检索方式是基于精确的 thread_id。这就像你的大脑里存储了无数的记忆片段,但你只能通过回忆起记忆发生的精确日期或某个特定的“文件名”才能找到它。这显然不是人类记忆的工作方式。
人类的记忆是联想式的、语义化的。当我问你“上次我们讨论人工智能伦理的时候,你有什么想法?”你不会去搜索精确到秒的时间戳,而是会根据“人工智能伦理”这个语义概念,快速调动相关的记忆。
1. 现有检查点检索的局限性:
- 仅限
thread_id: 只能通过唯一的thread_id来加载特定会话的最新或历史状态。无法跨会话共享或检索。 - 缺乏语义理解: 无法理解检查点内部存储的“内容”或“主题”。你无法问它:“之前某个用户问过关于退货政策,我们当时怎么处理的?”除非你恰好知道那个用户的
thread_id。 - 无法应对新情境: 当遇到一个全新的
thread_id但在语义上与历史某个thread_id中的特定环节高度相似时,无法主动召回相关经验。
2. 我们的愿景:语义化的检查点记忆
为了弥补这一缺陷,我们需要引入“语义检索”的能力。我们的目标是:
- 将每个检查点视为一个“记忆单元”: 不仅仅是数据,更是包含了特定上下文、用户意图、AI响应和工具执行的“事件”。
- 提取记忆的“精髓”并向量化: 从检查点的复杂状态中,抽取最能代表其核心内容和语义的文本信息,并将其转化为高维向量。
- 构建一个向量记忆库: 将这些向量及其原始检查点ID、元数据存储到一个向量数据库中。
- 动态、智能地召回: 在LangGraph执行的任何阶段,都可以根据当前的用户输入、任务目标或内部状态,查询向量记忆库,召回语义上最相关的历史检查点。
- 将召回的记忆注入上下文: 把召回的记忆(可能是摘要或关键片段)作为额外的上下文信息,提供给LLM或图中的其他节点,从而影响后续的决策和生成。
通过这种方式,我们的LangGraph应用将不再是每次都从零开始,或者仅仅依赖于当前 thread_id 的短暂记忆。它将拥有一个持续增长的、可检索的、能够跨会话学习的“第二大脑”。
三、架构设计:LangGraph 与向量存储的深度融合
要实现“Vector Store as Memory”,我们需要在LangGraph的检查点机制之上,构建一个更加智能的内存层。这不仅仅是简单地将检查点数据扔进向量数据库,而是要精心设计如何提取、存储和检索这些记忆。
1. 总体系统架构概览:
+---------------------+
| User/System |
| Input |
+----------+----------+
|
v
+---------------------+
| LangGraph Orchestrator |
| (Nodes, Edges, State) |
+----------+----------+
| (State Update)
v
+---------------------+
| Custom Checkpoint Saver |
| (Extends BaseCheckpointSaver) |
+----------+----------+
|
+---------------------------------+
| |
v v
+---------------------+ +---------------------+
| Raw Checkpoint | | Checkpoint Content |
| Persistence | | Extraction & |
| (e.g., SQLSaver) | | Summarization |
+---------------------+ +----------+----------+
|
v
+---------------------+
| Embedding Model |
| (e.g., OpenAI Embeddings) |
+----------+----------+
| (Vector)
v
+---------------------+
| Vector Database |
| (e.g., Chroma, Pinecone, Qdrant) |
| (Stores Embeddings + Metadata) |
+---------------------+
^
| (Retrieval Query)
|
+---------------------+ +----------+----------+
| Memory Retrieval | | Current User Query / |
| Node (LangGraph) |<------------| Graph Context |
+----------+----------+ +---------------------+
| (Retrieved Memories)
v
+---------------------+
| LLM Prompt / State |
| Augmentation |
+---------------------+
|
v
+---------------------+
| LangGraph Continue |
| (Informed Decision) |
+---------------------+
2. 核心组件与职责:
-
LangGraph Application:
- 定义图的节点、边和状态。
- 执行业务逻辑,处理用户输入,调用工具等。
- 利用定制的
CheckpointVectorSaver进行状态持久化。
-
Custom Checkpoint Saver (
CheckpointVectorSaver):- 继承自
langgraph.checkpoint.base.BaseCheckpointSaver。 - 在
put()方法被调用时,除了将原始检查点数据保存到传统的持久化存储(如SQLSaver)外,还会执行以下额外步骤:- 内容提取: 从LangGraph的状态对象中提取出有意义的文本内容,例如用户消息、AI回复、工具调用摘要等。
- 语义总结/表示: 将提取的内容进行总结,形成一个简洁且语义丰富的文本描述,作为向量化的输入。
- 向量化: 使用一个预训练的嵌入模型(如OpenAI Embeddings, Sentence Transformers等)将总结文本转换为高维向量。
- 向量数据库存储: 将生成的向量连同原始检查点ID、
thread_id、时间戳、摘要文本等元数据,一并存储到向量数据库中。
- 继承自
-
Embedding Service:
- 负责将文本转换为向量。可以是本地模型,也可以是API服务(如OpenAI)。
- 选择合适的Embedding模型对于检索质量至关重要。
-
Vector Database:
- 专门用于存储和检索高维向量。
- 支持高效的相似性搜索(例如,余弦相似度)。
- 存储每个检查点摘要的向量,并关联回原始检查点的唯一标识符。
- 需要能够存储额外的元数据,以便在检索时进行过滤和排序。
向量数据库条目结构示例:
字段名称 类型 描述 idstring向量数据库中的唯一ID,可以是 checkpoint_id的组合或新生成。vectorfloat[]检查点摘要的嵌入向量。 thread_idstring原始LangGraph会话的ID。 checkpoint_idstring原始LangGraph检查点的ID。 thread_tsdatetime检查点创建时间戳。 summary_textstring用于生成向量的文本摘要。 original_saver_keystring指向原始 BaseCheckpointSaver中存储的检查点记录的键(例如,SQLite中的checkpoint_id)。source_nodestring(可选) 触发此检查点存储的LangGraph节点名称。 user_feedbackstring(可选) 用户对此次交互的反馈,用于未来优化。 -
Memory Retrieval Node (LangGraph Node):
- 这是一个特殊的LangGraph节点,在图的适当位置被激活。
- 查询生成: 从当前的LangGraph状态中提取出用于查询记忆库的文本(例如,最新的用户消息、当前任务描述)。
- 向量数据库查询: 使用查询文本生成其嵌入向量,并在向量数据库中执行相似性搜索。
- 结果处理: 获取最相似的K个记忆条目。
- 记忆提取: 根据检索到的元数据(
original_saver_key),从原始的BaseCheckpointSaver中加载完整的历史检查点状态(如果需要)。 - 格式化与注入: 将检索到的记忆(可能是它们的摘要、关键对话片段或特定的变量值)格式化为LLM可以理解的文本,并将其注入到当前的LangGraph状态中,供后续的LLM调用或其他节点使用。
-
Context Injection Mechanism:
- 将检索到的记忆作为LLM提示的一部分,例如作为系统消息、few-shot示例,或直接作为额外的上下文。
- 也可以更新LangGraph的共享状态,让其他节点能够访问这些记忆。
四、实现细节与代码示例
现在,我们将深入到具体的代码实现,展示如何构建上述架构。
1. 准备工作:环境与依赖
我们需要安装LangGraph、LangChain相关库以及一个向量数据库客户端。这里我们以 ChromaDB 为例,因为它轻量且易于本地部署。
pip install -U langgraph langchain-openai "langchain[chroma]" "chromadb>=0.4.14"
2. 自定义 CheckpointVectorSaver
这是实现核心记忆功能的关键。我们将创建一个 VectorBackedCheckpointSaver,它将结合 SQLiteSaver 和 ChromaDB。
import uuid
import json
from datetime import datetime
from typing import Dict, Any, List, Optional, Union
from langgraph.checkpoint.base import BaseCheckpointSaver, Checkpoint, CheckpointTuple, CheckpointMetadata
from langgraph.checkpoint.sqlite import SQLiteSaver
from langchain_core.messages import BaseMessage, HumanMessage, AIMessage, ToolMessage, SystemMessage
from langchain_core.embeddings import Embeddings
from langchain_openai import OpenAIEmbeddings
from langchain_chroma import Chroma
from langchain_core.documents import Document
# 假设我们有一个LLM来总结记忆,这里我们直接从消息中提取
from langchain_openai import ChatOpenAI
from langchain_core.prompts import ChatPromptTemplate
from langchain_core.output_parsers import StrOutputParser
# 用于记忆总结的LLM
_llm_for_summary = ChatOpenAI(model="gpt-3.5-turbo", temperature=0)
_summary_prompt = ChatPromptTemplate.from_messages([
("system", "你是一个善于总结对话和任务的助手。请将以下对话或任务状态总结成一段简洁的描述,以便未来通过语义搜索来回忆。重点关注用户意图、关键信息和AI采取的行动。"),
("user", "{content}")
])
_summary_chain = _summary_prompt | _llm_for_summary | StrOutputParser()
class VectorBackedCheckpointSaver(BaseCheckpointSaver):
"""
一个结合了传统CheckpointSaver(如SQLiteSaver)和向量数据库的自定义Saver。
它在保存检查点的同时,会提取其关键信息,生成嵌入向量,并存储到向量数据库中。
"""
def __init__(
self,
wrapped_saver: BaseCheckpointSaver,
vectorstore: Chroma,
embedding_model: Embeddings,
summary_chain: Any, # Expects a LangChain Runnable for summarization
):
self.wrapped_saver = wrapped_saver
self.vectorstore = vectorstore
self.embedding_model = embedding_model
self.summary_chain = summary_chain
def _extract_and_summarize_state(self, state: Dict[str, Any]) -> str:
"""
从LangGraph的状态中提取关键信息并进行总结。
这里我们重点提取消息历史和当前的查询。
"""
messages_content = []
if "messages" in state and isinstance(state["messages"], list):
for msg in state["messages"]:
if isinstance(msg, HumanMessage):
messages_content.append(f"用户: {msg.content}")
elif isinstance(msg, AIMessage):
messages_content.append(f"AI: {msg.content}")
elif isinstance(msg, ToolMessage):
messages_content.append(f"工具({msg.name}): {msg.content}")
elif isinstance(msg, SystemMessage):
messages_content.append(f"系统: {msg.content}")
current_query = state.get("current_query", "")
tool_output = state.get("tool_output", "")
context_str = "n".join(messages_content)
if current_query:
context_str += f"n当前查询: {current_query}"
if tool_output:
context_str += f"n工具输出: {tool_output}"
if not context_str:
return "空状态或无法提取有效信息。"
# 使用LLM链进行总结
try:
summary = self.summary_chain.invoke({"content": context_str})
return summary
except Exception as e:
print(f"Error summarizing checkpoint state: {e}")
return f"未能总结状态,原始内容: {context_str[:200]}..." # 失败时返回部分原始内容
def put(self, config: Dict[str, Any], checkpoint: Checkpoint) -> CheckpointTuple:
"""
保存检查点,并将其摘要存储到向量数据库。
"""
# 1. 首先通过包装的saver保存原始检查点
checkpoint_tuple = self.wrapped_saver.put(config, checkpoint)
thread_id = config["configurable"]["thread_id"]
# checkpoint_id = checkpoint_tuple.checkpoint_id # 使用 wrapped_saver 返回的 checkpoint_id
checkpoint_id = checkpoint.id # checkpoint对象自带id
thread_ts = checkpoint.get("metadata", {}).get("thread_ts", datetime.now().isoformat())
# 2. 从检查点状态中提取并总结信息
state_for_embedding = self._extract_and_summarize_state(checkpoint["values"])
# 3. 生成嵌入向量
# embedding = self.embedding_model.embed_query(state_for_embedding) # embed_query 适用于单个字符串
# 4. 存储到向量数据库
doc_id = f"{thread_id}-{checkpoint_id}-{uuid.uuid4()}" # 确保在向量DB中是唯一的
document = Document(
page_content=state_for_embedding,
metadata={
"thread_id": thread_id,
"checkpoint_id": checkpoint_id,
"thread_ts": thread_ts,
"original_saver_key": checkpoint_id, # 存储原始saver中记录的id
}
)
self.vectorstore.add_documents([document], ids=[doc_id])
print(f"Saved checkpoint summary to vector DB. Doc ID: {doc_id}")
return checkpoint_tuple
def get(self, config: Dict[str, Any]) -> Optional[Checkpoint]:
"""
从包装的saver获取检查点。此方法不涉及向量数据库,
向量数据库用于检索,而不是直接加载检查点。
"""
return self.wrapped_saver.get(config)
def list(self, config: Dict[str, Any]) -> List[CheckpointMetadata]:
"""
从包装的saver列出检查点。
"""
return self.wrapped_saver.list(config)
# 确保实现了所有抽象方法
def __init_subclass__(cls) -> None:
super().__init_subclass__()
这段代码定义了一个 VectorBackedCheckpointSaver 类。它的核心逻辑在 put 方法中:
- 首先调用内部的
wrapped_saver(例如SQLiteSaver)来完成标准的检查点持久化。 - 然后,它会调用
_extract_and_summarize_state方法从当前LangGraph的状态中提取关键信息。这里我们简单地拼接了对话消息,但更复杂的场景可能需要解析工具调用、变量等,甚至使用一个小型LLM来生成更精炼的摘要。 - 使用
OpenAIEmbeddings将摘要文本转换为向量。 - 将这个向量连同原始
thread_id、checkpoint_id和摘要文本等元数据存储到Chroma向量数据库中。
3. 记忆检索节点 (MemoryRetrievalNode)
接下来,我们需要一个LangGraph节点,它能够查询向量数据库,召回相关的历史记忆。
from langgraph.graph import StateGraph, END, START
from langchain_core.runnables import RunnableLambda
# 假设AgentState是我们在LangGraph中使用的状态
# class AgentState(TypedDict):
# messages: List[BaseMessage]
# current_query: str
# tool_output: str
# retrieved_memories: List[str] # 新增字段用于存储检索到的记忆
def retrieve_memories(state: AgentState, vectorstore: Chroma) -> AgentState:
"""
根据当前的用户查询,从向量数据库中检索相关记忆。
"""
current_query = state.get("current_query")
if not current_query and state.get("messages"):
# 如果没有明确的current_query,尝试从最新的人类消息中获取
last_message = state["messages"][-1]
if isinstance(last_message, HumanMessage):
current_query = last_message.content
if not current_query:
print("No query to retrieve memories from.")
return {"retrieved_memories": []}
print(f"Retrieving memories for query: '{current_query}'")
# 执行相似性搜索
# 可以在这里添加过滤条件,例如只检索特定thread_id的记忆
# 但为了演示跨thread_id检索,我们暂时不加
# results = vectorstore.similarity_search_with_score(current_query, k=3, filter={"thread_id": state["config"]["configurable"]["thread_id"]})
results = vectorstore.similarity_search_with_score(current_query, k=3)
retrieved_memories_text = []
for doc, score in results:
# 过滤掉得分太低的记忆,或者与当前thread_id精确匹配的,除非特别需要
# 这里的过滤逻辑可以根据需求调整
# if score < 0.7: # 例如,只取相似度高于0.7的
# continue
# 避免检索到与当前thread_id完全一致的记忆,除非是历史版本
# if doc.metadata.get("thread_id") == state["config"]["configurable"]["thread_id"]:
# continue
memory_info = (
f"历史记忆 (相似度: {score:.2f}):n"
f" Thread ID: {doc.metadata.get('thread_id', 'N/A')}n"
f" Checkpoint ID: {doc.metadata.get('checkpoint_id', 'N/A')}n"
f" 时间: {doc.metadata.get('thread_ts', 'N/A')}n"
f" 内容: {doc.page_content}n"
)
retrieved_memories_text.append(memory_info)
if retrieved_memories_text:
print(f"Retrieved {len(retrieved_memories_text)} memories.")
return {"retrieved_memories": retrieved_memories_text}
else:
print("No relevant memories found.")
return {"retrieved_memories": []}
# 这个节点需要被包装成Runnable以便在LangGraph中使用
memory_retrieval_node = RunnableLambda(lambda state: retrieve_memories(state, vectorstore_client))
retrieve_memories 函数作为LangGraph的一个节点,它的职责是:
- 从当前
AgentState中获取用户最新的查询(或通过messages列表推断)。 - 使用这个查询文本对
Chroma向量数据库进行相似性搜索。 - 将检索到的相关记忆(及其元数据和相似度分数)格式化成一个字符串列表。
- 将这个列表作为
retrieved_memories字段添加到AgentState中。
4. 将记忆注入LLM提示
一旦记忆被检索到,我们需要将其注入到LLM的提示中,使其能够利用这些历史信息。这通常在一个LLM节点之前完成。
def format_memories_for_llm(state: AgentState) -> AgentState:
"""
将检索到的记忆格式化,并添加到LLM的系统消息中。
"""
retrieved_memories = state.get("retrieved_memories", [])
messages = state["messages"]
if not retrieved_memories:
return state # 没有记忆则直接返回
memory_prompt_part = "nn--- 历史相关记忆 ---n" + "n".join(retrieved_memories) + "n---------------------nn"
# 将记忆作为系统消息插入到对话历史的开头(或靠近用户查询的地方)
# 确保不要重复插入
if not messages or not (isinstance(messages[0], SystemMessage) and "历史相关记忆" in messages[0].content):
updated_messages = [SystemMessage(content=memory_prompt_part)] + messages
else:
# 如果已经有系统消息,可以考虑更新或追加
updated_messages = messages
if "历史相关记忆" not in updated_messages[0].content:
updated_messages[0].content += memory_prompt_part
print(f"Injected {len(retrieved_memories)} memories into LLM prompt.")
return {"messages": updated_messages}
memory_injector_node = RunnableLambda(format_memories_for_llm)
这个 format_memories_for_llm 函数接收包含 retrieved_memories 的状态,然后将这些记忆以结构化的方式插入到 messages 列表的开头,通常作为 SystemMessage。这样,LLM在处理当前用户查询时,就能看到这些历史上下文。
5. 组装 LangGraph
现在,我们将所有组件整合到一个LangGraph中。
# 确保设置了OpenAI API Key
os.environ["OPENAI_API_KEY"] = "YOUR_OPENAI_API_KEY"
# 初始化嵌入模型和向量数据库
embedding_model = OpenAIEmbeddings(model="text-embedding-ada-002")
vectorstore_client = Chroma(
collection_name="langgraph_memories",
embedding_function=embedding_model,
persist_directory="./chroma_db" # 持久化到本地文件
)
# 注意:在实际应用中,你可能需要根据实际情况初始化Chroma客户端
# 例如,如果Chroma服务器运行在远程,你需要配置其host和port
# 初始化传统的CheckpointSaver
base_saver = SQLiteSaver.from_file("langgraph_checkpoints_with_vector.sqlite")
# 初始化我们的VectorBackedCheckpointSaver
vector_backed_saver = VectorBackedCheckpointSaver(
wrapped_saver=base_saver,
vectorstore=vectorstore_client,
embedding_model=embedding_model,
summary_chain=_summary_chain
)
# 定义新的AgentState,包含retrieved_memories
class AgentState(TypedDict):
messages: List[BaseMessage]
current_query: str
tool_output: str
retrieved_memories: List[str] # 用于存储检索到的记忆
# 定义LLM节点 (与之前相同)
def call_llm(state: AgentState) -> AgentState:
messages = state["messages"]
model = ChatOpenAI(temperature=0)
response = model.invoke(messages)
return {"messages": messages + [response]}
# 组装LangGraph
workflow = StateGraph(AgentState)
# 1. 入口点:首先进行记忆检索
workflow.add_node("retrieve_memories", RunnableLambda(lambda state: retrieve_memories(state, vectorstore_client)))
# 2. 注入记忆到LLM提示
workflow.add_node("inject_memories", memory_injector_node)
# 3. 调用LLM
workflow.add_node("llm", call_llm)
# 4. 定义边
workflow.set_entry_point("retrieve_memories")
workflow.add_edge("retrieve_memories", "inject_memories")
workflow.add_edge("inject_memories", "llm")
workflow.add_edge("llm", END)
# 编译图并传入我们定制的saver
app_with_memory = workflow.compile(checkpointer=vector_backed_saver)
# 运行示例
print("n--- 第一次会话 (Thread 1) ---")
config_1 = {"configurable": {"thread_id": "customer_support_001"}}
response_1_1 = app_with_memory.invoke(
{"messages": [HumanMessage(content="我购买的XYZ产品在两天内就坏了,我该如何退货?")], "current_query": "XYZ产品退货流程"},
config=config_1
)
print(f"Agent (Thread 1, Round 1): {response_1_1['messages'][-1].content}n")
response_1_2 = app_with_memory.invoke(
{"messages": [HumanMessage(content="那如果我没有原始包装盒怎么办?")], "current_query": "没有原始包装盒退货"},
config=config_1
)
print(f"Agent (Thread 1, Round 2): {response_1_2['messages'][-1].content}n")
print("n--- 第二次会话 (Thread 2) ---")
config_2 = {"configurable": {"thread_id": "customer_support_002"}}
# 模拟一个新用户,但问题与之前会话的某个点相似
response_2_1 = app_with_memory.invoke(
{"messages": [HumanMessage(content="我收到的ABC产品是坏的,如何申请退款?")], "current_query": "ABC产品退款申请"},
config=config_2
)
print(f"Agent (Thread 2, Round 1): {response_2_1['messages'][-1].content}n")
# 清理ChromaDB (可选)
# vectorstore_client.delete_collection()
# print("ChromaDB collection cleared.")
在这个完整的示例中:
- 我们初始化了
OpenAIEmbeddings和Chroma向量数据库。 - 创建了
VectorBackedCheckpointSaver,将其作为checkpointer传入workflow.compile()。这意味着每次LangGraph状态更新并保存检查点时,其摘要也会被向量化并存储到Chroma中。 - 修改了
AgentState,增加了retrieved_memories字段。 - 在LangGraph中,我们新增了
retrieve_memories和inject_memories两个节点。retrieve_memories节点会在每次LLM调用前执行,根据当前的用户查询去向量数据库中搜索相关的历史记忆。inject_memories节点则负责将这些检索到的记忆格式化并插入到LLM的提示中。
- 我们运行了两个不同的
thread_id,但第二个thread_id的问题与第一个thread_id的部分内容在语义上是相似的。通过观察输出,我们可以看到retrieve_memories节点是否成功召回了跨thread_id的记忆。
五、高级考量与挑战
构建“第二大脑”并非一蹴而就,涉及多个复杂的设计决策和挑战。
1. 上下文窗口管理与记忆摘要:
- 挑战: 检索到的记忆可能非常多,或者单个记忆条目内容过长,容易超出LLM的上下文窗口限制。
- 解决方案:
- 限制检索数量 (k): 只检索最相关的K个记忆。
- 记忆摘要: 在将记忆注入LLM之前,对每个检索到的记忆进行二次摘要,提取其核心要点。这可以是一个小型LLM调用,或者基于规则的提取。
- 分层记忆: 建立不同粒度的记忆。例如,粗粒度的“会话摘要”和细粒度的“特定交互细节”。先检索粗粒度记忆,再根据需要深入检索细粒度记忆。
- 相关性截断: 根据相似度分数动态决定是否包含某个记忆。
2. 新颖性与相关性平衡 (Recency vs. Relevance):
- 挑战: 有时最新的记忆比非常久远但语义相似的记忆更重要。纯粹的语义搜索可能无法捕获这种时间维度上的偏好。
- 解决方案:
- 混合检索: 结合语义相似度和时间衰减因子。在向量数据库查询时,可以对时间较近的记忆给予更高的权重,或者在检索后根据时间戳对结果进行二次排序。
- 时间戳过滤: 仅检索在过去N天/小时内发生的记忆。
- “近邻”记忆: 在检索到某个核心记忆后,同时获取其前后一定时间范围内的其他检查点,以提供更完整的上下文。
3. 记忆的遗忘与修剪 (Forgetting and Pruning):
- 挑战: 向量数据库会不断增长,存储和检索成本会随之增加。某些记忆可能变得过时、不准确或不再有用。
- 解决方案:
- 基于时间衰减的删除: 定期删除超过一定时间阈值的记忆。
- 基于重要性/使用频率的删除: 标记那些从未被检索或使用过的记忆,并定期清理。
- 用户反馈驱动的删除: 允许用户标记“不相关”或“错误”的记忆,并将其从记忆库中移除。
- 记忆合并/压缩: 将多个连续的、语义相似的检查点合并成一个更简洁的摘要记忆。
4. 成本与性能:
- 挑战: 每次检查点保存都需要调用Embedding模型和向量数据库;每次检索也需要Embedding调用和向量数据库查询,这会增加延迟和API成本。
- 解决方案:
- 批量处理: 批量生成嵌入,减少API调用次数。
- 异步操作: 将检查点向量化和存储操作异步化,避免阻塞主LangGraph执行流。
- 局部Embedding模型: 对于对延迟和成本敏感的场景,可以考虑使用本地运行的Embedding模型。
- 优化向量数据库查询: 确保向量数据库索引优化,选择合适的硬件。
- 智能缓存: 缓存最近检索到的记忆,避免重复查询。
5. 状态序列化与提取的复杂性:
- 挑战: LangGraph的状态可以是任意Python对象,将其提取并总结成文本可能很复杂。
- 解决方案:
- 明确状态结构: 在
AgentState中尽量使用简单、可序列化的类型。 - 定制化提取逻辑: 根据每个应用的状态结构,编写专门的
_extract_and_summarize_state逻辑。 - LLM辅助总结: 利用LLM的强大理解和生成能力,对复杂状态进行总结,如我们在示例中所示。
- 明确状态结构: 在
6. 多租户与用户隔离:
- 挑战: 在多用户系统中,确保每个用户的记忆是私有的,不能被其他用户检索到。
- 解决方案:
- 向量数据库过滤: 在查询向量数据库时,始终添加
thread_id或user_id作为过滤条件。 - 独立的向量集合: 为每个用户或租户创建独立的向量集合。
- 向量数据库过滤: 在查询向量数据库时,始终添加
7. 评估与验证:
- 挑战: 如何衡量“第二大脑”的有效性?
- 解决方案:
- A/B测试: 对比有无记忆系统对任务完成率、用户满意度、LLM回复质量的影响。
- 人工评估: 专家对AI行为进行评估,看其是否展现出基于记忆的智能。
- 指标跟踪: 记录记忆检索命中率、检索到的记忆对最终决策的影响等。
六、未来展望
将LangGraph的检查点与向量数据库融合,仅仅是构建真正智能AI记忆系统的开始。未来,我们可以探索更多高级功能:
- 自省与主动学习: Agent不再是被动地等待用户查询记忆,而是能够主动地在决策过程中反思“我过去做过类似的事情吗?”,并主动检索相关经验。
- 情节记忆与语义记忆的结合: 区分特定事件(情节记忆)和泛化知识(语义记忆)。向量存储可以更好地处理语义记忆,而特定事件可能需要更精细的索引(如时间、关键实体)。
- 记忆编辑与修正: 允许Agent或人类用户对过去的记忆进行修正或补充,以纠正错误或更新信息。这对于Agent的持续学习和适应性至关重要。
- 图谱化记忆 (Knowledge Graph as Memory): 将记忆不仅仅存储为文本向量,而是构建成知识图谱。这样可以利用图谱的结构化查询能力和推理能力,实现更复杂、更精确的记忆检索和利用。例如,可以查询“谁在什么时候对哪个产品提出了退货请求,并且最终解决方案是什么?”
- 多模态记忆: 将视觉、听觉等非文本信息也纳入记忆范畴,通过多模态嵌入技术进行统一存储和检索。
结语
今天我们深入探讨了如何通过巧妙地融合LangGraph的检查点机制与向量数据库,为我们的AI应用构建一个功能强大的“第二大脑”。这不仅仅是数据持久化,更是将原始状态转化为可语义检索的智慧。通过定制 CheckpointVectorSaver 和 MemoryRetrievalNode,我们赋予了LangGraph应用跨会话、跨情境学习和推理的能力,使其能够真正从历史经验中受益,并做出更明智、更一致的决策。
从简单的状态机到具备丰富记忆的智能体,我们正一步步迈向更高级的通用人工智能。这个“第二大脑”的构建,是这一进程中不可或缺的关键里程碑。