各位同仁,各位对AI Agent架构充满热情的开发者们:
欢迎来到今天的讲座。我们今天将深入探讨一个在构建复杂、可控Agent工作流中至关重要的概念:有向无环图(Directed Acyclic Graph, DAG),以及它在LangGraph框架中是如何被发挥到极致的。在AI领域,我们正从简单的“提示-响应”模式,迅速转向需要多步骤推理、工具调用、条件判断、循环修正甚至多Agent协作的复杂系统。面对这种复杂性,传统的线性调用链或简单的函数组合已经显得力不从心。我们迫切需要一种更强大、更灵活、更可控的架构来支撑Agent的智能行为。我将论证,DAG正是这一挑战的终极答案。
一、Agent 工作流的演进与传统模式的局限
在探讨DAG之前,我们首先需要理解为什么Agent的工作流会变得如此复杂,以及我们目前面临的挑战。
早期的AI Agent,比如基于LangChain的简单Chain,通常遵循线性结构:输入 -> LLM -> 输出。这对于特定任务,如文本生成、简单问答,是高效的。然而,当任务需求提升,Agent需要:
- 进行多步骤推理:将复杂问题分解为子问题,逐步解决。
- 动态调用外部工具:根据当前情况,决定何时以及如何使用搜索引擎、数据库、API等。
- 处理条件分支:根据LLM的判断或工具的返回结果,选择不同的执行路径。
- 实现自我修正与迭代:在发现错误或结果不满意时,能够返回之前的步骤进行修正。
- 管理内部状态:Agent需要在多个步骤之间记住上下文、历史信息和中间结果。
- 并行执行任务:在某些情况下,独立子任务可以同时进行以提高效率。
传统的线性Chain或简单的Runnable序列,在面对这些需求时,暴露出了诸多局限性:
- 缺乏灵活的控制流:难以直接表达条件分支、循环或并行逻辑。如果强行实现,代码会变得高度嵌套、难以阅读和维护。
- 状态管理不透明:状态通常通过上下文参数隐式传递,容易混淆,且难以追踪状态在何处被修改。
- 调试与观测困难:当Agent行为不符合预期时,很难准确地知道哪一步出了问题,数据流向如何。
- 扩展性差:添加新的逻辑或工具往往需要大量修改现有结构,牵一发而动全身。
- 难以实现鲁棒的错误处理:链式结构中,一个环节出错可能导致整个流程中断,缺乏精细的错误捕获与恢复机制。
这些挑战促使我们寻找一种更本质、更强大的模型来描述Agent的复杂行为。而这一模型,正是有向无环图(DAG)。
二、有向无环图(DAG)的核心概念与软件工程的基石
2.1 什么是DAG?
有向无环图,顾名思义,是一种图数据结构,它包含:
- 节点(Nodes):表示图中的实体或步骤。
- 边(Edges):连接节点,表示节点之间的关系或数据流向。
- 有向(Directed):每条边都有一个明确的方向,从一个节点指向另一个节点,表示了执行或依赖的顺序。
- 无环(Acyclic):图中不存在任何从一个节点出发,沿着边的方向最终能回到该节点的路径。这意味着没有无限循环。
图1:DAG 示例
A --> B --> D
| ^
v |
C ----|
在这个例子中,A、B、C、D是节点。A到B,A到C,C到B,B到D是边。我们可以从A走到B再到D,但无法从任何节点出发回到自身。
2.2 DAG的数学特性与普适性
DAG拥有一些非常重要的数学特性,使其在计算机科学和软件工程中无处不在:
- 拓扑排序(Topological Sort):可以将DAG中的所有节点排列成一个线性序列,使得序列中每个节点都出现在其所有前驱节点之后。这意味着我们可以确定一个无冲突的执行顺序。
- 可达性(Reachability):可以确定从一个节点是否能到达另一个节点,这对于理解依赖关系和潜在的执行路径至关重要。
- 确定性:给定相同的输入,DAG的执行路径和结果是可预测的。
这些特性使得DAG成为许多复杂系统的核心。它并非LangGraph或AI Agent领域的新发明,而是计算机科学的基石。
表1:DAG在不同领域的应用示例
| 领域 | DAG 应用示例 | 核心优势 |
|---|---|---|
| 编译器 | 抽象语法树(AST)、控制流图(CFG) | 表示程序结构,优化执行,分析依赖 |
| 任务调度系统 | Airflow、Luigi、Celery | 定义任务依赖、并行执行、失败重试、可视化进度 |
| 数据流处理 | Apache Spark (RDDs)、Apache Flink、TensorFlow | 定义数据转换管道、优化计算、容错、并行处理 |
| 版本控制 | Git(提交历史) | 追踪版本演进、分支合并、历史回溯 |
| 依赖管理 | npm、Maven、Gradle | 管理软件包依赖、解决版本冲突、构建顺序 |
| 项目管理 | 甘特图、PERT图(关键路径法) | 规划任务顺序、识别关键路径、估算项目时间 |
| 区块链 | IOTA (Tangle) | 实现分布式账本的交易验证和共识(尽管有其特殊性,但核心是DAG) |
从这些例子中我们可以看到,选择DAG的原因是共同的:它提供了一种清晰、结构化、可控的方式来表示和管理复杂的依赖关系、执行顺序和数据流。它天然地避免了循环依赖导致的死锁或无限循环,并允许系统进行拓扑排序以确定安全的执行顺序。
三、Agent 工作流的固有特性与 DAG 的完美契合
现在,让我们将DAG的普适性优势带回到Agent工作流的特定语境中。为什么说Agent的复杂行为,与DAG的结构是天作之合?
3.1 Agent 决策与执行的本质
一个高级Agent的运行过程,本质上就是一系列的决策和行动:
- 接收输入:用户查询、环境感知。
- 分析与理解:LLM对输入进行语义分析。
- 决策:根据分析结果,决定下一步是回答、寻求更多信息、调用工具,还是进行更深层次的推理。
- 行动:执行决策(例如,调用工具、生成文本)。
- 观察结果:获取行动的反馈(工具返回、用户反馈)。
- 迭代与修正:根据观察结果,重新决策,可能回到之前的步骤或进入新的分支。
- 输出结果:将最终答案或行动反馈给用户/环境。
这个过程充满了条件分支、潜在的循环(迭代)、状态更新和工具调用,完美映射了DAG的结构:
- 节点即步骤:LLM调用、工具执行、条件判断、数据聚合,都可以是图中的一个节点。
- 边即控制流与数据流:连接节点的边表示了执行的顺序和数据从一个步骤流向下一个步骤。
- 状态的显式传递:Agent的上下文(如对话历史、中间结果)被封装在状态对象中,作为输入传递给节点,节点执行后更新状态并将其作为输出。
- 条件分支:通过在决策节点后引出多条条件边,实现动态选择执行路径。
- 迭代/循环:虽然DAG本身无环,但我们可以通过条件边巧妙地“模拟”循环,让执行流回到之前的某个节点,但每次迭代都是基于更新后的状态,并最终导向结束。这实际上是有限状态机的一种高级形式。
- 可预测性与可调试性:图的结构提供了一个清晰的蓝图,使得Agent的每一步行为都可追踪、可预测。
3.2 DAG 如何解决Agent工作流的痛点
| 痛点 | DAG 如何解决 |
|---|---|
| 复杂控制流 | 条件边:根据节点输出动态选择下一节点,轻松实现if/else。子图:将复杂逻辑封装为可重用的模块。并行分支:天然支持独立任务并行执行。 |
| 状态管理不透明 | 显式状态对象:LangGraph的StateGraph要求定义一个共享状态对象,每个节点接收当前状态,并返回更新后的状态。状态流转清晰可见。 |
| 调试与观测困难 | 可视化:图结构天然可渲染,提供Agent执行路径的直观视图。节点粒度日志:每个节点的输入/输出/执行时间可独立记录,快速定位问题。检查点(Checkpointing):允许在任意节点暂停、检查状态、恢复执行,极大简化调试。 |
| 扩展性差 | 模块化:只需添加新节点和新边即可引入新功能或工具,不影响现有逻辑。可重用性:子图可以被多个主图或在图的不同位置复用。 |
| 鲁棒的错误处理 | 节点级错误处理:可以在特定节点捕获异常,并导向专门的错误处理分支。检查点恢复:即使系统崩溃,也可从最近的检查点恢复,无需从头开始。 |
| 无限循环与不确定性 | 无环特性:从设计层面杜绝了真正的无限循环,所有迭代都是有条件的、有限的。即使模拟循环,也必须有明确的终止条件。拓扑排序:确保执行顺序的确定性。 |
| 协作与聚合 | 多分支汇合:并行分支的输出可以在一个节点汇聚,进行整合或总结。多Agent协作:每个Agent可以是图中的一个节点或子图,通过图进行任务分配和结果交换。 |
可以说,DAG为构建“可控、可预测、可调试、可扩展”的Agent提供了最坚实、最自然的架构基础。
四、LangGraph 核心机制:以 DAG 为基石构建 Agent
LangGraph是LangChain生态系统的一部分,专门用于通过图结构构建有状态、多步骤的Agent。它将我们前面讨论的DAG概念,以一种Pythonic且强大的方式实现。
4.1 StateGraph:有状态图的基石
LangGraph的核心是StateGraph。它允许我们定义一个共享的状态对象,这个状态会在图的各个节点之间传递和更新。这解决了传统链式结构中状态管理不透明的问题。
首先,我们需要定义Agent的状态。LangGraph使用TypedDict来定义状态。
from typing import TypedDict, Annotated, List, Union
import operator
# 定义 Agent 的共享状态
class AgentState(TypedDict):
"""
Agent 的共享状态。
Attributes:
input: 用户的初始查询或当前输入。
chat_history: 完整的对话历史记录。
messages: 当前处理批次的消息列表,通常是 Agent 思考过程中的中间消息。
tool_output: 工具调用的结果。
iterations: 迭代次数,用于防止无限循环。
# 可以根据需要添加更多状态,例如:
# search_results: 搜索引擎的返回结果
# data_to_process: 需要处理的数据
"""
input: str
chat_history: Annotated[List[tuple[str, str]], operator.add]
messages: Annotated[List[dict], operator.add] # 使用 operator.add 聚合消息列表
tool_output: Union[str, None]
iterations: int
在这里,Annotated和operator.add是LangGraph的特性,用于定义当多个节点同时更新同一个状态字段时,如何合并这些更新。operator.add表示将列表简单地连接起来。
4.2 节点(Nodes)与边(Edges)
在LangGraph中:
- 节点(Nodes):通常是一个Python函数或一个
Runnable对象。它接收当前AgentState作为输入,执行某些逻辑(如调用LLM、执行工具),然后返回一个字典,用于更新AgentState。 - 边(Edges):连接节点,可以是:
- 普通边 (
add_edge):从一个节点无条件地指向另一个节点。 - 条件边 (
add_conditional_edges):根据源节点的输出,动态地选择下一跳节点。这是实现条件分支和循环的关键。
- 普通边 (
4.3 Entrypoint 和 Exitpoint
set_entry_point:定义图的起始节点。Agent工作流从这里开始。set_finish_point:定义图的结束节点。当执行流到达这个节点时,Agent工作流停止并返回最终状态。
4.4 Checkpointing:状态持久化
LangGraph支持检查点(Checkpointing)机制,可以在Agent执行的任何阶段保存其完整状态。这对于调试、恢复中断的Agent、实现长时间运行的会话至关重要。它利用了DAG的离散步骤特性,使得每个节点执行前后的状态变化都是清晰可追踪的。
4.5 代码示例1:一个简单的工具调用 Agent (基础 DAG)
让我们构建一个简单的Agent,它能够:
- 接收用户输入。
- 决定是否需要调用一个工具(这里我们模拟一个简单的“计算器”工具)。
- 如果需要,调用工具并获取结果。
- 根据工具结果或直接回答用户。
首先,我们需要一些必要的库和工具。
# 确保安装了必要的库
# pip install langchain langchain-openai langgraph
from langchain_core.messages import BaseMessage, HumanMessage, AIMessage
from langchain_core.tools import tool
from langchain_openai import ChatOpenAI
from langgraph.graph import StateGraph, END
from typing import TypedDict, Annotated, List, Union
import operator
import os
# 设置 OpenAI API Key
# os.environ["OPENAI_API_KEY"] = "YOUR_OPENAI_API_KEY"
# 1. 定义 Agent 状态 (与之前相同)
class AgentState(TypedDict):
input: str
chat_history: Annotated[List[tuple[str, str]], operator.add]
messages: Annotated[List[BaseMessage], operator.add] # LangChain的消息类型
tool_output: Union[str, None]
iterations: int = 0 # 初始迭代次数为0
# 2. 定义工具
@tool
def calculator(expression: str) -> str:
"""计算数学表达式。例如:'2 + 2'"""
try:
return str(eval(expression))
except Exception as e:
return f"计算错误: {e}"
tools = [calculator]
# 3. 定义 LLM
llm = ChatOpenAI(model="gpt-4o-mini", temperature=0)
# 4. 绑定工具到 LLM
llm_with_tools = llm.bind_tools(tools)
# 5. 定义图中的节点
def call_llm(state: AgentState) -> dict:
"""调用 LLM 进行推理或生成响应。"""
print(f"--- Calling LLM with messages: {[m.content for m in state['messages']]}")
response = llm_with_tools.invoke(state['messages'])
return {"messages": [response], "iterations": state['iterations'] + 1}
def call_tool(state: AgentState) -> dict:
"""执行 LLM 决定的工具调用。"""
current_message = state['messages'][-1]
tool_calls = current_message.tool_calls
tool_results = []
print(f"--- Calling tool with: {tool_calls}")
for tool_call in tool_calls:
if tool_call.name == "calculator":
result = calculator.invoke(tool_call.args)
tool_results.append(AIMessage(content=result, name=tool_call.name, tool_call_id=tool_call.id))
else:
tool_results.append(AIMessage(content=f"未知工具: {tool_call.name}", name=tool_call.name))
# 将工具结果添加到消息中,等待LLM处理
return {"messages": tool_results, "tool_output": tool_results[0].content if tool_results else None, "iterations": state['iterations']}
# 6. 定义条件判断函数
def should_continue(state: AgentState) -> str:
"""
根据 LLM 的输出判断下一步操作:
- 'call_tool': 如果 LLM 决定调用工具。
- 'end': 如果 LLM 生成了最终响应。
"""
current_message = state['messages'][-1]
if current_message.tool_calls:
print("--- Decision: Call Tool ---")
return "call_tool"
else:
print("--- Decision: End ---")
return "end"
# 7. 构建 StateGraph
workflow = StateGraph(AgentState)
# 添加节点
workflow.add_node("llm_node", call_llm)
workflow.add_node("tool_node", call_tool)
# 设置入口点
workflow.set_entry_point("llm_node")
# 添加条件边:从 llm_node 出来,根据 should_continue 的判断决定去向
workflow.add_conditional_edges(
"llm_node", # 源节点
should_continue, # 条件判断函数
{ # 映射:判断结果 -> 目标节点
"call_tool": "tool_node",
"end": END # 如果是 "end",则结束图的执行
}
)
# 添加普通边:从 tool_node 出来,返回到 llm_node,让 LLM 处理工具结果
workflow.add_edge("tool_node", "llm_node")
# 编译图
app = workflow.compile()
# 可视化图 (需要安装 graphviz: pip install pydot graphviz)
# from IPython.display import Image, display
# display(Image(app.get_graph().draw_png()))
# 运行 Agent
print("================== Agent 运行开始 ==================")
initial_state = {
"input": "2加3是多少?",
"chat_history": [],
"messages": [HumanMessage(content="2加3是多少?")],
"tool_output": None,
"iterations": 0
}
result = app.invoke(initial_state, {"recursion_limit": 5}) # 设置递归限制防止无限循环
print("n--- 最终状态 ---")
print(f"最终消息: {result['messages'][-1].content}")
print(f"总迭代次数: {result['iterations']}")
print("n================== 第二次运行 ==================")
initial_state_2 = {
"input": "今天的天气怎么样?", # 这是一个不需工具的问题
"chat_history": [],
"messages": [HumanMessage(content="今天的天气怎么样?")],
"tool_output": None,
"iterations": 0
}
result_2 = app.invoke(initial_state_2, {"recursion_limit": 5})
print("n--- 最终状态 ---")
print(f"最终消息: {result_2['messages'][-1].content}")
print(f"总迭代次数: {result_2['iterations']}")
代码解释:
AgentState: 定义了Agent的全局状态,其中messages字段用于存储LLM和工具的消息历史。calculator: 一个简单的模拟工具。llm_with_tools: 将calculator工具绑定到ChatOpenAI模型,使其能够通过函数调用(tool_calling)机制识别和调用工具。call_llm节点: 接收当前状态,调用绑定了工具的LLM,并将LLM的响应(可能包含工具调用请求)更新到状态的messages中。call_tool节点: 检查LLM的最新消息,如果包含工具调用请求,则执行相应的工具,并将工具结果作为AIMessage添加到messages中。should_continue函数: 这是我们实现条件逻辑的关键。它检查llm_node输出的最新消息。如果LLM决定调用工具(即current_message.tool_calls非空),则返回"call_tool";否则(LLM直接给出了答案),返回"end"。- 图的构建:
add_node:注册了llm_node和tool_node。set_entry_point("llm_node"):Agent从llm_node开始执行。add_conditional_edges("llm_node", should_continue, {...}):从llm_node出发,根据should_continue的返回值决定下一步。如果返回"call_tool",则去tool_node;如果返回"end",则结束。add_edge("tool_node", "llm_node"):从tool_node执行完工具后,无条件地返回到llm_node。这是为了让LLM能够处理工具的执行结果并决定下一步。
这个简单的例子已经展示了DAG如何实现条件分支和有限迭代(通过tool_node返回llm_node形成一个“循环”)。
运行结果分析:
- 对于“2加3是多少?”,LLM会识别出需要
calculator工具,should_continue会返回"call_tool",流程进入tool_node。tool_node执行计算,然后返回llm_node。此时LLM会看到工具结果"5",然后生成最终答案,should_continue返回"end",流程结束。 - 对于“今天的天气怎么样?”,LLM会直接生成回答,
should_continue会返回"end",流程直接结束。
通过这个DAG,Agent的决策路径变得清晰、可追踪。
五、深入 LangGraph:实现复杂控制流与状态管理
在基础之上,LangGraph提供了更强大的能力来构建极端复杂的Agent行为。
5.1 条件分支 (Conditional Edges) 的威力
条件边是LangGraph实现动态行为的核心。add_conditional_edges允许我们定义一个根据特定逻辑(一个Python函数)动态选择下一个节点的机制。这个函数接收当前AgentState作为输入,返回一个字符串,这个字符串被映射到图中的一个节点名。
代码示例2:基于决策的条件分支 Agent (更复杂的路径选择)
假设我们有一个Agent,它需要根据用户请求的性质,选择不同的处理策略:
- 如果用户需要搜索信息,调用搜索引擎。
- 如果用户需要执行任务(如计算),调用工具。
- 如果用户只是聊天或提问,直接回答。
from langchain_core.messages import BaseMessage, HumanMessage, AIMessage
from langchain_core.tools import tool
from langchain_openai import ChatOpenAI
from langchain_community.tools import DuckDuckGoSearchRun # 假设需要一个搜索引擎工具
from langgraph.graph import StateGraph, END
from typing import TypedDict, Annotated, List, Union
import operator
import os
# os.environ["OPENAI_API_KEY"] = "YOUR_OPENAI_API_KEY"
# AgentState 定义不变
class AgentState(TypedDict):
input: str
chat_history: Annotated[List[tuple[str, str]], operator.add]
messages: Annotated[List[BaseMessage], operator.add]
tool_output: Union[str, None]
iterations: int = 0
# 新增状态,便于决策
action_type: Union[str, None]
# 定义工具
@tool
def calculator(expression: str) -> str:
"""计算数学表达式。例如:'2 + 2'"""
try:
return str(eval(expression))
except Exception as e:
return f"计算错误: {e}"
# 搜索引擎工具
search_tool = DuckDuckGoSearchRun() # 需要安装 duckduckgo-search
tools = [calculator, search_tool]
llm = ChatOpenAI(model="gpt-4o-mini", temperature=0)
llm_with_tools = llm.bind_tools(tools)
# 节点定义
def call_llm(state: AgentState) -> dict:
"""调用 LLM 进行推理或生成响应。"""
print(f"n--- Node: call_llm (messages: {[m.content for m in state['messages']]}) ---")
response = llm_with_tools.invoke(state['messages'])
return {"messages": [response], "iterations": state['iterations'] + 1}
def call_tool(state: AgentState) -> dict:
"""根据LLM的决定执行工具调用。"""
current_message = state['messages'][-1]
tool_calls = current_message.tool_calls
tool_results = []
print(f"n--- Node: call_tool (tool_calls: {tool_calls}) ---")
for tool_call in tool_calls:
if tool_call.name == "calculator":
result = calculator.invoke(tool_call.args)
elif tool_call.name == "duckduckgo_search":
result = search_tool.invoke(tool_call.args)
else:
result = f"未知工具: {tool_call.name}"
tool_results.append(AIMessage(content=result, name=tool_call.name, tool_call_id=tool_call.id))
return {"messages": tool_results, "tool_output": tool_results[0].content if tool_results else None}
def decide_action_type(state: AgentState) -> dict:
"""
让LLM判断用户的意图,决定后续的动作类型。
"""
print(f"n--- Node: decide_action_type ---")
# 构造一个提示,让LLM明确判断意图
prompt = (
f"根据用户的最新消息,判断其意图是 'search' (需要搜索信息), "
f"'calculate' (需要进行计算), 还是 'answer' (直接给出答案)。"
f"只返回 'search', 'calculate' 或 'answer' 中的一个词,不要有其他解释。n"
f"最新消息: {state['messages'][-1].content}"
)
# 这里我们使用一个没有绑定工具的LLM来做纯粹的决策
decision_llm = ChatOpenAI(model="gpt-4o-mini", temperature=0)
response = decision_llm.invoke([HumanMessage(content=prompt)])
action_type = response.content.strip().lower()
print(f"LLM 决策的动作类型: {action_type}")
return {"action_type": action_type, "messages": state['messages']} # 保持消息不变,只更新动作类型
def final_answer(state: AgentState) -> dict:
"""Agent 生成最终答案的节点。"""
print(f"n--- Node: final_answer ---")
# 假设LLM在call_llm节点已经生成了最终答案
return state # 状态无需修改,直接结束
# 定义条件判断函数
def route_action(state: AgentState) -> str:
"""
根据 AgentState 中的 'action_type' 字段路由到不同的节点。
"""
action_type = state.get("action_type")
if action_type == "search":
print("--- Routing: To Search ---")
return "search_flow" # 路由到搜索流程
elif action_type == "calculate":
print("--- Routing: To Calculate ---")
return "calculate_flow" # 路由到计算流程
else: # 默认或其他情况,直接回答
print("--- Routing: To Final Answer ---")
return "final_answer_node"
def should_call_tool_or_end(state: AgentState) -> str:
"""
在 LLM 节点后判断是否需要调用工具或结束。
"""
current_message = state['messages'][-1]
if current_message.tool_calls:
print("--- Decision: Call Tool (after LLM) ---")
return "call_tool_node"
else:
print("--- Decision: End (after LLM) ---")
return "final_answer_node"
# 构建 StateGraph
workflow = StateGraph(AgentState)
# 添加节点
workflow.add_node("decide_action", decide_action_type)
workflow.add_node("llm_node", call_llm)
workflow.add_node("tool_node", call_tool)
workflow.add_node("final_answer_node", final_answer)
# 设置入口点
workflow.set_entry_point("decide_action")
# 从 decide_action 节点开始,根据 LLM 的意图判断结果进行路由
workflow.add_conditional_edges(
"decide_action",
route_action,
{
"search": "llm_node", # 搜索意图,先让LLM思考如何搜索,可能涉及工具调用
"calculate": "llm_node", # 计算意图,先让LLM思考如何计算,可能涉及工具调用
"answer": "llm_node" # 直接回答意图,让LLM直接生成答案
# 注意:这里我们让所有路径都先通过llm_node,让llm决定是否调用工具。
# 更复杂的图可以为每个意图设计独立的子图或更直接的工具调用节点。
}
)
# 从 llm_node 出来,判断是否需要调用工具或结束
workflow.add_conditional_edges(
"llm_node",
should_call_tool_or_end,
{
"call_tool_node": "tool_node",
"final_answer_node": END # 如果是最终答案,则结束
}
)
# 从 tool_node 执行完工具后,返回到 llm_node,让 LLM 处理工具结果
workflow.add_edge("tool_node", "llm_node")
# 从 final_answer_node 结束
workflow.add_edge("final_answer_node", END)
app = workflow.compile()
# 运行 Agent
print("================== Agent 运行开始 (查询天气) ==================")
initial_state_weather = {
"input": "今天上海的天气怎么样?",
"chat_history": [],
"messages": [HumanMessage(content="今天上海的天气怎么样?")],
"tool_output": None,
"iterations": 0,
"action_type": None
}
result_weather = app.invoke(initial_state_weather, {"recursion_limit": 10})
print("n--- 最终状态 (天气查询) ---")
print(f"最终消息: {result_weather['messages'][-1].content}")
print(f"总迭代次数: {result_weather['iterations']}")
print("n================== Agent 运行开始 (计算) ==================")
initial_state_calc = {
"input": "计算 123 乘以 456",
"chat_history": [],
"messages": [HumanMessage(content="计算 123 乘以 456")],
"tool_output": None,
"iterations": 0,
"action_type": None
}
result_calc = app.invoke(initial_state_calc, {"recursion_limit": 10})
print("n--- 最终状态 (计算) ---")
print(f"最终消息: {result_calc['messages'][-1].content}")
print(f"总迭代次数: {result_calc['iterations']}")
print("n================== Agent 运行开始 (闲聊) ==================")
initial_state_chat = {
"input": "你好,Agent!",
"chat_history": [],
"messages": [HumanMessage(content="你好,Agent!")],
"tool_output": None,
"iterations": 0,
"action_type": None
}
result_chat = app.invoke(initial_state_chat, {"recursion_limit": 10})
print("n--- 最终状态 (闲聊) ---")
print(f"最终消息: {result_chat['messages'][-1].content}")
print(f"总迭代次数: {result_chat['iterations']}")
代码解释:
- 新增了
decide_action_type节点,它使用LLM来判断用户意图,并将结果存储在AgentState的action_type字段中。 route_action函数根据action_type的值,将流程路由到不同的逻辑分支。- 为了简化,本例中所有意图最终都汇聚到
llm_node,让LLM决定是否调用工具。在实际应用中,你可以为search、calculate等分支设计更具体的子图或直接工具调用。 final_answer_node专门用于接收最终答案并结束流程。
这个例子展示了如何通过多个条件边和节点组合,实现一个具有多意图识别和多路径处理能力的Agent。每一步的决策和数据流向都清晰地体现在图的结构中。
5.2 循环 (Loops) 的模拟
DAG的无环特性意味着我们不能直接创建无限循环。然而,通过条件边,我们可以有效地模拟“有限迭代”或“带修正的循环”。其核心思想是:当满足特定条件时,执行流返回到图中的某个先前节点,但每次返回都是在一个新的、更新后的状态下进行,并且必须有明确的终止条件。
代码示例3:带修正的迭代 Agent (用户反馈修正)
设想一个Agent,它需要生成一份报告。用户可能会反馈报告中的不足,Agent需要根据反馈进行修正,直到用户满意为止。
# AgentState 略有调整以包含报告和反馈
class ReportAgentState(TypedDict):
input: str
chat_history: Annotated[List[tuple[str, str]], operator.add]
messages: Annotated[List[BaseMessage], operator.add]
report: str
feedback: Union[str, None]
iterations: int = 0
llm = ChatOpenAI(model="gpt-4o-mini", temperature=0)
def generate_report(state: ReportAgentState) -> dict:
"""Agent 生成初始报告或修正后的报告。"""
print(f"n--- Node: generate_report (Iteration: {state['iterations']}) ---")
current_messages = state['messages']
# 构造提示,考虑之前的反馈
if state['feedback']:
prompt = (f"你正在修订一份报告。以下是用户对上一份报告的反馈:n"
f"'{state['feedback']}'n"
f"请根据此反馈,修正并重新生成报告。原始请求是:'{state['input']}'n"
f"当前对话历史:{state['chat_history']}")
else:
prompt = (f"请根据以下输入生成一份详细的报告:'{state['input']}'n"
f"当前对话历史:{state['chat_history']}")
response = llm.invoke([HumanMessage(content=prompt)])
report_content = response.content
print(f"Generated Report: {report_content[:100]}...") # 打印部分报告内容
return {"report": report_content, "messages": [AIMessage(content=report_content)], "iterations": state['iterations'] + 1}
def get_user_feedback(state: ReportAgentState) -> dict:
"""模拟获取用户反馈。在实际应用中,这会是一个用户输入节点。"""
print(f"n--- Node: get_user_feedback ---")
# 模拟用户反馈,实际中可能需要用户交互
if state['iterations'] < 2: # 模拟用户在前两次迭代中提供反馈
feedback_message = input(f"Agent 生成了报告(迭代 {state['iterations']})。请提供反馈 (输入 '满意' 结束): ")
if feedback_message.lower() == "满意":
return {"feedback": None, "messages": [HumanMessage(content="用户:满意")]} # 设置feedback为None表示满意
else:
return {"feedback": feedback_message, "messages": [HumanMessage(content=f"用户反馈: {feedback_message}")]}
else:
print("--- 模拟:用户已满意或达到最大迭代次数 ---")
return {"feedback": None, "messages": [HumanMessage(content="用户:满意")]}
def check_feedback(state: ReportAgentState) -> str:
"""根据用户反馈决定是否需要重新生成报告。"""
print(f"n--- Node: check_feedback ---")
if state['feedback'] is not None and state['iterations'] < 3: # 假设最多迭代3次
print("--- Decision: Needs Revision ---")
return "revise"
else:
print("--- Decision: Finished ---")
return "finish"
# 构建 StateGraph
workflow = StateGraph(ReportAgentState)
# 添加节点
workflow.add_node("generate_report_node", generate_report)
workflow.add_node("get_feedback_node", get_user_feedback)
# 设置入口点
workflow.set_entry_point("generate_report_node")
# 从生成报告节点到获取反馈节点
workflow.add_edge("generate_report_node", "get_feedback_node")
# 从获取反馈节点,根据反馈决定是修正还是结束
workflow.add_conditional_edges(
"get_feedback_node",
check_feedback,
{
"revise": "generate_report_node", # 如果需要修正,回到生成报告节点
"finish": END # 如果满意或达到最大迭代,结束
}
)
app = workflow.compile()
# 运行 Agent
print("================== Agent 运行开始 (报告生成与修正) ==================")
initial_report_state = {
"input": "一份关于2023年AI发展趋势的简要报告。",
"chat_history": [],
"messages": [HumanMessage(content="生成一份关于2023年AI发展趋势的简要报告。")],
"report": "",
"feedback": None,
"iterations": 0
}
result_report = app.invoke(initial_report_state, {"recursion_limit": 5}) # 限制递归深度
print("n--- 最终状态 (报告) ---")
print(f"最终报告:n{result_report['report']}")
print(f"总迭代次数: {result_report['iterations']}")
代码解释:
ReportAgentState:新增report和feedback字段来管理报告内容和用户反馈。generate_report:根据feedback字段决定是生成新报告还是修正报告。get_user_feedback:模拟用户输入反馈。check_feedback:判断是否需要revise(回到generate_report_node)或finish(结束)。- 核心在于
add_conditional_edges将get_feedback_node连接回generate_report_node,形成一个有条件的“后向边”,模拟了迭代修正的过程。iterations字段和recursion_limit用于防止真实无限循环。
这种模式在Agent需要进行多轮对话、自我修正、优化结果等场景中非常有用。
5.3 子图 (Subgraphs) 与嵌套
LangGraph允许将一个StateGraph作为另一个StateGraph的节点来运行,从而实现子图或嵌套图的模式。这极大地提高了模块化和可重用性。你可以将某个复杂的子任务(如“数据收集与清洗”或“多语言翻译”)封装成一个独立的子图,然后在主图中像调用普通节点一样调用它。
# 概念性代码,不完整运行示例,但展示结构
from langgraph.graph import StateGraph, END
from typing import TypedDict, Annotated, List, Union
import operator
# 定义一个子图的状态
class SubgraphState(TypedDict):
data_query: str
raw_data: str
processed_data: str
# 定义子图的节点
def fetch_raw_data(state: SubgraphState) -> dict:
print(f"--- Subgraph: Fetching raw data for: {state['data_query']}")
# 模拟数据获取
return {"raw_data": f"Raw data for '{state['data_query']}'"}
def process_data(state: SubgraphState) -> dict:
print(f"--- Subgraph: Processing raw data: {state['raw_data']}")
# 模拟数据处理
return {"processed_data": f"Processed: {state['raw_data']}"}
# 构建子图
sub_workflow = StateGraph(SubgraphState)
sub_workflow.add_node("fetch_data", fetch_raw_data)
sub_workflow.add_node("process_data", process_data)
sub_workflow.set_entry_point("fetch_data")
sub_workflow.add_edge("fetch_data", "process_data")
sub_workflow.set_finish_point("process_data")
sub_app = sub_workflow.compile() # 编译子图
# 定义主图的状态
class MainAgentState(TypedDict):
user_query: str
final_report: str
# 存储子图的输出
collected_data: str
# 定义主图的节点
def initial_query_analysis(state: MainAgentState) -> dict:
print(f"n--- Main Graph: Analyzing query: {state['user_query']}")
# 假设分析后决定需要收集数据
return {"user_query": state['user_query']}
def generate_final_report(state: MainAgentState) -> dict:
print(f"n--- Main Graph: Generating final report with data: {state['collected_data']}")
return {"final_report": f"Report based on {state['collected_data']}"}
# 构建主图
main_workflow = StateGraph(MainAgentState)
main_workflow.add_node("analyze_query", initial_query_analysis)
main_workflow.add_node("generate_report", generate_final_report)
# 将子图作为一个节点添加到主图
# 当主图执行到这个节点时,会调用子图的 invoke 方法
# 需要一个包装器来适配子图的状态到主图的状态
def call_subgraph_node(state: MainAgentState) -> dict:
print("n--- Main Graph: Calling Subgraph Node ---")
# 准备子图的输入状态
subgraph_input = {"data_query": state['user_query']}
# 调用子图
subgraph_result = sub_app.invoke(subgraph_input)
# 将子图的输出映射回主图的状态
return {"collected_data": subgraph_result['processed_data']}
main_workflow.add_node("data_collection_subgraph", call_subgraph_node)
main_workflow.set_entry_point("analyze_query")
main_workflow.add_edge("analyze_query", "data_collection_subgraph")
main_workflow.add_edge("data_collection_subgraph", "generate_report")
main_workflow.set_finish_point("generate_report")
main_app = main_workflow.compile()
# 运行主图
print("================== Main Agent 运行开始 (带子图) ==================")
main_initial_state = {
"user_query": "关于火星地质的最新数据",
"final_report": "",
"collected_data": ""
}
main_result = main_app.invoke(main_initial_state)
print("n--- 最终状态 (主图) ---")
print(f"最终报告: {main_result['final_report']}")
代码解释:
- 定义了独立的
SubgraphState和sub_workflow,它有自己的节点和边。 - 在
MainAgentState中,我们有一个collected_data字段来接收子图的输出。 call_subgraph_node是一个适配器函数,它负责将主图的状态转换为子图的输入,调用子图,然后将子图的输出合并回主图的状态。main_workflow.add_node("data_collection_subgraph", call_subgraph_node)将这个适配器函数注册为main_workflow的一个节点。
这种方式使得复杂Agent的架构层次分明,易于管理和团队协作。
5.4 并行执行 (Parallelism)
DAG天然支持并行执行不相互依赖的任务分支。在LangGraph中,你可以通过在某个节点之后引出多条并行边(到不同的节点),并在后续节点中等待所有并行分支完成并聚合其结果来实现并行。LangGraph的异步API(aenter, ainvoke)可以支持真正的并行执行。
# 概念性代码,展示并行结构的思路
from langgraph.graph import StateGraph, END
from typing import TypedDict, Annotated, List, Union
import operator
import asyncio
import time
class ParallelAgentState(TypedDict):
query: str
result_A: str
result_B: str
final_combined_result: str
async def task_A(state: ParallelAgentState) -> dict:
print(f"--- Parallel: Task A started for query: {state['query']}")
await asyncio.sleep(2) # 模拟耗时操作
result = f"Result from Task A for '{state['query']}'"
print(f"--- Parallel: Task A finished: {result}")
return {"result_A": result}
async def task_B(state: ParallelAgentState) -> dict:
print(f"--- Parallel: Task B started for query: {state['query']}")
await asyncio.sleep(1) # 模拟耗时操作
result = f"Result from Task B for '{state['query']}'"
print(f"--- Parallel: Task B finished: {result}")
return {"result_B": result}
def combine_results(state: ParallelAgentState) -> dict:
print(f"--- Combine: Combining results A: '{state['result_A']}' and B: '{state['result_B']}'")
combined = f"Combined: {state['result_A']} AND {state['result_B']}"
return {"final_combined_result": combined}
workflow = StateGraph(ParallelAgentState)
workflow.add_node("start_parallel", lambda state: state) # 只是一个占位节点
workflow.add_node("task_A_node", task_A)
workflow.add_node("task_B_node", task_B)
workflow.add_node("combine_node", combine_results)
workflow.set_entry_point("start_parallel")
# 从 start_parallel 节点引出两条并行边
workflow.add_edge("start_parallel", "task_A_node")
workflow.add_edge("start_parallel", "task_B_node")
# 使用 `join` 语法,当所有前驱节点完成时,`combine_node` 才会执行
workflow.add_edge("task_A_node", "combine_node")
workflow.add_edge("task_B_node", "combine_node")
workflow.set_finish_point("combine_node")
app = workflow.compile()
# 异步运行 Agent
async def run_parallel_agent():
print("================== Agent 运行开始 (并行任务) ==================")
initial_state = {
"query": "获取商品信息",
"result_A": "",
"result_B": "",
"final_combined_result": ""
}
start_time = time.time()
result = await app.ainvoke(initial_state)
end_time = time.time()
print("n--- 最终状态 (并行) ---")
print(f"最终合并结果: {result['final_combined_result']}")
print(f"总耗时: {end_time - start_time:.2f} 秒")
# 在主程序中调用异步函数
# asyncio.run(run_parallel_agent())
代码解释:
task_A和task_B是两个异步节点,模拟并行执行的任务。- 从
start_parallel节点引出两条边到task_A_node和task_B_node。 combine_results节点会等待task_A_node和task_B_node都完成后才执行,这通过LangGraph的内部机制自动处理。- 使用
app.ainvoke来异步运行图。
通过这种方式,Agent可以在需要时同时处理多个独立的子任务,显著提高效率。
六、DAG 带来的优势:可控性、可观测性与可扩展性
现在我们已经看到了LangGraph如何利用DAG构建复杂的Agent,是时候系统地总结DAG带来的核心优势了。
6.1 可控性 (Controllability)
DAG结构赋予了Agent无与伦比的控制能力:
- 显式的执行路径:不再是黑盒。每一步LLM的思考、工具的调用、决策的逻辑,都通过图的节点和边清晰地表示出来。你可以精确地知道Agent在任何时刻处于哪个阶段,正在执行什么操作。
- 强制的依赖关系:边定义了严格的执行顺序和数据依赖。一个节点只有在其所有前驱节点完成后才能执行,确保了逻辑的正确性,避免了竞态条件或数据不一致。
- 精细的错误处理:可以在特定节点捕获和处理错误。例如,如果一个工具调用失败,可以路由到专门的错误处理节点,而不是让整个Agent崩溃。这使得Agent更加鲁棒。
- 中断与恢复:结合LangGraph的检查点机制,DAG的离散步骤使得Agent可以在任何节点暂停,保存状态,并在之后从该节点恢复执行。这对于长时间运行的Agent、需要用户介入的Agent,以及故障恢复至关重要。
6.2 可观测性 (Observability)
DAG的结构天生就具备高度的可观测性:
- 直观的可视化:图结构可以轻易地被渲染成图形,提供Agent决策流的直观视图。开发者可以一目了然地看到Agent的潜在行为路径,以及实际执行的路径。
- 细粒度的日志与追踪:每个节点都是一个独立的执行单元,其输入、输出、耗时、内部日志都可以被独立记录和追踪。这使得Agent的每一步行为都透明化。
- 高效的调试:当Agent行为不符合预期时,我们可以通过可视化图和节点日志,快速定位问题发生在哪一个节点、哪一步决策出了问题,大大缩短调试周期。
6.3 可扩展性 (Extensibility)
DAG的模块化特性保证了卓越的扩展性:
- 模块化设计:每个节点都是一个独立的、可替换的模块。添加新的工具、新的决策逻辑、新的LLM调用策略,只需添加新的节点和边,而无需修改现有节点的内部逻辑。
- 高重用性:可以将常用的子任务封装成子图,并在多个主图或图的不同部分复用。例如,“数据清洗子图”、“用户意图识别子图”等。
- 迭代开发:可以从一个简单的Agent图开始,逐步增加复杂性,每次添加新功能都只涉及图中的局部修改,降低了开发风险。
6.4 避免无限循环与不确定性
DAG的“无环”特性从根本上解决了传统Agent设计中可能出现的无限循环问题。即使我们通过条件边模拟了迭代,那也是“有限迭代”,必须有明确的终止条件或最大迭代次数。这保证了Agent行为的确定性和可预测性,避免了资源耗尽或逻辑死锁。
七、挑战与未来展望
尽管DAG为Agent工作流带来了巨大的优势,但我们也要清醒地认识到它可能带来的挑战,并展望未来的发展。
7.1 图的复杂度管理
随着Agent功能的增长,其对应的DAG可能会变得非常庞大和复杂。一个包含几十个甚至上百个节点和边的图,其维护成本将显著增加。
- 解决方案:
- 子图抽象:这是最有效的策略,将相关逻辑封装在子图中,降低主图的复杂度。
- 自动化图生成:对于某些高度结构化或动态变化的Agent,可以考虑通过代码或元数据来自动化生成图结构。
- 约定与模式:定义清晰的节点命名、边连接和状态更新约定,提高可读性。
7.2 可视化工具的演进
虽然LangGraph提供了基本的图可视化功能,但对于大型复杂图,我们需要更强大的交互式可视化工具,能够进行缩放、过滤、高亮、以及实时追踪执行路径。
- 展望:集成更强大的图形库,提供Web UI,实时展示Agent的执行路径、当前状态和性能指标。
7.3 性能优化
对于高并发、低延迟的Agent系统,如何高效地执行大型、深度嵌套的DAG,包括节点间的上下文切换、状态序列化/反序列化、并行任务调度等,都是需要持续优化的领域。
- 展望:更优化的异步运行时、分布式执行、缓存机制、以及与专用AI加速硬件的结合。
7.4 Agentic System Design的演进
LangGraph目前主要关注单个Agent的内部工作流。但未来的Agent系统将是多Agent协作的生态。如何用DAG来描述和管理多个Agent之间的任务分配、沟通协议、结果协调,将是下一个重要的研究方向。
- 展望:将图的概念扩展到多Agent协调层面,例如,每个Agent自身是一个子图,而Agent之间的通信和协作则通过更高层次的图来编排。
VIII. 智能未来,图谱铸就
有向无环图(DAG)为构建复杂、可控的AI Agent工作流提供了坚实且不可或缺的结构基础。LangGraph作为这一理念的卓越实践者,通过其强大的图构建能力、显式状态管理和灵活的控制流机制,将Agent从简单的链式调用提升到可预测、可控、可观测的智能实体。它不仅解决了当前Agent开发中的诸多痛点,也为未来更高级、更鲁棒、更具协作性的AI系统奠定了基石。理解并掌握DAG在LangGraph中的应用,是每一位致力于构建下一代智能Agent的开发者不可或缺的关键能力。