探讨 ‘Vector Store as Memory’:将 LangGraph 的 Checkpoints 与向量数据库融合,打造真正的‘第二大脑’

探讨 ‘Vector Store as Memory’:将 LangGraph 的 Checkpoints 与向量数据库融合,打造真正的‘第二大脑’

各位同仁,各位对人工智能未来充满好奇的技术专家们,晚上好。

我们今天齐聚一堂,共同探讨一个令人兴奋且极具潜力的技术方向:如何通过将 LangGraph 的 Checkpoints 与向量数据库深度融合,构建一个真正具备“第二大脑”能力的智能系统。在当前大语言模型(LLM)驱动的应用浪潮中,我们面临的核心挑战之一是:如何让我们的AI应用拥有持续的、上下文感知的、能够从历史经验中学习和推理的“记忆”?

传统的RAG(Retrieval Augmented Generation)模式,虽然极大地扩展了LLM的知识边界,但其本质是无状态的,每次查询都是相对独立的。而LangGraph,作为LLM编排的利器,通过状态机和节点间跳转,赋予了应用流程和生命周期。它的Checkpoints机制,更是为我们提供了保存应用状态的强大能力。然而,仅仅保存状态是不够的,如果不能智能地检索和利用这些历史状态,它们就只是一堆沉睡的数据。

我们的目标,正是要唤醒这些沉睡的记忆。我们不仅仅要存储过去的执行路径和结果,更要让系统能够理解这些记忆的“含义”,并能在未来需要时,根据当前的上下文和意图,智能地召回最相关的历史经验。这,就是“Vector Store as Memory”的核心思想,也是我们构建AI“第二大脑”的关键一步。

一、LangGraph:智能应用的骨架与记忆的起点

在深入探讨记忆机制之前,我们必须先理解其承载者——LangGraph。LangGraph是LangChain家族中的一员,旨在解决构建复杂LLM应用时的编排难题。它将一个复杂的任务分解为一系列可执行的步骤(节点),并通过定义节点之间的转换(边)来构建一个有向图。每个节点可以是一个LLM调用、一个工具执行、一个数据处理函数,甚至是另一个子图。

1. LangGraph 的核心优势:

  • 状态管理 (State Management): LangGraph的核心是其状态管理机制。它维护一个共享的、可变的状态对象,在图中的所有节点之间传递和更新。这使得应用程序能够记住过去的步骤和结果,并在后续步骤中利用这些信息。
  • 节点与边 (Nodes and Edges): 通过定义节点(执行单元)和边(状态转换逻辑),我们可以直观地构建复杂的业务流程。
  • 循环与条件逻辑 (Loops and Conditional Logic): LangGraph支持条件性地根据当前状态选择下一个节点,甚至可以构建循环,实现迭代或自我修正的行为。
  • 工具集成 (Tool Integration): 轻松集成各种工具,如搜索、数据库查询、API调用等,扩展LLM的能力边界。
  • Checkpointing (检查点): 这是我们今天讨论的重点。LangGraph提供了一种机制,可以将图的当前状态持久化存储。

2. LangGraph Checkpoints 的作用:

Checkpoints是LangGraph提供的一种将图的当前状态保存到持久化存储中的机制。它主要服务于以下目的:

  • 容错与恢复: 当应用程序崩溃或中断时,可以从最近的检查点恢复执行,避免从头开始。
  • 调试与回溯: 开发者可以检查任意历史检查点的状态,理解应用程序的执行路径和中间结果。
  • 可解释性: 记录了每个步骤的输入、输出和状态变化,有助于理解AI的决策过程。
  • 重放与分析: 可以基于历史检查点重放应用程序的执行,进行性能分析或行为模式研究。
  • 多轮对话与会话管理: 对于需要维护长会话状态的应用(如聊天机器人),检查点是保存对话历史的关键。

3. LangGraph Checkpoint 的底层机制:

LangGraph通过 BaseCheckpointSaver 抽象接口来管理检查点的持久化。最常用的实现包括 MemorySaver(内存存储,非持久化)和 SQLSaver(SQLite或PostgreSQL等关系型数据库存储)。

一个检查点通常包含以下核心信息:

  • thread_id: 标识一个独立的执行会话或对话。
  • checkpoint_id: 检查点自身的唯一标识,通常是一个UUID。
  • parent_checkpoint_id: 指向前一个检查点,构建历史链。
  • thread_ts: 检查点创建的时间戳。
  • state: 序列化后的图的完整状态(例如,消息列表、变量等)。
  • metadata: 其他附加元数据。

让我们看一个简单的LangGraph应用如何使用 SQLSaver 来保存检查点:

import os
from typing import Dict, TypedDict, List
from langchain_core.messages import BaseMessage, HumanMessage, AIMessage
from langchain_openai import ChatOpenAI
from langgraph.graph import StateGraph, END
from langgraph.checkpoint.sqlite import SQLiteSaver

