深入 ‘Long-term Relationship Modeling’:利用 LangGraph 实现具备‘用户偏好演进轨迹’的持续记忆系统

尊敬的各位技术同仁,

欢迎来到今天的讲座。我们今天将深入探讨一个极具挑战性也充满机遇的话题:如何构建一个能够理解并适应用户长期关系的智能系统,特别是如何利用 LangGraph 框架实现一个具备“用户偏好演进轨迹”的持续记忆系统。

在当今高度个性化的数字时代,仅仅停留在对用户当前意图的理解是远远不够的。一个真正智能的系统,应该能够像一个经验丰富的朋友或顾问一样,记住用户的喜好、习惯,更重要的是,能够感知这些偏好是如何随着时间、情境和互动而悄然变化的。这不仅是提升用户体验的关键,更是构建持久用户关系、实现前瞻性服务的基础。

我们将从理论到实践,逐步解构这一复杂系统。首先,我们将审视传统 AI 系统在记忆和长期关系建模上的局限性。接着,我们将深入 LangGraph 的核心机制,理解它如何为我们构建这样的系统提供了强大的工具。随后,我们将设计一个具体的架构,并辅以详尽的代码示例,展示如何将用户偏好演进轨迹的概念落地。最后,我们将探讨优化策略和未来展望。


第一部分:理解长期关系建模的必要性与挑战

在人工智能领域,我们已经取得了长足的进步,尤其是在自然语言处理和短期任务完成方面。然而,当涉及到需要“记忆”和“理解”用户长期行为模式的场景时,大多数系统仍显得力不从心。

1.1 传统 AI 系统的记忆困境

传统的基于会话的 AI 系统,无论是聊天机器人还是推荐引擎,通常面临以下记忆挑战:

  • 上下文窗口限制: 大型语言模型(LLMs)虽然拥有巨大的上下文窗口,但它们并非无限。长时间的对话或大量的历史数据很快就会超出这些限制,导致模型“遗忘”早期的互动。
  • 无状态性: 许多系统本质上是无状态的。每次用户请求都被视为独立事件,缺乏将当前请求与过去行为关联起来的内在机制。即使有数据库存储,系统也往往需要明确的指令去查询,而非自主地整合信息。
  • 缺乏个性化演进: 即使系统能记住历史数据,也很难主动地从这些数据中提炼出用户偏好的动态变化。它们可能只是简单地重复过去的推荐,而忽略了用户品味可能已经发生了转变。
  • “冷启动”与“遗忘”问题: 对于新用户,系统缺乏足够数据进行个性化;对于老用户,若长时间未互动,系统可能无法有效更新其偏好,导致推荐质量下降。

1.2 长期关系建模的需求

长期关系建模旨在超越短时记忆,构建对用户更深层次、更动态的理解。其核心需求包括:

  • 持续个性化: 无论用户何时回归,系统都能基于其最新的偏好和历史行为提供高度个性化的体验。
  • 前瞻性推荐: 系统不仅能响应用户当前的显式需求,还能预测其潜在需求,甚至在用户意识到之前就提供建议。
  • 动态适应: 用户的偏好不是一成不变的,它们会随着年龄、生活阶段、外部事件、甚至心情而变化。系统必须能够捕捉并适应这些变化。
  • 用户旅程理解: 将用户的每一次互动串联起来,形成一个完整的“用户旅程”,从而理解用户行为背后的动机和目标。

1.3 用户偏好演进的复杂性

捕捉用户偏好的演进轨迹是一个复杂的过程,涉及多个层面:

  • 偏好的非静态性: 用户对产品、内容、服务乃至交互方式的偏好并非固定不变。例如,一个用户可能在年轻时偏爱快节奏的动作电影,随着年龄增长转而喜欢剧情片。
  • 显式与隐式反馈: 显式反馈(如点赞、收藏、评分)直接反映了用户偏好,但往往稀疏且滞后。隐式反馈(如点击、停留时间、购买行为、浏览路径)更丰富,但需要复杂的推理才能提炼出偏好。
  • 上下文依赖性: 同一个用户在不同情境下可能表现出不同的偏好。例如,工作日可能偏爱商务资讯,周末则倾向于娱乐内容。
  • 渐进式转变与突然变化: 偏好演进可能是一个缓慢渐进的过程,也可能因为某个重大事件(如搬家、职业变动)而突然发生转变。系统需要能够识别这两种模式。
  • 多维度偏好: 偏好往往是多维度的,例如,用户可能喜欢某个类型的电影,但对特定导演或演员也有偏好,同时对观看时长、语言等也有要求。

为了应对这些挑战,我们需要一个强大的框架来协调多个智能组件,并管理复杂的状态。LangGraph 正是为此而生。


第二部分:LangGraph 核心概念解析

LangGraph 是 LangChain 生态系统中的一个高级库,它允许我们通过构建有向无环图(DAG)或循环图来编排复杂的 Agent 工作流。它提供了一种声明式的方式来定义 Agent 之间的交互、工具使用以及状态管理,这对于实现具备长期记忆和偏好演进能力的系统至关重要。

2.1 Agentic Workflows 与状态管理

LangGraph 的核心思想是将复杂的任务分解为一系列相互协作的“Agent”(代理)和“Tool”(工具),并通过一个共享的“Graph State”(图状态)来协调它们。

  • Agent: 通常是一个由大型语言模型(LLM)驱动的决策单元,它可以接收输入、思考、决定执行哪个工具,并生成输出。
  • Tool: 外部功能或 API 接口,Agent 可以调用它们来获取信息或执行操作,例如查询数据库、发送邮件、调用外部 API 等。
  • Graph State: 这是 LangGraph 最关键的特性之一。它是一个可变的数据结构,在整个图的执行过程中共享。每个节点(Agent 或工具)都可以读取和更新这个状态。这意味着 Agent 之间可以传递信息,并且整个系统能够维护一个持久的上下文。

2.2 节点与边:构建流程

