引言:复杂任务的挑战与协作智能的需求
随着人工智能技术的飞速发展,我们正从简单的模式识别迈向能够理解、推理并执行复杂任务的智能系统。然而,在现实世界中,许多问题并非单一算法或模型能够独立解决的。它们往往具有以下特点:
- 复杂性高: 需要多个步骤、多种技能和不同类型的数据处理。
- 动态性强: 初始规划可能因中间结果或环境变化而需要调整。
- 不确定性: 任务执行过程中可能出现意外情况或需要探索性决策。
- 跨领域知识: 涉及不同专业领域的知识和工具。
传统的编程范式,即自上而下、预先定义好所有逻辑流的程序,在面对这类高度复杂和动态的任务时显得力不从心。它们难以灵活适应变化,也难以有效整合不同功能模块。
AI Agent时代的到来,为我们提供了一种新的解决思路。Agent被定义为一个能够感知环境、进行推理、采取行动并实现目标的自主实体。当任务的复杂性达到一定程度时,单个Agent也可能力有不逮。此时,协作智能(Collaborative Intelligence)变得至关重要,即通过多个Agent协同工作来完成宏大目标。
正是在这样的背景下,’Supervisor’ 架构应运而生。它不是简单地将任务分解给多个子Agent,而是在此基础上构建一个逻辑闭环,确保主Agent(Supervisor)能够动态地规划、分发、监控并根据反馈调整任务,从而实现对复杂任务的端到端管理。这种架构旨在模拟人类团队协作的模式,其中有一个项目经理(Supervisor)负责宏观调度,而多个专家(Sub-Graph/Worker Agents)负责具体执行。
‘Supervisor’ 架构核心理念与组成
‘Supervisor’ 架构的核心思想是构建一个智能化的协调系统,它能够将一个复杂的、高层次的目标分解为一系列可管理的、低层次的子任务,并利用专业的执行单元来完成这些子任务,最终将结果整合并反馈给协调者,形成一个持续迭代的闭环。
什么是 ‘Supervisor’ 架构?
‘Supervisor’ 架构是一种分布式智能系统设计模式,其核心是一个智能的中心Agent(Supervisor),它不直接执行所有任务,而是充当一个任务规划者、调度者和协调者。当接收到一个高层级目标时,Supervisor会动态地生成一个任务清单,并将这些任务分发给多个专门的子图(Sub-Graph)或工作者Agent。这些工作者Agent执行各自的任务后,将结果反馈给Supervisor。Supervisor根据这些反馈,评估进度,决定下一步行动,可能是生成新的任务,调整现有任务,或者宣布任务完成。这个反馈与决策的过程构成了一个逻辑闭环,使得系统能够自我纠正、适应变化并逐步逼近最终目标。
架构核心角色与职责
-
核心 Supervisor Agent
- 职责: 接收高层级目标,理解意图,进行任务分解、规划、调度、监控、结果评估和迭代决策。它是整个系统的“大脑”和“项目经理”。
- 关键能力: 强大的推理能力(通常通过大语言模型LLM实现),对全局状态的感知和维护,灵活的任务生成和调整能力。
-
子图(Sub-Graph)/ 工作者 Agent
- 职责: 接收由Supervisor分发的具体任务,利用其专业知识和工具集执行任务,并将执行结果(包括成功、失败、中间产物等)结构化地反馈给Supervisor。它们是系统的“执行者”和“专家”。
- 关键能力: 专注于特定领域或任务类型(例如,代码生成、数据分析、API调用、文档检索),具备稳定性、可靠性和高效的执行能力。一个子图Agent本身可能是一个包含多个步骤的Agent链条或小型工作流。
-
任务清单 (Task List)
- 定义: 由Supervisor动态生成和维护的一系列待执行、正在执行或已完成的任务的集合。每个任务都应有明确的描述、输入、预期输出、状态和优先级。
- 作用: 作为Supervisor与工作者Agent之间沟通的桥梁和工作进度的可视化表示。
-
逻辑闭环 (Closed Loop)
- 定义: 指的是从Supervisor规划任务 -> 工作者执行任务 -> 工作者反馈结果 -> Supervisor评估结果并调整规划 -> 再次分发任务,这样一个持续迭代的循环。
- 作用: 确保系统能够根据实际执行情况进行动态调整和优化,提高任务完成的鲁棒性和适应性。它是Supervisor架构区别于简单任务分解的关键特征。
为什么选择这种架构?
- 处理复杂性: 将一个宏大问题拆解为多个小问题,每个小问题由专门的Agent处理,降低了单个Agent的认知负担。
- 提高效率: 不同的工作者Agent可以并行执行独立的子任务,加速整体任务的完成。
- 增强鲁棒性: 某个工作者Agent的失败不会导致整个系统崩溃,Supervisor可以重新分配任务或寻求替代方案。
- 动态适应性: 通过反馈闭环,系统能够根据中间结果或外部环境的变化,实时调整策略和任务。
- 促进模块化与复用: 工作者Agent是独立的、可复用的组件,可以轻松地添加到不同的Supervisor系统中。
- 可解释性与可控性: 任务清单和执行日志提供了任务进展的清晰视图,有助于调试和理解系统行为。
核心组件深入解析
A. Supervisor Agent:智能核心与决策中心
Supervisor Agent是整个架构的灵魂,它负责全局的协调和智能决策。其设计复杂且关键。
1. 职责细化:规划、分解、调度、监控、评估
- 规划 (Planning): 根据初始目标和当前系统状态,制定一个高层次的执行计划。这通常涉及对问题域的理解和对工作者Agent能力的认知。
- 分解 (Decomposition): 将高层次计划进一步细化为一系列具体、可执行的子任务。这是从抽象到具体的关键一步。
- 调度 (Scheduling): 决定哪些任务应该被执行,以何种顺序,以及由哪个工作者Agent来执行。可能涉及任务优先级、资源可用性等考量。
- 监控 (Monitoring): 跟踪每个任务的执行状态,包括是否正在运行、是否完成、是否失败以及耗时等。
- 评估 (Evaluation): 接收工作者Agent反馈的结果,评估其质量和对高层级目标的贡献。
- 迭代决策 (Iterative Decision-making): 基于评估结果,决定是继续执行下一个任务、重新规划、调整现有任务,还是宣布任务完成。
2. 内部机制:LLM集成、规则引擎、状态机
Supervisor的智能主要来源于其内部机制:
- LLM集成: 大语言模型是Supervisor进行自然语言理解、高级推理、任务分解和动态规划的核心工具。通过精心设计的Prompt,LLM可以根据当前情境和目标,生成下一步的行动计划或任务清单。
- 规则引擎: 用于处理结构化的决策逻辑,例如,当特定条件满足时触发特定任务,或者根据任务结果自动选择后续流程。这补充了LLM的非确定性,提供了确定性的控制流。
- 状态机: 管理整个任务的生命周期和Supervisor自身的运行状态。例如,任务可能经历“待规划”、“规划中”、“任务分发”、“等待结果”、“结果评估”、“完成/失败”等状态。
3. 示例代码:Supervisor Agent 的基本结构
我们使用Python和asyncio来模拟异步操作,并假定存在一个LLM客户端。
import asyncio
import uuid
from enum import Enum
from typing import List, Dict, Any, Optional
from pydantic import BaseModel, Field
# --- 1. 任务数据模型 (Task Data Model) ---
class TaskStatus(str, Enum):
PENDING = "pending"
IN_PROGRESS = "in_progress"
COMPLETED = "completed"
FAILED = "failed"
CANCELLED = "cancelled"
class Task(BaseModel):
task_id: str = Field(default_factory=lambda: str(uuid.uuid4()))
description: str
input_data: Dict[str, Any] = Field(default_factory=dict)
assigned_worker: Optional[str] = None
status: TaskStatus = TaskStatus.PENDING
result: Optional[Dict[str, Any]] = None
error: Optional[str] = None
dependencies: List[str] = Field(default_factory=list) # 任务依赖
# --- 2. 模拟LLM客户端 ---
class MockLLMClient:
async def generate_response(self, prompt: str, temperature: float = 0.7) -> str:
print(f"n--- LLM Input for Planning ---n{prompt}n-----------------------------n")
# 实际应用中,这里会调用OpenAI, Anthropic, 或其他LLM API
# 为了演示,我们模拟一个LLM的响应
if "分解以下目标" in prompt:
# 模拟LLM根据目标分解任务
if "生成一份市场分析报告" in prompt:
return """
[
{"description": "收集市场数据", "worker": "data_collector", "input_data": {"query": "最新科技市场数据"}},
{"description": "分析数据趋势", "worker": "data_analyst", "input_data": {"data_source": "previous_task_result"}},
{"description": "撰写报告草稿", "worker": "report_writer", "input_data": {"analysis_summary": "previous_task_result"}},
{"description": "审查并修订报告", "worker": "editor", "input_data": {"draft_report": "previous_task_result"}}
]
"""
elif "开发一个简单的Web应用" in prompt:
return """
[
{"description": "设计数据库Schema", "worker": "db_designer", "input_data": {"app_name": "MyWebApp"}},
{"description": "编写后端API", "worker": "backend_dev", "input_data": {"schema": "previous_task_result"}},
{"description": "开发前端UI", "worker": "frontend_dev", "input_data": {"api_spec": "previous_task_result"}},
{"description": "部署应用", "worker": "devops_engineer", "input_data": {"code_repo": "previous_task_result"}}
]
"""
elif "评估以下任务结果" in prompt:
# 模拟LLM评估任务结果
if "成功" in prompt:
return "评估结果:任务执行成功,可以进行下一步。"
else:
return "评估结果:任务执行失败,需要重新规划或重试。"
return "[]" # 默认空任务列表
# --- 3. Supervisor Agent ---
class SupervisorAgent:
def __init__(self, name: str, llm_client: MockLLMClient):
self.name = name
self.llm = llm_client
self.task_queue: asyncio.Queue[Task] = asyncio.Queue()
self.completed_tasks: asyncio.Queue[Task] = asyncio.Queue()
self.all_tasks: Dict[str, Task] = {} # 存储所有任务,按task_id索引
self.available_workers: Dict[str, Any] = {} # 存储注册的工作者Agent
self.current_goal: Optional[str] = None
self.global_context: Dict[str, Any] = {} # 全局上下文,用于传递信息
def register_worker(self, worker_name: str, worker_instance: Any):
"""注册一个工作者Agent"""
self.available_workers[worker_name] = worker_instance
print(f"Supervisor: Worker '{worker_name}' registered.")
async def _plan_tasks(self, goal: str, context: Dict[str, Any]) -> List[Task]:
"""
使用LLM根据目标和当前上下文规划任务。
这里模拟LLM返回JSON格式的任务列表。
"""
prompt = f"""
你是一个高级任务规划者。
当前目标是:{goal}
当前上下文:{context}
请将上述目标分解为一系列具体的、可执行的子任务。
每个任务应该包含:
- description: 任务描述
- worker: 推荐执行此任务的工作者Agent名称 (例如: data_collector, data_analyst, report_writer, editor)
- input_data: 任务所需的输入数据,如果依赖前一个任务结果,请写 "previous_task_result" 作为占位符。
请以JSON数组格式返回任务列表,例如:
[
{{"description": "任务1描述", "worker": "worker_name_1", "input_data": {{"key": "value"}}}},
{{"description": "任务2描述", "worker": "worker_name_2", "input_data": {{"key": "value", "dependency": "previous_task_result"}}}}
]
"""
response_str = await self.llm.generate_response(prompt)
try:
task_defs = json.loads(response_str)
tasks = []
for t_def in task_defs:
new_task = Task(
description=t_def["description"],
assigned_worker=t_def.get("worker"),
input_data=t_def.get("input_data", {})
)
self.all_tasks[new_task.task_id] = new_task
tasks.append(new_task)
return tasks
except json.JSONDecodeError as e:
print(f"Error decoding LLM response: {e}nResponse: {response_str}")
return []
async def _evaluate_result(self, task: Task) -> bool:
"""
使用LLM评估任务结果。
"""
prompt = f"""
你是一个任务结果评估者。
请评估以下任务的执行结果:
任务描述: {task.description}
任务状态: {task.status.value}
任务结果: {json.dumps(task.result, indent=2) if task.result else '无'}
错误信息: {task.error if task.error else '无'}
请判断任务是否成功完成并满足预期。如果成功,返回“成功”。如果失败或需要重新规划,返回“失败”。
"""
response = await self.llm.generate_response(prompt)
print(f"Supervisor: 评估任务 '{task.description}' 结果: {response}")
return "成功" in response
async def execute_goal(self, goal: str, initial_context: Dict[str, Any] = None):
"""
启动Supervisor执行一个高层级目标。
"""
self.current_goal = goal
self.global_context = initial_context if initial_context else {}
print(f"nSupervisor: 接收到新目标 - '{goal}'")
# 1. 初始规划
initial_tasks = await self._plan_tasks(goal, self.global_context)
for task in initial_tasks:
await self.task_queue.put(task)
print(f"Supervisor: 规划任务 '{task.description}' ({task.task_id}),加入任务队列。")
# 2. 任务分发与处理循环
while True:
if self.task_queue.empty() and self.is_all_tasks_completed_or_failed():
print(f"nSupervisor: 所有任务已处理完成或失败。")
break
try:
task = self.all_tasks.get(self.get_next_runnable_task_id()) # 获取下一个可运行任务
if not task:
await asyncio.sleep(1) # 没有可运行任务,等待1秒
continue
if task.status == TaskStatus.PENDING:
print(f"nSupervisor: 分发任务 '{task.description}' ({task.task_id}) 给 '{task.assigned_worker}'")
task.status = TaskStatus.IN_PROGRESS
# 模拟将任务发送给工作者Agent
worker = self.available_workers.get(task.assigned_worker)
if worker:
# 填充依赖数据
input_data_with_deps = self._resolve_dependencies(task.input_data)
task.input_data = input_data_with_deps # 更新任务的输入数据
asyncio.create_task(worker.execute_task(task, self.completed_tasks))
else:
task.status = TaskStatus.FAILED
task.error = f"No worker registered for '{task.assigned_worker}'"
await self.completed_tasks.put(task) # 放入完成队列以便Supervisor处理失败
print(f"Supervisor: 错误 - 未找到工作者 '{task.assigned_worker}' 来执行任务 '{task.description}'")
except asyncio.QueueEmpty:
await asyncio.sleep(0.1) # 短暂等待,避免空循环
# 3. 处理已完成任务的反馈
while not self.completed_tasks.empty():
completed_task = await self.completed_tasks.get()
self.all_tasks[completed_task.task_id] = completed_task # 更新全局任务状态
print(f"nSupervisor: 收到任务 '{completed_task.description}' ({completed_task.task_id}) 的反馈。状态: {completed_task.status.value}")
is_success = await self._evaluate_result(completed_task)
if completed_task.status == TaskStatus.COMPLETED and is_success:
# 将任务结果添加到全局上下文,以供后续任务使用
self.global_context[completed_task.task_id] = completed_task.result
print(f"Supervisor: 任务 '{completed_task.description}' 成功。结果已添加到全局上下文。")
# 检查是否有新的任务需要根据此结果生成
# 实际应用中,这里可以再次调用LLM进行Re-planning
# 例如:new_tasks = await self._plan_next_step(self.current_goal, self.global_context)
# for t in new_tasks: await self.task_queue.put(t)
pass # 暂时简化,不动态生成新任务
else:
print(f"Supervisor: 任务 '{completed_task.description}' 失败或评估不通过。错误: {completed_task.error}")
# 失败处理策略:重试、重新规划、通知等
# 暂时标记为失败,不重试
completed_task.status = TaskStatus.FAILED
self.global_context[f"{completed_task.task_id}_error"] = completed_task.error
await asyncio.sleep(0.5) # 避免CPU空转
print(f"nSupervisor: 目标 '{goal}' 执行完成。最终全局上下文:n{json.dumps(self.global_context, indent=2)}")
def is_all_tasks_completed_or_failed(self) -> bool:
"""检查所有任务是否都已完成或失败"""
if not self.all_tasks:
return False # 如果还没有任务,则不认为完成
for task in self.all_tasks.values():
if task.status in [TaskStatus.PENDING, TaskStatus.IN_PROGRESS]:
return False
return True
def get_next_runnable_task_id(self) -> Optional[str]:
"""获取下一个可以运行的任务ID (简化版,不处理复杂依赖)"""
# 简单地返回第一个状态为PENDING的任务
# 实际中需要考虑任务依赖
for task_id, task in self.all_tasks.items():
if task.status == TaskStatus.PENDING:
# 检查依赖 (这里简化,假设无依赖或依赖已满足)
# 实际需要遍历task.dependencies,检查其状态
return task_id
return None
def _resolve_dependencies(self, input_data: Dict[str, Any]) -> Dict[str, Any]:
"""解析输入数据中的依赖占位符"""
resolved_data = {}
for key, value in input_data.items():
if isinstance(value, str) and value == "previous_task_result":
# 查找最近完成任务的结果
# 实际应用中,可能需要更精细的依赖解析,例如指定哪个任务的结果
found_result = None
for task_id, task in self.all_tasks.items():
if task.status == TaskStatus.COMPLETED and task.result:
# 简单的策略:使用最近完成的任务结果
# 更复杂的策略:需要LLM判断哪个结果最合适,或者任务定义中明确指定依赖任务ID
found_result = task.result
break # 找到一个就用,简化处理
resolved_data[key] = found_result if found_result else "Error: Dependency not met"
elif isinstance(value, dict):
resolved_data[key] = self._resolve_dependencies(value) # 递归处理嵌套字典
else:
resolved_data[key] = value
return resolved_data
B. 子图(Sub-Graph)/ 工作者 Agent:专业执行单元
工作者Agent是执行实际操作的专业单元。它们通常是轻量级的,专注于特定类型的任务,并具备调用外部工具或执行特定逻辑的能力。
1. 职责细化:接收任务、执行、报告结果
- 接收任务: 从Supervisor接收结构化的任务对象。
- 执行任务: 根据任务描述和输入数据,利用内置工具或逻辑执行操作。这可能涉及调用外部API、数据库查询、代码执行、数据处理等。
- 报告结果: 将任务执行的结果(成功/失败、输出数据、错误信息)以结构化的方式返回给Supervisor。
2. 特性:原子性、可复用性、隔离性
- 原子性: 每个工作者Agent通常负责一个相对原子化的操作或一组紧密相关的操作。
- 可复用性: 设计为通用组件,可以在不同Supervisor架构或任务中被复用。
- 隔离性: 工作者Agent之间相互独立,一个Agent的失败不应直接影响其他Agent的运行。
3. 示例代码:一个简单的工作者 Agent
我们创建几个模拟的工作者Agent,它们通过一个asyncio.Queue与Supervisor通信。
import json
import asyncio
from typing import Dict, Any
# --- 模拟工作者Agent ---
class BaseWorkerAgent:
def __init__(self, name: str):
self.name = name
async def execute_task(self, task: Task, completed_tasks_queue: asyncio.Queue):
"""
所有工作者Agent的通用执行接口。
具体逻辑由子类实现。
"""
print(f"Worker '{self.name}': 接收任务 '{task.description}' ({task.task_id})")
try:
result_data = await self._run_specific_logic(task.input_data)
task.result = result_data
task.status = TaskStatus.COMPLETED
print(f"Worker '{self.name}': 任务 '{task.description}' ({task.task_id}) 完成。")
except Exception as e:
task.error = str(e)
task.status = TaskStatus.FAILED
print(f"Worker '{self.name}': 任务 '{task.description}' ({task.task_id}) 失败: {e}")
finally:
await completed_tasks_queue.put(task) # 将任务结果反馈给Supervisor
async def _run_specific_logic(self, input_data: Dict[str, Any]) -> Dict[str, Any]:
"""子类需要实现此方法,包含具体业务逻辑。"""
raise NotImplementedError("Subclasses must implement _run_specific_logic method.")
class DataCollectorWorker(BaseWorkerAgent):
def __init__(self):
super().__init__("data_collector")
async def _run_specific_logic(self, input_data: Dict[str, Any]) -> Dict[str, Any]:
query = input_data.get("query", "default_query")
print(f"Worker '{self.name}': 正在收集关于 '{query}' 的数据...")
await asyncio.sleep(2) # 模拟耗时操作
mock_data = {
"query": query,
"raw_data": [
{"item": "Laptop", "sales": 1200, "region": "North"},
{"item": "Keyboard", "sales": 800, "region": "South"},
{"item": "Mouse", "sales": 500, "region": "East"}
],
"timestamp": "2023-10-27"
}
return {"collected_data": mock_data}
class DataAnalystWorker(BaseWorkerAgent):
def __init__(self):
super().__init__("data_analyst")
async def _run_specific_logic(self, input_data: Dict[str, Any]) -> Dict[str, Any]:
data_source = input_data.get("data_source")
if not data_source or "collected_data" not in data_source:
raise ValueError("Missing 'collected_data' in input for analysis.")
print(f"Worker '{self.name}': 正在分析数据趋势...")
await asyncio.sleep(3) # 模拟耗时操作
raw_data = data_source["collected_data"]["raw_data"]
total_sales = sum(item["sales"] for item in raw_data)
regions = list(set(item["region"] for item in raw_data))
analysis_summary = {
"total_sales": total_sales,
"regions_covered": regions,
"top_item": max(raw_data, key=lambda x: x["sales"])["item"]
}
return {"analysis_summary": analysis_summary}
class ReportWriterWorker(BaseWorkerAgent):
def __init__(self):
super().__init__("report_writer")
async def _run_specific_logic(self, input_data: Dict[str, Any]) -> Dict[str, Any]:
analysis_summary = input_data.get("analysis_summary")
if not analysis_summary:
raise ValueError("Missing 'analysis_summary' in input for report writing.")
print(f"Worker '{self.name}': 正在撰写报告草稿...")
await asyncio.sleep(4) # 模拟耗时操作
report_content = (
f"市场分析报告草稿:nn"
f"根据最新数据分析,总销售额达到 {analysis_summary['total_sales']}。"
f"主要销售区域包括 {', '.join(analysis_summary['regions_covered'])}。"
f"最畅销商品是 {analysis_summary['top_item']}。"
f"nn请进行进一步审查和修订。"
)
return {"draft_report": report_content}
class EditorWorker(BaseWorkerAgent):
def __init__(self):
super().__init__("editor")
async def _run_specific_logic(self, input_data: Dict[str, Any]) -> Dict[str, Any]:
draft_report = input_data.get("draft_report")
if not draft_report:
raise ValueError("Missing 'draft_report' in input for editing.")
print(f"Worker '{self.name}': 正在审查并修订报告...")
await asyncio.sleep(2) # 模拟耗时操作
final_report = draft_report.replace("草稿", "最终版").replace("请进行进一步审查和修订。", "报告已修订并准备发布。")
return {"final_report": final_report}
# --- 数据库设计者 (Web App 任务链) ---
class DBDesignerWorker(BaseWorkerAgent):
def __init__(self):
super().__init__("db_designer")
async def _run_specific_logic(self, input_data: Dict[str, Any]) -> Dict[str, Any]:
app_name = input_data.get("app_name", "untitled_app")
print(f"Worker '{self.name}': 正在为 '{app_name}' 设计数据库Schema...")
await asyncio.sleep(2)
schema = {
"users": {"id": "PK", "username": "TEXT", "email": "TEXT"},
"products": {"id": "PK", "name": "TEXT", "price": "REAL"}
}
return {"db_schema": schema}
class BackendDevWorker(BaseWorkerAgent):
def __init__(self):
super().__init__("backend_dev")
async def _run_specific_logic(self, input_data: Dict[str, Any]) -> Dict[str, Any]:
schema = input_data.get("schema")
if not schema:
raise ValueError("Missing 'schema' for backend development.")
print(f"Worker '{self.name}': 正在编写后端API...")
await asyncio.sleep(3)
api_spec = {
"/users": {"GET": "list_users", "POST": "create_user"},
"/products/{id}": {"GET": "get_product", "PUT": "update_product"}
}
return {"api_spec": api_spec, "backend_code_repo": "https://github.com/mywebapp/backend"}
class FrontendDevWorker(BaseWorkerAgent):
def __init__(self):
super().__init__("frontend_dev")
async def _run_specific_logic(self, input_data: Dict[str, Any]) -> Dict[str, Any]:
api_spec = input_data.get("api_spec")
if not api_spec:
raise ValueError("Missing 'api_spec' for frontend development.")
print(f"Worker '{self.name}': 正在开发前端UI...")
await asyncio.sleep(3)
frontend_code_repo = "https://github.com/mywebapp/frontend"
return {"frontend_code_repo": frontend_code_repo}
class DevOpsEngineerWorker(BaseWorkerAgent):
def __init__(self):
super().__init__("devops_engineer")
async def _run_specific_logic(self, input_data: Dict[str, Any]) -> Dict[str, Any]:
code_repo = input_data.get("code_repo") # 假设code_repo是一个包含所有代码的字典或URL
if not code_repo:
raise ValueError("Missing 'code_repo' for deployment.")
print(f"Worker '{self.name}': 正在部署应用...")
await asyncio.sleep(5)
deployment_status = "Deployment successful on AWS EC2."
return {"deployment_status": deployment_status, "app_url": "http://mywebapp.example.com"}
C. 任务管理系统:生命周期与状态追踪
一个健壮的任务管理系统是Supervisor架构稳定运行的基础。它负责任务的创建、存储、查询、更新和状态流转。
1. 任务定义:结构化表示 (Pydantic)
使用Pydantic可以方便地定义任务的数据结构,确保数据类型安全和一致性。
# Task类已在SupervisorAgent的示例代码中定义
# from pydantic import BaseModel, Field
# from enum import Enum
# from typing import List, Dict, Any, Optional
# class TaskStatus(str, Enum):
# PENDING = "pending"
# IN_PROGRESS = "in_progress"
# COMPLETED = "completed"
# FAILED = "failed"
# CANCELLED = "cancelled"
# class Task(BaseModel):
# task_id: str = Field(default_factory=lambda: str(uuid.uuid4()))
# description: str
# input_data: Dict[str, Any] = Field(default_factory=dict)
# assigned_worker: Optional[str] = None
# status: TaskStatus = TaskStatus.PENDING
# result: Optional[Dict[str, Any]] = None
# error: Optional[str] = None
# dependencies: List[str] = Field(default_factory=list) # 任务依赖
2. 任务队列:优先级、调度
Supervisor内部通常会维护一个任务队列。在更复杂的场景中,可以引入优先级、截止日期、资源需求等因素进行调度。asyncio.Queue是一个简单的实现,而对于分布式系统,则会采用消息队列(如RabbitMQ, Kafka, Redis Streams)或数据库。
3. 任务状态:待处理、进行中、完成、失败
任务状态是任务生命周期的核心。Supervisor会根据工作者Agent的反馈更新任务状态,并根据状态进行下一步决策。
4. 示例代码:任务数据模型与状态管理
在SupervisorAgent类中,我们已经看到了self.all_tasks字典用于存储所有任务,self.task_queue用于待处理任务,self.completed_tasks用于已完成任务的反馈。
# SupervisorAgent 内部的简化任务管理
# self.task_queue: asyncio.Queue[Task] = asyncio.Queue() # 待分发任务 (或待运行,更精确)
# self.completed_tasks: asyncio.Queue[Task] = asyncio.Queue() # 工作者完成/失败的任务反馈队列
# self.all_tasks: Dict[str, Task] = {} # 存储所有任务的当前状态,Supervisor是唯一的更新者
D. 通信机制:高效的信息流转
Supervisor与工作者Agent之间需要高效、可靠的通信机制来交换任务和结果。
1. 异步消息队列:解耦、弹性
- 优点: 生产者(Supervisor)和消费者(工作者Agent)解耦,允许异步操作,提高系统弹性、可伸缩性和容错性。如果某个工作者Agent暂时离线,任务仍可在队列中等待。
- 常用技术: Redis Streams、RabbitMQ、Apache Kafka、Celery (基于消息队列的分布式任务队列)。
2. REST/gRPC API:实时交互、控制
- 优点: 适用于需要实时请求-响应模式的场景,或者Supervisor需要主动查询工作者Agent状态或发送控制命令。
- 缺点: 紧耦合,工作者Agent必须在线且可访问。
3. 共享状态/数据库:持久化、协调
- 优点: 提供持久化的任务状态和全局上下文,允许多个Supervisor实例或工作者Agent在共享数据上进行协调。
- 常用技术: PostgreSQL, MongoDB, Redis (作为缓存或简单DB)。
4. 示例代码:基于消息队列的通信模拟
在我们的Python例子中,asyncio.Queue模拟了内存中的消息队列,它实现了Supervisor与工作者Agent之间的异步通信。Supervisor将任务放入task_queue(或直接将任务对象传递给工作者),工作者完成任务后将带结果的Task对象放入completed_tasks_queue。
# SupervisorAgent 的队列
# self.task_queue: asyncio.Queue[Task] = asyncio.Queue() # 从Supervisor角度看,这是待进一步处理的任务队列
# self.completed_tasks: asyncio.Queue[Task] = asyncio.Queue() # 接收来自工作者的结果
# 工作者Agent 的执行方法
# async def execute_task(self, task: Task, completed_tasks_queue: asyncio.Queue):
# # ... 执行逻辑 ...
# await completed_tasks_queue.put(task) # 将结果放入队列
这种asyncio.Queue模拟虽然在单进程内有效,但在分布式环境中需要替换为真正的消息队列服务。
E. 反馈与迭代机制:逻辑闭环的实现
反馈与迭代是Supervisor架构实现动态性和自适应能力的关键。
1. 结果报告:结构化输出
工作者Agent必须以Supervisor能够理解和解析的结构化格式报告结果。这通常是JSON或Pydantic模型。
2. 错误处理与重试策略
- 当工作者Agent报告失败时,Supervisor需要决定如何处理。
- 重试: 对于瞬时错误(网络抖动),可以配置重试次数和指数退避策略。
- 重新规划: 对于持续性错误或逻辑错误,Supervisor可能需要调用LLM重新规划任务,甚至调整工作者Agent的选择。
- 通知: 向系统管理员发送警报。
3. 状态更新与重新规划:Supervisor 的动态调整能力
Supervisor接收到反馈后,会更新全局上下文和任务状态。如果发现任务进展不顺利,或者有更好的执行路径,它会利用LLM或其他决策逻辑进行重新规划 (Re-planning)。这可能是生成新的任务、调整现有任务的优先级、甚至修改高层级计划。这种能力使得系统能够从错误中学习,并适应不断变化的环境。
IV. ‘Supervisor’ 架构的运作流程
A. 整体工作流概览
下表概述了 ‘Supervisor’ 架构中各个阶段的主要参与者和交互。
| 阶段 | 参与者 | 关键活动 | 核心机制 |
|---|---|---|---|
| 1. 目标接收 | 用户/外部系统 | 提交一个高层级目标 | API调用、消息触发 |
| 2. 初始规划 | Supervisor Agent | 理解目标,调用LLM进行初步任务分解,生成初始任务清单,初始化全局上下文 | LLM推理、Prompt Engineering、状态初始化 |
| 3. 任务生成 | Supervisor Agent | 将规划的任务封装为结构化的 Task 对象,并添加到内部任务管理系统 |
Pydantic模型、任务队列 |
| 4. 任务分发 | Supervisor Agent | 从任务清单中选择可执行任务,识别合适的 Worker Agent,并将其分发给 Worker Agent | 调度逻辑、消息队列/直接调用 |
| 5. 工作者执行 | Worker Agent | 接收任务,利用其专业工具和逻辑执行任务,捕获执行结果(包括成功/失败、输出数据) | 工具调用、业务逻辑、沙盒环境(可选) |
| 6. 结果返回 | Worker Agent | 将任务执行结果(Task 对象更新状态和数据)反馈给 Supervisor |
消息队列、回调机制 |
| 7. 结果评估 | Supervisor Agent | 接收 Worker Agent 的反馈,评估任务结果,更新全局上下文,并检查是否满足高层级目标或是否有新的依赖被满足 | LLM推理、规则引擎、状态机、全局上下文管理 |
| 8. 迭代决策 | Supervisor Agent | 根据评估结果,决定下一步行动:继续分发下一个任务、重新规划、生成补充任务、重试失败任务,或宣布目标完成 | LLM推理、条件逻辑、任务状态机 |
| 9. 闭环完成 | Supervisor Agent/用户 | 当所有任务完成且目标达成时,Supervisor 报告最终结果,并可关闭或等待新目标。 | 最终结果输出、系统清理 |
B. 详细步骤分解与代码片段
我们将通过一个具体的例子来演示整个流程:“生成一份市场分析报告”。
1. 初始目标接收
用户通过某个接口向Supervisor提交一个高层级目标。
import json
# 假设这是程序的入口
async def main():
llm_client = MockLLMClient()
supervisor = SupervisorAgent("MainSupervisor", llm_client)
# 注册工作者Agent
supervisor.register_worker("data_collector", DataCollectorWorker())
supervisor.register_worker("data_analyst", DataAnalystWorker())
supervisor.register_worker("report_writer", ReportWriterWorker())
supervisor.register_worker("editor", EditorWorker())
# 注册第二个任务链的工作者
supervisor.register_worker("db_designer", DBDesignerWorker())
supervisor.register_worker("backend_dev", BackendDevWorker())
supervisor.register_worker("frontend_dev", FrontendDevWorker())
supervisor.register_worker("devops_engineer", DevOpsEngineerWorker())
# 初始目标
goal_market_report = "生成一份市场分析报告,涵盖最新的科技市场趋势和销售数据。"
await supervisor.execute_goal(goal_market_report)
print("n" + "="*50 + "n")
# 第二个目标
goal_web_app = "开发一个简单的Web应用,包括数据库、后端API和前端UI,并部署上线。"
await supervisor.execute_goal(goal_web_app)
if __name__ == "__main__":
asyncio.run(main())
2. Supervisor 初始规划与任务分解 (LLM调用)
Supervisor接收目标后,利用LLM进行初步规划,将大目标分解为一系列子任务。
# SupervisorAgent._plan_tasks 方法内部
# prompt = f"""
# 你是一个高级任务规划者。
# 当前目标是:{goal}
# ...
# """
# response_str = await self.llm.generate_response(prompt)
# task_defs = json.loads(response_str)
LLM模拟输出示例 (对应 goal_market_report):
[
{"description": "收集市场数据", "worker": "data_collector", "input_data": {"query": "最新科技市场数据"}},
{"description": "分析数据趋势", "worker": "data_analyst", "input_data": {"data_source": "previous_task_result"}},
{"description": "撰写报告草稿", "worker": "report_writer", "input_data": {"analysis_summary": "previous_task_result"}},
{"description": "审查并修订报告", "worker": "editor", "input_data": {"draft_report": "previous_task_result"}}
]
3. 任务生成与入队
Supervisor将LLM生成的任务定义转换为Task对象,并将其添加到自己的all_tasks字典和task_queue中。
# SupervisorAgent.execute_goal 方法内部
# initial_tasks = await self._plan_tasks(goal, self.global_context)
# for task in initial_tasks:
# await self.task_queue.put(task)
# self.all_tasks[task.task_id] = task
4. 任务分发与工作者启动
Supervisor从task_queue中取出处于PENDING状态且依赖已满足的任务,找到对应的注册工作者Agent,并将任务分发给它。
# SupervisorAgent.execute_goal 循环内部
# task = self.all_tasks.get(self.get_next_runnable_task_id())
# ...
# worker = self.available_workers.get(task.assigned_worker)
# if worker:
# input_data_with_deps = self._resolve_dependencies(task.input_data)
# task.input_data = input_data_with_deps
# asyncio.create_task(worker.execute_task(task, self.completed_tasks))
例如,对于第一个任务“收集市场数据”,Supervisor会将其分发给data_collector。
5. 工作者执行与结果返回
data_collector Agent接收任务,执行数据收集模拟,并将结果封装回Task对象,然后放入completed_tasks_queue。
# DataCollectorWorker._run_specific_logic 方法
# await asyncio.sleep(2)
# mock_data = { ... }
# return {"collected_data": mock_data}
# BaseWorkerAgent.execute_task 方法的最后
# await completed_tasks_queue.put(task)
6. Supervisor 结果评估与下一步决策
Supervisor从completed_tasks_queue中取出已完成的任务,更新其状态,并调用LLM评估结果。
# SupervisorAgent.execute_goal 循环内部
# completed_task = await self.completed_tasks.get()
# self.all_tasks[completed_task.task_id] = completed_task
# is_success = await self._evaluate_result(completed_task)
# if is_success:
# self.global_context[completed_task.task_id] = completed_task.result
# # 动态规划 (此处简化,实际会再次调用LLM)
# else:
# # 错误处理,可能重试或重新规划
例如,data_collector任务完成后,其结果{"collected_data": ...}会被添加到global_context中。当data_analyst任务被分发时,Supervisor会解析input_data中的"previous_task_result"占位符,将其替换为data_collector任务的实际输出。
7. 闭环迭代与最终完成
这个过程会持续进行。每个任务完成后,Supervisor都会评估结果并决定下一个步骤。如果所有任务都成功完成,且评估认为高层级目标已达成,Supervisor会宣布任务完成,并输出最终结果。
例如,当editor完成最终报告的修订后,Supervisor会评估其结果,发现报告已准备好,便可宣布整个“生成市场分析报告”的目标完成。
最终输出示例 (global_context 部分):
{
"task_id_of_data_collector": {
"collected_data": {
"query": "最新科技市场数据",
"raw_data": [
{"item": "Laptop", "sales": 1200, "region": "North"},
{"item": "Keyboard", "sales": 800, "region": "South"},
{"item": "Mouse", "sales": 500, "region": "East"}
],
"timestamp": "2023-10-27"
}
},
"task_id_of_data_analyst": {
"analysis_summary": {
"total_sales": 2500,
"regions_covered": ["North", "South", "East"],
"top_item": "Laptop"
}
},
"task_id_of_report_writer": {
"draft_report": "市场分析报告草稿:nn根据最新数据分析,总销售额达到 2500。n主要销售区域包括 North, South, East。n最畅销商品是 Laptop。nn请进行进一步审查和修订。"
},
"task_id_of_editor": {
"final_report": "市场分析报告最终版:nn根据最新数据分析,总销售额达到 2500。n主要销售区域包括 North, South, East。n最畅销商品是 Laptop。nn报告已修订并准备发布。"
}
}
V. 架构设计考量与最佳实践
构建一个健壮、高效的Supervisor架构需要考虑多方面的设计原则和最佳实践。
A. 模块化与解耦
- Supervisor与Worker的解耦: Supervisor不应知道Worker的内部实现细节,只通过预定义接口(任务定义、结果结构)交互。Worker也应独立于Supervisor运行。
- Worker内部的模块化: 每个Worker应专注于单一职责,其内部逻辑可以进一步分解为更小的函数或工具。
- 好处: 提高可维护性、可测试性,方便独立开发和部署。
B. 扩展性与并发性
- 水平扩展: Supervisor可以调度多个Worker实例,甚至可以有多个Supervisor实例(通过分布式锁或协调服务避免冲突)。通过消息队列,可以轻松增加Worker的数量来处理并发任务。
- 异步编程: 利用
asyncio(Python)或其他语言的异步机制,使得Supervisor在等待Worker结果时不会阻塞,能够同时管理多个任务的生命周期。 - 并行处理: 不同的Worker Agent可以并行执行任务,显著提高整体效率。
C. 容错性与健壮性
- 错误处理: Worker Agent应捕获并报告所有错误,Supervisor根据错误类型执行重试、跳过、重新规划或中止等策略。
- 任务持久化: 任务状态和全局上下文应存储在持久化存储(数据库)中,以防Supervisor或Worker崩溃后数据丢失,支持恢复。
- 心跳机制: Supervisor可以定期检查Worker Agent的活跃状态,及时发现并处理离线Worker。
D. 状态管理与持久化
- 全局上下文: Supervisor必须维护一个全局上下文,记录任务进展、中间结果和关键信息。这个上下文是LLM进行后续规划的重要依据。
- 任务状态: 每个任务的状态(
PENDING,IN_PROGRESS,COMPLETED,FAILED)必须准确更新。 - 持久化存储: 对于长期运行或关键任务,
all_tasks和global_context应定期或实时地写入数据库,以防止系统重启导致状态丢失。
E. 可观测性:日志、监控、追踪
- 详细日志: Supervisor和Worker Agent都应生成详细的日志,记录任务接收、执行、结果、错误等信息,方便调试和问题追溯。
- 任务监控: 提供一个仪表盘或界面,实时显示所有任务的状态、进度、耗时,以及Worker Agent的负载情况。
- 分布式追踪: 对于跨多个服务和Agent的复杂任务,引入分布式追踪系统(如OpenTelemetry, Jaeger)可以帮助理解请求流和性能瓶颈。
F. 安全性考虑
- Worker沙箱: 如果Worker Agent执行外部代码或访问敏感资源,应将其运行在沙箱环境中,限制其权限,防止恶意或错误操作影响整个系统。
- 认证与授权: Supervisor与Worker之间的通信应进行认证和授权,确保只有合法的Agent才能交互。
- 输入验证: 对所有传入任务的输入数据进行严格验证,防止注入攻击或其他恶意输入。
G. 上下文管理与成本优化
- 上下文窗口限制: LLM有上下文窗口限制,Supervisor需要智能地管理传递给LLM的上下文信息,只包含最相关和必要的数据,避免上下文漂移和Token超限。
- Token成本优化: 频繁调用LLM会产生高昂费用。Supervisor应优化Prompt,减少不必要的LLM调用,或使用缓存机制。
- 中间结果总结: Worker返回的详细结果在传递给LLM进行评估或规划前,可以由Supervisor进行摘要或过滤,减少LLM处理的数据量。
VI. 实际应用场景
‘Supervisor’ 架构凭借其动态规划和迭代执行的能力,在多个复杂领域展现出巨大潜力。
A. 智能数据分析与报告生成
- 场景: 用户提出“分析过去一年销售数据,并生成一份市场趋势报告”。
- Supervisor:
- 规划任务:数据抽取、数据清洗、数据分析(趋势、相关性)、图表生成、报告撰写、报告审查。
- 根据分析结果动态调整报告侧重点,或要求补充特定数据。
- Worker Agents:
DataExtractorAgent:从数据库、API、文件系统提取数据。DataCleanerAgent:处理缺失值、异常值、格式转换。DataAnalysisAgent:执行统计分析、机器学习模型预测。ChartGeneratorAgent:根据分析结果生成各种图表。ReportWriterAgent:整合分析结果和图表,撰写报告初稿。ReviewerAgent:审查报告内容、语法、逻辑,并提出修改意见。
B. 自动化软件开发与测试
- 场景: 用户提出“开发一个支持用户注册和登录的简单Web应用”。
- Supervisor:
- 规划任务:需求分析、数据库设计、后端API开发、前端UI开发、单元测试、集成测试、部署。
- 根据测试结果,动态调整开发任务,修复Bug,或重新设计部分模块。
- Worker Agents:
RequirementsAnalystAgent:与用户交互,明确需求。DBDesignerAgent:设计数据库Schema。BackendDevAgent:编写API代码、业务逻辑。FrontendDevAgent:开发用户界面。UnitTestAgent:为代码编写并运行单元测试。IntegrationTestAgent:执行集成测试。BugFixAgent:根据测试报告修复Bug。DevOpsAgent:负责CI/CD和应用部署。
C. 复杂多模态AI应用
- 场景: 用户上传一张图片,要求“描述图片内容,识别其中人物,并根据人物表情猜测其情绪,然后生成一段基于情绪的创意短文”。
- Supervisor:
- 规划任务:图片内容识别、人脸检测、情绪识别、文本生成。
- 根据识别结果,动态调整文本生成的风格和内容。
- Worker Agents:
ImageCaptioningAgent:生成图片描述。FaceDetectionAgent:检测图片中的人脸位置。EmotionRecognitionAgent:分析人脸表情,识别情绪。CreativeWriterAgent:根据图片描述、人物和情绪生成创意短文。
D. 智能客服与任务自动化
- 场景: 客户咨询“我的订单#XYZ出了问题,请查询并解决”。
- Supervisor:
- 规划任务:订单信息查询、问题诊断、解决方案查找、与客户沟通、更新订单状态。
- 根据查询结果和问题类型,动态选择不同的解决方案流程(如退款、换货、技术支持)。
- Worker Agents:
OrderQueryAgent:查询订单数据库,获取详情。ProblemDiagnosisAgent:分析订单问题类型。SolutionFinderAgent:从知识库或API中查找解决方案。CommunicationAgent:通过聊天或邮件与客户沟通。OrderUpdateAgent:更新订单状态或触发后台操作。
VII. 挑战与展望
尽管’Supervisor’ 架构提供了强大的能力,但在实际部署和维护中仍面临一些挑战:
- 上下文漂移与LLM的非确定性: 随着任务链的延长,Supervisor需要维护的上下文会逐渐增大,可能导致LLM的推理能力下降,甚至产生“幻觉”。LLM的非确定性也可能导致每次规划或评估的结果不一致,增加了系统调试的复杂性。
- 调试复杂性: 分布式系统和Agent之间的复杂交互使得问题诊断变得困难。需要强大的日志、监控和追踪工具。
- 计算与成本开销: 频繁的LLM调用(尤其是在规划和评估阶段)会带来显著的计算资源消耗和经济成本。需要精细的Prompt工程和缓存策略来优化。
- 任务依赖管理: 现实世界的任务往往存在复杂的依赖关系(例如,任务B必须在任务A完成后才能开始)。Supervisor需要一个鲁棒的依赖解析和调度机制。
- 安全性与权限控制: 如何确保各个Worker Agent在执行任务时不会越权访问资源,以及如何安全地处理敏感数据,是分布式Agent系统必须解决的问题。
展望未来,’Supervisor’ 架构将朝着更智能化、更自适应的方向发展:
- 自适应学习与优化: Supervisor将不仅仅是执行规划,还能从历史任务的成功与失败中学习,自动优化规划策略和Worker Agent的选择。
- 更强大的多模态理解: 结合视觉、听觉等多模态输入,使Supervisor能够处理更丰富的环境信息。
- 人类与Agent协作: 发展更自然的接口,让人类可以介入Supervisor的决策过程,提供指导或纠正,形成“人机共驾”的智能系统。
- 形式化验证与可信AI: 研究如何对Agent系统的行为进行形式化验证,确保其决策符合预期,提高系统的可信度和安全性。
VIII. 架构的精髓在于智能的分解、协作与迭代,它为构建能够处理现实世界复杂性和动态性的下一代AI系统提供了强大的范式。
通过将复杂目标分解为可管理的子任务,并利用专业的Agent高效执行,同时通过智能的反馈闭环实现动态调整和优化,’Supervisor’ 架构极大地提升了AI系统处理复杂、多阶段、动态任务的能力。它不仅是技术上的创新,更是对人类协作模式在AI领域的一种映射,预示着未来AI系统将以更加灵活、鲁棒和智能的方式,解决我们面临的各种挑战。