# 确保设置了OpenAI API Key
# os.environ["OPENAI_API_KEY"] = "YOUR_OPENAI_API_KEY"

# 1. 定义图的状态
class AgentState(TypedDict):
    messages: List[BaseMessage]
    current_query: str
    tool_output: str

# 2. 定义节点:这是一个简单的LLM调用节点
def call_llm(state: AgentState) -> AgentState:
    messages = state["messages"]
    model = ChatOpenAI(temperature=0)
    response = model.invoke(messages)
    return {"messages": messages + [response]}

# 3. 定义图
workflow = StateGraph(AgentState)
workflow.add_node("llm", call_llm)
workflow.set_entry_point("llm")
workflow.add_edge("llm", END) # 简单图,LLM调用后即结束

# 4. 配置检查点保存器
memory = SQLiteSaver.from_file("langgraph_checkpoints.sqlite")

# 5. 编译图并传入检查点保存器
app = workflow.compile(checkpointer=memory)

# 6. 运行图并观察检查点
config = {"configurable": {"thread_id": "user_123"}}

# 第一次运行
print("--- 第一次运行 ---")
input_messages = [HumanMessage(content="你好,我需要一些关于太阳系行星的信息。")]
final_state = app.invoke({"messages": input_messages}, config=config)
print(f"Final State (run 1): {final_state['messages'][-1].content}n")

# 第二次运行 (同一thread_id,会加载之前的状态)
print("--- 第二次运行 ---")
input_messages = [HumanMessage(content="具体来说,火星有什么独特的特征?")]
final_state_2 = app.invoke({"messages": input_messages}, config=config)
print(f"Final State (run 2): {final_state_2['messages'][-1].content}n")

# 加载历史检查点
print("--- 加载历史检查点 ---")
# 假设我们想看第一次运行结束时的状态
# 注意: app.invoke 会自动加载最新的检查点,如果想加载特定历史检查点,需要更复杂的逻辑
# 我们可以直接查询SQLite数据库来验证
import sqlite3
conn = sqlite3.connect("langgraph_checkpoints.sqlite")
cursor = conn.cursor()
cursor.execute("SELECT thread_id, checkpoint_id, parent_checkpoint_id, thread_ts, state FROM checkpoints WHERE thread_id = 'user_123' ORDER BY thread_ts DESC")
rows = cursor.fetchall()

print(f"Found {len(rows)} checkpoints for thread_id 'user_123':")
for i, row in enumerate(rows):
    print(f"Checkpoint {i+1}: thread_id={row[0]}, checkpoint_id={row[1]}, thread_ts={row[3]}")
    # print(f"State: {row[4][:200]}...") # 打印部分状态,避免过长
conn.close()

上面的例子展示了LangGraph如何通过 thread_id 简单地实现会话状态的延续。然而,这种延续是基于精确匹配 thread_id 的。如果用户开启了一个新的 thread_id,但提出的问题与过去某个 thread_id 中的某个关键时刻高度相关,当前的机制就无法提供帮助。我们缺乏一种“语义搜索”历史状态的能力。

二、缺失的环节:检查点的语义检索

当前的LangGraph检查点机制,虽然强大,但其检索方式是基于精确的 thread_id。这就像你的大脑里存储了无数的记忆片段,但你只能通过回忆起记忆发生的精确日期或某个特定的“文件名”才能找到它。这显然不是人类记忆的工作方式。

人类的记忆是联想式的、语义化的。当我问你“上次我们讨论人工智能伦理的时候,你有什么想法?”你不会去搜索精确到秒的时间戳,而是会根据“人工智能伦理”这个语义概念,快速调动相关的记忆。

1. 现有检查点检索的局限性:

  • 仅限 thread_id: 只能通过唯一的 thread_id 来加载特定会话的最新或历史状态。无法跨会话共享或检索。
  • 缺乏语义理解: 无法理解检查点内部存储的“内容”或“主题”。你无法问它:“之前某个用户问过关于退货政策,我们当时怎么处理的?”除非你恰好知道那个用户的 thread_id
  • 无法应对新情境: 当遇到一个全新的 thread_id 但在语义上与历史某个 thread_id 中的特定环节高度相似时,无法主动召回相关经验。

2. 我们的愿景:语义化的检查点记忆