LangGraph 的工作流是通过定义节点(Nodes)和边(Edges)来构建的。

  • 节点 (Nodes):
    • Agent 节点: 封装了一个 Agent 的逻辑,通常包括一个 LLM 和一套工具。
    • Tool 节点: 封装了单个工具的执行逻辑。
    • 函数节点: 可以是任何 Python 函数,用于执行特定的逻辑,如数据清洗、格式转换、条件判断等。
  • 边 (Edges):
    • 普通边 (Standard Edges): 从一个节点直接连接到另一个节点,表示顺序执行。
    • 条件边 (Conditional Edges): 这是 LangGraph 的强大之处。它允许根据当前 Graph State 的内容,动态地决定下一个要执行的节点。例如,一个 Agent 决定是否需要调用某个工具,或者根据用户的偏好选择不同的后续处理路径。

2.3 工具使用:扩展能力

Agent 的智能离不开工具的辅助。LangGraph 能够无缝集成 LangChain 提供的各种工具,使 Agent 能够:

  • 查询外部知识: 通过 RAG (Retrieval Augmented Generation) 工具查询向量数据库、传统数据库或知识图谱。
  • 执行操作: 调用 APIs 执行实际操作,如更新用户档案、发送消息、创建订单等。
  • 数据处理: 使用自定义工具处理数据,例如解析 JSON、计算统计数据等。

2.4 持久化记忆与 Checkpointers

对于长期关系建模,持久化记忆是不可或缺的。LangGraph 通过 Checkpointers 机制提供了强大的持久化能力。

  • Checkpointer: 它允许我们将 LangGraph 的整个 Graph State 存储到持久化存储中(如数据库、文件系统)。这意味着,即使系统重启,或者一个会话中断后重新开始,Graph State 也能被完全恢复,从而实现跨会话的记忆。
  • 使用场景:
    • 会话恢复: 用户中断对话后,可以从上次中断的地方继续。
    • 长期记忆: 存储用户在多次互动中积累的偏好、上下文信息。
    • 故障恢复: 系统崩溃后,可以从最近的检查点恢复状态。

LangGraph Checkpointers 的实现通常是基于键值存储,例如 SQLite、Redis 或自定义数据库。它以每次图执行的输入为键,存储图的完整状态。

LangGraph 核心概念总结

概念 描述 作用
Graph State 整个图共享的可变数据结构 维护系统上下文,Agent 间信息传递,实现状态持久化
Node 图中的执行单元,可以是 Agent、Tool 或普通函数 封装特定逻辑,构成工作流步骤
Edge 连接节点,定义执行顺序 控制流程,条件边实现动态决策
Agent 由 LLM 驱动的智能决策单元 理解用户意图,选择工具,生成响应,更新状态
Tool 外部功能接口(如数据库查询、API 调用) 扩展 Agent 能力,获取外部信息,执行实际操作
Checkpointer 将 Graph State 持久化到存储中 实现跨会话记忆,支持会话恢复和长期状态管理

通过这些核心概念,LangGraph 为我们构建一个能够理解和适应用户偏好演进的持续记忆系统奠定了坚实的基础。


第三部分:架构设计:用户偏好演进记忆系统

现在,我们将设计一个具体的系统架构,利用 LangGraph 来实现用户偏好演进轨迹的追踪。这个系统将模拟一个智能助手,它不仅能响应用户当前的请求,还能根据历史互动动态更新其对用户偏好的理解。

3.1 高层架构概览

我们的系统将由多个智能 Agent 和一系列工具组成,它们在一个 LangGraph 工作流中协同工作。其核心思想是:每次用户互动后,系统都会分析这次互动,提炼出潜在的偏好信息,并将其整合到用户的长期档案中。

+----------------+       +-------------------+       +--------------------+
|  用户输入/互动  | ----> |  LangGraph 核心流程  | <---- |    持久化存储       |
| (User Input/Interaction) |       | (LangGraph Core Workflow) |       | (Persistence Layer) |
+----------------+       +-------------------+       +--------------------+
                              ^       |                     ^     |
                              |       v                     |     |
                              |   +-----------------+       |     |
                              +---| 会话分析器 Agent |-------+     |
                                  | (Session Analyzer)|           |
                                  +-----------------+           |
                                            |                     |
                                            v                     |
                                  +-------------------+           |
                                  | 偏好演进器 Agent  |-----------+
                                  | (Preference Evolver)|
                                  +-------------------+
                                            |                     |
                                            v                     |
                                  +-------------------+           |
                                  | 策略生成器 Agent  |-----------+
                                  | (Strategy Generator)|
                                  +-------------------+
                                            |
                                            v
                                  +-----------------+
                                  | 系统响应/推荐   |
                                  | (System Response/Recommendation) |
                                  +-----------------+

3.2 核心组件与数据模型

为了实现上述架构,我们需要定义以下核心组件和数据模型:

  1. 用户档案 (UserProfile):

    • 这是用户长期偏好的核心存储。它会随着每次互动而更新。
    • 可以包含结构化数据(如年龄、性别、注册日期)和非结构化数据(如兴趣标签、偏好描述、近期活动摘要)。
    • 我们使用一个结构化的 JSON 或字典来表示,方便 LLM 进行解析和更新。

    UserProfile 数据模型示例

    字段 类型 描述
    user_id String 用户唯一标识符
    name String 用户名
    age_group String 年龄段(例如:“20-30岁”、“30-40岁”)
    location String 用户所在地
    interests List[String] 显式或隐式提取的兴趣标签(例如:["科幻电影", "户外运动", "编程", "历史小说"])
    preferred_genres Dict[String, int] 对不同内容类型的偏好程度(例如:{"电影": 5, "书籍": 4, "音乐": 3})
    recent_activity_summary String LLM 对近期用户行为的摘要,反映短期趋势
    preference_history List[Dict] 偏好演变的历史记录,包含时间戳和偏好快照,用于轨迹分析
    last_updated Timestamp 最后更新时间
  2. 会话事件 (SessionEvent):

    • 记录用户与系统每一次互动的详细信息,作为偏好提取的原始数据。
    • 可以存储在一个简单的日志文件、关系数据库或向量数据库中。

    SessionEvent 数据模型示例

    字段 类型 描述
    event_id String 事件唯一标识符
    user_id String 关联用户
    timestamp Timestamp 事件发生时间
    event_type String 事件类型(例如:“query”、“click”、“purchase”、“rating”)
    content String 用户输入、点击的项目名称、购买的商品描述等
    system_response String 系统对用户输入的回复或推荐的内容
    feedback String 用户对系统响应的显式或隐式反馈(例如:“满意”、“不满意”、“停留时间”)
  3. 持久化存储 (Persistence Layer):

    • UserProfile 存储: 关系型数据库(如 PostgreSQL)或文档型数据库(如 MongoDB)非常适合存储结构化的 UserProfile
    • SessionEvent 存储: 也可以是关系型数据库,或者为了更高效的语义搜索,可以使用向量数据库(如 ChromaDB, Weaviate)来存储事件的嵌入向量。
    • LangGraph Checkpointer: 用于持久化 LangGraph 的 Graph State,确保会话的连续性。通常可以是 SQLite、Redis 或自定义的数据库实现。

