各位来宾,各位技术同仁,大家下午好!
今天,我们齐聚一堂,共同探讨一个在现代AI系统,尤其是智能体(Agentic Systems)构建中日益凸显的关键概念:非有向无环图工作流(Non-DAG Workflows)。具体来说,我们将深入解析LangGraph这一新兴框架,是如何在数学本质上处理这些带有反馈环的复杂系统。我将以一名编程专家的视角,为大家带来一场关于LangGraph中非DAG工作流的深度剖析。
第一章:有向无环图(DAG)的局限性及其在AI工作流中的挑战
在计算机科学和工程领域,有向无环图(Directed Acyclic Graph, DAG)是一种极其常见且强大的数据结构。它由一组节点和有方向的边组成,并且图中不包含任何循环。这意味着从任何一个节点出发,你都不可能通过沿着边移动回到该节点。
DAG的优势显而易见:
- 确定性与可预测性: 由于没有循环,任务的执行顺序是确定的,不会出现无限循环。
- 并行性: 独立或不依赖于其他任务的任务可以并行执行,这在数据处理管道(如Apache Airflow, Apache Spark的执行计划)和构建系统(如Makefiles, Bazel)中非常有用。
- 易于理解与调试: 流程清晰,数据流向单一,更容易跟踪和排查问题。
然而,当我们将目光投向构建真正智能、自适应的AI系统,特别是基于大型语言模型(LLM)的智能体时,DAG的局限性便浮现出来。这些智能体往往需要:
- 迭代与修正: 智能体需要根据执行结果进行自我评估,并决定是否需要重新规划、重新尝试或修正先前的行动。
- 反馈循环: 智能体可能需要从环境、用户或自身内部状态获取反馈,并据此调整其行为。例如,一个RAG(检索增强生成)系统可能需要评估检索到的文档是否相关,如果否,则重新检索;或者评估生成的答案是否满意,如果否,则重新生成或寻求更多信息。
- 多轮对话与规划: 在多轮对话中,每一轮的回复都依赖于历史对话状态;在复杂的规划任务中,智能体可能需要尝试多种方案,评估其可行性,然后选择或调整方案。
所有这些场景都天然地包含了循环(Cycles)。智能体在执行某个动作后,会再次回到一个决策点,根据新的状态和反馈来决定下一步,这正是非DAG工作流的核心特征。传统的DAG框架在处理这种“感知-思考-行动-感知”的循环模式时显得力不从心,因为它们无法直接建模这种需要反复访问相同处理阶段的流程。
第二章:非DAG工作流:概念与LangGraph的诞生
2.1 非DAG工作流的定义与必要性
顾名思义,非DAG工作流是指那些包含一个或多个循环的图结构。在这些图中,你可以从某个节点出发,通过一系列有向边,最终回到该节点。
为什么非DAG工作流在AI领域如此重要?
它提供了一种构建具有以下能力的智能体的方法:
- 自适应性: 能够根据运行时条件动态调整行为。
- 鲁棒性: 能够从错误中恢复,进行自我修正。
- 复杂推理: 能够进行迭代式思考、多步骤规划和深度探索。
- 交互性: 能够与用户或环境进行多轮、有状态的交互。
例如,一个复杂的LLM代理可能需要:
- 规划 (Plan): 根据用户请求生成一个初始计划。
- 执行 (Execute): 按照计划执行一个或多个工具调用。
- 观察 (Observe): 获取工具调用的结果。
- 反思 (Reflect): 评估执行结果是否符合预期,是否需要修正计划。
- 循环: 如果需要修正,则回到“规划”步骤,更新计划;如果完成,则退出。
这显然是一个循环。为了有效地建模和执行这种流程,我们需要一个能够原生支持非DAG结构的工作流框架。
2.2 LangGraph的诞生:连接LLM与图结构
LangGraph正是为了解决这一痛点而诞生的。它是LangChain生态系统的一部分,旨在帮助开发者构建具有高度自主性和复杂行为的LLM驱动智能体。LangGraph的核心思想是将智能体的行为建模为一个状态图(State Graph)。
与传统的DAG不同,LangGraph中的图具有:
- 共享的可变状态(Shared Mutable State): 节点之间不只是传递数据,而是共享和更新一个全局状态。
- 条件性边(Conditional Edges): 边的选择不再是固定的,而是根据当前状态动态决定的,这正是实现循环的关键。
通过这两种机制,LangGraph使得构建能够“感知-思考-行动-反思”的智能体成为可能,而这些智能体的工作流往往是非DAG的。
第三章:LangGraph的核心机制:状态、节点与条件边
要理解LangGraph如何处理非DAG工作流,我们必须深入其核心构建块:状态、节点和条件边。
3.1 状态 (State)
在LangGraph中,状态是整个工作流的“记忆”和“上下文”。它是一个Python字典或TypedDict的实例,包含了智能体在当前时刻的所有相关信息。每个节点执行后,都可以对这个状态进行更新。
数学本质: 状态可以被视为一个状态空间(State Space)S中的一个点。工作流的每一步操作,都是将当前状态从s_k转换为s_{k+1}。
LangGraph通过StateGraph类来定义状态。你需要指定一个TypedDict来表示你的状态结构,并且最重要的是,你需要为每个状态字段定义一个reducer(合并函数)。当一个节点返回一个部分状态更新时,这个reducer函数决定了如何将这个部分更新与现有状态合并。
示例:定义一个RAG代理的状态
from typing import TypedDict, List, Optional
import operator
# 定义一个简单的AgentState,用于RAG工作流
class AgentState(TypedDict):
"""
RAG代理的共享状态。
Attributes:
question: 用户提出的问题。
documents: 从检索器获取的文档列表。
generation: LLM生成的答案。
feedback: 用户或系统对生成答案的反馈。
iteration_count: 记录迭代次数,用于控制循环。
need_re_retrieval: 是否需要重新检索。
need_re_generation: 是否需要重新生成。
"""
question: str
documents: List[str]
generation: Optional[str]
feedback: Optional[str]
iteration_count: int
need_re_retrieval: bool
need_re_generation: bool
# 定义状态的reducer函数
# LangGraph使用这些函数来合并节点返回的部分状态更新
# operator.add 用于列表的拼接,operator.or 用于字典的合并(这里简化处理)
# 对于更复杂的类型,需要自定义合并逻辑
def reduce_state(current_state: AgentState, new_update: dict) -> AgentState:
"""
一个简单的状态合并函数,用于演示。
LangGraph通常会为TypedDict的每个字段自动生成合并逻辑,
或者我们可以通过StateGraph的`.add_node`或`.add_edge`来指定。
对于列表,通常是追加;对于简单字段,通常是覆盖。
"""
merged_state = current_state.copy()
for key, value in new_update.items():
if key == 'documents' and isinstance(value, list) and isinstance(merged_state.get(key), list):
# 对于文档列表,通常是追加或者完全替换,这里我们选择替换
merged_state[key] = value
elif key == 'iteration_count' and isinstance(value, int) and isinstance(merged_state.get(key), int):
merged_state[key] = merged_state.get(key, 0) + value
else:
merged_state[key] = value
return merged_state
# 在实际LangGraph中,我们会这样定义reducer
# 例如,对于列表,我们可能希望追加;对于字符串,我们希望覆盖
# StateGraph的构造函数或者add_node/add_edge方法允许你为每个key指定reducer
# 例如:StateGraph(AgentState).add_node("retrieve", retrieve_node, reducer={"documents": lambda x, y: x + y})
# 但更常见的是,LangGraph会根据TypedDict的定义和默认行为进行合并。
# 对于TypedDict,默认行为通常是:
# - 如果键是列表,使用 operator.add (列表拼接)
# - 如果键是字典,使用 operator.or (字典合并)
# - 否则,新值覆盖旧值
# 让我们假设我们使用LangGraph的默认reducer行为,
# 或者在构建图时明确指定它们。
# 例如,对于`documents`字段,我们可能希望每次检索都是一个新的集合,所以会覆盖旧的。
# 对于`iteration_count`,我们希望是累加。
# 对于`generation`和`feedback`,我们希望是覆盖。
3.2 节点 (Nodes)
节点是工作流中的基本计算单元。每个节点都是一个Python函数或可调用对象,它接收当前状态作为输入,执行一些操作(例如调用LLM、使用工具、执行业务逻辑),然后返回一个部分状态更新(一个字典),这个更新将按照前面定义的reducer规则合并到全局状态中。
数学本质: 每个节点n都可以被看作一个状态转换函数f_n: S -> S',它将当前状态s映射到一个新的部分状态s'。当s'与原状态合并后,得到新的全局状态s_{k+1}。
from langchain_core.messages import HumanMessage
from langchain_core.runnables import RunnableConfig
from langchain_openai import ChatOpenAI
from langchain_core.output_parsers import StrOutputParser
# 假设已经配置了OpenAI API Key
# os.environ["OPENAI_API_KEY"] = "YOUR_API_KEY"
llm = ChatOpenAI(model="gpt-4o", temperature=0)
# 模拟一个检索器
def retrieve(state: AgentState, config: Optional[RunnableConfig] = None) -> dict:
print("---NODE: RETRIEVE---")
question = state["question"]
# 模拟从数据库或API检索文档
if "LangGraph" in question:
docs = ["LangGraph是LangChain生态系统的一部分,用于构建有状态的、自适应的LLM代理。",
"它允许你定义图结构,其中包含节点和条件边,以处理复杂的逻辑和反馈循环。",
"LangGraph的核心在于其对共享状态和非DAG工作流的支持。"]
elif "Python" in question:
docs = ["Python是一种高级的、解释型的、通用的编程语言。",
"它以其简洁的语法和强大的库生态系统而闻名。"]
else:
docs = ["没有找到相关文档。"]
return {"documents": docs, "need_re_retrieval": False} # 假设这次检索是成功的
# 模拟一个文档评分器
def grade_documents(state: AgentState, config: Optional[RunnableConfig] = None) -> dict:
print("---NODE: GRADE DOCUMENTS---")
question = state["question"]
documents = state["documents"]
# 模拟LLM调用来判断文档相关性
prompt = f"""
鉴于以下用户问题和检索到的文档,请判断文档是否与问题相关。
如果相关,返回 'relevant'。如果不相关,返回 'not_relevant'。
问题: {question}
文档: {' '.join(documents)}
"""
response = llm.invoke([HumanMessage(content=prompt)]).content
if "relevant" in response.lower():
print("---文档相关---")
return {"need_re_retrieval": False}
else:
print("---文档不相关,需要重新检索---")
return {"need_re_retrieval": True, "documents": []} # 清空文档,要求重新检索
# 模拟一个答案生成器
def generate(state: AgentState, config: Optional[RunnableConfig] = None) -> dict:
print("---NODE: GENERATE---")
question = state["question"]
documents = state["documents"]
if not documents:
return {"generation": "对不起,我没有找到足够的信息来回答这个问题。", "need_re_generation": False}
prompt = f"""
基于以下文档,请清晰、简洁地回答用户问题。
问题: {question}
文档: {' '.join(documents)}
答案:
"""
response = llm.invoke([HumanMessage(content=prompt)]).content
return {"generation": response, "need_re_generation": False}
# 模拟一个答案质量评估器 (基于文档)
def grade_generation_with_documents(state: AgentState, config: Optional[RunnableConfig] = None) -> dict:
print("---NODE: GRADE GENERATION WITH DOCUMENTS---")
question = state["question"]
documents = state["documents"]
generation = state["generation"]
if not generation or "对不起" in generation: # 如果生成失败,直接标记为不满意
return {"need_re_generation": True}
# 模拟LLM调用来判断生成答案是否满意
prompt = f"""
鉴于以下用户问题、检索到的文档和生成的答案,请判断生成的答案是否充分回答了问题,并且与文档内容一致。
如果满意,返回 'satisfactory'。如果不满意,返回 'not_satisfactory'。
问题: {question}
文档: {' '.join(documents)}
答案: {generation}
"""
response = llm.invoke([HumanMessage(content=prompt)]).content
if "satisfactory" in response.lower():
print("---答案生成满意---")
return {"need_re_generation": False}
else:
print("---答案生成不满意,需要重新生成---")
return {"need_re_generation": True}
# 模拟一个用户反馈节点 (或者更复杂的系统反馈)
def get_user_feedback(state: AgentState, config: Optional[RunnableConfig] = None) -> dict:
print("---NODE: GET USER FEEDBACK---")
current_generation = state["generation"]
question = state["question"]
# 真实场景中,这里会等待用户输入或从其他系统获取反馈
# 为了演示,我们模拟一个简单的反馈逻辑
if "LangGraph" in question and "LangGraph" in current_generation:
print("---模拟用户反馈:满意---")
return {"feedback": "satisfactory"}
else:
print("---模拟用户反馈:不满意,答案不够全面---")
return {"feedback": "not_satisfactory"}
3.3 条件边 (Conditional Edges)
条件边是实现非DAG工作流,即引入循环的关键。它不是简单地从节点A到节点B,而是根据当前状态中的某个或某些值,动态地决定接下来应该跳转到哪个节点。
数学本质: 条件边可以被看作一个决策函数g: S -> N,它接收当前状态s作为输入,并输出下一个要执行的节点n(或者一组节点,或者一个特殊指令如__end__表示终止)。这个函数决定了在状态空间中的下一步路径。
LangGraph通过add_conditional_edges方法来实现。你需要提供:
- 源节点: 决策发生的地方。
- 条件函数: 一个接收当前状态作为输入,并返回一个字符串(表示下一个节点名)的函数。
- 分支映射: 一个字典,将条件函数返回的字符串映射到实际的目标节点。
示例:一个循环决策节点
假设我们有一个decide_to_continue节点,它根据feedback和iteration_count来决定下一步。
# 定义条件函数
def decide_to_continue(state: AgentState, config: Optional[RunnableConfig] = None) -> str:
print("---DECIDE TO CONTINUE---")
iteration_count = state.get("iteration_count", 0)
# 增加迭代次数
state["iteration_count"] = iteration_count + 1 # 直接修改state,或者通过返回字典
# 简单的循环控制,防止无限循环
if state["iteration_count"] > 3:
print("---达到最大迭代次数,终止---")
return "end"
if state["feedback"] == "satisfactory" and not state["need_re_generation"]:
print("---所有条件满意,流程结束---")
return "end"
if state["need_re_retrieval"]:
print("---需要重新检索---")
return "retrieve" # 返回节点名
if state["need_re_generation"]:
print("---需要重新生成---")
return "generate" # 返回节点名
# 如果既不需要重新检索也不需要重新生成,但用户不满意,
# 可能是文档不够,或者生成不够好,这里我们简化,再次尝试生成
if state["feedback"] == "not_satisfactory":
print("---用户不满意,尝试重新生成---")
return "generate" # 尝试重新生成
print("---未知情况,终止---") # 兜底,防止意外无限循环
return "end"
通过add_conditional_edges,我们可以将decide_to_continue节点的输出映射到retrieve、generate或__end__。
第四章:非DAG工作流的数学本质:状态机、不动点与图论
LangGraph处理非DAG工作流的能力,并非空中楼阁,其背后蕴含着深厚的数学和计算机科学原理。我们可以从几个不同的角度来理解其数学本质。
4.1 有限状态机(FSM)的拓展
经典FSM:
一个经典的有限状态机定义为五元组 (Q, Σ, δ, q₀, F):
Q: 有限状态集合。Σ: 输入字母表。δ: 状态转移函数Q × Σ → Q。q₀: 初始状态。F: 终止状态集合。
FSM的核心在于:在任何给定时刻,系统都处于一个确定的状态,并根据接收到的输入,通过状态转移函数决定下一个状态。
LangGraph作为广义FSM:
LangGraph可以被视为一个高度泛化和扩展的FSM:
- 状态
Q(LangGraph的AgentState): 不再是简单的枚举值,而是一个结构化的、可变的字典(TypedDict)。这使得状态能够承载丰富的信息,如文档列表、生成文本、迭代计数等。这实际上是将FSM中的“状态”概念从离散的、原子性的状态扩展到了一个连续的、高维的状态空间。 - 输入
Σ(LangGraph的节点输出与内部逻辑): 并非外部输入事件,而是由图中的节点(Actions)执行计算后产生的部分状态更新。每个节点都是一个局部的状态转换器。 - 状态转移函数
δ(LangGraph的reducer和条件边):reducer: 定义了如何将节点返回的部分状态更新合并到全局状态中。这可以看作是状态空间中的一个“局部步进”操作。- 条件边函数: 这是最关键的部分。它扮演了FSM中
δ的核心决策角色,但不是基于外部输入,而是完全基于当前的内部状态s来决定下一步要执行的节点(下一个状态)。这可以被形式化为g: S → N,其中N是节点集合。
通过这种扩展,LangGraph允许我们在一个巨大的、细粒度的状态空间中进行导航,并根据复杂的状态逻辑动态地选择路径,从而实现循环。
4.2 不动点(Fixed Point)迭代与收敛
在许多迭代算法中,我们寻求一个“不动点”——即经过一次或多次变换后,结果不再发生变化的状态。虽然LangGraph不强制实现不动点,但其循环结构与不动点迭代有着深刻的联系。
数学形式:
考虑一个工作流循环,它从一个状态 s_k 开始,经过一系列节点和条件边的处理,最终返回到一个决策点,产生新的状态 s_{k+1}。我们可以将整个循环过程抽象为一个复合的状态转换算子 T:
s_{k+1} = T(s_k)
这里的T是一个由多个节点函数f_n和条件决策函数g组合而成的复杂映射。
循环终止:
一个非DAG工作流的循环必须有终止条件,否则会陷入无限循环。在LangGraph中,终止条件通常由条件边函数g来决定,当g(s_k)返回一个特殊的终止指令(例如__end__)时,循环结束。
这意味着我们正在寻找一个状态s*,使得在s*上执行T后,条件函数g会引导我们退出循环。从某种意义上说,我们期望系统能够达到一个“稳定”或“满意”的状态,即一个“可接受的不动点”,在这个点上,进一步的迭代不再必要或不再有益。
在RAG自修正的例子中:
T可能包括检索、评估文档、生成答案、评估答案、获取反馈等一系列步骤。- 循环的目的是让
feedback变为satisfactory且need_re_generation为False。 - 当达到这个状态时,
decide_to_continue函数会返回"end",从而终止循环。
因此,LangGraph的执行过程可以看作是一个在状态空间中寻找特定终止条件的迭代搜索过程。开发者需要精心设计条件边函数和节点逻辑,以确保系统能够:
- 收敛: 状态能够逐步向终止条件靠拢。
- 终止: 最终能够满足某个条件并退出循环,避免无限执行。
这通常通过引入迭代计数限制、状态变化检测或明确的成功/失败标志来实现。
4.3 图论视角
从纯粹的图论角度看,LangGraph允许我们定义一个包含有向环的图。
- 节点: 图的顶点,代表计算步骤。
- 边: 图的边,代表控制流。
传统的图遍历算法(如深度优先搜索DFS、广度优先搜索BFS)通常用于DAG。在有环图中,它们需要额外的机制(如访问标记)来避免无限循环。
LangGraph的StateGraph在运行时,实际上是在这个有环的定义图上执行一系列有状态的路径遍历。对于单次执行而言,从起点到终点的实际执行路径(trace)仍然是一个DAG,因为它记录了每次访问节点时的特定状态和决策。但底层的图结构(schema)本身是允许循环的。
通过将控制流逻辑内嵌到状态和条件函数中,LangGraph有效地将传统的静态图结构转换为了一个动态的、行为驱动的系统。每当决策函数被调用时,它实际上是根据当前AgentState这个高维向量,在图的邻接矩阵或邻接表中进行动态查找,决定下一跳。
总结表格:
| 特性 | DAG工作流 | 非DAG工作流(LangGraph) | 数学/CS 对应关系 |
|---|---|---|---|
| 结构 | 无循环的有向图 | 包含循环的有向图 | 图论:无环图 vs. 有环图 |
| 数据流 | 单向,数据传递 | 共享可变状态,数据修改与传递并存 | 状态空间中的点与局部转换 |
| 控制流 | 静态,预定义 | 动态,基于运行时状态的条件判断 | 广义FSM的转移函数、决策函数 g: S → N |
| 执行模式 | 线性或分支,一次性完成 | 迭代,可重复访问相同节点 | 迭代算法,寻找不动点或终止条件 |
| 复杂性 | 易于理解,可并行化 | 建模复杂交互与自适应行为,终止需谨慎 | 状态空间搜索,收敛性与终止性 |
| AI应用 | 数据处理、特征工程 | 智能体、自修正RAG、多轮对话、规划 | 行为驱动、自主决策的系统 |
第五章:代码实践:构建一个带反馈环的RAG代理
现在,让我们将这些理论付诸实践,构建一个更复杂的RAG代理。这个代理将包含多个反馈循环:
- 文档相关性检查: 如果检索到的文档与问题不相关,代理会尝试重新检索。
- 答案质量检查: 如果生成的答案质量不佳(与文档不符),代理会尝试重新生成。
- 用户反馈循环: 代理会模拟获取用户反馈,如果用户不满意,代理会尝试改进答案(例如,重新生成)。
- 迭代次数限制: 防止无限循环。
这个例子将充分展示LangGraph如何利用状态和条件边来构建一个非DAG工作流。
import os
from typing import TypedDict, List, Optional
from langchain_core.messages import HumanMessage
from langchain_core.runnables import RunnableConfig
from langchain_openai import ChatOpenAI
from langgraph.graph import StateGraph, END # 导入END来表示终止
# 假设已经配置了OpenAI API Key
# os.environ["OPENAI_API_KEY"] = "YOUR_API_KEY"
llm = ChatOpenAI(model="gpt-4o", temperature=0)
# 1. 定义 AgentState
class AgentState(TypedDict):
question: str
documents: List[str]
generation: Optional[str]
feedback: Optional[str]
iteration_count: int
need_re_retrieval: bool # 是否需要重新检索
need_re_generation: bool # 是否需要重新生成
# 2. 定义 Node 函数
# 这些函数在前面已经定义,这里为了完整性再次列出,并稍作调整以适应AgentState的变更
# 节点函数应该返回一个字典,其键是AgentState的键,值是需要更新的部分状态。
# 检索器节点
def retrieve(state: AgentState, config: Optional[RunnableConfig] = None) -> dict:
print(f"---NODE: RETRIEVE (Iteration: {state['iteration_count']})---")
question = state["question"]
# 模拟从数据库或API检索文档
# 为了演示,根据问题内容返回不同的文档
if "LangGraph" in question:
docs = ["LangGraph是LangChain生态系统的一部分,用于构建有状态的、自适应的LLM代理。",
"它允许你定义图结构,其中包含节点和条件边,以处理复杂的逻辑和反馈循环。",
"LangGraph的核心在于其对共享状态和非DAG工作流的支持。"]
elif "Python" in question:
docs = ["Python是一种高级的、解释型的、通用的编程语言。",
"它以其简洁的语法和强大的库生态系统而闻名。"]
else:
docs = ["没有找到相关文档。"]
print(f" 检索到文档: {docs[:1]}...")
return {"documents": docs, "need_re_retrieval": False} # 假设检索完成,不需要重新检索
# 文档评分器节点
def grade_documents(state: AgentState, config: Optional[RunnableConfig] = None) -> dict:
print(f"---NODE: GRADE DOCUMENTS (Iteration: {state['iteration_count']})---")
question = state["question"]
documents = state["documents"]
if not documents:
print(" 没有文档可评分,标记为不相关。")
return {"need_re_retrieval": True} # 没有文档,直接标记为不相关,需要重新检索
# 模拟LLM调用来判断文档相关性
prompt = f"""
鉴于以下用户问题和检索到的文档,请判断文档是否与问题相关。
如果相关,返回 'relevant'。如果不相关,返回 'not_relevant'。
问题: {question}
文档: {' '.join(documents)}
"""
response = llm.invoke([HumanMessage(content=prompt)]).content
if "relevant" in response.lower():
print(" 文档被评定为相关。")
return {"need_re_retrieval": False}
else:
print(" 文档被评定为不相关,需要重新检索。")
return {"need_re_retrieval": True, "documents": []} # 清空文档,要求重新检索
# 答案生成器节点
def generate(state: AgentState, config: Optional[RunnableConfig] = None) -> dict:
print(f"---NODE: GENERATE (Iteration: {state['iteration_count']})---")
question = state["question"]
documents = state["documents"]
if not documents:
print(" 没有文档,无法生成答案。")
return {"generation": "对不起,我没有找到足够的信息来回答这个问题。", "need_re_generation": False}
prompt = f"""
基于以下文档,请清晰、简洁地回答用户问题。
问题: {question}
文档: {' '.join(documents)}
答案:
"""
response = llm.invoke([HumanMessage(content=prompt)]).content
print(f" 生成答案: {response[:50]}...")
return {"generation": response, "need_re_generation": False} # 假设生成完成,不需要重新生成
# 答案质量评估器节点 (基于文档)
def grade_generation_with_documents(state: AgentState, config: Optional[RunnableConfig] = None) -> dict:
print(f"---NODE: GRADE GENERATION WITH DOCUMENTS (Iteration: {state['iteration_count']})---")
question = state["question"]
documents = state["documents"]
generation = state["generation"]
if not generation or "对不起" in generation: # 如果生成失败,直接标记为不满意
print(" 生成内容为空或为错误信息,标记为不满意。")
return {"need_re_generation": True}
# 模拟LLM调用来判断生成答案是否满意
prompt = f"""
鉴于以下用户问题、检索到的文档和生成的答案,请判断生成的答案是否充分回答了问题,并且与文档内容一致。
如果满意,返回 'satisfactory'。如果不满意,返回 'not_satisfactory'。
问题: {question}
文档: {' '.join(documents)}
答案: {generation}
"""
response = llm.invoke([HumanMessage(content=prompt)]).content
if "satisfactory" in response.lower():
print(" 答案生成被评定为满意。")
return {"need_re_generation": False}
else:
print(" 答案生成被评定为不满意,需要重新生成。")
return {"need_re_generation": True}
# 模拟用户反馈节点 (或者更复杂的系统反馈)
# 为了演示方便,这里简化为根据问题和生成内容判断
def get_user_feedback(state: AgentState, config: Optional[RunnableConfig] = None) -> dict:
print(f"---NODE: GET USER FEEDBACK (Iteration: {state['iteration_count']})---")
current_generation = state["generation"]
question = state["question"]
# 真实场景中,这里会等待用户输入或从其他系统获取反馈
# 为了演示,我们模拟一个简单的反馈逻辑
if "LangGraph" in question and "LangGraph" in str(current_generation):
print(" 模拟用户反馈:满意。")
return {"feedback": "satisfactory"}
else:
print(" 模拟用户反馈:不满意,答案不够全面或不准确。")
return {"feedback": "not_satisfactory"}
# 3. 定义条件边函数
def decide_to_continue(state: AgentState, config: Optional[RunnableConfig] = None) -> str:
print(f"---DECISION NODE: DECIDE TO CONTINUE (Iteration: {state['iteration_count']})---")
# 增加迭代次数
current_iteration_count = state.get("iteration_count", 0)
# LangGraph的reducer会在节点执行后合并返回的状态,这里直接修改state是演示,
# 实际中通常是返回一个字典 {"iteration_count": current_iteration_count + 1}
# 但为了条件判断的即时性,这里我们可以假设它在节点执行前已更新或在返回中更新。
# 为了简洁,我们假设reducer已经处理了iteration_count的累加。
# 这里的state['iteration_count']是上一个节点执行后,reducer合并后的最新值。
# 简单的循环控制,防止无限循环
MAX_ITERATIONS = 3
if state["iteration_count"] >= MAX_ITERATIONS:
print(f" 达到最大迭代次数 ({MAX_ITERATIONS}),终止。")
return END # 使用END表示终止
# 优先处理文档相关性问题
if state["need_re_retrieval"]:
print(" 需要重新检索文档。")
return "retrieve"
# 如果文档相关,但生成答案不满意,则尝试重新生成
if state["need_re_generation"]:
print(" 需要重新生成答案。")
return "generate"
# 如果文档相关且答案生成满意,但用户不满意,也尝试重新生成 (可能需要不同的策略或更多文档)
if state["feedback"] == "not_satisfactory":
print(" 用户不满意,尝试重新生成答案。")
return "generate"
# 如果所有条件都满意,或者没有其他可行的路径,则终止
print(" 所有条件满意,或无更多操作,流程结束。")
return END
# 4. 构建 LangGraph
workflow = StateGraph(AgentState)
# 添加节点
workflow.add_node("retrieve", retrieve)
workflow.add_node("grade_documents", grade_documents)
workflow.add_node("generate", generate)
workflow.add_node("grade_generation_with_documents", grade_generation_with_documents)
workflow.add_node("get_user_feedback", get_user_feedback)
# 设置入口点
workflow.set_entry_point("retrieve")
# 添加边 - 线性流
workflow.add_edge("retrieve", "grade_documents")
workflow.add_edge("grade_documents", "generate") # 无论文档是否相关,先尝试生成,再根据反馈处理
workflow.add_edge("generate", "grade_generation_with_documents")
workflow.add_edge("grade_generation_with_documents", "get_user_feedback")
# 添加条件边 - 引入循环的关键
# 从 get_user_feedback 节点,根据 decide_to_continue 的结果决定下一步
workflow.add_conditional_edges(
"get_user_feedback", # 源节点
decide_to_continue, # 条件函数
{ # 分支映射
"retrieve": "retrieve",
"generate": "generate",
END: END # 如果决定终止,则结束
}
)
# 编译图
app = workflow.compile()
# 运行图并观察循环
print("n--- Running Workflow: Scenario 1 (Successful) ---")
inputs = {"question": "什么是LangGraph?", "documents": [], "generation": None,
"feedback": None, "iteration_count": 0, "need_re_retrieval": False, "need_re_generation": False}
final_state = app.invoke(inputs)
print("nFinal State (Scenario 1):")
print(final_state)
print(f"最终答案: {final_state['generation']}")
print("n--- Running Workflow: Scenario 2 (Needs Re-generation) ---")
inputs_2 = {"question": "请详细解释Python的并发编程。", "documents": [], "generation": None,
"feedback": None, "iteration_count": 0, "need_re_retrieval": False, "need_re_generation": False}
# 为了模拟需要重新生成,我们假设第一次检索到的文档不足以生成好的答案
# 在实际中,这会由 grade_generation_with_documents 或 get_user_feedback 决定
# 这里我们让 retrieve 和 grade_documents 正常,但 generate 可能会被评估为不满意
final_state_2 = app.invoke(inputs_2)
print("nFinal State (Scenario 2):")
print(final_state_2)
print(f"最终答案: {final_state_2['generation']}")
print("n--- Running Workflow: Scenario 3 (Limited Iterations) ---")
# 故意设置一个问题,让它在几次迭代内都无法达到满意
inputs_3 = {"question": "一个关于不存在的技术的复杂问题。", "documents": [], "generation": None,
"feedback": None, "iteration_count": 0, "need_re_retrieval": False, "need_re_generation": False}
final_state_3 = app.invoke(inputs_3)
print("nFinal State (Scenario 3):")
print(final_state_3)
print(f"最终答案: {final_state_3['generation']}")
代码解析:
AgentState: 定义了RAG代理在任何时刻的全部信息,包括问题、文档、生成的答案、反馈、迭代计数以及两个关键的布尔标志need_re_retrieval和need_re_generation,它们是控制循环的核心。- 节点函数:
retrieve、grade_documents、generate、grade_generation_with_documents、get_user_feedback。每个函数都接收AgentState,执行特定任务,并返回一个字典来更新AgentState。例如,grade_documents会根据文档相关性设置need_re_retrieval。 decide_to_continue函数: 这是实现循环的关键条件函数。它根据AgentState中的iteration_count、need_re_retrieval、need_re_generation和feedback来决定流程的下一步:- 如果达到最大迭代次数,则
END。 - 如果
need_re_retrieval为True,则返回"retrieve",将流程引导回检索节点。 - 如果
need_re_generation为True,则返回"generate",将流程引导回生成节点。 - 如果用户反馈
not_satisfactory,也返回"generate",尝试改进答案。 - 否则,返回
END,流程结束。
- 如果达到最大迭代次数,则
workflow.add_conditional_edges: 这行代码是魔法所在。它告诉LangGraph,在get_user_feedback节点执行完毕后,不要简单地跳转到下一个固定节点,而是调用decide_to_continue函数,根据其返回值("retrieve","generate", 或END)来决定下一步的去向。这正是反馈环的实现。- 循环演示:
- Scenario 1: 一个简单的问题,所有步骤顺利通过,最终答案满意,流程结束。
- Scenario 2: 模拟一个需要重新生成答案的情况。
grade_generation_with_documents可能会判断初始生成不满意,或者get_user_feedback返回not_satisfactory,然后decide_to_continue将流程引导回generate节点,形成一个循环。 - Scenario 3: 演示迭代限制。即使问题一直无法得到满意解决,流程也会在达到最大迭代次数后强制终止。
通过这个例子,我们可以清晰地看到LangGraph如何通过AgentState作为共享上下文,以及add_conditional_edges作为动态决策机制,有效地构建和执行包含复杂反馈环的非DAG工作流。这使得智能体能够进行自我修正、迭代推理和适应性行为。
第六章:高级议题与设计考量
构建复杂的非DAG工作流需要考虑一些高级议题,以确保系统的稳定性、可维护性和可扩展性。
6.1 终止条件与收敛性
如前所述,循环必须有终止条件。在LangGraph中,通常通过以下方式实现:
- 迭代计数限制: 最直接的方法,如示例中的
MAX_ITERATIONS。这保证了即使逻辑错误导致无限循环,系统也能在一定次数后停止。 - 状态满足条件: 当
AgentState达到某个“成功”或“满意”状态时,条件函数返回END。 - 状态无变化: 如果连续几次迭代后,关键状态字段不再发生有意义的变化,可以认为系统已收敛,进而终止。这需要更精细的状态比较逻辑。
6.2 非确定性与鲁棒性
LLM的输出本质上是非确定性的,这在循环中可能导致有趣的挑战:
- 不稳定的循环: 相同的输入可能在不同时间导致LLM给出不同的结果,从而影响条件边的判断,可能导致循环行为不稳定。
- 错误恢复: LLM可能会生成无效的输出或导致工具调用失败。工作流需要有机制来捕获这些错误,并决定是重试、跳过还是以其他方式处理。这可以在节点内部实现,也可以通过额外的条件边来处理。
设计时应考虑这些非确定性,引入重试机制、错误处理节点或更复杂的决策逻辑来提高鲁棒性。
6.3 调试与可视化
复杂的非DAG工作流,尤其是包含多个嵌套循环和条件分支的,可能难以理解和调试。LangGraph提供了一些工具来帮助开发者:
- LangSmith集成: LangGraph与LangSmith(LangChain的观测平台)无缝集成,可以记录每次执行的完整调用链、状态变化和决策路径。这是理解复杂工作流行为的强大工具。
- 图可视化: LangGraph可以导出图的DOT表示,可以用于生成SVG或PNG图像,直观地展示工作流的结构,包括所有节点和条件边。
6.4 并行化与性能
虽然循环本身通常意味着顺序执行,但一个大型工作流中可能存在可以并行执行的子图或节点。LangGraph的核心执行器是顺序的,但你可以将并行任务封装在单个节点中(例如,使用asyncio.gather或线程池),或者通过更高级的LangGraph特性(如subgraphs)来管理更复杂的并行模式。
6.5 状态的原子性与一致性
在多并发场景下,共享的可变状态需要原子性更新和一致性保证。LangGraph通过其reducer机制来管理状态合并,这有助于保持状态的一致性。然而,如果节点内部执行了复杂的并发操作或外部副作用,开发者仍需自行确保这些操作的原子性和线程安全。
6.6 扩展性与模块化
对于非常大的工作流,将所有逻辑放在一个图中可能会变得难以管理。LangGraph支持:
- 子图(Subgraphs): 将一个复杂的子流程封装成一个独立的
StateGraph,然后作为另一个图的节点。这允许模块化设计和代码复用。 - 节点复用: 相同的节点函数可以在图的不同位置被多次引用。
这些特性有助于构建可扩展和易于维护的复杂智能体系统。
第七章:超越LangGraph:非DAG工作流的更广阔应用
LangGraph虽然专注于LLM驱动的智能体,但其处理非DAG工作流的核心思想——即基于状态的动态决策和迭代执行——在计算机科学和工程的许多其他领域都有广泛应用:
- 业务流程管理(BPM): 许多复杂的业务流程,如审批流程、订单履行,都包含条件分支、人工干预和反复修改,本质上是非DAG的。现代BPM系统需要支持这种灵活性。
- 控制系统: 自动化控制系统(例如,机器人控制、工业自动化)持续感知环境、决策、行动,然后再次感知,形成经典的反馈控制循环。
- 游戏AI: 游戏中的NPC行为往往是状态驱动的。例如,一个敌人AI可能在“巡逻”状态下,如果发现玩家则进入“追逐”状态,如果失去玩家视野则返回“搜索”状态,如果受到攻击则进入“战斗”状态,这些都是包含循环的状态转换。
- 复杂的ETL/ELT管道: 在某些数据处理场景中,可能需要根据数据质量或业务规则,反复清洗、转换数据,直到满足特定条件。
- 编译器优化: 编译器中的某些优化阶段会反复执行,直到代码达到一个稳定状态或满足某些性能目标。
LangGraph的出现,不仅为LLM智能体的开发带来了革命性的便利,也再次提醒我们,在设计复杂系统时,跳出传统DAG思维的限制,拥抱包含反馈和迭代的非DAG模型,是通向更智能、更自适应未来的关键。它将图论、状态机和不动点迭代等理论概念,以一种实用的、可编程的方式呈现出来,使得开发者能够构建出真正具有自主决策和学习能力的AI应用。
LangGraph通过其对共享状态和条件边的巧妙设计,为构建能够进行迭代推理、自我修正和适应性行为的LLM驱动智能体提供了强大的框架。它将有向无环图的局限性转化为机遇,使得开发者能够以一种直观且数学上严谨的方式,建模并执行复杂且富有生命力的AI工作流。理解其背后的状态机、不动点和图论本质,将有助于我们更好地设计、调试和优化这些下一代智能系统。