为了弥补这一缺陷,我们需要引入“语义检索”的能力。我们的目标是:

  • 将每个检查点视为一个“记忆单元”: 不仅仅是数据,更是包含了特定上下文、用户意图、AI响应和工具执行的“事件”。
  • 提取记忆的“精髓”并向量化: 从检查点的复杂状态中,抽取最能代表其核心内容和语义的文本信息,并将其转化为高维向量。
  • 构建一个向量记忆库: 将这些向量及其原始检查点ID、元数据存储到一个向量数据库中。
  • 动态、智能地召回: 在LangGraph执行的任何阶段,都可以根据当前的用户输入、任务目标或内部状态,查询向量记忆库,召回语义上最相关的历史检查点。
  • 将召回的记忆注入上下文: 把召回的记忆(可能是摘要或关键片段)作为额外的上下文信息,提供给LLM或图中的其他节点,从而影响后续的决策和生成。

通过这种方式,我们的LangGraph应用将不再是每次都从零开始,或者仅仅依赖于当前 thread_id 的短暂记忆。它将拥有一个持续增长的、可检索的、能够跨会话学习的“第二大脑”。

三、架构设计:LangGraph 与向量存储的深度融合

要实现“Vector Store as Memory”,我们需要在LangGraph的检查点机制之上,构建一个更加智能的内存层。这不仅仅是简单地将检查点数据扔进向量数据库,而是要精心设计如何提取、存储和检索这些记忆。

1. 总体系统架构概览:

+---------------------+
|     User/System     |
|       Input         |
+----------+----------+
           |
           v
+---------------------+
|   LangGraph Orchestrator    |
|   (Nodes, Edges, State)     |
+----------+----------+
           | (State Update)
           v
+---------------------+
|  Custom Checkpoint Saver    |
|  (Extends BaseCheckpointSaver) |
+----------+----------+
           |
           +---------------------------------+
           |                                 |
           v                                 v
+---------------------+             +---------------------+
|   Raw Checkpoint    |             |  Checkpoint Content |
|   Persistence       |             |   Extraction &      |
| (e.g., SQLSaver)    |             |     Summarization   |
+---------------------+             +----------+----------+
                                               |
                                               v
                                      +---------------------+
                                      |    Embedding Model  |
                                      | (e.g., OpenAI Embeddings) |
                                      +----------+----------+
                                                 | (Vector)
                                                 v
                                      +---------------------+
                                      |   Vector Database   |
                                      | (e.g., Chroma, Pinecone, Qdrant) |
                                      | (Stores Embeddings + Metadata)  |
                                      +---------------------+
                                                 ^
                                                 | (Retrieval Query)
                                                 |
+---------------------+             +----------+----------+
|  Memory Retrieval   |             |  Current User Query / |
|   Node (LangGraph)  |<------------|  Graph Context        |
+----------+----------+             +---------------------+
           | (Retrieved Memories)
           v
+---------------------+
|  LLM Prompt / State |
|    Augmentation     |
+---------------------+
           |
           v
+---------------------+
|   LangGraph Continue |
|   (Informed Decision) |
+---------------------+

2. 核心组件与职责:

  1. LangGraph Application:

    • 定义图的节点、边和状态。
    • 执行业务逻辑,处理用户输入,调用工具等。
    • 利用定制的 CheckpointVectorSaver 进行状态持久化。
  2. Custom Checkpoint Saver (CheckpointVectorSaver):

    • 继承自 langgraph.checkpoint.base.BaseCheckpointSaver
    • put() 方法被调用时,除了将原始检查点数据保存到传统的持久化存储(如SQLSaver)外,还会执行以下额外步骤:
      • 内容提取: 从LangGraph的状态对象中提取出有意义的文本内容,例如用户消息、AI回复、工具调用摘要等。
      • 语义总结/表示: 将提取的内容进行总结,形成一个简洁且语义丰富的文本描述,作为向量化的输入。
      • 向量化: 使用一个预训练的嵌入模型(如OpenAI Embeddings, Sentence Transformers等)将总结文本转换为高维向量。
      • 向量数据库存储: 将生成的向量连同原始检查点ID、thread_id、时间戳、摘要文本等元数据,一并存储到向量数据库中。
  3. Embedding Service:

    • 负责将文本转换为向量。可以是本地模型,也可以是API服务(如OpenAI)。
    • 选择合适的Embedding模型对于检索质量至关重要。
  4. Vector Database:

    • 专门用于存储和检索高维向量。
    • 支持高效的相似性搜索(例如,余弦相似度)。
    • 存储每个检查点摘要的向量,并关联回原始检查点的唯一标识符。
    • 需要能够存储额外的元数据,以便在检索时进行过滤和排序。

    向量数据库条目结构示例:

    字段名称 类型 描述
    id string 向量数据库中的唯一ID,可以是 checkpoint_id 的组合或新生成。
    vector float[] 检查点摘要的嵌入向量。
    thread_id string 原始LangGraph会话的ID。
    checkpoint_id string 原始LangGraph检查点的ID。
    thread_ts datetime 检查点创建时间戳。
    summary_text string 用于生成向量的文本摘要。
    original_saver_key string 指向原始 BaseCheckpointSaver 中存储的检查点记录的键(例如,SQLite中的checkpoint_id)。
    source_node string (可选) 触发此检查点存储的LangGraph节点名称。
    user_feedback string (可选) 用户对此次交互的反馈,用于未来优化。
  5. Memory Retrieval Node (LangGraph Node):

    • 这是一个特殊的LangGraph节点,在图的适当位置被激活。
    • 查询生成: 从当前的LangGraph状态中提取出用于查询记忆库的文本(例如,最新的用户消息、当前任务描述)。
    • 向量数据库查询: 使用查询文本生成其嵌入向量,并在向量数据库中执行相似性搜索。
    • 结果处理: 获取最相似的K个记忆条目。
    • 记忆提取: 根据检索到的元数据(original_saver_key),从原始的 BaseCheckpointSaver 中加载完整的历史检查点状态(如果需要)。
    • 格式化与注入: 将检索到的记忆(可能是它们的摘要、关键对话片段或特定的变量值)格式化为LLM可以理解的文本,并将其注入到当前的LangGraph状态中,供后续的LLM调用或其他节点使用。
  6. Context Injection Mechanism:

    • 将检索到的记忆作为LLM提示的一部分,例如作为系统消息、few-shot示例,或直接作为额外的上下文。
    • 也可以更新LangGraph的共享状态,让其他节点能够访问这些记忆。