3.3 LangGraph 工作流设计

我们的 LangGraph 工作流将包含以下核心 Agent:

  1. 会话分析器 Agent (Session Analyzer Agent):

    • 职责: 在每次用户互动后,分析当前的 SessionEvent 和最近的 SessionHistory,从中提取出与用户偏好相关的新信息。这可能包括新的兴趣关键词、对某种内容类型的明确或隐式偏好、情绪变化等。
    • 工具:
      • query_session_history_tool: 查询最近的会话历史。
      • record_session_event_tool: 将当前事件记录到会话历史中。
    • 输出: 提取出的偏好洞察(如新的兴趣标签、对特定内容的积极/消极反馈)。
  2. 偏好演进器 Agent (Preference Evolver Agent):

    • 职责: 接收来自 Session Analyzer 的偏好洞察,结合当前的 UserProfilePreferenceHistory,决定如何更新用户的长期偏好。这可能涉及添加新兴趣、调整现有兴趣的权重、识别偏好的转变趋势,甚至清理过时的偏好。
    • 工具:
      • get_user_profile_tool: 获取当前用户的完整档案。
      • update_user_profile_tool: 更新用户的档案,包括兴趣、偏好强度和偏好历史记录。
    • 输出: 更新后的 UserProfile
  3. 策略生成器 Agent (Strategy Generator Agent):

    • 职责: 基于最新的 UserProfile 和当前的会话上下文,生成下一步的系统响应、推荐或交互策略。这可以是直接的答案,也可以是更具前瞻性的内容推荐。
    • 工具:
      • get_user_profile_tool: 获取最新的用户档案。
      • retrieve_recommendations_tool: 从内容库中检索符合用户偏好的内容。
    • 输出: 最终的用户响应或推荐。

LangGraph 工作流节点与边定义

节点名称 类型 职责 输入 输出 关联 Agent/Tool
start_node __start__ 流程起始,接收用户输入 user_input, user_id user_input, user_id N/A
analyze_session Agent 分析当前会话,提取偏好洞察 user_input, user_id, session_history preference_insights SessionAnalyzerAgent, query_session_history_tool, record_session_event_tool
evolve_preferences Agent 根据洞察更新用户长期偏好档案 user_id, preference_insights, user_profile updated_user_profile PreferenceEvolverAgent, get_user_profile_tool, update_user_profile_tool
generate_strategy Agent 基于更新的偏好生成系统响应/推荐 user_id, updated_user_profile, user_input system_response StrategyGeneratorAgent, get_user_profile_tool, retrieve_recommendations_tool
end_node __end__ 流程结束,返回系统响应 system_response system_response N/A

工作流示意图(LangGraph 视角,文本表示)

+----------------+
|    __start__   |
+----------------+
        |
        v
+-------------------+
|  analyze_session  |  (Session Analyzer Agent)
+-------------------+
        |
        v
+---------------------+
|  evolve_preferences |  (Preference Evolver Agent)
+---------------------+
        |
        v
+---------------------+
|  generate_strategy  |  (Strategy Generator Agent)
+---------------------+
        |
        v
+----------------+
|     __end__    |
+----------------+

这是一个线性的简化流程。在实际应用中,generate_strategy 之后可能还有条件边,例如,如果用户请求更详细的信息,可以再次回到 analyze_session 或其他 Agent。此外,evolve_preferences 也可以是条件性的,例如,只有当 preference_insights 足够重要时才触发更新。


第四部分:代码实现:关键模块与 LangGraph 实践

现在,让我们通过具体的代码示例来构建上述系统。我们将使用 Python、LangChain 和 LangGraph。

4.1 环境准备与依赖

首先,确保安装了必要的库:

pip install langchain langchain-openai langgraph pydantic

你还需要设置 OpenAI API 密钥:

import os
os.environ["OPENAI_API_KEY"] = "YOUR_OPENAI_API_KEY"

4.2 数据模型定义

我们使用 Pydantic 来定义数据模型,这有助于 LLM 生成结构化的输出。

from pydantic import BaseModel, Field
from typing import List, Dict, Optional
from datetime import datetime

class UserProfile(BaseModel):
    user_id: str
    name: str = "Unknown"
    age_group: Optional[str] = None
    location: Optional[str] = None
    interests: List[str] = Field(default_factory=list) # 泛兴趣标签
    preferred_genres: Dict[str, int] = Field(default_factory=dict) # 细分内容偏好,权重
    recent_activity_summary: str = "" # LLM总结的近期活动
    preference_history: List[Dict] = Field(default_factory=list) # 历史偏好快照
    last_updated: datetime = Field(default_factory=datetime.now)

class SessionEvent(BaseModel):
    event_id: str
    user_id: str
    timestamp: datetime = Field(default_factory=datetime.now)
    event_type: str # e.g., "query", "click", "purchase", "feedback"
    content: str # User input or item interacted with
    system_response: Optional[str] = None
    feedback: Optional[str] = None # e.g., "positive", "negative", "neutral" or raw text

