在构建复杂的AI系统时,多智能体(Multi-Agent System, MAS)架构已成为一种强大而灵活的范式。LangChain Graph(LangGraph)作为LangChain生态系统的核心组件,为我们提供了定义和执行这些智能体工作流的强大能力。然而,如何有效地协调这些智能体,使其协同工作以解决复杂问题,是设计MAS时的关键挑战。本文将深入探讨两种截然不同的智能体协调模式:中心化指挥(Supervisor)与去中心化协同(Swarm),并结合LangGraph的代码实现,详细阐述它们的差异、适用场景以及优缺点。
第一部分:引言——复杂问题的智能体解决方案
随着大型语言模型(LLM)能力的飞速发展,单个LLM在处理开放域、多步骤或需要专业知识的任务时,往往会遇到瓶颈。多智能体系统通过将复杂问题分解为多个子任务,并分配给具有不同能力或角色的智能体,从而能够更高效、更鲁棒地解决问题。
LangGraph正是为构建这类系统而生。它允许我们将智能体定义为图中的节点,智能体之间的交互和状态流转定义为边。通过这种方式,我们可以清晰地可视化和控制智能体的工作流程。然而,在设计智能体图时,一个核心决策点在于如何建立智能体之间的控制和协作机制。这引出了我们今天讨论的两个主要范式:Supervisor模式和Swarm模式。
第二部分:中心化指挥——Supervisor模式
2.1 概念与原理
Supervisor模式,顾名思义,是一种中心化的控制模式。在这种模式下,一个或少数几个特设的“主管”智能体(Supervisor Agent)负责整个工作流的规划、任务分配、进度监控和决策。其他智能体(Worker Agents)则作为执行者,接收主管的指令,完成特定任务,并将结果反馈给主管。主管根据反馈和整体目标,决定下一步由哪个智能体执行,或者如何调整策略。
这种模式类似于项目经理与团队成员的关系。项目经理(Supervisor)了解项目的整体蓝图,将任务拆分给不同的团队成员(Workers),并协调他们的工作,确保项目按计划进行。
优点:
- 控制力强: 主管智能体对整个流程有完全的控制,易于理解和调试。
- 结构清晰: 任务流转路径明确,责任划分清晰。
- 状态管理集中: 整体状态由主管统一维护,避免冲突。
- 适用于线性或树状任务: 对于有明确先后顺序或层级关系的任务非常有效。
缺点:
- 单点瓶颈: 主管智能体可能成为性能瓶颈,尤其是在处理大量并发任务时。
- 单点故障: 如果主管智能体出现问题,整个系统可能停摆。
- 扩展性受限: 增加新智能体可能需要修改主管的决策逻辑。
- 不够灵活: 对突发情况或需要快速适应的环境响应能力较弱。
2.2 LangGraph中的Supervisor实现
在LangGraph中实现Supervisor模式,通常涉及以下几个关键要素:
- 定义主管智能体(Supervisor Node): 这是一个特殊的节点,其职责是接收当前状态,根据预设的逻辑或LLM的推理,决定下一个要激活的智能体或任务。
- 定义工作智能体(Worker Nodes): 这些是执行具体任务的节点,它们接收主管传递的输入,执行任务,并将结果返回给主管。
- 条件性边(Conditional Edges): 这是实现主管决策的关键。主管节点的输出会作为条件,决定图的下一步走向。
让我们通过一个具体的例子来演示:一个代码生成与审查的流程。
任务场景: 用户提出一个编程需求,系统需要:
- 由一个“规划者”智能体(Planner)生成高层计划。
- 由一个“编码者”智能体(Coder)根据计划编写代码。
- 由一个“审查者”智能体(Reviewer)审查代码并提出改进意见。
- 如果审查通过,则结束;否则,将审查意见反馈给编码者重新修改。
- 一个“主管”智能体(Supervisor)负责整个流程的协调。
import operator
from typing import Annotated, TypedDict, List
from langchain_core.messages import BaseMessage, HumanMessage, AIMessage
from langchain_openai import ChatOpenAI
from langgraph.graph import StateGraph, END
# 1. 定义应用状态
# ---------------------------------------------------------------------
class AgentState(TypedDict):
"""
智能体共享的状态。
`messages`:所有智能体交互的历史消息。
`next_agent`: 决定下一个要激活的智能体。
`plan`: 规划者生成的初始计划。
`code`: 编码者生成的代码。
`review_comments`: 审查者提出的评论。
"""
messages: Annotated[List[BaseMessage], operator.add]
next_agent: str
plan: str
code: str
review_comments: str
# 2. 初始化LLM模型
# ---------------------------------------------------------------------
# 假设你已经设置了 OPENAI_API_KEY 环境变量
llm = ChatOpenAI(model="gpt-4o", temperature=0.7)
# 3. 定义智能体(节点)
# ---------------------------------------------------------------------
class AgentNode:
def __init__(self, name: str, system_message: str):
self.name = name
self.agent = llm.bind_tools([]) # 智能体可以绑定工具,这里简化为无工具
def __call__(self, state: AgentState) -> dict:
messages = state["messages"]
# 将当前智能体的角色消息添加到历史中
messages.append(AIMessage(content=f"<{self.name}> 开始工作..."))
# 核心逻辑:调用LLM生成响应
response = self.agent.invoke(messages)
content = response.content
messages.append(AIMessage(content=f"<{self.name}> 完成工作。输出:n{content}"))
# 返回更新后的状态。具体的字段更新由每个智能体的逻辑决定。
return {"messages": messages}
# 具体的工作智能体
def planner_node(state: AgentState) -> dict:
"""规划者智能体:根据用户需求生成高层计划。"""
print("---PLANNER NODE ACTIVATED---")
messages = state["messages"]
user_query = messages[-1].content if messages and isinstance(messages[-1], HumanMessage) else "无明确用户查询"
prompt = f"""你是一个专业的软件开发规划者。
根据以下用户需求,生成一个详细的开发计划,包括主要步骤和模块。
用户需求:{user_query}
请直接输出计划内容。
"""
response = llm.invoke([HumanMessage(content=prompt)])
new_plan = response.content
messages.append(AIMessage(content=f"规划者已生成计划:n{new_plan}"))
return {"messages": messages, "plan": new_plan, "next_agent": "coder"} # 主管决定下一个是coder
def coder_node(state: AgentState) -> dict:
"""编码者智能体:根据计划编写代码。"""
print("---CODER NODE ACTIVATED---")
plan = state["plan"]
previous_code = state.get("code", "")
review_comments = state.get("review_comments", "")
prompt_parts = [
"你是一个资深的Python工程师。根据以下计划和(可选的)审查意见,编写或修改代码。",
f"计划:n{plan}"
]
if previous_code:
prompt_parts.append(f"现有代码:n```pythonn{previous_code}n```")
if review_comments:
prompt_parts.append(f"审查意见:n{review_comments}n请根据审查意见修改代码。")
prompt_parts.append("请直接输出完整的Python代码块。")
response = llm.invoke([HumanMessage(content="n".join(prompt_parts))])
new_code = response.content
messages = state["messages"]
messages.append(AIMessage(content=f"编码者已生成/修改代码:n{new_code}"))
return {"messages": messages, "code": new_code, "next_agent": "reviewer"}
def reviewer_node(state: AgentState) -> dict:
"""审查者智能体:审查代码并提出意见。"""
print("---REVIEWER NODE ACTIVATED---")
code = state["code"]
prompt = f"""你是一个严格的代码审查专家。
请审查以下Python代码,并提出改进意见。
如果代码非常完美,可以直接回复 "APPROVE"。
如果需要修改,请清晰列出所有改进点。
代码:n```pythonn{code}n```
"""
response = llm.invoke([HumanMessage(content=prompt)])
comments = response.content
messages = state["messages"]
messages.append(AIMessage(content=f"审查者已提出意见:n{comments}"))
if "APPROVE" in comments.upper():
return {"messages": messages, "review_comments": comments, "next_agent": "supervisor_check_approve"}
else:
return {"messages": messages, "review_comments": comments, "next_agent": "supervisor_recode"}
# 4. 定义主管智能体及其决策逻辑 (这里将主管决策分散到不同的函数中,模拟一个中心化的决策流程)
# ---------------------------------------------------------------------
def supervisor_router(state: AgentState) -> str:
"""
主管智能体的核心决策逻辑。
根据 state["next_agent"] 的值来决定下一个节点。
"""
next_agent = state["next_agent"]
print(f"---SUPERVISOR ROUTER ACTIVATED. Next Agent Proposed: {next_agent}---")
if next_agent == "coder":
return "coder"
elif next_agent == "reviewer":
return "reviewer"
elif next_agent == "supervisor_check_approve":
return "check_approval"
elif next_agent == "supervisor_recode":
return "recode_decision"
else:
# 默认或错误处理,例如回到规划或直接结束
return "planner"
def check_approval(state: AgentState) -> str:
"""检查审查结果,决定是否结束。"""
print("---SUPERVISOR: CHECKING APPROVAL---")
review_comments = state["review_comments"]
if "APPROVE" in review_comments.upper():
print("---CODE APPROVED. ENDING PROCESS---")
return "end" # 结束
else:
print("---CODE NOT APPROVED. Rerouting to Coder for modification---")
# 实际这里应该返回 'coder',但是为了通过 LangGraph 的条件边,我们需要一个节点名
# 更好的做法是在 reviewer_node 中直接根据评论决定是返回 'coder' 还是 'end'
# 但为了演示 supervisor 集中决策,这里让 supervisor 再次介入
return "recode_decision" # 明确路由到重新编码决策
def recode_decision(state: AgentState) -> str:
"""决定是否需要重新编码。"""
print("---SUPERVISOR: RECODE DECISION---")
# 实际上,这里我们可以让 LLM 再次评估是否还有重新编码的必要,或者直接返回 coder
# 为了简化,我们直接路由回 coder
return "coder"
# 5. 构建LangGraph图
# ---------------------------------------------------------------------
workflow = StateGraph(AgentState)
# 添加节点
workflow.add_node("planner", planner_node)
workflow.add_node("coder", coder_node)
workflow.add_node("reviewer", reviewer_node)
# 添加主管决策节点 (这里我们用一个函数来模拟主管的决策,而不是一个独立的LLM节点,以简化)
# 实际上 supervisor_router 可以在一个独立的 LLM 调用中完成,根据状态输出下一步的节点名
workflow.add_node("check_approval", check_approval)
workflow.add_node("recode_decision", recode_decision)
# 设置入口点
workflow.set_entry_point("planner")
# 添加边
# 规划者完成后,主管决定下一步
workflow.add_edge("planner", "coder") # 规划者直接导向编码者,因为 planner_node 已经设置了 next_agent='coder'
workflow.add_edge("coder", "reviewer") # 编码者直接导向审查者,因为 coder_node 已经设置了 next_agent='reviewer'
# 审查者完成后,根据 next_agent 属性,由主管进行条件路由
workflow.add_conditional_edges(
"reviewer",
supervisor_router, # 这是一个函数,根据 state["next_agent"] 返回下一个节点名
{
"supervisor_check_approve": "check_approval",
"supervisor_recode": "recode_decision",
# 也可以直接在这里返回 "coder" 或 "end"
}
)
workflow.add_conditional_edges(
"check_approval",
check_approval, # 再次调用 check_approval 函数进行决策
{
"end": END,
"recode_decision": "recode_decision" # 如果未批准,路由到重新编码决策
}
)
workflow.add_edge("recode_decision", "coder") # 重新编码决策后回到编码者
# 编译图
app = workflow.compile()
# 6. 运行Supervisor模式的工作流
# ---------------------------------------------------------------------
print("n---RUNNING SUPERVISOR WORKFLOW---")
initial_query = "请编写一个Python函数,计算斐波那契数列的第n个数字。"
final_state = app.invoke(
{"messages": [HumanMessage(content=initial_query)],
"next_agent": "planner",
"plan": "",
"code": "",
"review_comments": ""}
)
print("n---FINAL STATE MESSAGES---")
for msg in final_state["messages"]:
print(msg.content)
代码解释:
AgentState: 定义了智能体之间共享的状态,包括消息历史、计划、代码、审查意见等。next_agent字段是Supervisor模式下智能体之间显式传递控制权的关键。planner_node,coder_node,reviewer_node: 这些是工作智能体节点。它们接收当前状态,执行各自的任务,并更新状态。注意它们在返回时都会设置next_agent字段,指示它们期望下一个由谁来处理。supervisor_router,check_approval,recode_decision: 这些函数共同构成了主管的决策逻辑。supervisor_router根据工作智能体设置的next_agent字段,将控制权路由到不同的决策分支或下一个工作智能体。check_approval和recode_decision则处理审查后的具体决策。workflow.add_conditional_edges: 这是LangGraph中实现条件路由的核心。它接收一个源节点,一个决定函数,以及一个映射字典。决定函数的输出(字符串)会匹配字典中的键,从而决定下一步走向哪个节点。在本例中,supervisor_router函数充当了主管的角色,根据状态来决定路径。
这个例子清晰地展示了主管智能体(通过supervisor_router函数及其关联的条件边实现)如何根据工作智能体的输出和系统状态,集中控制整个流程的流转。
第三部分:去中心化协同——Swarm模式
3.1 概念与原理
Swarm模式(蜂群模式),或称去中心化协同模式,其灵感来源于自然界中的蜂群、蚁群行为。在这种模式下,没有一个中心化的主管来指挥。每个智能体都是相对独立的,它们通过观察共享环境(共享状态或“黑板”),根据自身的局部规则和目标,自主决定何时行动、如何行动。智能体之间通过共享状态进行隐式或显式的沟通,共同朝着一个宏观目标演进。
这种模式类似于一群自由职业者在同一个共享文档上协作。每个人根据文档的当前状态和自己的专业领域,决定自己接下来要补充、修改或评论哪一部分,最终共同完成一个项目,而无需一个单一的领导者来分配每一个具体的任务。
优点:
- 高韧性与鲁棒性: 单个智能体故障不会导致整个系统崩溃,其他智能体可以继续工作。
- 高可扩展性: 增加新智能体通常只需将其加入共享环境,无需修改中心逻辑。
- 并行性与效率: 理论上多个智能体可以同时工作(如果底层执行环境支持)。
- 适应性强: 系统能更好地适应动态变化的环境和未知情况,通过 emergent behavior(涌现行为)解决复杂问题。
- 避免单点瓶颈: 没有中心化的瓶颈。
缺点:
- 复杂性高: 系统的行为难以预测和调试,因为没有单一的控制流。
- 协调开销: 智能体之间可能需要额外的机制来避免冲突或重复工作。
- 收敛性挑战: 难以保证系统一定会收敛到最佳解决方案,或者何时收敛。
- 状态管理复杂: 共享状态的并发访问和一致性维护可能成为问题。
3.2 LangGraph中的Swarm实现
在LangGraph中实现纯粹的Swarm模式具有一定的挑战性,因为LangGraph的图结构本质上是定义了一个流程,节点通常是顺序执行的。要模拟去中心化,我们需要让每个智能体节点在接收到状态时,自主地判断“我是否需要在这个时候行动?”以及“我行动之后,谁应该接下来行动(或者,任务是否已经完成)?”。这通常通过以下方式实现:
- 共享状态(Blackboard): 所有智能体都读写一个共同的
AgentState,其中包含了任务的进展、已完成的部分、待处理的部分等。 - 智能体自决策: 每个智能体节点内部包含逻辑,判断它是否应该基于当前共享状态采取行动。
- 条件路由(基于状态或智能体输出): 智能体在完成任务后,可以修改共享状态,或者在其输出中包含信息,引导系统进入下一个阶段或由另一个智能体接手。关键在于,这个路由决策不是由一个中心化的主管做出,而是由智能体自身或一套公共规则驱动。
- 终止条件: 系统需要一个明确的终止条件,通常也是基于共享状态的某个属性来判断任务是否完成。
让我们通过一个“协同研究与写作”的例子来演示Swarm模式:
任务场景: 用户提出一个研究主题,系统需要:
- 多个“研究员”智能体(Researcher)各自收集信息。
- 一个“分析师”智能体(Analyst)分析收集到的信息。
- 一个“作者”智能体(Writer)根据分析结果撰写草稿。
- 一个“评论者”智能体(Critic)评估草稿质量。
- 所有智能体根据共享的“研究进展”和“草稿状态”自主决定是否需要采取行动,直到最终报告完成。
import operator
from typing import Annotated, TypedDict, List
from langchain_core.messages import BaseMessage, HumanMessage, AIMessage
from langchain_openai import ChatOpenAI
from langgraph.graph import StateGraph, END
import json
# 1. 定义应用状态 (Blackboard)
# ---------------------------------------------------------------------
class SwarmAgentState(TypedDict):
"""
智能体共享的黑板状态。
`topic`: 研究主题。
`research_data`: 收集到的研究数据列表。
`analysis_summary`: 分析师的总结。
`draft`: 作者撰写的草稿。
`critique`: 评论者对草稿的意见。
`status`: 整体任务状态 (e.g., "researching", "analyzing", "writing", "reviewing", "done")
`turns`: 记录回合数,防止无限循环
"""
messages: Annotated[List[BaseMessage], operator.add]
topic: str
research_data: Annotated[List[str], operator.add]
analysis_summary: str
draft: str
critique: str
status: str
turns: int
# 2. 初始化LLM模型
# ---------------------------------------------------------------------
llm = ChatOpenAI(model="gpt-4o", temperature=0.7)
# 3. 定义智能体(节点)
# ---------------------------------------------------------------------
# 辅助函数:将新消息添加到状态
def add_message(state: SwarmAgentState, agent_name: str, content: str) -> dict:
messages = state["messages"]
messages.append(AIMessage(content=f"<{agent_name}> {content}"))
return {"messages": messages}
def researcher_node(state: SwarmAgentState) -> dict:
"""研究员智能体:根据主题收集信息。"""
print("---RESEARCHER NODE ACTIVATED---")
if state["status"] != "researching" or len(state["research_data"]) >= 3: # 假设最多收集3条数据
print("Researcher: Not my turn or enough data collected.")
return {"turns": state["turns"] + 1} # 增加回合数,但不做实际动作
topic = state["topic"]
# 模拟研究:LLM生成一些与主题相关的信息
prompt = f"""你是一个专业的研究员。根据主题:"{topic}",
查找并提供一条新的、有价值的信息或事实。
请直接输出信息内容,不要包含任何前缀。
已收集信息:{state["research_data"]}
"""
response = llm.invoke([HumanMessage(content=prompt)])
new_info = response.content
new_research_data = state["research_data"] + [new_info]
print(f"Researcher: Found new info - {new_info[:50]}...")
# 如果收集到足够的数据,则更新状态为“analyzing”
next_status = "analyzing" if len(new_research_data) >= 3 else "researching"
return {
"messages": add_message(state, "Researcher", f"收集到新信息: {new_info}"),
"research_data": new_research_data,
"status": next_status,
"turns": state["turns"] + 1
}
def analyst_node(state: SwarmAgentState) -> dict:
"""分析师智能体:分析收集到的数据。"""
print("---ANALYST NODE ACTIVATED---")
if state["status"] != "analyzing" or state["analysis_summary"]: # 如果不是分析阶段或已分析
print("Analyst: Not my turn or already analyzed.")
return {"turns": state["turns"] + 1}
research_data = "n- ".join(state["research_data"])
topic = state["topic"]
prompt = f"""你是一个数据分析专家。
请分析以下关于主题"{topic}"的研究数据,并提炼出关键的洞察和总结。
研究数据:
- {research_data}
请直接输出分析总结。
"""
response = llm.invoke([HumanMessage(content=prompt)])
summary = response.content
print(f"Analyst: Generated summary - {summary[:50]}...")
return {
"messages": add_message(state, "Analyst", f"完成分析总结: {summary}"),
"analysis_summary": summary,
"status": "writing", # 分析完成后,进入写作阶段
"turns": state["turns"] + 1
}
def writer_node(state: SwarmAgentState) -> dict:
"""作者智能体:根据分析结果撰写草稿。"""
print("---WRITER NODE ACTIVATED---")
if state["status"] != "writing" and state["status"] != "reviewing_feedback": # 写作或根据反馈修改
print("Writer: Not my turn.")
return {"turns": state["turns"] + 1}
analysis_summary = state["analysis_summary"]
topic = state["topic"]
current_draft = state["draft"]
critique = state["critique"]
if not analysis_summary:
print("Writer: Waiting for analysis summary.")
return {"turns": state["turns"] + 1}
prompt_parts = [
f"你是一个专业的报告作者。请根据以下分析总结,撰写或修改关于主题'{topic}'的报告草稿。",
f"分析总结:n{analysis_summary}"
]
if current_draft:
prompt_parts.append(f"当前草稿:n{current_draft}")
if critique and state["status"] == "reviewing_feedback":
prompt_parts.append(f"评论者意见:n{critique}n请根据意见修改草稿。")
prompt_parts.append("请直接输出完整的报告草稿。")
response = llm.invoke([HumanMessage(content="n".join(prompt_parts))])
new_draft = response.content
print(f"Writer: Wrote/modified draft - {new_draft[:50]}...")
# 写作完成后,进入评论阶段
return {
"messages": add_message(state, "Writer", f"完成草稿/修改草稿:n{new_draft}"),
"draft": new_draft,
"critique": "", # 清空之前的评论
"status": "reviewing",
"turns": state["turns"] + 1
}
def critic_node(state: SwarmAgentState) -> dict:
"""评论者智能体:评估草稿质量。"""
print("---CRITIC NODE ACTIVATED---")
if state["status"] != "reviewing" or not state["draft"]:
print("Critic: Not my turn or no draft to review.")
return {"turns": state["turns"] + 1}
draft = state["draft"]
prompt = f"""你是一个严格的报告评论者。
请审查以下报告草稿,并提出改进意见。
如果草稿已经足够完善,可以直接回复 "FINALIZED"。
如果需要修改,请清晰列出所有改进点。
草稿:n{draft}
"""
response = llm.invoke([HumanMessage(content=prompt)])
critique_comments = response.content
print(f"Critic: Provided critique - {critique_comments[:50]}...")
if "FINALIZED" in critique_comments.upper():
next_status = "done" # 任务完成
else:
next_status = "reviewing_feedback" # 需要根据反馈修改
return {
"messages": add_message(state, "Critic", f"提供评论: {critique_comments}"),
"critique": critique_comments,
"status": next_status,
"turns": state["turns"] + 1
}
# 4. 定义路由逻辑 (去中心化决策)
# ---------------------------------------------------------------------
def router_node(state: SwarmAgentState) -> str:
"""
这是一个路由节点,但它不是主管。它根据当前的 'status'
和共享黑板上的其他信息,决定下一个最合适的智能体来行动。
它更像是一个协调者,而不是指挥者。
"""
current_status = state["status"]
turns = state["turns"]
print(f"---ROUTER NODE ACTIVATED. Current status: {current_status}, Turns: {turns}---")
# 设置一个最大回合数,防止无限循环
if turns > 15: # 假设15回合为最大尝试次数
print("Max turns reached. Forcing termination.")
return "end"
if current_status == "done":
return "end"
elif current_status == "researching" and len(state["research_data"]) < 3:
return "researcher"
elif current_status == "analyzing" and not state["analysis_summary"]:
return "analyst"
elif current_status == "writing" or current_status == "reviewing_feedback":
return "writer"
elif current_status == "reviewing" and state["draft"]:
return "critic"
else:
# 如果没有明确的下一个智能体,或者出现异常,可以设定一个默认行为
# 例如,循环回到研究阶段,或者直接结束
print(f"Router: No clear next step for status '{current_status}'. Returning to researcher or ending.")
# 如果所有智能体都认为不是自己的回合,但任务未完成,则可能需要重置或结束
# 这里的逻辑需要根据实际情况精心设计
if current_status == "researching": # 尝试再次激活研究员
return "researcher"
return "end" # 默认结束,避免死循环
# 5. 构建LangGraph图
# ---------------------------------------------------------------------
workflow = StateGraph(SwarmAgentState)
workflow.add_node("researcher", researcher_node)
workflow.add_node("analyst", analyst_node)
workflow.add_node("writer", writer_node)
workflow.add_node("critic", critic_node)
workflow.add_node("router", router_node) # 引入一个通用路由器,它不主动“指挥”而是“协调”
# 设置入口点
workflow.set_entry_point("router") # 从路由器开始,让它决定谁先行动
# 添加边
# 所有智能体在完成其任务后,都将控制权返回给路由器
workflow.add_edge("researcher", "router")
workflow.add_edge("analyst", "router")
workflow.add_edge("writer", "router")
workflow.add_edge("critic", "router")
# 路由器根据状态进行条件路由
workflow.add_conditional_edges(
"router",
router_node, # 路由器本身就是一个决策函数
{
"researcher": "researcher",
"analyst": "analyst",
"writer": "writer",
"critic": "critic",
"end": END
}
)
# 编译图
app = workflow.compile()
# 6. 运行Swarm模式的工作流
# ---------------------------------------------------------------------
print("n---RUNNING SWARM WORKFLOW---")
initial_topic = "人工智能在医疗健康领域的应用与挑战"
final_state = app.invoke(
{"messages": [HumanMessage(content=f"请研究并撰写一份关于'{initial_topic}'的报告。")],
"topic": initial_topic,
"research_data": [],
"analysis_summary": "",
"draft": "",
"critique": "",
"status": "researching", # 初始状态
"turns": 0
}
)
print("n---FINAL REPORT---")
print(final_state["draft"])
print("n---FINAL STATE MESSAGES---")
for msg in final_state["messages"]:
print(msg.content)
代码解释:
SwarmAgentState: 这是一个更丰富的共享状态,包含所有智能体可能需要读写的信息。status字段是协调的关键,它指示了任务的当前整体阶段。researcher_node,analyst_node,writer_node,critic_node: 每个智能体节点内部都有一个条件判断 (if state["status"] != "my_turn")。这意味着每个智能体都会检查当前状态,如果不是它应该行动的时候,它就什么也不做(或者只是增加回合数),然后将控制权交还。如果它决定行动,它会更新共享状态(例如,research_data,analysis_summary,draft,critique),并可能改变status字段,从而隐式地引导下一个阶段。router_node: 这个节点在这里的角色不是“主管”,而是“调度员”或“协调器”。它根据共享的status字段,决定将控制权传递给哪个智能体。它不发布命令,而是根据当前任务的客观状态来判断哪个智能体最有可能对完成任务做出贡献。它也包含了终止条件(例如turns > 15或current_status == "done")。- 循环与终止: 智能体之间通过
router_node形成一个循环。每个智能体在完成(或决定不行动)后都返回到router_node,router_node再根据最新的状态决定下一个激活谁。当status变为"done"或达到最大回合数时,系统终止。
这个例子通过智能体内部的条件逻辑和一个协调性的路由器,模拟了去中心化协同。每个智能体都“监听”共享状态,并自主决定是否行动,共同推动任务向前发展。
第四部分:对比分析与混合策略
4.1 核心差异对比
| 特征 | Supervisor模式 (中心化指挥) | Swarm模式 (去中心化协同) |
|---|---|---|
| 控制机制 | 单一或少数主管智能体负责决策和流程控制。 | 无中心指挥,每个智能体根据局部规则和共享状态自主决策。 |
| 任务分配 | 主管明确分配任务给特定工作智能体。 | 智能体自发认领或响应共享状态下的任务。 |
| 通信模式 | 主管与工作智能体之间,工作智能体通常只与主管通信。 | 智能体之间通过共享状态(黑板)进行隐式通信。 |
| 韧性/鲁棒性 | 单点故障风险高,主管智能体失效则系统停摆。 | 高韧性,单个智能体故障不影响整体运行。 |
| 可扩展性 | 添加新智能体可能需要修改主管逻辑,扩展性有限。 | 易于添加新智能体,只需使其理解共享状态和规则。 |
| 复杂度 | 逻辑清晰,易于理解和调试。 | 行为涌现,难以预测和调试,收敛性不易保证。 |
| 性能瓶颈 | 主管智能体可能成为性能瓶颈。 | 无中心瓶颈,理论上可并行执行(如果架构支持)。 |
| 适用场景 | 任务流明确、顺序性强、要求高精度控制的场景。 | 开放性问题、探索性任务、需要高适应性和鲁棒性的场景。 |
| LangGraph实现 | add_conditional_edges 基于主管输出路由,显式next_agent状态字段。 |
智能体节点内部条件逻辑,status字段驱动的通用路由器。 |
4.2 何时选择哪种模式?
-
选择Supervisor模式,如果:
- 问题领域结构化程度高,任务步骤明确且有依赖关系。
- 需要严格控制整个流程,确保每一步都按预期执行。
- 系统规模相对较小,或者主管智能体的决策负载可控。
- 对可解释性和调试便利性有较高要求。
- 例如:自动化测试流程、复杂的审批流、数据处理流水线。
-
选择Swarm模式,如果:
- 问题领域复杂,难以预先定义所有步骤,需要探索和适应。
- 要求系统具有高韧性、高可用性和可扩展性。
- 希望通过智能体之间的协作和涌现行为来解决问题。
- 系统需要处理并发或准并发任务(尽管LangGraph本身是顺序执行,但Swarm模式的逻辑更适合并发思想)。
- 例如:开放域研究、创意内容生成、复杂系统诊断、多视角信息整合。
4.3 混合策略:融合两者的优势
在实际应用中,纯粹的Supervisor或Swarm模式可能都不足以应对所有情况。更常见且强大的做法是采用混合策略,结合两者的优点。
- 分层Supervisor-Swarm: 一个高层主管智能体负责宏观任务的分解和阶段性协调,而每个宏观阶段内部则由一个智能体Swarm来完成具体的、探索性的子任务。
- 例如:一个主管负责“项目规划”->“开发”->“测试”->“部署”的流程。在“开发”阶段,可以启动一个Swarm智能体群(编码者、测试者、调试者)进行迭代开发,直到代码满足要求;然后主管继续推动到“测试”阶段。
- 带有领导者的Swarm: 在一个去中心化的Swarm中,可以临时指定一个“领导者”智能体,在特定阶段负责协调或做出关键决策,一旦该阶段完成,领导者角色可以解除或转移。
- 例如:在研究Swarm中,当收集到足够数据后,可以临时指定一个智能体作为“整合者”,负责将所有信息汇总并生成一个初步报告,然后解除其领导角色,让其他智能体继续评论或修改。
在LangGraph中实现混合策略,意味着在图中可以既有由一个节点显式控制的条件边,也可以有由多个节点共享状态并相互影响的循环。例如,一个Supervisor节点可以根据其LLM的输出,选择激活一个Swarm的入口节点,然后这个Swarm会在内部迭代运行,直到达到其终止条件,再将控制权交还给Supervisor。
# 混合策略的简化概念性代码片段
# 假设有一个高层主管,它可以在必要时启动一个 Swarm 流程
def high_level_supervisor_node(state: AgentState) -> str:
"""高层主管根据任务进展,决定进入哪个阶段。"""
if state["current_phase"] == "planning":
# ... 进行规划,然后决定进入 "research_swarm_start"
return "research_swarm_start"
elif state["current_phase"] == "research_done":
# ... 处理研究结果,然后决定进入 "development_phase"
return "development_phase"
# ... 其他阶段
return END
def research_swarm_entry_node(state: SwarmAgentState) -> dict:
"""研究Swarm的入口,设置初始状态并启动Swarm路由器。"""
# 假设SwarmAgentState与AgentState可以转换或部分共享
return {
"messages": state["messages"],
"topic": state["topic"],
"research_data": [],
"analysis_summary": "",
"draft": "",
"critique": "",
"status": "researching",
"turns": 0
}
# 构建一个包含Swarm子图的整体图
# main_workflow = StateGraph(AgentState)
# main_workflow.add_node("high_level_supervisor", high_level_supervisor_node)
# main_workflow.add_node("research_swarm_entry", research_swarm_entry_node)
# # ... 其他主管控制的节点
#
# # 这里需要将 Swarm 的整个图作为一个子图嵌入
# # LangGraph 目前没有直接的子图嵌套功能,但可以通过在一个节点中 invoke 另一个图来实现
# # 或者将 Swarm 的所有节点和边直接添加到主图中,通过路由逻辑来隔离
#
# # 例如,在 high_level_supervisor_node 内部,如果决定进入研究阶段,
# # 它可以调用 app_swarm.invoke(...) 来执行 Swarm 流程,并将结果返回给主图状态。
# 这种混合方式需要更精细的状态管理和节点设计,但可以充分利用两种模式的优势。
第五部分:高级考量与最佳实践
5.1 状态管理与同步
在多智能体系统中,共享状态的准确性和一致性至关重要。
- 不可变性与操作符: 在LangGraph中,使用
Annotated[List[BaseMessage], operator.add]等方式可以确保列表的追加式更新,避免直接修改历史状态。对于复杂对象,需要确保智能体更新的是状态的副本或通过明确的合并逻辑。 - 状态设计: 谨慎设计
TypedDict状态结构,确保每个字段的语义清晰,并能支持智能体之间所需的所有信息交换。
5.2 工具使用(Tool Use)
无论Supervisor还是Swarm模式,智能体都需要能够使用外部工具(如搜索引擎、代码解释器、数据库查询等)来增强其能力。LangChain的bind_tools功能可以方便地将工具集成到LLM智能体中。
- Supervisor模式: 主管可以根据任务需要,决定哪个工作智能体应该使用哪个工具。
- Swarm模式: 智能体可以自主决定何时以及使用哪个工具,以推进其局部目标。
5.3 错误处理与鲁棒性
多智能体系统比单个智能体更容易出错,因为交互路径复杂。
- 重试机制: 为容易失败的智能体(如LLM调用)实现重试逻辑。
- 回退策略: 当某个智能体失败时,系统应有能力回退到之前的状态,或尝试替代路径。
- 状态验证: 在智能体处理状态之前,对其进行验证,防止无效输入导致后续错误。
- 超时机制: 确保智能体不会无限期地等待或执行。
5.4 可观测性与调试
理解复杂智能体系统的行为是一项挑战。
- 日志记录: 详细记录每个智能体的输入、输出、决策和状态变化。
- 可视化: LangGraph的可视化工具(例如
app.get_graph().draw_mermaid_png())对于理解图结构和流程流转非常有帮助。 - 追踪: 使用LangSmith等工具追踪智能体调用链、中间步骤和LLM的token使用情况。
5.5 性能考量
- LLM调用成本与延迟: LLM调用是主要的性能瓶颈和成本来源。优化智能体提示、减少不必要的调用、使用更小的模型或缓存结果。
- 并行化: 尽管LangGraph节点是顺序执行的,但Swarm模式的逻辑更适合并行化。如果底层任务允许,可以考虑在节点内部使用异步操作或结合其他并行执行框架。
尾声
LangGraph为我们提供了构建强大而灵活的多智能体系统的蓝图。无论是采用中心化的Supervisor模式,还是去中心化的Swarm模式,核心都在于如何巧妙地设计智能体的角色、它们之间的通信方式以及状态流转机制。通过深入理解这两种范式的优缺点,并善用LangGraph的条件路由和状态管理能力,我们可以根据具体问题选择或融合最适合的策略,从而构建出既高效又鲁棒的智能体协同解决方案。