四、实现细节与代码示例

现在,我们将深入到具体的代码实现,展示如何构建上述架构。

1. 准备工作:环境与依赖

我们需要安装LangGraph、LangChain相关库以及一个向量数据库客户端。这里我们以 ChromaDB 为例,因为它轻量且易于本地部署。

pip install -U langgraph langchain-openai "langchain[chroma]" "chromadb>=0.4.14"

2. 自定义 CheckpointVectorSaver

这是实现核心记忆功能的关键。我们将创建一个 VectorBackedCheckpointSaver,它将结合 SQLiteSaverChromaDB

import uuid
import json
from datetime import datetime
from typing import Dict, Any, List, Optional, Union
from langgraph.checkpoint.base import BaseCheckpointSaver, Checkpoint, CheckpointTuple, CheckpointMetadata
from langgraph.checkpoint.sqlite import SQLiteSaver
from langchain_core.messages import BaseMessage, HumanMessage, AIMessage, ToolMessage, SystemMessage
from langchain_core.embeddings import Embeddings
from langchain_openai import OpenAIEmbeddings
from langchain_chroma import Chroma
from langchain_core.documents import Document

# 假设我们有一个LLM来总结记忆,这里我们直接从消息中提取
from langchain_openai import ChatOpenAI
from langchain_core.prompts import ChatPromptTemplate
from langchain_core.output_parsers import StrOutputParser

# 用于记忆总结的LLM
_llm_for_summary = ChatOpenAI(model="gpt-3.5-turbo", temperature=0)
_summary_prompt = ChatPromptTemplate.from_messages([
    ("system", "你是一个善于总结对话和任务的助手。请将以下对话或任务状态总结成一段简洁的描述,以便未来通过语义搜索来回忆。重点关注用户意图、关键信息和AI采取的行动。"),
    ("user", "{content}")
])
_summary_chain = _summary_prompt | _llm_for_summary | StrOutputParser()