class PreferenceInsights(BaseModel):
    new_interests: List[str] = Field(default_factory=list, description="新发现的用户兴趣标签")
    genre_adjustments: Dict[str, int] = Field(default_factory=dict, description="需要调整的细分偏好及其权重变化")
    summary_update: Optional[str] = Field(None, description="需要更新的用户近期活动总结")
    sentiment: Optional[str] = Field(None, description="用户在本次互动中的情绪倾向")

class SystemResponse(BaseModel):
    response_text: str
    recommendations: List[str] = Field(default_factory=list)
    strategy_summary: str = "" # LLM解释其策略

4.3 持久化层模拟 (简化版)

为了演示,我们将使用简单的内存字典来模拟数据库。在生产环境中,这将被替换为实际的数据库操作。

# 模拟数据库
_user_profiles_db: Dict[str, UserProfile] = {}
_session_history_db: Dict[str, List[SessionEvent]] = {}
_content_library_db: Dict[str, List[str]] = {
    "科幻电影": ["星际穿越", "沙丘", "流浪地球"],
    "历史小说": ["三体", "平凡的世界", "活着"], # 示例,实际应更精确
    "户外运动": ["登山装备推荐", "徒步路线攻略", "露营技巧"],
    "编程": ["Python进阶", "LangChain教程", "深度学习入门"],
    "美食": ["各地特色小吃", "家常菜谱", "米其林餐厅指南"]
}

def get_user_profile_from_db(user_id: str) -> Optional[UserProfile]:
    return _user_profiles_db.get(user_id)

def update_user_profile_in_db(user_id: str, profile: UserProfile):
    profile.last_updated = datetime.now()
    _user_profiles_db[user_id] = profile
    print(f"--- User profile for {user_id} updated: {profile.dict()}")

def record_session_event_to_db(event: SessionEvent):
    if event.user_id not in _session_history_db:
        _session_history_db[event.user_id] = []
    _session_history_db[event.user_id].append(event)
    print(f"--- Session event recorded for {event.user_id}: {event.content}")

def get_session_history_from_db(user_id: str, limit: int = 5) -> List[SessionEvent]:
    return _session_history_db.get(user_id, [])[-limit:]

def get_recommendations_from_content_library(interests: List[str], preferred_genres: Dict[str, int], limit: int = 3) -> List[str]:
    recs = []
    # 优先根据权重高的细分偏好推荐
    sorted_genres = sorted(preferred_genres.items(), key=lambda item: item[1], reverse=True)
    for genre, _ in sorted_genres:
        if genre in _content_library_db:
            recs.extend(_content_library_db[genre])
        if len(recs) >= limit:
            return recs[:limit]

    # 然后根据泛兴趣标签推荐
    for interest in interests:
        if interest in _content_library_db:
            recs.extend(_content_library_db[interest])
        if len(recs) >= limit:
            return recs[:limit]
    return list(set(recs))[:limit] # 去重并限制数量

# 初始化一些测试用户数据
_user_profiles_db["user_123"] = UserProfile(
    user_id="user_123",
    name="Alice",
    age_group="20-30岁",
    interests=["科幻电影", "美食"],
    preferred_genres={"电影": 5, "美食": 3}
)

4.4 工具函数实现

这些工具将封装与“数据库”交互的逻辑,供 LangGraph 中的 Agent 调用。

from langchain.tools import tool

@tool
def get_user_profile_tool(user_id: str) -> str:
    """获取指定用户的完整档案信息。输入为用户ID。"""
    profile = get_user_profile_from_db(user_id)
    return profile.json() if profile else "User profile not found."

@tool
def update_user_profile_tool(user_id: str, profile_json: str) -> str:
    """更新指定用户的档案信息。输入为用户ID和新的UserProfile的JSON字符串。"""
    try:
        updated_profile = UserProfile.parse_raw(profile_json)
        if updated_profile.user_id != user_id:
            return "Error: User ID in profile_json does not match provided user_id."
        update_user_profile_in_db(user_id, updated_profile)
        return "User profile updated successfully."
    except Exception as e:
        return f"Error updating user profile: {e}"

@tool
def record_session_event_tool(event_json: str) -> str:
    """记录一次会话事件。输入为SessionEvent的JSON字符串。"""
    try:
        event = SessionEvent.parse_raw(event_json)
        record_session_event_to_db(event)
        return "Session event recorded."
    except Exception as e:
        return f"Error recording session event: {e}"

@tool
def get_recent_session_history_tool(user_id: str, limit: int = 5) -> str:
    """获取用户最近的会话历史记录。输入为用户ID和限制条数。"""
    history = get_session_history_from_db(user_id, limit)
    return [event.dict() for event in history]

@tool
def get_recommendations_tool(user_id: str, query: str = "") -> str:
    """根据用户ID和可选查询,生成个性化推荐。此工具会查询用户档案。"""
    profile = get_user_profile_from_db(user_id)
    if not profile:
        return "User profile not found for recommendations."

    # 简单地将用户的兴趣和偏好转换为推荐查询
    all_interests = list(set(profile.interests + list(profile.preferred_genres.keys())))

    # 结合query进行更精确的搜索 (这里简化,实际可能需要更复杂的RAG)
    if query:
        # 尝试从query中提取新的兴趣
        temp_llm = ChatOpenAI(temperature=0)
        prompt = f"从用户查询 '{query}' 中提取与内容推荐相关的关键词或主题,用逗号分隔。如果没有,返回空字符串。"
        extracted_keywords = temp_llm.invoke(prompt).content.strip()
        if extracted_keywords:
            all_interests.extend([k.strip() for k in extracted_keywords.split(',') if k.strip()])
            all_interests = list(set(all_interests)) # 去重

    recs = get_recommendations_from_content_library(all_interests, profile.preferred_genres)
    return f"根据您的偏好,我推荐:{', '.join(recs)}" if recs else "暂时没有找到合适的推荐。"

4.5 LangGraph Agent 定义

我们将使用 OpenAI 的函数调用模型来驱动 Agent。

from langchain_openai import ChatOpenAI
from langchain.prompts import ChatPromptTemplate
from langchain.schema import AIMessage, HumanMessage
from langgraph.prebuilt import ToolExecutor, ToolNode

