各位编程专家、架构师和对AI智能体系统充满好奇的朋友们,大家好!
今天,我们将深入探讨在构建复杂、动态的AI智能体系统时,两种核心的工作流模式:编排 (Orchestration) 与 编舞 (Choreography)。特别地,我们将聚焦于 LangGraph 这个强大的框架,分析在这两种模式下,LangGraph 如何帮助我们处理那些充满不确定性和多变性的“动态任务”。作为一名编程专家,我的目标是为大家提供一个既有理论深度又具实践指导意义的讲座,包含严谨的逻辑、丰富的代码示例,并以清晰易懂的语言呈现。
1. AI智能体与动态任务的挑战
在当今AI领域,构建能够自主思考、规划和执行任务的智能体(AI Agent)已成为前沿热点。这些智能体不再仅仅是简单的问答系统,它们需要与外部工具交互、进行复杂推理、处理不确定信息,甚至在执行过程中根据反馈动态调整策略。
动态任务 (Dynamic Tasks) 在这里指的是那些执行路径不固定、依赖于运行时条件、可能需要人机协作、或者涉及多步骤工具调用的任务。例如:
- 用户意图理解与任务分解: 用户输入“帮我预订下周从上海到北京的机票,并查找那边的酒店推荐”,智能体需要分解为机票查询、酒店查询两个子任务,并根据查询结果决定下一步行动。
- 复杂数据分析与报告生成: 智能体需要从多个数据源获取信息、执行ETL操作、进行统计分析,并根据分析结果决定报告的结构和内容。
- 故障诊断与修复: 智能体接收到系统告警后,需要执行一系列诊断命令、分析日志、识别问题,并尝试自动修复,若无法修复则上报人工处理。
- 人机协作流程: 智能体完成某一步骤后,需要等待人类审批或提供额外信息,然后才能继续执行。
传统的顺序执行或固定图(Fixed Graph)的工作流模式,在处理这类动态任务时显得力不从心。它们缺乏灵活性,难以适应运行时条件的变化。这就是 LangGraph 的用武之地。
LangGraph 是 LangChain 的一个扩展,它允许我们使用有向无环图 (DAG) 或有环图 (Cycle Graph) 的方式来定义智能体的行为。通过节点 (Nodes) 和边 (Edges) 的概念,LangGraph 提供了一种强大而直观的方式来构建复杂的、多步骤的、状态驱动的AI应用。它的核心优势在于能够表示并执行动态逻辑,包括条件分支、循环和人机协作,从而让智能体能够根据实时情况做出决策。
那么,在 LangGraph 的语境下,我们如何组织这些动态任务的执行流呢?这便引出了编排与编舞这两种模式的讨论。
2. 工作流模式:编排 (Orchestration) vs. 编舞 (Choreography)
在分布式系统和复杂业务流程设计中,编排和编舞是两种核心的工作流协调模式。理解它们的异同对于设计健壮、可维护的智能体系统至关重要。
2.1 编排 (Orchestration)
定义: 编排模式的核心思想是集中式控制。存在一个或少数几个中心化的“编排者”(Orchestrator),负责定义、管理和驱动整个工作流的执行。编排者明确地知道所有参与者的角色、任务以及它们之间的依赖关系,并按照预设或动态生成的逻辑,一步一步地调用各个参与者,并协调它们之间的交互。它就像一个乐队指挥,精确地指导每个乐手何时演奏、演奏什么。
核心特征:
- 中心控制点: 有一个明确的组件(编排器)来控制流程。
- 显式调用: 编排器直接调用参与者,或通过消息队列发送指令。
- 全局视图: 编排器对整个流程有清晰的端到端视图。
- 状态管理: 编排器通常负责维护整个流程的全局状态。
- 强耦合(相对而言): 编排器需要了解每个参与者的接口和行为。
优点:
- 清晰的流程可见性: 整个流程的执行路径清晰可见,易于理解和审计。
- 易于调试和监控: 由于是中心化控制,流程的每一步状态和遇到的问题都可以在编排器层面追踪。
- 强大的控制力: 编排器可以轻松地处理错误、实现重试逻辑、引入人工审批等复杂控制流。
- 一致性保证: 集中式的状态管理有助于维护数据的一致性。
- 适用于复杂业务逻辑: 当业务流程复杂且需要严格的顺序和条件判断时,编排模式更易于实现。
缺点:
- 单点瓶颈/故障: 编排器可能成为性能瓶颈或单点故障。
- 可伸缩性挑战: 随着系统规模的扩大,编排器可能难以扩展以处理所有交互。
- 较强的耦合: 编排器与参与者之间存在一定程度的耦合,修改参与者可能需要调整编排器。
- 缺乏自治性: 参与者缺乏自主性,完全由编排器驱动。
2.2 编舞 (Choreography)
定义: 编舞模式的核心思想是去中心化控制。系统中没有一个中心化的控制器。相反,各个参与者(服务、组件、智能体)通过事件驱动的方式进行交互。每个参与者都监听特定的事件,当接收到自己感兴趣的事件时,它会执行相应的操作,并可能发出新的事件,从而触发其他参与者的响应。这就像一群舞者,他们没有指挥,而是通过观察彼此的动作、听着音乐的节奏,自主地做出反应,共同完成一段舞蹈。
核心特征:
- 去中心化控制: 没有中心控制器,每个参与者都是独立的。
- 事件驱动: 参与者通过发布和订阅事件进行通信。
- 隐式流: 整体流程是事件序列的副产品,没有明确的全局视图。
- 局部状态: 每个参与者只维护自己的局部状态。
- 松耦合: 参与者之间通过事件契约解耦,不直接调用彼此。
优点:
- 高弹性与可伸缩性: 由于没有单点故障,系统更具弹性,并且可以独立扩展各个参与者。
- 松耦合: 参与者之间通过事件解耦,修改一个参与者通常不会影响其他参与者。
- 高自治性: 每个参与者都是独立的,可以自主地执行任务。
- 适用于微服务架构: 与微服务架构的理念非常契合。
- 易于引入新功能: 只需添加新的事件监听者即可扩展功能。
缺点:
- 流程可见性差: 难以获得整个流程的端到端视图,追踪事件流可能非常复杂。
- 调试和监控困难: 跨多个服务追踪问题和事件链条非常具有挑战性,需要复杂的分布式追踪工具。
- 复杂性管理: 随着事件类型和参与者数量的增加,事件风暴 (Event Storm) 和回调地狱 (Callback Hell) 的风险增高。
- 事务一致性挑战: 跨多个服务的分布式事务(例如Saga模式)实现复杂。
- 缺乏显式控制: 难以在流程中插入人工干预或复杂的决策逻辑。
2.3 比较表格
| 特征 | 编排 (Orchestration) | 编舞 (Choreography) |
|---|---|---|
| 控制方式 | 集中式,由一个编排器驱动 | 去中心化,由事件驱动 |
| 耦合度 | 相对较高,编排器需了解参与者接口 | 松散,通过事件契约解耦 |
| 通信方式 | 直接调用或消息指令 | 发布/订阅事件 |
| 流程可见性 | 高,编排器提供全局视图 | 低,流程是事件序列的副产品 |
| 调试难度 | 相对较低,集中式日志和状态 | 较高,需要分布式追踪和日志聚合 |
| 可伸缩性 | 编排器可能成为瓶颈,横向扩展复杂 | 高,各参与者可独立扩展 |
| 弹性/容错 | 编排器是单点故障风险 | 高,单个参与者故障不影响整体 |
| 适用场景 | 复杂业务流程、严格顺序、强一致性、需人工干预 | 微服务、高伸缩性、高弹性、事件驱动系统 |
| LangGraph中 | LangGraph 自身就是一种强大的编排器 | 可用于构建与外部系统协同的组件,或模拟内部事件驱动 |
3. LangGraph 基础:构建动态工作流的基石
在深入探讨模式选择之前,我们先快速回顾一下 LangGraph 的核心概念,它们是实现编排或编舞的基础。
3.1 核心概念
- 图状态 (Graph State): 这是 LangGraph 的核心,一个可变的数据结构,代表了整个工作流的当前状态。所有节点都可以读取和更新这个状态。它通常是一个字典,其中包含键值对,用于存储模型输出、工具结果、用户输入、决策标志等。
- 节点 (Nodes): 图中的基本处理单元。每个节点都是一个可执行的函数,它可以是:
- LLM 调用: 调用大型语言模型进行推理、生成文本。
- 工具调用: 执行外部工具(如API调用、数据库查询、文件操作)。
- 自定义函数: 任何Python函数,用于数据处理、逻辑判断、状态更新等。
- 人机节点: 暂停执行,等待用户输入或审批。
- 边 (Edges): 连接节点的路径,决定了工作流的流向。
- 直接边 (Direct Edges): 从一个节点到另一个节点的无条件跳转。
- 条件边 (Conditional Edges): 根据图状态或前一个节点的输出动态决定下一个节点。这是实现动态任务的关键。
- Entry Point & Finish Point: 图的起始和结束点。
- Checkpoints: LangGraph 允许将图的当前状态保存到持久化存储中。这对于实现人机协作、长时任务恢复、以及调试非常有用。
3.2 简单 LangGraph 示例:工具调用与条件判断
让我们通过一个简单的例子来展示 LangGraph 的基本构造。假设我们有一个智能体,它可以回答问题,如果问题需要实时信息,它会调用一个搜索工具。
from typing import TypedDict, Annotated, List, Union
from langchain_core.tools import tool
from langchain_core.messages import BaseMessage, HumanMessage, AIMessage
from langchain_openai import ChatOpenAI
from langgraph.graph import StateGraph, END, START
import operator
# 1. 定义图状态 (Graph State)
# 这是一个字典,用于在节点之间传递信息
class AgentState(TypedDict):
messages: Annotated[List[BaseMessage], operator.add]
# 可能需要存储工具调用结果、决策等
# 2. 定义工具
@tool
def search_web(query: str):
"""Searches the web for the given query."""
print(f"DEBUG: Performing web search for: {query}")
# 模拟网络搜索结果
if "最新新闻" in query:
return "最新的AI新闻:AI大模型在医疗领域取得突破性进展。"
elif "天气" in query:
return "上海今天多云,气温20-28摄氏度。"
else:
return f"搜索结果:关于 '{query}' 的信息。"
tools = [search_web]
llm = ChatOpenAI(model="gpt-4o", temperature=0) # 假设已配置OpenAI API密钥
# 3. 定义节点函数
# 3.1 LLM 节点:生成响应或工具调用
def call_llm(state: AgentState):
messages = state["messages"]
# 绑定工具,让LLM知道哪些工具可用
response = llm.bind_tools(tools).invoke(messages)
return {"messages": [response]}
# 3.2 工具节点:执行工具
def call_tool(state: AgentState):
messages = state["messages"]
last_message = messages[-1]
tool_calls = last_message.tool_calls
tool_outputs = []
for tool_call in tool_calls:
tool_name = tool_call["name"]
tool_args = tool_call["args"]
if tool_name == "search_web":
print(f"DEBUG: Calling tool search_web with args: {tool_args}")
output = search_web.invoke(tool_args)
tool_outputs.append(AIMessage(content=str(output), name=tool_name)) # 将工具输出作为AIMessage返回
else:
tool_outputs.append(AIMessage(content=f"Unknown tool: {tool_name}", name=tool_name))
return {"messages": tool_outputs}
# 4. 定义条件边函数
def route_agent(state: AgentState):
"""
根据LLM的输出决定下一步是继续LLM还是调用工具。
"""
last_message = state["messages"][-1]
if last_message.tool_calls:
print("DEBUG: LLM requested tool call. Routing to 'call_tool'.")
return "call_tool"
else:
print("DEBUG: LLM provided a final answer. Routing to 'END'.")
return "END"
# 5. 构建图
workflow = StateGraph(AgentState)
workflow.add_node("call_llm", call_llm)
workflow.add_node("call_tool", call_tool)
# 设置入口点
workflow.set_entry_point("call_llm")
# 从LLM节点出发的条件边
workflow.add_conditional_edges(
"call_llm", # 源节点
route_agent, # 条件函数
{
"call_tool": "call_tool", # 如果条件函数返回"call_tool",则去call_tool节点
"END": END # 如果条件函数返回"END",则结束
}
)
# 从工具节点出发,执行完工具后再次回到LLM,让LLM处理工具结果
workflow.add_edge("call_tool", "call_llm")
app = workflow.compile()
# 运行智能体
print("--- 智能体开始运行 ---")
inputs1 = {"messages": [HumanMessage(content="上海今天天气怎么样?")]}
for s in app.stream(inputs1):
print(s)
print("--- 任务结束 ---")
print("n--- 智能体开始运行 ---")
inputs2 = {"messages": [HumanMessage(content="给我讲个笑话。")]}
for s in app.stream(inputs2):
print(s)
print("--- 任务结束 ---")
"""
预期输出示例 (可能因LLM模型不同而略有差异):
--- 智能体开始运行 ---
DEBUG: Performing web search for: 上海今天天气怎么样?
DEBUG: LLM requested tool call. Routing to 'call_tool'.
{'call_llm': {'messages': [AIMessage(content='', additional_kwargs={'tool_calls': [{'id': 'call_SgG9D3j043t4', 'function': {'arguments': '{"query":"上海今天天气怎么样?"}', 'name': 'search_web'}}], 'name': 'search_web'})]}}
DEBUG: Calling tool search_web with args: {'query': '上海今天天气怎么样?'}
{'call_tool': {'messages': [AIMessage(content='上海今天多云,气温20-28摄氏度。', name='search_web')]}}
DEBUG: LLM provided a final answer. Routing to 'END'.
{'call_llm': {'messages': [AIMessage(content='上海今天多云,气温20-28摄氏度。', response_metadata={'token_usage': {'completion_tokens': 16, 'prompt_tokens': 105, 'total_tokens': 121}, 'model_name': 'gpt-4o', 'system_fingerprint': 'fp_3ec9b05763', 'finish_reason': 'stop'}, id='run-c3e0204d-e99d-4874-a690-335195e26978-0')]}}
{'__end__': {'messages': [HumanMessage(content='上海今天天气怎么样?'), AIMessage(content='', additional_kwargs={'tool_calls': [{'id': 'call_SgG9D3j043t4', 'function': {'arguments': '{"query":"上海今天天气怎么样?"}', 'name': 'search_web'}}], 'name': 'search_web'}), AIMessage(content='上海今天多云,气温20-28摄氏度。', name='search_web'), AIMessage(content='上海今天多云,气温20-28摄氏度。', response_metadata={'token_usage': {'completion_tokens': 16, 'prompt_tokens': 105, 'total_tokens': 121}, 'model_name': 'gpt-4o', 'system_fingerprint': 'fp_3ec9b05763', 'finish_reason': 'stop'}, id='run-c3e0204d-e99d-4874-a690-335195e26978-0')]}}
--- 任务结束 ---
--- 智能体开始运行 ---
DEBUG: LLM provided a final answer. Routing to 'END'.
{'call_llm': {'messages': [AIMessage(content='为什么计算机总是在找朋友?nn因为它们有很多字节(bytes)需要分享!', response_metadata={'token_usage': {'completion_tokens': 26, 'prompt_tokens': 78, 'total_tokens': 104}, 'model_name': 'gpt-4o', 'system_fingerprint': 'fp_3ec9b05763', 'finish_reason': 'stop'}, id='run-16c87910-184e-4f7f-a64f-4d2f093a1f87-0')]}}
{'__end__': {'messages': [HumanMessage(content='给我讲个笑话。'), AIMessage(content='为什么计算机总是在找朋友?nn因为它们有很多字节(bytes)需要分享!', response_metadata={'token_usage': {'completion_tokens': 26, 'prompt_tokens': 78, 'total_tokens': 104}, 'model_name': 'gpt-4o', 'system_fingerprint': 'fp_3ec9b05763', 'finish_reason': 'stop'}, id='run-16c87910-184e-4f7f-a64f-4d2f093a1f87-0')]}}
--- 任务结束 ---
"""
这个例子展示了 LangGraph 如何通过 StateGraph、add_node、add_conditional_edges 和共享的 AgentState 来构建一个动态的工作流。LLM 的输出决定了下一步是执行工具还是直接结束,这就是一个典型的编排行为。
4. 在 LangGraph 中实现编排
可以说,LangGraph 的设计哲学本身就非常倾向于编排模式。LangGraph 的 StateGraph 就是一个强大的编排器。它维护着一个全局的 AgentState,并根据预定义的节点、边和条件逻辑,精确地控制着流程的每一步。
4.1 核心原理
- 中心化状态:
AgentState是所有节点共享的单一数据源,它记录了整个流程的进展和结果。 - 显式流程定义: 通过
add_node、add_edge和add_conditional_edges,我们明确地定义了所有可能的执行路径。 - 决策点: 条件边函数(如
route_agent)充当了流程中的决策点,根据AgentState的内容决定下一步应该执行哪个节点。 - LLM 作为决策者: 在许多 LangGraph 应用中,LLM 本身被用作智能的决策者,根据上下文和工具集,决定是回答问题、调用工具,还是寻求更多信息。
4.2 案例:智能项目管理助手
让我们构建一个更复杂的智能体,作为项目管理助手。它需要处理以下动态任务:
- 用户提出需求。
- LLM 判断需求类型:
- 查询任务: 需要从项目数据库中查询信息(例如,项目状态、成员、截止日期)。
- 创建任务: 需要在项目管理系统中创建一个新任务。
- 需要澄清: LLM 无法直接理解或执行,需要向用户提问。
- 需要人工审批: 涉及敏感操作或高风险决策,需要人类专家介入。
- 根据判断结果,智能体调用相应的工具,或者与用户交互,或者进入人工审批流程。
- 执行完成后,将结果反馈给用户。
工具定义:
from datetime import datetime
import json
@tool
def query_project_database(project_id: str = None, task_id: str = None, status: str = None, assignee: str = None):
"""
Queries the project management database for information about projects or tasks.
Can filter by project ID, task ID, status, or assignee.
Returns a JSON string of relevant project/task data.
"""
print(f"DEBUG: Querying project database with: project_id={project_id}, task_id={task_id}, status={status}, assignee={assignee}")
# 模拟数据库查询结果
mock_data = {
"P101": {"name": "Website Redesign", "status": "In Progress", "due_date": "2024-12-31", "tasks": [
{"id": "T001", "name": "Design UI", "status": "Completed", "assignee": "Alice"},
{"id": "T002", "name": "Develop Backend", "status": "In Progress", "assignee": "Bob"},
]},
"P102": {"name": "Mobile App Launch", "status": "Planning", "due_date": "2025-03-01", "tasks": [
{"id": "T003", "name": "Market Research", "status": "Pending", "assignee": "Charlie"},
]}
}
results = []
if project_id:
if project_id in mock_data:
results.append(mock_data[project_id])
elif task_id:
for p_id, p_data in mock_data.items():
for task in p_data.get("tasks", []):
if task["id"] == task_id:
results.append(task)
break
elif status:
for p_id, p_data in mock_data.items():
if p_data["status"].lower() == status.lower():
results.append(p_data)
for task in p_data.get("tasks", []):
if task["status"].lower() == status.lower():
results.append(task)
elif assignee:
for p_id, p_data in mock_data.items():
for task in p_data.get("tasks", []):
if task["assignee"].lower() == assignee.lower():
results.append(task)
return json.dumps(results, indent=2, ensure_ascii=False)
@tool
def create_new_task(project_id: str, task_name: str, description: str, assignee: str, due_date: str):
"""
Creates a new task in the project management system.
Requires project ID, task name, description, assignee, and due date (YYYY-MM-DD).
Returns the ID of the newly created task.
"""
print(f"DEBUG: Creating new task: Project={project_id}, Name={task_name}, Assignee={assignee}, Due Date={due_date}")
# 模拟任务创建
task_id = f"T{len(project_id) + len(task_name) + len(assignee) % 1000:03d}" # 简单生成ID
print(f"DEBUG: Task '{task_name}' created with ID '{task_id}' for project '{project_id}'.")
return f"Task '{task_name}' (ID: {task_id}) has been created for project '{project_id}' and assigned to {assignee}, due by {due_date}."
project_tools = [query_project_database, create_new_task]
LangGraph 智能体定义:
from typing import TypedDict, Annotated, List, Union
from langchain_core.messages import BaseMessage, HumanMessage, AIMessage, ToolMessage
from langchain_openai import ChatOpenAI
from langgraph.graph import StateGraph, END, START
import operator
import json
# 1. 定义图状态 (AgentState)
# 除了消息,我们还可能需要一个字段来存储人类反馈或请求人工审批的标志
class AgentState(TypedDict):
messages: Annotated[List[BaseMessage], operator.add]
human_input: str # 用于接收人类反馈
needs_human_approval: bool # 标志是否需要人工审批
approval_granted: bool # 标志人工审批是否通过
# 2. 初始化LLM
llm = ChatOpenAI(model="gpt-4o", temperature=0)
# 3. 定义节点函数
# 3.1 LLM 节点:主要决策者
def call_llm(state: AgentState):
messages = state["messages"]
# 绑定所有工具,让LLM知道它能做什么
response = llm.bind_tools(project_tools).invoke(messages)
return {"messages": [response]}
# 3.2 工具节点:执行具体操作
def call_tool(state: AgentState):
messages = state["messages"]
last_message = messages[-1]
tool_calls = last_message.tool_calls
tool_outputs = []
for tool_call in tool_calls:
tool_name = tool_call["name"]
tool_args = tool_call["args"]
print(f"DEBUG: Calling tool '{tool_name}' with args: {tool_args}")
try:
if tool_name == "query_project_database":
output = query_project_database.invoke(tool_args)
elif tool_name == "create_new_task":
output = create_new_task.invoke(tool_args)
else:
output = f"Unknown tool: {tool_name}"
tool_outputs.append(ToolMessage(content=str(output), tool_call_id=tool_call["id"], name=tool_name))
except Exception as e:
tool_outputs.append(ToolMessage(content=f"Error calling tool {tool_name}: {e}", tool_call_id=tool_call["id"], name=tool_name))
return {"messages": tool_outputs}
# 3.3 人工审批节点:暂停流程,等待人类输入
def human_approval_node(state: AgentState):
print("n--- 等待人工审批 ---")
print(f"当前状态:{state['messages'][-1].content}")
print("请问您是否同意此操作?(y/n)")
# 这里可以集成一个外部的审批系统,例如通过API调用发送审批请求
# 为了演示,我们直接在控制台获取输入
user_response = input("您的选择: ").strip().lower()
if user_response == 'y':
print("人工审批通过。")
return {"approval_granted": True, "needs_human_approval": False, "messages": [HumanMessage(content="人工审批:同意")]}
else:
print("人工审批拒绝。")
return {"approval_granted": False, "needs_human_approval": False, "messages": [HumanMessage(content="人工审批:拒绝")]}
# 3.4 收集人类反馈节点(如果LLM需要澄清)
def collect_human_feedback(state: AgentState):
print("n--- 需要人类反馈 ---")
print(f"智能体消息: {state['messages'][-1].content}")
feedback = input("请提供更多信息: ").strip()
return {"human_input": feedback, "messages": [HumanMessage(content=f"用户反馈: {feedback}")]}
# 4. 定义条件边函数:这是编排的核心决策逻辑
def route_agent_for_project_manager(state: AgentState):
"""
根据LLM的输出决定下一步是调用工具、结束、请求人工审批还是收集人类反馈。
"""
last_message = state["messages"][-1]
# 检查是否需要人工审批(LLM可能通过特殊指令触发)
if "ACTION: HUMAN_APPROVAL_REQUIRED" in last_message.content:
print("DEBUG: LLM requested human approval. Routing to 'human_approval'.")
return "human_approval"
# 检查是否需要人类反馈(LLM可能通过特殊指令触发)
if "ACTION: COLLECT_HUMAN_FEEDBACK" in last_message.content:
print("DEBUG: LLM requested human feedback. Routing to 'collect_human_feedback'.")
return "collect_human_feedback"
# 如果LLM请求工具调用
if last_message.tool_calls:
print("DEBUG: LLM requested tool call. Routing to 'call_tool'.")
return "call_tool"
else:
# 如果LLM提供了最终答案
print("DEBUG: LLM provided a final answer. Routing to 'END'.")
return "END"
# 定义一个在人工审批后或人类反馈后,重新回到LLM的路由函数
def route_after_human_interaction(state: AgentState):
if state.get("approval_granted") is False:
print("DEBUG: Approval denied. Routing to 'END'.")
return "END" # 审批被拒绝,流程结束
# 无论是审批通过还是提供了反馈,都回到LLM继续处理
print("DEBUG: Human interaction complete. Routing back to 'call_llm'.")
return "call_llm"
# 5. 构建图
workflow = StateGraph(AgentState)
workflow.add_node("call_llm", call_llm)
workflow.add_node("call_tool", call_tool)
workflow.add_node("human_approval", human_approval_node)
workflow.add_node("collect_human_feedback", collect_human_feedback)
workflow.set_entry_point("call_llm")
# 从LLM节点出发的条件边
workflow.add_conditional_edges(
"call_llm",
route_agent_for_project_manager,
{
"call_tool": "call_tool",
"human_approval": "human_approval",
"collect_human_feedback": "collect_human_feedback",
"END": END
}
)
# 工具执行完毕后,回到LLM让它处理工具结果
workflow.add_edge("call_tool", "call_llm")
# 人工审批或人类反馈后,回到LLM继续处理
workflow.add_conditional_edges(
"human_approval",
route_after_human_interaction, # 审批通过回到LLM,审批拒绝结束
{
"call_llm": "call_llm",
"END": END
}
)
workflow.add_edge("collect_human_feedback", "call_llm")
app = workflow.compile()
# 运行智能体
print("--- 智能项目管理助手开始运行 ---")
# 场景1: 查询任务
print("n--- 场景1: 查询项目 P101 的所有任务 ---")
inputs1 = {"messages": [HumanMessage(content="查询项目 P101 的所有任务。")]}
for s in app.stream(inputs1):
print(s)
print("--- 任务结束 ---")
# 场景2: 创建任务 (可能需要人工审批)
# 为了演示,我们让LLM识别到"高风险"关键词时触发人工审批
# 实际中LLM需要被提示如何触发ACTION: HUMAN_APPROVAL_REQUIRED
print("n--- 场景2: 创建高风险任务 ---")
# 假设LLM被提示,当任务描述包含"高风险"时,输出 "ACTION: HUMAN_APPROVAL_REQUIRED"
# 为了简化,这里直接模拟LLM输出
class MockLLMForApproval:
def invoke(self, messages):
last_msg_content = messages[-1].content
if "高风险" in last_msg_content:
return AIMessage(content="ACTION: HUMAN_APPROVAL_REQUIREDn创建高风险任务需要人工审批。")
return llm.bind_tools(project_tools).invoke(messages)
# 临时替换LLM,用于演示审批流程
_original_llm = llm
llm = MockLLMForApproval()
app_approval_test = workflow.compile() # 重新编译以使用新的LLM
inputs2 = {"messages": [HumanMessage(content="请在项目P101中创建一个新任务:'进行安全审计',描述:'这是一项高风险的安全审计任务,请Bob在2024-11-01前完成。'")]}
for s in app_approval_test.stream(inputs2):
print(s)
print("--- 任务结束 ---")
llm = _original_llm # 恢复原始LLM
# 场景3: LLM需要澄清 (模拟)
# 假设LLM被提示,当信息不足时,输出 "ACTION: COLLECT_HUMAN_FEEDBACK"
class MockLLMForFeedback:
def invoke(self, messages):
last_msg_content = messages[-1].content
if "我不太明白" in last_msg_content: # 模拟LLM识别到需要更多信息
return AIMessage(content="ACTION: COLLECT_HUMAN_FEEDBACKn我不太明白您想做什么,请提供更多细节。")
return llm.bind_tools(project_tools).invoke(messages)
_original_llm = llm
llm = MockLLMForFeedback()
app_feedback_test = workflow.compile() # 重新编译以使用新的LLM
print("n--- 场景3: LLM需要澄清 ---")
inputs3 = {"messages": [HumanMessage(content="帮我处理一下那个事情,我不太明白。")]}
for s in app_feedback_test.stream(inputs3):
print(s)
print("--- 任务结束 ---")
llm = _original_llm # 恢复原始LLM
"""
预期输出示例 (部分):
--- 智能项目管理助手开始运行 ---
--- 场景1: 查询项目 P101 的所有任务 ---
{'call_llm': {'messages': [AIMessage(content='', additional_kwargs={'tool_calls': [{'id': 'call_pM9D...Q0g', 'function': {'arguments': '{"project_id":"P101"}', 'name': 'query_project_database'}}]}, response_metadata={'token_usage': {'completion_tokens': 17, 'prompt_tokens': 161, 'total_tokens': 178}, 'model_name': 'gpt-4o', 'system_fingerprint': 'fp_3ec9b05763', 'finish_reason': 'tool_calls'}, id='run-2a5c...2c-0')]}}
DEBUG: LLM requested tool call. Routing to 'call_tool'.
DEBUG: Calling tool 'query_project_database' with args: {'project_id': 'P101'}
{'call_tool': {'messages': [ToolMessage(content='[n {n "name": "Website Redesign",n "status": "In Progress",n "due_date": "2024-12-31",n "tasks": [n {n "id": "T001",n "name": "Design UI",n "status": "Completed",n "assignee": "Alice"n },n {n "id": "T002",n "name": "Develop Backend",n "status": "In Progress",n "assignee": "Bob"n }n ]n }n]', tool_call_id='call_pM9D...Q0g', name='query_project_database')]}}
DEBUG: LLM provided a final answer. Routing to 'END'.
{'call_llm': {'messages': [AIMessage(content='项目 P101 "Website Redesign" 包含以下任务:nn- 任务 ID T001: "Design UI", 状态 "Completed", 负责人 "Alice"n- 任务 ID T002: "Develop Backend", 状态 "In Progress", 负责人 "Bob"', response_metadata={'token_usage': {'completion_tokens': 86, 'prompt_tokens': 267, 'total_tokens': 353}, 'model_name': 'gpt-4o', 'system_fingerprint': 'fp_3ec9b05763', 'finish_reason': 'stop'}, id='run-774f...71-0')]}}
{'__end__': {'messages': [HumanMessage(content='查询项目 P101 的所有任务。'), AIMessage(content='', additional_kwargs={'tool_calls': [{'id': 'call_pM9D...Q0g', 'function': {'arguments': '{"project_id":"P101"}', 'name': 'query_project_database'}}]}, response_metadata={'token_usage': {'completion_tokens': 17, 'prompt_tokens': 161, 'total_tokens': 178}, 'model_name': 'gpt-4o', 'system_fingerprint': 'fp_3ec9b05763', 'finish_reason': 'tool_calls'}, id='run-2a5c...2c-0'), ToolMessage(content='[n {n "name": "Website Redesign",n "status": "In Progress",n "due_date": "2024-12-31",n "tasks": [n {n "id": "T001",n "name": "Design UI",n "status": "Completed",n "assignee": "Alice"n },n {n "id": "T002",n "name": "Develop Backend",n "status": "In Progress",n "assignee": "Bob"n }n ]n }n]', tool_call_id='call_pM9D...Q0g', name='query_project_database'), AIMessage(content='项目 P101 "Website Redesign" 包含以下任务:nn- 任务 ID T001: "Design UI", 状态 "Completed", 负责人 "Alice"n- 任务 ID T002: "Develop Backend", 状态 "In Progress", 负责人 "Bob"', response_metadata={'token_usage': {'completion_tokens': 86, 'prompt_tokens': 267, 'total_tokens': 353}, 'model_name': 'gpt-4o', 'system_fingerprint': 'fp_3ec9b05763', 'finish_reason': 'stop'}, id='run-774f...71-0')]}}
--- 任务结束 ---
--- 场景2: 创建高风险任务 ---
--- 等待人工审批 ---
当前状态:ACTION: HUMAN_APPROVAL_REQUIRED
创建高风险任务需要人工审批。
请问您是否同意此操作?(y/n)
您的选择: y
人工审批通过。
{'human_approval': {'approval_granted': True, 'needs_human_approval': False, 'messages': [HumanMessage(content='人工审批:同意')]}}
DEBUG: Human interaction complete. Routing back to 'call_llm'.
{'call_llm': {'messages': [AIMessage(content='', additional_kwargs={'tool_calls': [{'id': 'call_J33c...b0K', 'function': {'arguments': '{"project_id":"P101","task_name":"进行安全审计","description":"这是一项高风险的安全审计任务","assignee":"Bob","due_date":"2024-11-01"}', 'name': 'create_new_task'}}]}, response_metadata={'token_usage': {'completion_tokens': 42, 'prompt_tokens': 200, 'total_tokens': 242}, 'model_name': 'gpt-4o', 'system_fingerprint': 'fp_3ec9b05763', 'finish_reason': 'tool_calls'}, id='run-507c...7e-0')]}}
DEBUG: LLM requested tool call. Routing to 'call_tool'.
DEBUG: Calling tool 'create_new_task' with args: {'project_id': 'P101', 'task_name': '进行安全审计', 'description': '这是一项高风险的安全审计任务', 'assignee': 'Bob', 'due_date': '2024-11-01'}
DEBUG: Task '进行安全审计' created with ID 'T111' for project 'P101'.
{'call_tool': {'messages': [ToolMessage(content="Task '进行安全审计' (ID: T111) has been created for project 'P101' and assigned to Bob, due by 2024-11-01.", tool_call_id='call_J33c...b0K', name='create_new_task')]}}
DEBUG: LLM provided a final answer. Routing to 'END'.
{'call_llm': {'messages': [AIMessage(content='好的,我已经为您在项目 P101 中创建了新任务:“进行安全审计”(ID: T111),并已分配给 Bob,截止日期为 2024-11-01。', response_metadata={'token_usage': {'completion_tokens': 86, 'prompt_tokens': 324, 'total_tokens': 410}, 'model_name': 'gpt-4o', 'system_fingerprint': 'fp_3ec9b05763', 'finish_reason': 'stop'}, id='run-e40d...11-0')]}}
{'__end__': {'messages': [HumanMessage(content='请在项目P101中创建一个新任务:'进行安全审计',描述:'这是一项高风险的安全审计任务,请Bob在2024-11-01前完成。''), AIMessage(content='ACTION: HUMAN_APPROVAL_REQUIREDn创建高风险任务需要人工审批。'), HumanMessage(content='人工审批:同意'), AIMessage(content='', additional_kwargs={'tool_calls': [{'id': 'call_J33c...b0K', 'function': {'arguments': '{"project_id":"P101","task_name":"进行安全审计","description":"这是一项高风险的安全审计任务","assignee":"Bob","due_date":"2024-11-01"}', 'name': 'create_new_task'}}]}, response_metadata={'token_usage': {'completion_tokens': 42, 'prompt_tokens': 200, 'total_tokens': 242}, 'model_name': 'gpt-4o', 'system_fingerprint': 'fp_3ec9b05763', 'finish_reason': 'tool_calls'}, id='run-507c...7e-0'), ToolMessage(content="Task '进行安全审计' (ID: T111) has been created for project 'P101' and assigned to Bob, due by 2024-11-01.", tool_call_id='call_J33c...b0K', name='create_new_task'), AIMessage(content='好的,我已经为您在项目 P101 中创建了新任务:“进行安全审计”(ID: T111),并已分配给 Bob,截止日期为 2024-11-01。', response_metadata={'token_usage': {'completion_tokens': 86, 'prompt_tokens': 324, 'total_tokens': 410}, 'model_name': 'gpt-4o', 'system_fingerprint': 'fp_3ec9b05763', 'finish_reason': 'stop'}, id='run-e40d...11-0')]}}
--- 任务结束 ---
--- 场景3: LLM需要澄清 ---
--- 需要人类反馈 ---
智能体消息: ACTION: COLLECT_HUMAN_FEEDBACK
我不太明白您想做什么,请提供更多细节。
请提供更多信息: 我想查一下所有未完成的任务。
{'collect_human_feedback': {'human_input': '我查一下所有未完成的任务。', 'messages': [HumanMessage(content='用户反馈: 我查一下所有未完成的任务。')]}}
DEBUG: Human interaction complete. Routing back to 'call_llm'.
{'call_llm': {'messages': [AIMessage(content='', additional_kwargs={'tool_calls': [{'id': 'call_jV7e...1_W', 'function': {'arguments': '{"status":"In Progress"}', 'name': 'query_project_database'}}]}, response_metadata={'token_usage': {'completion_tokens': 16, 'prompt_tokens': 177, 'total_tokens': 193}, 'model_name': 'gpt-4o', 'system_fingerprint': 'fp_3ec9b05763', 'finish_reason': 'tool_calls'}, id='run-1215...0b-0')]}}
DEBUG: LLM requested tool call. Routing to 'call_tool'.
DEBUG: Calling tool 'query_project_database' with args: {'status': 'In Progress'}
{'call_tool': {'messages': [ToolMessage(content='[n {n "name": "Website Redesign",n "status": "In Progress",n "due_date": "2024-12-31",n "tasks": [n {n "id": "T001",n "name": "Design UI",n "status": "Completed",n "assignee": "Alice"n },n {n "id": "T002",n "name": "Develop Backend",n "status": "In Progress",n "assignee": "Bob"n }n ]n },n {n "id": "T002",n "name": "Develop Backend",n "status": "In Progress",n "assignee": "Bob"n }n]', tool_call_id='call_jV7e...1_W', name='query_project_database')]}}
DEBUG: LLM provided a final answer. Routing to 'END'.
{'call_llm': {'messages': [AIMessage(content='好的,以下是当前“进行中”的任务和项目:nn**项目:**n- **Website Redesign (P101):** 状态 "In Progress", 截止日期 "2024-12-31"n - **任务:** "Develop Backend" (ID: T002), 状态 "In Progress", 负责人 "Bob"nn请问您还需要了解其他信息吗?', response_metadata={'token_usage': {'completion_tokens': 159, 'prompt_tokens': 301, 'total_tokens': 460}, 'model_name': 'gpt-4o', 'system_fingerprint': 'fp_3ec9b05763', 'finish_reason': 'stop'}, id='run-0604...1c-0')]}}
{'__end__': {'messages': [HumanMessage(content='帮我处理一下那个事情,我不太明白。'), AIMessage(content='ACTION: COLLECT_HUMAN_FEEDBACKn我不太明白您想做什么,请提供更多细节。'), HumanMessage(content='用户反馈: 我查一下所有未完成的任务。'), AIMessage(content='', additional_kwargs={'tool_calls': [{'id': 'call_jV7e...1_W', 'function': {'arguments': '{"status":"In Progress"}', 'name': 'query_project_database'}}]}, response_metadata={'token_usage': {'completion_tokens': 16, 'prompt_tokens': 177, 'total_tokens': 193}, 'model_name': 'gpt-4o', 'system_fingerprint': 'fp_3ec9b05763', 'finish_reason': 'tool_calls'}, id='run-1215...0b-0'), ToolMessage(content='[n {n "name": "Website Redesign",n "status": "In Progress",n "due_date": "2024-12-31",n "tasks": [n {n "id": "T001",n "name": "Design UI",n "status": "Completed",n "assignee": "Alice"n },n {n "id": "T002",
"""
这个项目管理助手的例子充分展示了 LangGraph 在编排模式下的强大能力:
- 多路径动态决策:
route_agent_for_project_manager函数根据 LLM 的输出(是否请求工具、人工审批或反馈)动态选择下一跳。 - 人机协作:
human_approval_node和collect_human_feedback节点允许流程暂停,等待人类的介入,然后根据人类的反馈继续或终止流程。 - 状态管理:
AgentState存储了整个对话历史、审批状态、人类输入等,确保了流程的连贯性。 - 工具调用编排: LLM 根据当前需求智能地选择并调用不同的工具,LangGraph 负责协调工具执行和结果回传。
总结: 在 LangGraph 中,编排模式是其自然的表达方式。通过显式的节点、条件边和共享状态,我们可以构建出高度可控、可预测且易于调试的复杂动态工作流。对于一个单一的、职责明确的智能体系统而言,编排通常是最佳选择。
5. 在 LangGraph 中实现编舞 (或模拟)
纯粹的编舞模式在 LangGraph 的单一图实例中是难以直接实现的,因为 LangGraph 本身就是围绕一个中心化的状态机进行编排的。它的边定义了显式的流转规则,这与编舞的去中心化、事件驱动的理念相悖。
然而,我们可以在以下两种场景中探讨“编舞”:
- 在 LangGraph 内部模拟编舞特性: 通过设计节点和状态,让节点对特定的“事件”或状态变化做出响应,而非严格地被上一个节点直接调用。
- 将 LangGraph 作为外部编舞系统中的一个参与者: 多个 LangGraph 实例或其他服务通过外部事件总线进行通信,形成一个更大的编舞系统。
5.1 方法一:在 LangGraph 内部模拟事件驱动的编舞
这种方法通过精心设计 AgentState 和条件边函数,使得节点看起来像是“监听”状态中的特定“事件”或标志,并根据这些事件自主触发。
核心思想:
AgentState中包含一个event_queue或status_flags字段。- 节点在完成任务后,不是直接返回下一个节点的名字,而是向
event_queue中添加一个事件,或者设置一个status_flag。 - 条件边函数则不再是简单地检查上一个 LLM 的意图,而是检查
event_queue中是否存在某个事件,或者某个status_flag是否被设置。多个条件边可以监听同一个事件。
案例:简化的多模块数据处理流程
假设一个智能体需要处理用户上传的数据:
- 接收数据。
- 数据清洗模块 (Node A):清洗数据,完成后发出
DATA_CLEANED事件。 - 数据验证模块 (Node B):验证数据,完成后发出
DATA_VALIDATED事件。 - 数据存储模块 (Node C):存储数据,完成后发出
DATA_STORED事件。 - 报告生成模块 (Node D):监听
DATA_STORED事件,生成报告。 - 通知模块 (Node E):监听
DATA_VALIDATED事件,如果验证失败则发送通知。
在这种设计中,数据清洗完成,会发出一个 DATA_CLEANED 事件。理论上,多个节点可以同时监听这个事件并开始工作。这里我们简化为顺序流程,但强调其“事件驱动”的理念。
from typing import TypedDict, Annotated, List
from langchain_core.messages import BaseMessage, HumanMessage, AIMessage
from langgraph.graph import StateGraph, END, START
import operator
import json
class DataProcessingState(TypedDict):
messages: Annotated[List[BaseMessage], operator.add]
raw_data: str
cleaned_data: str
is_valid: bool
processed_report: str
events: Annotated[List[str], operator.add] # 存储事件列表
# 节点函数
def receive_data(state: DataProcessingState):
raw_data = state["messages"][-1].content # 假设用户输入就是原始数据
print(f"DEBUG: Data received: {raw_data[:50]}...")
return {"raw_data": raw_data, "events": ["DATA_RECEIVED"]}
def clean_data(state: DataProcessingState):
raw_data = state["raw_data"]
cleaned_data = raw_data.replace("dirty_keyword", "").strip() # 模拟清洗
print(f"DEBUG: Data cleaned. Cleaned data: {cleaned_data[:50]}...")
return {"cleaned_data": cleaned_data, "events": ["DATA_CLEANED"]}
def validate_data(state: DataProcessingState):
cleaned_data = state["cleaned_data"]
is_valid = len(cleaned_data) > 10 # 模拟验证:数据长度大于10为有效
print(f"DEBUG: Data validated. Is valid: {is_valid}")
return {"is_valid": is_valid, "events": ["DATA_VALIDATED"]}
def store_data(state: DataProcessingState):
if not state["is_valid"]:
print("DEBUG: Data invalid, skipping storage.")
return {"messages": [AIMessage(content="数据无效,未存储。")], "events": ["DATA_STORAGE_SKIPPED"]}
cleaned_data = state["cleaned_data"]
# 模拟存储操作
storage_path = f"/data/processed_{len(cleaned_data)}.txt"
with open(f"temp_storage_{len(cleaned_data)}.txt", "w") as f: # 写入临时文件模拟
f.write(cleaned_data)
print(f"DEBUG: Data stored to {storage_path}.")
return {"messages": [AIMessage(content=f"数据已存储到 {storage_path}")], "events": ["DATA_STORED"]}
def generate_report(state: DataProcessingState):
if "DATA_STORED" not in state["events"]:
print("DEBUG: Data not stored, cannot generate report.")
return {"messages": [AIMessage(content="数据未存储,无法生成报告。")]}
cleaned_data = state["cleaned_data"]
report = f"处理报告:n原始数据长度: {len(state['raw_data'])}n清洗后数据长度: {len(cleaned_data)}n数据是否有效: {state['is_valid']}n生成时间: {datetime.now().isoformat()}"
print(f"DEBUG: Report generated.")
return {"processed_report": report, "messages": [AIMessage(content="报告已生成。")]}
def send_notification(state: DataProcessingState):
if "DATA_VALIDATED" not in state["events"]:
print("DEBUG: Data not validated, skipping notification.")
return {}
if not state["is_valid"]:
notification_msg = f"警告:数据验证失败!原始数据:{state['raw_data'][:50]}..."
print(f"DEBUG: Sending notification: {notification_msg}")
return {"messages": [AIMessage(content=f"通知:{notification_msg}")]}
else:
print("DEBUG: Data valid, no failure notification needed.")
return {} # 数据有效则不发送失败通知
# 条件边函数
def route_on_event(state: DataProcessingState):
latest_event = state["events"][-1] if state["events"] else None
print(f"DEBUG: Latest event: {latest_event}")
if latest_event == "DATA_RECEIVED":
return "clean_data"
elif latest_event == "DATA_CLEANED":
return "validate_data"
elif latest_event == "DATA_VALIDATED":
# 验证模块完成,可以并行触发存储和通知(如果LangGraph支持并行,这里简化为先存储)
# 如果要模拟并行,这里需要更复杂的路由或使用子图
return "store_data"
elif latest_event == "DATA_STORED":
return "generate_report"
elif latest_event == "DATA_STORAGE_SKIPPED": # 如果存储跳过,直接尝试生成报告(报告会说无法生成)
return "generate_report"
elif latest_event == "REPORT_GENERATED": # 报告生成后,可以结束或继续其他操作
return "send_notification" # 假设报告生成后,触发通知(这里是序列,但可设计为并行)
# 对于 send_notification 节点,它可能不产生新的事件,或者它本身就是流程的末端
# 为了演示,我们让它完成后就结束
return "END" # 默认结束
# 构建图
workflow = StateGraph(DataProcessingState)
workflow.add_node("receive_data", receive_data)
workflow.add_node("clean_data", clean_data)
workflow.add_node("validate_data", validate_data)
workflow.add_node("store_data", store_data)
workflow.add_node("generate_report", generate_report)
workflow.add_node("send_notification", send_notification)
workflow.set_entry_point("receive_data")
# 所有的节点都通过检查最新事件来决定下一步
workflow.add_conditional_edges(
"receive_data", route_on_event,
{"clean_data": "clean_data"}
)
workflow.add_conditional_edges(
"clean_data", route_on_event,
{"validate_data": "validate_data"}
)
workflow.add_conditional_edges(
"validate_data", route_on_event,
{"store_data": "store_data"}
)
workflow.add_conditional_edges(
"store_data", route_on_event,
{"generate_report": "generate_report"}
)
workflow.add_conditional_edges(
"generate_report", route_on_event,
{"send_notification": "send_notification"}
)
workflow.add_conditional_edges(
"send_notification", route_on_event, # send_notification完成后就结束
{"END": END}
)
# 确保在某些情况下可以直接结束,例如数据无效跳过存储后
# 可以在 generate_report 之后直接 END,或者根据实际业务逻辑调整
workflow.add_edge("generate_report", "send_notification") # 报告生成后,总是尝试发送通知
app = workflow.compile()
# 运行智能体
print("--- 内部模拟编舞数据处理开始 ---")
inputs1 = {"messages": [HumanMessage(content="这是我的原始数据:user_id_123, product_A, quantity_10, price_100.0, dirty_keyword")]}
for s in app.stream(inputs1):
print(s)
print("--- 任务结束 ---")
print("n--- 内部模拟编舞数据处理 (短数据,验证失败) ---")
inputs2 = {"messages": [HumanMessage(content="短数据")]}
for s in app.stream(inputs2):
print(s)
print("--- 任务结束 ---")
"""
预期输出示例 (部分):
--- 内部模拟编舞数据处理开始 ---
DEBUG: Data received: 这是我的原始数据:user_id_123, product_A, quantity_10,...
{'receive_data': {'raw_data': '这是我的原始数据:user_id_123, product_A, quantity_10, price_100.0, dirty_keyword', 'events': ['DATA_RECEIVED']}}
DEBUG: Latest event: DATA_RECEIVED
DEBUG: Data cleaned. Cleaned data: 这是我的原始数据:user_id_123, product_A, quantity_10,...
{'clean_data': {'cleaned_data': '这是我的原始数据:user_id_123, product_A, quantity_10, price_100.0,', 'events': ['DATA_CLEANED']}}
DEBUG: Latest event: DATA_CLEANED
DEBUG: Data validated. Is valid: True
{'validate_data': {'is_valid': True, 'events': ['DATA_VALIDATED']}}
DEBUG: Latest event: DATA_VALIDATED
DEBUG: Data stored to /data/processed_50.txt.
{'store_data': {'messages': [AIMessage(content='数据已存储到 /data/processed_50.txt')], 'events': ['DATA_STORED']}}
DEBUG: Latest event: DATA_STORED
DEBUG: Report generated.
{'generate_report': {'processed_report': '处理报告:n原始数据长度: 74n清洗后数据长度: 50n数据是否有效: Truen生成时间: 2024-07-29T...', 'messages': [AIMessage(content='报告已生成。')]}}
DEBUG: Latest event: DATA_STORED
DEBUG: Data valid, no failure notification needed.
{'send_notification': {}}
DEBUG: Latest event: DATA_STORED
{'__end__': {'messages': [HumanMessage(content='这是我的原始数据:user_id_123, product_A, quantity_10, price_100.0, dirty_keyword'), AIMessage(content='数据已存储到 /data/processed_50.txt'), AIMessage(content='报告已生成。')], 'raw_data': '这是我的原始数据:user_id_123, product_A, quantity_10, price_100.0, dirty_keyword', 'cleaned_data': '这是我的原始数据:user_id_123, product_A, quantity_10, price_100.0,', 'is_valid': True, 'processed_report': '处理报告:n原始数据长度: 74n清洗后数据长度: 50n数据是否有效: Truen生成时间: 2024-07-29T...', 'events': ['DATA_RECEIVED', 'DATA_CLEANED', 'DATA_VALIDATED', 'DATA_STORED']}}
--- 任务结束 ---
--- 内部模拟编舞数据处理 (短数据,验证失败) ---
DEBUG: Data received: 短数据...
{'receive_data': {'raw_data': '短数据', 'events': ['DATA_RECEIVED']}}
DEBUG: Latest event: DATA_RECEIVED
DEBUG: Data cleaned. Cleaned data: 短数据...
{'clean_data': {'cleaned_data': '短数据', 'events': ['DATA_CLEANED']}}
DEBUG: Latest event: DATA_CLEANED
DEBUG: Data validated. Is valid: False
{'validate_data': {'is_valid': False, 'events': ['DATA_VALIDATED']}}
DEBUG: Latest event: DATA_VALIDATED
DEBUG: Data invalid, skipping storage.
{'store_data': {'messages': [AIMessage(content='数据无效,未存储。')], 'events': ['DATA_STORAGE_SKIPPED']}}
DEBUG: Latest event: DATA_STORAGE_SKIPPED
DEBUG: Data not stored, cannot generate report.
{'generate_report': {'messages': [AIMessage(content='数据未存储,无法生成报告。')]}}
DEBUG: Latest event: DATA_STORAGE_SKIPPED
DEBUG: Sending notification: 警告:数据验证失败!原始数据:短数据...
{'send_notification': {'messages': [AIMessage(content='通知:警告:数据验证失败!原始数据:短数据...')]}}
DEBUG: Latest event: DATA_STORAGE_SKIPPED
{'__end__': {'messages': [HumanMessage(content='短数据'), AIMessage(content='数据无效,未存储。'), AIMessage(content='数据未存储,无法生成报告。'), AIMessage(content='通知:警告:数据验证失败!原始数据:短数据...')], 'raw_data': '短数据', 'cleaned_data': '短数据', 'is_valid': False, 'events': ['DATA_RECEIVED', 'DATA_CLEANED', 'DATA_VALIDATED', 'DATA_STORAGE_SKIPPED']}}
--- 任务结束 ---
"""
这个例子虽然在 LangGraph 内部仍然使用条件边进行编排,但通过 events 列表模拟了事件驱动的特性。每个节点完成任务后,会向 events 列表中添加一个事件,后续的路由逻辑(route_on_event)则根据这个事件来决定流向。send_notification 节点也可以被视为一个“监听者”,它检查 is_valid 状态来决定是否执行。
这种“内部编舞”的优点是:
- 稍微增强了模块化: 节点更多地关注于自身任务和事件发布,而不是直接调用下一个节点。
- 更容易并行扩展: 如果 LangGraph 支持并行执行,可以有多个节点同时监听同一个事件并独立启动。
缺点是:
- 仍然是编排: 最终的流转逻辑仍然集中在
route_on_event函数中,没有完全去中心化。 - 调试复杂: 事件链追踪可能比直接的函数调用更困难。
5.2 方法二:多个 LangGraph 智能体在外部编舞系统中协作
这是更接近“纯粹编舞”的场景。在这种架构中:
- 每个 LangGraph 实例都是一个独立的、功能专一的微服务或自治智能体。
- 这些 LangGraph 实例各自负责其内部的编排逻辑。
- 它们之间不直接调用,而是通过一个外部消息队列(如 Kafka, RabbitMQ, Redis Pub/Sub)或事件总线进行异步通信。
- 每个 LangGraph 实例监听它感兴趣的事件,并发布它产生的事件。
架构示意:
+----------------+ +----------------+ +----------------+
| LangGraph A | | LangGraph B | | LangGraph C |
| (订单处理代理) | ----> | (库存管理代理) | ----> | (物流调度代理) |
+----------------+ +----------------+ +----------------+
| ^ ^
| | |
+---------- Event Bus (e.g., Kafka) ---------------+
(OrderCreated, ItemReserved, ShippingScheduled)
概念代码片段(非完整可运行代码,仅示意):
LangGraph A (订单处理代理):
# order_processing_agent.py
from langgraph.graph import StateGraph, END
from typing import TypedDict, List, Annotated
import operator
# 假设有一个外部消息发布者
from external_messaging import publish_event
class OrderState(TypedDict):
order_details: dict
order_id: str
messages: Annotated[List[str], operator.add]
def process_order_node(state: OrderState):
# 模拟订单处理逻辑
order_id = "ORD" + str(hash(json.dumps(state["order_details"])))[:6]
print(f"LangGraph A: Processing order {order_id}...")
# 假设订单处理成功,发布 OrderCreated 事件
publish_event("OrderCreated", {"order_id": order_id, "items": state["order_details"]["items"]})
return {"order_id": order_id, "messages": [f"Order {order_id} processed, OrderCreated event published."]}
workflow_a = StateGraph(OrderState)
workflow_a.add_node("process_order", process_order_node)
workflow_a.set_entry_point("process_order")
workflow_a.add_edge("process_order", END) # 订单处理完成后,LangGraph A 的本次执行结束
app_a = workflow_a.compile()
# 如何触发 LangGraph A:
# app_a.invoke({"order_details": {"customer_id": "C123", "items": [{"id": "P001", "qty": 2}]}})
LangGraph B (库存管理代理):
# inventory_management_agent.py
from langgraph.graph import StateGraph, END
from typing import TypedDict, List, Annotated
import operator
from external_messaging import subscribe_event, publish_event
class InventoryState(TypedDict):
order_id: str
items: List[dict]
inventory_reserved: bool
messages: Annotated[List[str], operator.add]
def reserve_inventory_node(state: InventoryState):
print(f"LangGraph B: Reserving inventory for order {state['order_id']} items: {state['items']}")
# 模拟库存预留逻辑
success = True # 假设总是成功
if success:
publish_event("ItemReserved", {"order_id": state['order_id'], "status": "success"})
return {"inventory_reserved": True, "messages": [f"Inventory reserved for order {state['order_id']}."]}
else:
publish_event("ItemReservationFailed", {"order_id": state['order_id'], "status": "failed"})
return {"inventory_reserved": False, "messages": [f"Inventory reservation failed for order {state['order_id']}."]}
workflow_b = StateGraph(InventoryState)
workflow_b.add_node("reserve_inventory", reserve_inventory_node)
workflow_b.set_entry_point("reserve_inventory")
workflow_b.add_edge("reserve_inventory", END)
app_b = workflow_b.compile()
# 监听 OrderCreated 事件并触发 LangGraph B:
def on_order_created(event_data):
print(f"LangGraph B Listener: Received OrderCreated event: {event_data}")
# 触发 LangGraph B 实例
app_b.invoke({"order_id": event_data["order_id"], "items": event_data["items"], "inventory_reserved": False})
# subscribe_event("OrderCreated", on_order_created)
LangGraph C (物流调度代理):
# shipping_agent.py
from langgraph.graph import StateGraph, END
from typing import TypedDict, List, Annotated
import operator
from external_messaging import subscribe_event
class ShippingState(TypedDict):
order_id: str
shipping_scheduled: bool
messages: Annotated[List[str], operator.add]
def schedule_shipping_node(state: ShippingState):
print(f"LangGraph C: Scheduling shipping for order {state['order_id']}")
# 模拟物流调度逻辑
# ...
return {"shipping_scheduled": True, "messages": [f"Shipping scheduled for order {state['order_id']}."]}
workflow_c = StateGraph(ShippingState)
workflow_c.add_node("schedule_shipping", schedule_shipping_node)
workflow_c.set_entry_point("schedule_shipping")
workflow_c.add_edge("schedule_shipping", END)
app_c = workflow_c.compile()
# 监听 ItemReserved 事件并触发 LangGraph C:
def on_item_reserved(event_data):
if event_data["status"] == "success":
print(f"LangGraph C Listener: Received ItemReserved event (success): {event_data}")
# 触发 LangGraph C 实例
app_c.invoke({"order_id": event_data["order_id"], "shipping_scheduled": False})
else:
print(f"LangGraph C Listener: Received ItemReserved event (failed): {event_data}")
# 处理预留失败的情况,例如通知用户或进行补偿
# subscribe_event("ItemReserved", on_item_reserved)
在这种“外部编舞”模式中,每个 LangGraph 实例都是一个自治的、内部编排的智能体,它们通过松耦合的事件进行通信。
优点:
- 高度解耦和自治: 每个 LangGraph 实例可以独立开发、部署和扩展。
- 高弹性: 一个 LangGraph 实例的故障不会直接影响其他实例。
- 可伸缩性: 可以根据需要独立扩展各个 LangGraph 服务。
- 符合微服务架构: LangGraph 实例可以很好地作为微服务中的智能代理。
缺点:
- 端到端流程可见性差: 很难从一个地方全面了解整个业务流程的进展。
- 分布式事务复杂: 跨多个 LangGraph 实例的事务一致性(例如,Saga模式)实现起来非常复杂。
- 调试和追踪困难: 需要复杂的分布式追踪和日志聚合工具来理解事件流和错误。
- 事件契约管理: 事件的定义和版本管理变得至关重要。
6. LangGraph 中动态任务的最佳模式选择
现在我们回到了核心问题:在 LangGraph 中,哪种模式更适合处理动态任务?
6.1 关键考虑因素
在做选择时,我们需要权衡以下因素:
-
系统规模与复杂性:
- 单一、紧密耦合的智能体: 如果你的AI应用是一个单一的、职责明确的智能体,内部逻辑复杂但边界清晰,那么编排是更自然的匹配。
- 多个、松散耦合的服务/智能体: 如果你的系统由多个自治的AI智能体或微服务组成,它们需要独立运行并异步协作,那么外部编舞模式更合适。
-
流程可见性与控制需求:
- 需要清晰的端到端视图和强控制: 如果你对流程的每一步都需要严格控制、易于审计和调试,并且需要频繁介入