class VectorBackedCheckpointSaver(BaseCheckpointSaver):
    """
    一个结合了传统CheckpointSaver(如SQLiteSaver)和向量数据库的自定义Saver。
    它在保存检查点的同时,会提取其关键信息,生成嵌入向量,并存储到向量数据库中。
    """
    def __init__(
        self,
        wrapped_saver: BaseCheckpointSaver,
        vectorstore: Chroma,
        embedding_model: Embeddings,
        summary_chain: Any, # Expects a LangChain Runnable for summarization
    ):
        self.wrapped_saver = wrapped_saver
        self.vectorstore = vectorstore
        self.embedding_model = embedding_model
        self.summary_chain = summary_chain

    def _extract_and_summarize_state(self, state: Dict[str, Any]) -> str:
        """
        从LangGraph的状态中提取关键信息并进行总结。
        这里我们重点提取消息历史和当前的查询。
        """
        messages_content = []
        if "messages" in state and isinstance(state["messages"], list):
            for msg in state["messages"]:
                if isinstance(msg, HumanMessage):
                    messages_content.append(f"用户: {msg.content}")
                elif isinstance(msg, AIMessage):
                    messages_content.append(f"AI: {msg.content}")
                elif isinstance(msg, ToolMessage):
                    messages_content.append(f"工具({msg.name}): {msg.content}")
                elif isinstance(msg, SystemMessage):
                    messages_content.append(f"系统: {msg.content}")

        current_query = state.get("current_query", "")
        tool_output = state.get("tool_output", "")

        context_str = "n".join(messages_content)
        if current_query:
            context_str += f"n当前查询: {current_query}"
        if tool_output:
            context_str += f"n工具输出: {tool_output}"

        if not context_str:
            return "空状态或无法提取有效信息。"

        # 使用LLM链进行总结
        try:
            summary = self.summary_chain.invoke({"content": context_str})
            return summary
        except Exception as e:
            print(f"Error summarizing checkpoint state: {e}")
            return f"未能总结状态,原始内容: {context_str[:200]}..." # 失败时返回部分原始内容

    def put(self, config: Dict[str, Any], checkpoint: Checkpoint) -> CheckpointTuple:
        """
        保存检查点,并将其摘要存储到向量数据库。
        """
        # 1. 首先通过包装的saver保存原始检查点
        checkpoint_tuple = self.wrapped_saver.put(config, checkpoint)

        thread_id = config["configurable"]["thread_id"]
        # checkpoint_id = checkpoint_tuple.checkpoint_id # 使用 wrapped_saver 返回的 checkpoint_id
        checkpoint_id = checkpoint.id # checkpoint对象自带id
        thread_ts = checkpoint.get("metadata", {}).get("thread_ts", datetime.now().isoformat())

        # 2. 从检查点状态中提取并总结信息
        state_for_embedding = self._extract_and_summarize_state(checkpoint["values"])

        # 3. 生成嵌入向量
        # embedding = self.embedding_model.embed_query(state_for_embedding) # embed_query 适用于单个字符串

        # 4. 存储到向量数据库
        doc_id = f"{thread_id}-{checkpoint_id}-{uuid.uuid4()}" # 确保在向量DB中是唯一的
        document = Document(
            page_content=state_for_embedding,
            metadata={
                "thread_id": thread_id,
                "checkpoint_id": checkpoint_id,
                "thread_ts": thread_ts,
                "original_saver_key": checkpoint_id, # 存储原始saver中记录的id
            }
        )
        self.vectorstore.add_documents([document], ids=[doc_id])
        print(f"Saved checkpoint summary to vector DB. Doc ID: {doc_id}")

        return checkpoint_tuple

    def get(self, config: Dict[str, Any]) -> Optional[Checkpoint]:
        """
        从包装的saver获取检查点。此方法不涉及向量数据库,
        向量数据库用于检索,而不是直接加载检查点。
        """
        return self.wrapped_saver.get(config)

    def list(self, config: Dict[str, Any]) -> List[CheckpointMetadata]:
        """
        从包装的saver列出检查点。
        """
        return self.wrapped_saver.list(config)

    # 确保实现了所有抽象方法
    def __init_subclass__(cls) -> None:
        super().__init_subclass__()

这段代码定义了一个 VectorBackedCheckpointSaver 类。它的核心逻辑在 put 方法中:

  1. 首先调用内部的 wrapped_saver(例如 SQLiteSaver)来完成标准的检查点持久化。
  2. 然后,它会调用 _extract_and_summarize_state 方法从当前LangGraph的状态中提取关键信息。这里我们简单地拼接了对话消息,但更复杂的场景可能需要解析工具调用、变量等,甚至使用一个小型LLM来生成更精炼的摘要。
  3. 使用 OpenAIEmbeddings 将摘要文本转换为向量。
  4. 将这个向量连同原始 thread_idcheckpoint_id 和摘要文本等元数据存储到 Chroma 向量数据库中。

3. 记忆检索节点 (MemoryRetrievalNode)

接下来,我们需要一个LangGraph节点,它能够查询向量数据库,召回相关的历史记忆。

from langgraph.graph import StateGraph, END, START
from langchain_core.runnables import RunnableLambda

# 假设AgentState是我们在LangGraph中使用的状态
# class AgentState(TypedDict):
#     messages: List[BaseMessage]
#     current_query: str
#     tool_output: str
#     retrieved_memories: List[str] # 新增字段用于存储检索到的记忆