# 初始化 LLM
llm = ChatOpenAI(temperature=0, model="gpt-4o") # 推荐使用gpt-4o或更高的模型以获得更好的推理能力

# 组合所有工具
tools = [
    get_user_profile_tool,
    update_user_profile_tool,
    record_session_event_tool,
    get_recent_session_history_tool,
    get_recommendations_tool
]
tool_executor = ToolExecutor(tools)

# 定义 LangGraph 的图状态
class GraphState(BaseModel):
    user_id: str
    user_input: str
    session_event_id: str # 用于记录当前会话事件
    chat_history: List[Dict] = Field(default_factory=list) # 存储Agent间的对话历史
    preference_insights: Optional[PreferenceInsights] = None
    user_profile: Optional[UserProfile] = None
    system_response: Optional[SystemResponse] = None

# Agent 定义
# ----------------------------------------------------
# 会话分析器 Agent
class SessionAnalyzerAgent:
    def __init__(self, llm: ChatOpenAI, tools: list):
        self.llm = llm
        self.tools = tools
        self.prompt = ChatPromptTemplate.from_messages([
            ("system", "你是一个会话分析专家。你的任务是从用户的最新输入和最近的会话历史中提取关键信息和用户偏好洞察。"),
            ("system", "请特别关注用户提到或暗示的兴趣、对内容的情绪、以及可能导致偏好转变的迹象。"),
            ("system", "你必须使用提供的工具来获取历史数据,并以JSON格式输出偏好洞察,遵循PreferenceInsights Pydantic模型。"),
            ("user", "用户ID: {user_id}n最新输入: {user_input}n最近会话历史: {recent_history}n请分析并提取用户偏好洞察。"),
        ]).partial(
            tools=self.tools,
            tool_names=", ".join([t.name for t in self.tools]),
            format_instructions=PreferenceInsights.schema_json()
        )

    def run(self, state: GraphState) -> GraphState:
        print("n--- Running SessionAnalyzerAgent ---")
        # 记录当前事件
        current_event = SessionEvent(
            event_id=state.session_event_id,
            user_id=state.user_id,
            event_type="query",
            content=state.user_input
        )
        record_session_event_to_db(current_event) # 直接调用模拟DB函数,也可以通过tool

        # 获取历史,通过tool_executor调用
        recent_history_str = tool_executor.invoke({"tool_name": "get_recent_session_history_tool", "tool_input": {"user_id": state.user_id}})

        # LLM分析
        analysis_prompt = self.prompt.format_messages(
            user_id=state.user_id,
            user_input=state.user_input,
            recent_history=recent_history_str
        )

        response = self.llm.invoke(analysis_prompt)

        # 尝试解析LLM输出为PreferenceInsights
        try:
            insights = PreferenceInsights.parse_raw(response.content)
            state.preference_insights = insights
            print(f"Extracted Preference Insights: {insights.dict()}")
        except Exception as e:
            print(f"Warning: Could not parse preference insights: {e}. Raw LLM output: {response.content}")
            state.preference_insights = PreferenceInsights() # 确保状态不为空

        state.chat_history.append({"role": "assistant", "content": response.content})
        return state

# 偏好演进器 Agent
class PreferenceEvolverAgent:
    def __init__(self, llm: ChatOpenAI, tools: list):
        self.llm = llm
        self.tools = tools
        self.prompt = ChatPromptTemplate.from_messages([
            ("system", "你是一个用户偏好演进专家。你的任务是根据会话分析器提取的最新偏好洞察,结合用户当前的长期档案,更新用户的偏好轨迹。"),
            ("system", "请仔细考虑如何整合新的兴趣、调整细分偏好权重、更新近期活动总结,并记录偏好历史快照。"),
            ("system", "你必须使用提供的工具来获取和更新用户档案。输出应是一个完整的、更新后的UserProfile的JSON字符串。"),
            ("user", "用户ID: {user_id}n当前用户档案: {current_profile}n最新偏好洞察: {preference_insights}n请更新用户档案。"),
        ]).partial(
            tools=self.tools,
            tool_names=", ".join([t.name for t in self.tools]),
            format_instructions=UserProfile.schema_json()
        )

    def run(self, state: GraphState) -> GraphState:
        print("n--- Running PreferenceEvolverAgent ---")
        if not state.preference_insights:
            print("No preference insights to evolve. Skipping.")
            return state

        # 获取当前用户档案
        current_profile_str = tool_executor.invoke({"tool_name": "get_user_profile_tool", "tool_input": {"user_id": state.user_id}})
        current_profile = UserProfile.parse_raw(current_profile_str) if current_profile_str != "User profile not found." else UserProfile(user_id=state.user_id)

        # LLM更新档案
        evolve_prompt = self.prompt.format_messages(
            user_id=state.user_id,
            current_profile=current_profile.json(),
            preference_insights=state.preference_insights.json()
        )
        response = self.llm.invoke(evolve_prompt)

        try:
            updated_profile = UserProfile.parse_raw(response.content)

            # 记录历史快照
            current_profile.preference_history.append({
                "timestamp": datetime.now().isoformat(),
                "snapshot": {
                    "interests": current_profile.interests,
                    "preferred_genres": current_profile.preferred_genres,
                    "recent_activity_summary": current_profile.recent_activity_summary
                }
            })
            # 确保只保留N条历史记录
            max_history = 5
            updated_profile.preference_history = current_profile.preference_history[-max_history:] + updated_profile.preference_history[-max_history:]

            # 更新到数据库
            update_user_profile_in_db(state.user_id, updated_profile) # 直接调用模拟DB函数
            state.user_profile = updated_profile
            print(f"User profile evolved. Current interests: {updated_profile.interests}, genres: {updated_profile.preferred_genres}")
        except Exception as e:
            print(f"Warning: Could not parse updated user profile: {e}. Raw LLM output: {response.content}")
            state.user_profile = current_profile # 失败则回滚到当前档案

        state.chat_history.append({"role": "assistant", "content": response.content})
        return state