def retrieve_memories(state: AgentState, vectorstore: Chroma) -> AgentState:
    """
    根据当前的用户查询,从向量数据库中检索相关记忆。
    """
    current_query = state.get("current_query")
    if not current_query and state.get("messages"):
        # 如果没有明确的current_query,尝试从最新的人类消息中获取
        last_message = state["messages"][-1]
        if isinstance(last_message, HumanMessage):
            current_query = last_message.content

    if not current_query:
        print("No query to retrieve memories from.")
        return {"retrieved_memories": []}

    print(f"Retrieving memories for query: '{current_query}'")

    # 执行相似性搜索
    # 可以在这里添加过滤条件,例如只检索特定thread_id的记忆
    # 但为了演示跨thread_id检索,我们暂时不加
    # results = vectorstore.similarity_search_with_score(current_query, k=3, filter={"thread_id": state["config"]["configurable"]["thread_id"]})
    results = vectorstore.similarity_search_with_score(current_query, k=3)

    retrieved_memories_text = []
    for doc, score in results:
        # 过滤掉得分太低的记忆,或者与当前thread_id精确匹配的,除非特别需要
        # 这里的过滤逻辑可以根据需求调整
        # if score < 0.7: # 例如,只取相似度高于0.7的
        #     continue

        # 避免检索到与当前thread_id完全一致的记忆,除非是历史版本
        # if doc.metadata.get("thread_id") == state["config"]["configurable"]["thread_id"]:
        #     continue

        memory_info = (
            f"历史记忆 (相似度: {score:.2f}):n"
            f"  Thread ID: {doc.metadata.get('thread_id', 'N/A')}n"
            f"  Checkpoint ID: {doc.metadata.get('checkpoint_id', 'N/A')}n"
            f"  时间: {doc.metadata.get('thread_ts', 'N/A')}n"
            f"  内容: {doc.page_content}n"
        )
        retrieved_memories_text.append(memory_info)

    if retrieved_memories_text:
        print(f"Retrieved {len(retrieved_memories_text)} memories.")
        return {"retrieved_memories": retrieved_memories_text}
    else:
        print("No relevant memories found.")
        return {"retrieved_memories": []}

# 这个节点需要被包装成Runnable以便在LangGraph中使用
memory_retrieval_node = RunnableLambda(lambda state: retrieve_memories(state, vectorstore_client))

retrieve_memories 函数作为LangGraph的一个节点,它的职责是:

  1. 从当前 AgentState 中获取用户最新的查询(或通过 messages 列表推断)。
  2. 使用这个查询文本对 Chroma 向量数据库进行相似性搜索。
  3. 将检索到的相关记忆(及其元数据和相似度分数)格式化成一个字符串列表。
  4. 将这个列表作为 retrieved_memories 字段添加到 AgentState 中。

4. 将记忆注入LLM提示

一旦记忆被检索到,我们需要将其注入到LLM的提示中,使其能够利用这些历史信息。这通常在一个LLM节点之前完成。

def format_memories_for_llm(state: AgentState) -> AgentState:
    """
    将检索到的记忆格式化,并添加到LLM的系统消息中。
    """
    retrieved_memories = state.get("retrieved_memories", [])
    messages = state["messages"]

    if not retrieved_memories:
        return state # 没有记忆则直接返回

    memory_prompt_part = "nn--- 历史相关记忆 ---n" + "n".join(retrieved_memories) + "n---------------------nn"

    # 将记忆作为系统消息插入到对话历史的开头(或靠近用户查询的地方)
    # 确保不要重复插入
    if not messages or not (isinstance(messages[0], SystemMessage) and "历史相关记忆" in messages[0].content):
        updated_messages = [SystemMessage(content=memory_prompt_part)] + messages
    else:
        # 如果已经有系统消息,可以考虑更新或追加
        updated_messages = messages
        if "历史相关记忆" not in updated_messages[0].content:
            updated_messages[0].content += memory_prompt_part

    print(f"Injected {len(retrieved_memories)} memories into LLM prompt.")
    return {"messages": updated_messages}

memory_injector_node = RunnableLambda(format_memories_for_llm)

这个 format_memories_for_llm 函数接收包含 retrieved_memories 的状态,然后将这些记忆以结构化的方式插入到 messages 列表的开头,通常作为 SystemMessage。这样,LLM在处理当前用户查询时,就能看到这些历史上下文。

5. 组装 LangGraph

现在,我们将所有组件整合到一个LangGraph中。

# 确保设置了OpenAI API Key
os.environ["OPENAI_API_KEY"] = "YOUR_OPENAI_API_KEY"

# 初始化嵌入模型和向量数据库
embedding_model = OpenAIEmbeddings(model="text-embedding-ada-002")
vectorstore_client = Chroma(
    collection_name="langgraph_memories",
    embedding_function=embedding_model,
    persist_directory="./chroma_db" # 持久化到本地文件
)
# 注意:在实际应用中,你可能需要根据实际情况初始化Chroma客户端
# 例如,如果Chroma服务器运行在远程,你需要配置其host和port

# 初始化传统的CheckpointSaver
base_saver = SQLiteSaver.from_file("langgraph_checkpoints_with_vector.sqlite")

# 初始化我们的VectorBackedCheckpointSaver
vector_backed_saver = VectorBackedCheckpointSaver(
    wrapped_saver=base_saver,
    vectorstore=vectorstore_client,
    embedding_model=embedding_model,
    summary_chain=_summary_chain
)

# 定义新的AgentState,包含retrieved_memories
class AgentState(TypedDict):
    messages: List[BaseMessage]
    current_query: str
    tool_output: str
    retrieved_memories: List[str] # 用于存储检索到的记忆

# 定义LLM节点 (与之前相同)
def call_llm(state: AgentState) -> AgentState:
    messages = state["messages"]
    model = ChatOpenAI(temperature=0)
    response = model.invoke(messages)
    return {"messages": messages + [response]}

# 组装LangGraph
workflow = StateGraph(AgentState)

# 1. 入口点:首先进行记忆检索
workflow.add_node("retrieve_memories", RunnableLambda(lambda state: retrieve_memories(state, vectorstore_client)))

# 2. 注入记忆到LLM提示
workflow.add_node("inject_memories", memory_injector_node)

# 3. 调用LLM
workflow.add_node("llm", call_llm)

# 4. 定义边
workflow.set_entry_point("retrieve_memories")
workflow.add_edge("retrieve_memories", "inject_memories")
workflow.add_edge("inject_memories", "llm")
workflow.add_edge("llm", END)

# 编译图并传入我们定制的saver
app_with_memory = workflow.compile(checkpointer=vector_backed_saver)

# 运行示例
print("n--- 第一次会话 (Thread 1) ---")
config_1 = {"configurable": {"thread_id": "customer_support_001"}}
response_1_1 = app_with_memory.invoke(
    {"messages": [HumanMessage(content="我购买的XYZ产品在两天内就坏了,我该如何退货?")], "current_query": "XYZ产品退货流程"},
    config=config_1
)
print(f"Agent (Thread 1, Round 1): {response_1_1['messages'][-1].content}n")

response_1_2 = app_with_memory.invoke(
    {"messages": [HumanMessage(content="那如果我没有原始包装盒怎么办?")], "current_query": "没有原始包装盒退货"},
    config=config_1
)
print(f"Agent (Thread 1, Round 2): {response_1_2['messages'][-1].content}n")

print("n--- 第二次会话 (Thread 2) ---")
config_2 = {"configurable": {"thread_id": "customer_support_002"}}
# 模拟一个新用户,但问题与之前会话的某个点相似
response_2_1 = app_with_memory.invoke(
    {"messages": [HumanMessage(content="我收到的ABC产品是坏的,如何申请退款?")], "current_query": "ABC产品退款申请"},
    config=config_2
)
print(f"Agent (Thread 2, Round 1): {response_2_1['messages'][-1].content}n")

# 清理ChromaDB (可选)
# vectorstore_client.delete_collection()
# print("ChromaDB collection cleared.")

在这个完整的示例中:

  1. 我们初始化了 OpenAIEmbeddingsChroma 向量数据库。
  2. 创建了 VectorBackedCheckpointSaver,将其作为 checkpointer 传入 workflow.compile()。这意味着每次LangGraph状态更新并保存检查点时,其摘要也会被向量化并存储到Chroma中。
  3. 修改了 AgentState,增加了 retrieved_memories 字段。
  4. 在LangGraph中,我们新增了 retrieve_memoriesinject_memories 两个节点。
    • retrieve_memories 节点会在每次LLM调用前执行,根据当前的用户查询去向量数据库中搜索相关的历史记忆。
    • inject_memories 节点则负责将这些检索到的记忆格式化并插入到LLM的提示中。
  5. 我们运行了两个不同的 thread_id,但第二个 thread_id 的问题与第一个 thread_id 的部分内容在语义上是相似的。通过观察输出,我们可以看到 retrieve_memories 节点是否成功召回了跨 thread_id 的记忆。

五、高级考量与挑战

构建“第二大脑”并非一蹴而就,涉及多个复杂的设计决策和挑战。

1. 上下文窗口管理与记忆摘要:

  • 挑战: 检索到的记忆可能非常多,或者单个记忆条目内容过长,容易超出LLM的上下文窗口限制。
  • 解决方案:
    • 限制检索数量 (k): 只检索最相关的K个记忆。
    • 记忆摘要: 在将记忆注入LLM之前,对每个检索到的记忆进行二次摘要,提取其核心要点。这可以是一个小型LLM调用,或者基于规则的提取。
    • 分层记忆: 建立不同粒度的记忆。例如,粗粒度的“会话摘要”和细粒度的“特定交互细节”。先检索粗粒度记忆,再根据需要深入检索细粒度记忆。
    • 相关性截断: 根据相似度分数动态决定是否包含某个记忆。

2. 新颖性与相关性平衡 (Recency vs. Relevance):

  • 挑战: 有时最新的记忆比非常久远但语义相似的记忆更重要。纯粹的语义搜索可能无法捕获这种时间维度上的偏好。
  • 解决方案:
    • 混合检索: 结合语义相似度和时间衰减因子。在向量数据库查询时,可以对时间较近的记忆给予更高的权重,或者在检索后根据时间戳对结果进行二次排序。
    • 时间戳过滤: 仅检索在过去N天/小时内发生的记忆。
    • “近邻”记忆: 在检索到某个核心记忆后,同时获取其前后一定时间范围内的其他检查点,以提供更完整的上下文。