# 策略生成器 Agent
class StrategyGeneratorAgent:
    def __init__(self, llm: ChatOpenAI, tools: list):
        self.llm = llm
        self.tools = tools
        self.prompt = ChatPromptTemplate.from_messages([
            ("system", "你是一个智能推荐和交互策略生成器。根据用户最新的档案和当前输入,生成最合适的系统响应和推荐。"),
            ("system", "请利用用户档案中的兴趣和偏好来提供个性化推荐。如果用户明确寻求信息,优先回答问题。"),
            ("system", "你必须使用提供的工具来获取用户档案和生成推荐。输出应遵循SystemResponse Pydantic模型。"),
            ("user", "用户ID: {user_id}n最新用户档案: {user_profile}n用户输入: {user_input}n请生成系统响应和推荐。"),
        ]).partial(
            tools=self.tools,
            tool_names=", ".join([t.name for t in self.tools]),
            format_instructions=SystemResponse.schema_json()
        )

    def run(self, state: GraphState) -> GraphState:
        print("n--- Running StrategyGeneratorAgent ---")
        # 确保有最新的用户档案
        if not state.user_profile:
            profile_str = tool_executor.invoke({"tool_name": "get_user_profile_tool", "tool_input": {"user_id": state.user_id}})
            state.user_profile = UserProfile.parse_raw(profile_str) if profile_str != "User profile not found." else UserProfile(user_id=state.user_id)

        # LLM生成策略和响应
        strategy_prompt = self.prompt.format_messages(
            user_id=state.user_id,
            user_profile=state.user_profile.json(),
            user_input=state.user_input
        )
        response = self.llm.invoke(strategy_prompt)

        try:
            system_response = SystemResponse.parse_raw(response.content)
            # 如果LLM没有通过工具生成推荐,这里可以强制调用推荐工具
            if not system_response.recommendations:
                # 尝试用LLM的文本响应来生成推荐,或者直接调用推荐工具
                recs_from_tool_str = tool_executor.invoke({"tool_name": "get_recommendations_tool", "tool_input": {"user_id": state.user_id, "query": state.user_input}})
                # 简单解析工具输出,实际可能需要更智能的解析
                if "我推荐:" in recs_from_tool_str:
                    system_response.recommendations = [r.strip() for r in recs_from_tool_str.split("我推荐:")[1].split(',') if r.strip()]
                else:
                    system_response.recommendations = [recs_from_tool_str] # 如果是错误信息或无法解析,直接作为推荐

            state.system_response = system_response
            print(f"Generated System Response: {system_response.response_text}, Recommendations: {system_response.recommendations}")
        except Exception as e:
            print(f"Warning: Could not parse system response: {e}. Raw LLM output: {response.content}")
            state.system_response = SystemResponse(response_text="抱歉,我暂时无法生成有效的回复。", recommendations=[])

        state.chat_history.append({"role": "assistant", "content": response.content})
        return state

4.6 构建 LangGraph 流程

现在我们将这些 Agent 组织成一个 LangGraph 流程。

from langgraph.graph import StateGraph, END
from uuid import uuid4

# 初始化 Agents
session_analyzer_agent = SessionAnalyzerAgent(llm, tools)
preference_evolver_agent = PreferenceEvolverAgent(llm, tools)
strategy_generator_agent = StrategyGeneratorAgent(llm, tools)

# 构建图
workflow = StateGraph(GraphState)

# 添加节点
workflow.add_node("analyze_session", session_analyzer_agent.run)
workflow.add_node("evolve_preferences", preference_evolver_agent.run)
workflow.add_node("generate_strategy", strategy_generator_agent.run)

# 设置入口点
workflow.set_entry_point("analyze_session")

# 添加边
workflow.add_edge("analyze_session", "evolve_preferences")
workflow.add_edge("evolve_preferences", "generate_strategy")
workflow.add_edge("generate_strategy", END)

# 编译图
app = workflow.compile()

print("LangGraph workflow compiled successfully.")

4.7 运行与测试

让我们模拟用户互动,观察偏好如何演进。

# 模拟用户互动
user_id = "user_123"

def run_interaction(user_input: str):
    session_event_id = str(uuid4()) # 为每次互动生成唯一事件ID
    initial_state = GraphState(
        user_id=user_id,
        user_input=user_input,
        session_event_id=session_event_id
    )

    # 获取初始用户档案,如果不存在则创建
    if user_id not in _user_profiles_db:
        _user_profiles_db[user_id] = UserProfile(user_id=user_id, name="New User")
        print(f"--- Created new profile for {user_id} ---")

    # 运行图
    final_state = app.invoke(initial_state)

    # 打印最终响应
    if final_state.system_response:
        print(f"nSystem Response: {final_state.system_response.response_text}")
        if final_state.system_response.recommendations:
            print(f"Recommendations: {', '.join(final_state.system_response.recommendations)}")
        print(f"Strategy Summary: {final_state.system_response.strategy_summary}")
    else:
        print("nSystem did not generate a response.")

    # 打印当前用户档案,以观察演进
    current_profile = get_user_profile_from_db(user_id)
    if current_profile:
        print(f"n--- User {user_id} Current Profile ---")
        print(f"Interests: {current_profile.interests}")
        print(f"Preferred Genres: {current_profile.preferred_genres}")
        print(f"Recent Activity Summary: {current_profile.recent_activity_summary}")
        print(f"Preference History Length: {len(current_profile.preference_history)}")
    print("-" * 50)

# 第一次互动:用户表达对科幻电影的兴趣
run_interaction("我最近想看一些精彩的科幻电影,有什么推荐吗?")

# 第二次互动:用户提到对历史小说的兴趣
run_interaction("除了科幻,我还挺喜欢看历史题材的小说,有什么好书推荐?")

# 第三次互动:用户询问户外运动相关
run_interaction("我想周末去户外活动,有什么推荐的装备或者路线吗?")

# 第四次互动:用户再次提到电影,但语气积极
run_interaction("上次推荐的科幻电影很棒!还有类似的吗?")

# 第五次互动:用户表达对编程的兴趣
run_interaction("最近对学习编程很感兴趣,尤其是Python和AI,有什么好的学习资料?")