3. 记忆的遗忘与修剪 (Forgetting and Pruning):

  • 挑战: 向量数据库会不断增长,存储和检索成本会随之增加。某些记忆可能变得过时、不准确或不再有用。
  • 解决方案:
    • 基于时间衰减的删除: 定期删除超过一定时间阈值的记忆。
    • 基于重要性/使用频率的删除: 标记那些从未被检索或使用过的记忆,并定期清理。
    • 用户反馈驱动的删除: 允许用户标记“不相关”或“错误”的记忆,并将其从记忆库中移除。
    • 记忆合并/压缩: 将多个连续的、语义相似的检查点合并成一个更简洁的摘要记忆。

4. 成本与性能:

  • 挑战: 每次检查点保存都需要调用Embedding模型和向量数据库;每次检索也需要Embedding调用和向量数据库查询,这会增加延迟和API成本。
  • 解决方案:
    • 批量处理: 批量生成嵌入,减少API调用次数。
    • 异步操作: 将检查点向量化和存储操作异步化,避免阻塞主LangGraph执行流。
    • 局部Embedding模型: 对于对延迟和成本敏感的场景,可以考虑使用本地运行的Embedding模型。
    • 优化向量数据库查询: 确保向量数据库索引优化,选择合适的硬件。
    • 智能缓存: 缓存最近检索到的记忆,避免重复查询。

5. 状态序列化与提取的复杂性:

  • 挑战: LangGraph的状态可以是任意Python对象,将其提取并总结成文本可能很复杂。
  • 解决方案:
    • 明确状态结构:AgentState 中尽量使用简单、可序列化的类型。
    • 定制化提取逻辑: 根据每个应用的状态结构,编写专门的 _extract_and_summarize_state 逻辑。
    • LLM辅助总结: 利用LLM的强大理解和生成能力,对复杂状态进行总结,如我们在示例中所示。

6. 多租户与用户隔离:

  • 挑战: 在多用户系统中,确保每个用户的记忆是私有的,不能被其他用户检索到。
  • 解决方案:
    • 向量数据库过滤: 在查询向量数据库时,始终添加 thread_iduser_id 作为过滤条件。
    • 独立的向量集合: 为每个用户或租户创建独立的向量集合。

7. 评估与验证:

  • 挑战: 如何衡量“第二大脑”的有效性?
  • 解决方案:
    • A/B测试: 对比有无记忆系统对任务完成率、用户满意度、LLM回复质量的影响。
    • 人工评估: 专家对AI行为进行评估,看其是否展现出基于记忆的智能。
    • 指标跟踪: 记录记忆检索命中率、检索到的记忆对最终决策的影响等。

六、未来展望

将LangGraph的检查点与向量数据库融合,仅仅是构建真正智能AI记忆系统的开始。未来,我们可以探索更多高级功能:

  • 自省与主动学习: Agent不再是被动地等待用户查询记忆,而是能够主动地在决策过程中反思“我过去做过类似的事情吗?”,并主动检索相关经验。
  • 情节记忆与语义记忆的结合: 区分特定事件(情节记忆)和泛化知识(语义记忆)。向量存储可以更好地处理语义记忆,而特定事件可能需要更精细的索引(如时间、关键实体)。
  • 记忆编辑与修正: 允许Agent或人类用户对过去的记忆进行修正或补充,以纠正错误或更新信息。这对于Agent的持续学习和适应性至关重要。
  • 图谱化记忆 (Knowledge Graph as Memory): 将记忆不仅仅存储为文本向量,而是构建成知识图谱。这样可以利用图谱的结构化查询能力和推理能力,实现更复杂、更精确的记忆检索和利用。例如,可以查询“谁在什么时候对哪个产品提出了退货请求,并且最终解决方案是什么?”
  • 多模态记忆: 将视觉、听觉等非文本信息也纳入记忆范畴,通过多模态嵌入技术进行统一存储和检索。

结语

今天我们深入探讨了如何通过巧妙地融合LangGraph的检查点机制与向量数据库,为我们的AI应用构建一个功能强大的“第二大脑”。这不仅仅是数据持久化,更是将原始状态转化为可语义检索的智慧。通过定制 CheckpointVectorSaverMemoryRetrievalNode,我们赋予了LangGraph应用跨会话、跨情境学习和推理的能力,使其能够真正从历史经验中受益,并做出更明智、更一致的决策。

从简单的状态机到具备丰富记忆的智能体,我们正一步步迈向更高级的通用人工智能。这个“第二大脑”的构建,是这一进程中不可或缺的关键里程碑。

发表回复

您的邮箱地址不会被公开。 必填项已用 * 标注