预期输出(部分模拟,实际LLM输出会有所不同):

--- Running SessionAnalyzerAgent ---
--- Session event recorded for user_123: 我最近想看一些精彩的科幻电影,有什么推荐吗?
Extracted Preference Insights: {'new_interests': ['科幻电影'], 'genre_adjustments': {'电影': 1}, 'summary_update': '用户对科幻电影表现出强烈兴趣', 'sentiment': 'positive'}

--- Running PreferenceEvolverAgent ---
--- User profile for user_123 updated: {'user_id': 'user_123', 'name': 'Alice', 'age_group': '20-30岁', 'location': None, 'interests': ['科幻电影', '美食'], 'preferred_genres': {'电影': 6, '美食': 3}, 'recent_activity_summary': '用户对科幻电影表现出强烈兴趣', 'preference_history': [...], 'last_updated': '...'}
User profile evolved. Current interests: ['科幻电影', '美食'], genres: {'电影': 6, '美食': 3}

--- Running StrategyGeneratorAgent ---
Generated System Response: 好的,根据您对科幻电影的兴趣,我为您推荐...
Recommendations: ['星际穿越', '沙丘', '流浪地球']
Strategy Summary: 根据用户对科幻电影的强烈偏好,提供了相关推荐。
--------------------------------------------------

--- Running SessionAnalyzerAgent ---
--- Session event recorded for user_123: 除了科幻,我还挺喜欢看历史题材的小说,有什么好书推荐?
Extracted Preference Insights: {'new_interests': ['历史小说'], 'genre_adjustments': {'书籍': 1}, 'summary_update': '用户对历史小说表现出兴趣', 'sentiment': 'positive'}

--- Running PreferenceEvolverAgent ---
--- User profile for user_123 updated: {'user_id': 'user_123', 'name': 'Alice', 'age_group': '20-30岁', 'location': None, 'interests': ['科幻电影', '美食', '历史小说'], 'preferred_genres': {'电影': 6, '美食': 3, '书籍': 4}, 'recent_activity_summary': '用户对历史小说表现出兴趣', 'preference_history': [...], 'last_updated': '...'}
User profile evolved. Current interests: ['科幻电影', '美食', '历史小说'], genres: {'电影': 6, '美食': 3, '书籍': 4}

--- Running StrategyGeneratorAgent ---
Generated System Response: 很高兴您喜欢历史小说!根据您的偏好,我推荐...
Recommendations: ['三体', '平凡的世界', '活着'] # 注意这里是模拟,'三体'可能是科幻,实际LLM会更准确
Strategy Summary: 结合用户对历史小说的兴趣,提供了相应书籍推荐。
--------------------------------------------------

... (后续互动类似)

通过这个流程,我们可以看到:

  1. SessionAnalyzerAgent 从用户输入中提取了新的兴趣和情绪。
  2. PreferenceEvolverAgent 根据这些洞察,更新了 UserProfile 中的 interestspreferred_genres,并记录了偏好快照。
  3. StrategyGeneratorAgent 利用更新后的 UserProfile 来生成更贴近用户当前和演进偏好的推荐。

这个系统通过 LangGraph 实现了:

  • 状态管理: GraphState 在 Agent 之间传递信息。
  • 工具使用: Agent 调用工具与模拟数据库交互,实现数据持久化。
  • Agent 协作: 各 Agent 承担不同职责,协同完成用户偏好演进的整个流程。
  • 持续记忆: UserProfile 的更新和 SessionEvent 的记录构成了系统的长期记忆。

4.8 添加 Checkpointer (可选但推荐)

为了实现真正的跨会话记忆,我们需要为 LangGraph 添加 Checkpointer。

from langgraph.checkpoint.sqlite import SqliteSaver

# 初始化 Checkpointer
memory = SqliteSaver.from_conn_string(":memory:") # 使用内存SQLite,生产环境请用文件路径或真实数据库

# 编译图时传入 Checkpointer
app_with_memory = workflow.compile(checkpointer=memory)

# 运行带有记忆的交互
def run_interaction_with_memory(user_id: str, user_input: str):
    session_event_id = str(uuid4())
    initial_state = GraphState(
        user_id=user_id,
        user_input=user_input,
        session_event_id=session_event_id
    )

    # LangGraph 会自动根据 config 中的 user_id 加载或创建检查点
    config = {"configurable": {"thread_id": user_id}} 

    # 获取初始用户档案,如果不存在则创建
    if user_id not in _user_profiles_db:
        _user_profiles_db[user_id] = UserProfile(user_id=user_id, name="New User")
        print(f"--- Created new profile for {user_id} ---")

    final_state = app_with_memory.invoke(initial_state, config=config)

    if final_state.system_response:
        print(f"nSystem Response: {final_state.system_response.response_text}")
        if final_state.system_response.recommendations:
            print(f"Recommendations: {', '.join(final_state.system_response.recommendations)}")
        print(f"Strategy Summary: {final_state.system_response.strategy_summary}")
    else:
        print("nSystem did not generate a response.")

    current_profile = get_user_profile_from_db(user_id)
    if current_profile:
        print(f"n--- User {user_id} Current Profile ---")
        print(f"Interests: {current_profile.interests}")
        print(f"Preferred Genres: {current_profile.preferred_genres}")
        print(f"Recent Activity Summary: {current_profile.recent_activity_summary}")
        print(f"Preference History Length: {len(current_profile.preference_history)}")
    print("-" * 50)

print("n--- Running Interactions with Checkpointer ---")
run_interaction_with_memory("user_456", "我是一个新用户,很喜欢看电影,特别是动作片。")
run_interaction_with_memory("user_456", "有没有什么好看的科幻电影推荐?") # 这次互动会基于上次的记忆
run_interaction_with_memory("user_456", "最近工作压力大,想读点轻松的小说。") # 偏好开始演进

通过 Checkpointer,即使应用程序重启,只要 thread_id 相同,LangGraph 就能恢复之前的状态,使得 Agent 能够基于历史状态继续工作,从而真正实现了持续记忆。


第五部分:优化与高级话题

我们已经构建了一个基础的用户偏好演进记忆系统。为了使其在实际场景中更健壮、更智能,我们还需要考虑以下优化和高级话题。

5.1 增量学习与遗忘机制

  • 增量学习: 偏好不应一次性剧烈改变,而是根据新的证据逐步调整。这可以通过在 PreferenceEvolverAgent 中为每次偏好调整设置一个“学习率”或“置信度”来实现,新的洞察只对现有偏好产生一定比例的影响。
  • 遗忘机制/偏好衰减: 用户的旧偏好会逐渐变得不那么重要。可以引入时间衰减因子,例如,N个月前的互动对当前偏好的影响权重更低。对于长时间未提及或互动的内容,其偏好权重可以缓慢降低。这可以防止用户档案无限膨胀,并保持其相关性。
  • 突变检测: 某些重大事件(如用户明确表示“我不再喜欢X了”)可能导致偏好突然改变。SessionAnalyzerAgent 可以被训练来识别这些“突变信号”,并通知 PreferenceEvolverAgent 进行更大幅度的调整。

5.2 多模态偏好

目前我们的系统主要处理文本输入。未来可以扩展到支持多模态偏好:

  • 图像/视频: 分析用户观看的视频内容、点赞的图片风格等,提取视觉偏好。
  • 语音: 分析用户语音中的情感、语速、语调,作为偏好洞察的补充。
  • 行为模式: 分析用户在应用内的点击路径、停留时间、滑动习惯等非文本行为,这些是重要的隐式偏好信号。

这需要集成额外的工具和 Agent,例如图像识别模型、语音情感分析模型,并将这些多模态信息转化为 PreferenceInsights

5.3 可解释性与透明度

在智能系统中,用户往往希望了解为什么系统会给出某个推荐或响应。

  • Agent 思想链: LangGraph 的 Agent 在做决策时通常会生成一个“思考过程”(thought process)。我们可以将这个过程作为 SystemResponse 的一部分,向用户解释推荐的依据(例如:“根据您过去对科幻电影和历史小说的兴趣,我推荐了…”)。
  • 偏好溯源: 提供一个界面,让用户查看其 UserProfile 中的关键偏好,以及这些偏好是如何从历史互动中演变而来的(通过 preference_history 字段)。

5.4 安全性与隐私

用户数据,尤其是偏好数据,涉及个人隐私。

  • 数据匿名化与假名化: 在可能的情况下,对敏感数据进行匿名化或假名化处理。
  • 访问控制: 严格限制对用户档案数据的访问权限。
  • 数据最小化: 只收集和存储必要的偏好数据。
  • 用户控制: 允许用户查看、修改或删除其偏好数据。

5.5 评估指标

如何衡量一个长期关系建模系统的成功?

  • 用户满意度: 最直接的指标,可以通过问卷、评分等方式获取。
  • 推荐准确率/相关性: 衡量推荐内容与用户实际兴趣的匹配程度。
  • 点击率 (CTR) / 转化率: 推荐的有效性。
  • 用户留存率 / 互动时长: 长期关系的体现。
  • 偏好更新频率与准确性: 衡量系统捕捉偏好演进的能力。

5.6 异步处理与扩展性

在生产环境中,系统需要处理大量并发请求。

  • 异步 LangGraph: LangGraph 支持异步执行,可以使用 asyncio 来构建异步 Agent 和工具,提高吞吐量。
  • 分布式存储: 将用户档案和会话历史存储在分布式数据库中,如 Cassandra、DynamoDB 等,以支持高并发读写。
  • Agent 服务的微服务化: 将每个 Agent 部署为独立的微服务,便于扩展和管理。

5.7 Agent 协作的复杂性

随着 Agent 数量和复杂度的增加,可能会出现:

  • 循环依赖: Agent 之间不恰当的条件边可能导致无限循环。
  • 状态冲突: 多个 Agent 同时尝试修改 GraphState 的同一部分,可能导致不一致。
  • 决策僵局: Agent 无法决定下一步行动,或陷入低效的决策循环。

这些问题需要通过精心设计的图结构、明确的 Agent 职责、以及在必要时引入仲裁 Agent 来解决。


第六部分:展望未来:智能系统的进化之路

我们今天探讨的“用户偏好演进轨迹”的持续记忆系统,仅仅是智能系统迈向真正智能化的一个缩影。将 LangGraph 这样的框架与 LLMs 结合,为我们构建更复杂、更动态、更具适应性的 Agentic 系统打开了大门。

未来,我们可以预见:

  1. 更深层次的语义理解: LLMs 将继续在理解复杂语境、隐含意图和细微情感方面取得突破,使偏好提取更加精准。
  2. 自适应学习循环: 系统将不仅能学习用户偏好,还能学习如何更有效地学习偏好,甚至调整自身的 Agent 结构以优化学习过程。
  3. 多 Agent 协作的范式转变: 像 LangGraph 这样的框架将成为构建复杂 AI 系统的标准工具,Agent 之间将形成更复杂、更智能的协作网络。
  4. 实时个性化与预测: 系统能够在毫秒级内响应用户,并基于其演进的偏好,主动预测用户未来的需求和行为。

构建一个能够理解并适应用户长期关系的智能系统,无疑是一项长期而激动人心的工程。它要求我们在技术、架构和伦理层面持续探索和创新。LangGraph 为我们提供了一个强大的起点,让我们能够将这些复杂的概念付诸实践,向着更智能、更人性化的 AI 体验迈进。


通过今天的讲座,我们深入探讨了如何利用 LangGraph 框架,构建一个能够追踪用户偏好演进轨迹的持续记忆系统。我们剖析了传统 AI 系统的记忆局限,介绍了 LangGraph 的核心能力,并设计了一个具体的 Agent 协作架构,辅以详细的代码示例,展示了如何从用户互动中提取、演进和利用个性化偏好。这不仅提升了系统的智能化水平,也为未来更高级的个性化服务奠定了坚实基础。

发表回复

您的邮箱地址不会被公开。 必填项已用 * 标注