多智能体系统中的协作与挑战:经理智能体与执行智能体的任务分配与结果审计
尊敬的各位专家、学者,以及对多智能体系统(Multi-Agent Systems, MAS)充满热情的同仁们:
大家好!
在当今复杂多变的技术环境中,单一的、集中式的系统往往难以应对快速变化的需求和高度并行的任务。多智能体系统作为一种分布式人工智能范式,通过让多个自主、协作的智能体共同解决问题,展现出强大的潜力和灵活性。在MAS的众多应用场景中,任务的有效分配与结果的可靠审计是系统能否高效、健壮运行的关键。今天,我们将深入探讨一个核心的MAS协作模式:经理智能体(Manager Agent)与执行智能体(Executor Agent)之间的任务分配与结果审计机制。我们将从概念定义出发,逐步剖析各种实现策略,并辅以详尽的Python代码示例,力求构建一个逻辑严谨、实践可行的技术框架。
智能体角色定义:经理与执行者
在MAS中,智能体的角色是动态且多样的。但在许多实际应用中,一种层次化的协作模式非常有效,即由一个或一组智能体负责高层决策、规划和协调,而另一组智能体则专注于执行具体的、细粒度的任务。这就是经理智能体与执行智能体的核心分工。
1. 经理智能体 (Manager Agent)
经理智能体是系统的协调者和管理者。它的主要职责包括:
- 任务分解与规划:将高层目标分解为一系列可执行的子任务。
- 资源管理:了解系统内可用资源(包括执行智能体)的状态、能力和负载。
- 任务分配:根据任务特性、执行智能体的能力、负载和声誉,将子任务分配给最合适的执行智能体。
- 进度监控:跟踪已分配任务的执行状态,识别潜在的延迟或失败。
- 结果审计:接收执行智能体提交的结果,并对其进行验证和审计,确保结果的正确性和合规性。
- 异常处理:当任务执行失败或结果不符合预期时,采取补救措施,如任务重分配、回滚等。
- 决策与优化:根据任务执行的历史数据和反馈,优化未来的任务分配策略和系统整体性能。
经理智能体通常具备更强的推理、规划和决策能力,它维持着对整个系统(或其关注的子系统)的宏观视图。
2. 执行智能体 (Executor Agent)
执行智能体是系统的劳动力,它专注于执行具体的任务。其主要职责包括:
- 能力注册与维护:向经理智能体(或一个注册中心)声明自身具备的能力、专长和当前状态。
- 任务接收与解析:接收经理智能体分配的任务,并理解任务的具体要求。
- 任务执行:利用自身的工具、知识和接口,按照任务描述执行操作。
- 状态报告:在任务执行过程中或完成后,向经理智能体报告任务的当前状态和进度。
- 结果提交:任务完成后,将执行结果(包括成功/失败状态、产出数据、日志等)提交给经理智能体进行审计。
- 错误自处理:在自身能力范围内,尝试处理执行过程中遇到的局部错误。
执行智能体通常专注于某个特定领域,具有较强的领域知识和操作能力,但其对全局的视图可能有限。
3. 角色交互模型
经理智能体与执行智能体之间形成一个清晰的请求-响应(Request-Response)或发布-订阅(Publish-Subscribe)交互模式。经理智能体发布任务,执行智能体响应并执行,然后报告结果,经理智能体审计结果并可能采取进一步行动。这种分工明确的模式有助于系统的模块化、可扩展性和容错性。
任务的标准化表示
在任何协作系统中,任务的清晰、无歧义的表示是实现有效沟通和分配的基础。一个标准化的任务结构能够确保经理智能体和执行智能体对任务内容有共同的理解。我们可以将任务定义为一个包含多个字段的数据结构。
1. 任务结构字段
| 字段名称 | 数据类型 | 描述 | 示例值 |
|---|---|---|---|
task_id |
字符串 | 任务的唯一标识符。 | "T1001" |
task_type |
字符串 | 任务的类型,指示所需的能力或领域。 | "data_processing", "web_scraping" |
description |
字符串 | 任务的简要描述。 | "从指定URL抓取新闻标题和内容" |
parameters |
字典/JSON | 任务执行所需的具体参数。 | {"url": "https://example.com/news", "selector": "h2.title"} |
deadline |
时间戳/日期 | 任务的截止时间。 | "2024-12-31T23:59:59Z" |
priority |
枚举/整数 | 任务的优先级 (如:低, 中, 高)。 | "HIGH" |
expected_output_format |
字典/JSON模式 | 预期结果的数据结构或格式。 | {"title": "string", "content": "string"} |
requester_id |
字符串 | 任务的请求者(如果存在)。 | "User_X" |
assigned_to |
字符串 | 当前被分配的执行智能体ID(初始为空)。 | null |
status |
字符串 | 任务的当前状态 (如:PENDING, ASSIGNED, IN_PROGRESS, COMPLETED, FAILED, AUDITED) |
"PENDING" |
2. 任务表示示例 (JSON/Python 字典)
在实际系统中,JSON或YAML是常见的任务表示格式,因其易读性和跨语言兼容性。
import uuid
import datetime
class Task:
def __init__(self, task_type, description, parameters, deadline=None, priority="MEDIUM", requester_id=None):
self.task_id = str(uuid.uuid4()) # 唯一任务ID
self.task_type = task_type
self.description = description
self.parameters = parameters
self.deadline = deadline if deadline else (datetime.datetime.now() + datetime.timedelta(hours=1)).isoformat()
self.priority = priority
self.expected_output_format = {"data": "any", "status": "string"} # 简化示例,实际会更详细
self.requester_id = requester_id
self.assigned_to = None
self.status = "PENDING"
self.creation_time = datetime.datetime.now().isoformat()
self.assignment_history = [] # 记录任务分配历史
def to_dict(self):
return {
"task_id": self.task_id,
"task_type": self.task_type,
"description": self.description,
"parameters": self.parameters,
"deadline": self.deadline,
"priority": self.priority,
"expected_output_format": self.expected_output_format,
"requester_id": self.requester_id,
"assigned_to": self.assigned_to,
"status": self.status,
"creation_time": self.creation_time,
"assignment_history": self.assignment_history
}
@classmethod
def from_dict(cls, data):
task = cls(
task_type=data['task_type'],
description=data['description'],
parameters=data['parameters'],
deadline=data.get('deadline'),
priority=data.get('priority', 'MEDIUM'),
requester_id=data.get('requester_id')
)
task.task_id = data['task_id']
task.assigned_to = data.get('assigned_to')
task.status = data.get('status', 'PENDING')
task.creation_time = data.get('creation_time')
task.assignment_history = data.get('assignment_history', [])
return task
# 示例任务创建
example_task = Task(
task_type="web_scraping",
description="Scrape top 5 articles from a tech blog.",
parameters={"url": "https://techblog.com", "num_articles": 5, "xpath": "//article/h2/a/@href"},
priority="HIGH"
)
print(example_task.to_dict())
任务分配机制详解
任务分配是经理智能体最核心的功能之一,它直接影响系统的效率、负载均衡和容错性。任务分配机制可以从简单指派到复杂的协商竞价进行演变。
1. 概述:从简单指派到智能协商
- 简单指派:经理智能体直接将任务分配给某个执行智能体,不考虑太多因素。适用于任务类型单一、执行智能体能力同质的场景。
- 基于能力匹配:经理智能体根据任务需求和执行智能体注册的能力进行匹配。这是最基本的智能分配。
- 基于负载均衡:在能力匹配的基础上,考虑执行智能体的当前工作负载,避免某个智能体过载。
- 基于协商/竞价:执行智能体主动参与任务分配过程,通过报价、声明意愿等方式与经理智能体协商,争取任务。这增加了智能体的自主性。
- 基于声誉/信任:经理智能体根据执行智能体的历史表现(成功率、响应时间、审计结果等)来决定任务分配,优先选择更可靠的智能体。
2. 集中式分配 (Centralized Assignment)
在集中式分配中,经理智能体拥有全局知识,负责所有的分配决策。
优点:决策简单、高效,易于实现全局优化。
缺点:经理智能体可能成为单点瓶颈或单点故障。
常见算法:
- 轮询 (Round Robin):简单地按顺序将任务分配给执行智能体。适用于智能体能力同质且负载均匀的场景。
- 最短队列 (Shortest Queue):将任务分配给当前任务队列最短的执行智能体。
- 基于能力与负载匹配:这是最常用且有效的集中式策略。经理智能体维护一个执行智能体注册表,包含它们的能力和当前负载。
Python代码示例:基于能力与负载的匹配
我们首先定义一个简单的 Agent 基类和 ManagerAgent、ExecutorAgent 类。
import time
import random
import threading
from collections import deque
# 基础Agent类
class BaseAgent:
def __init__(self, agent_id):
self.agent_id = agent_id
self.mailbox = deque() # 智能体间通信的邮箱
self.stop_event = threading.Event()
def send_message(self, receiver_agent, message):
receiver_agent.mailbox.append(message)
# print(f"[{self.agent_id}] Sent message to [{receiver_agent.agent_id}]: {message['performative']}")
def receive_message(self):
if self.mailbox:
return self.mailbox.popleft()
return None
def run(self):
print(f"Agent {self.agent_id} started.")
while not self.stop_event.is_set():
message = self.receive_message()
if message:
self.handle_message(message)
time.sleep(0.1) # 避免忙等待
print(f"Agent {self.agent_id} stopped.")
def handle_message(self, message):
raise NotImplementedError("Subclasses must implement handle_message method.")
def stop(self):
self.stop_event.set()
class ExecutorAgent(BaseAgent):
def __init__(self, agent_id, capabilities):
super().__init__(agent_id)
self.capabilities = capabilities # 例如: {"web_scraping": True, "data_processing": True}
self.current_load = 0 # 当前正在处理的任务数量
self.reputation = 0.5 # 初始声誉值
self.tasks_in_progress = {} # {task_id: task_object}
def register_self(self, manager_agent):
# 智能体向经理智能体注册自身能力
message = {
"performative": "REGISTER",
"sender": self.agent_id,
"capabilities": self.capabilities
}
self.send_message(manager_agent, message)
print(f"[{self.agent_id}] Registered with manager, capabilities: {self.capabilities}")
def handle_message(self, message):
performative = message['performative']
sender = message['sender']
if performative == "ASSIGN_TASK":
task_data = message['content']['task']
task = Task.from_dict(task_data)
print(f"[{self.agent_id}] Received task {task.task_id}: {task.description}")
self.current_load += 1
self.tasks_in_progress[task.task_id] = task
threading.Thread(target=self._execute_task, args=(task, sender)).start()
elif performative == "REQUEST_CAPABILITIES":
# 经理智能体查询能力,执行智能体响应
response = {
"performative": "INFORM_CAPABILITIES",
"sender": self.agent_id,
"receiver": sender,
"content": {
"capabilities": self.capabilities,
"load": self.current_load,
"reputation": self.reputation
}
}
self.send_message(sender, response)
else:
print(f"[{self.agent_id}] Received unknown message: {message}")
def _execute_task(self, task, manager_agent):
print(f"[{self.agent_id}] Started executing task {task.task_id} ({task.task_type}).")
# 模拟任务执行时间
execution_time = random.uniform(2, 5)
time.sleep(execution_time)
# 模拟任务结果:5%失败率
success = random.random() > 0.05
result_data = None
status = "FAILED"
error_message = None
if success:
status = "COMPLETED"
result_data = {"processed_data": f"Result for {task.task_id} by {self.agent_id}", "execution_time": execution_time}
print(f"[{self.agent_id}] Task {task.task_id} completed successfully.")
else:
error_message = "Simulated execution failure."
print(f"[{self.agent_id}] Task {task.task_id} failed: {error_message}")
# 报告结果给经理智能体
report_message = {
"performative": "INFORM_RESULT",
"sender": self.agent_id,
"receiver": manager_agent.agent_id,
"content": {
"task_id": task.task_id,
"status": status,
"result": result_data,
"error": error_message,
"execution_time": execution_time
}
}
self.send_message(manager_agent, report_message)
self.current_load -= 1
del self.tasks_in_progress[task.task_id]
class ManagerAgent(BaseAgent):
def __init__(self, agent_id):
super().__init__(agent_id)
self.executor_agents = {} # {agent_id: {"capabilities": {}, "load": 0, "reputation": 0.5, "agent_ref": executor_agent_object}}
self.pending_tasks = deque()
self.tasks_in_progress = {} # {task_id: task_object}
self.completed_tasks = {}
self.failed_tasks = {}
def handle_message(self, message):
performative = message['performative']
sender = message['sender']
if performative == "REGISTER":
capabilities = message['capabilities']
self.executor_agents[sender] = {
"capabilities": capabilities,
"load": 0,
"reputation": 0.5, # 初始声誉
"agent_ref": None # 稍后在主循环中设置实际Agent对象
}
print(f"[{self.agent_id}] Executor {sender} registered with capabilities: {capabilities}")
elif performative == "INFORM_CAPABILITIES":
# 响应经理智能体的查询,更新执行智能体的最新状态
if sender in self.executor_agents:
self.executor_agents[sender]['capabilities'] = message['content']['capabilities']
self.executor_agents[sender]['load'] = message['content']['load']
self.executor_agents[sender]['reputation'] = message['content']['reputation']
# print(f"[{self.agent_id}] Updated info for {sender}: {self.executor_agents[sender]}")
elif performative == "INFORM_RESULT":
task_id = message['content']['task_id']
status = message['content']['status']
result = message['content']['result']
error = message['content']['error']
execution_time = message['content']['execution_time']
if task_id in self.tasks_in_progress:
task = self.tasks_in_progress[task_id]
task.status = status
task.assigned_to = None # 任务完成后解除分配
print(f"[{self.agent_id}] Received result for task {task_id} from {sender}. Status: {status}")
# 审计结果
audit_passed = self._audit_task_result(task, result, error, status, execution_time)
if status == "COMPLETED" and audit_passed:
self.completed_tasks[task_id] = task
print(f"[{self.agent_id}] Task {task_id} AUDITED_SUCCESSFULLY.")
self._update_executor_reputation(sender, True)
else:
self.failed_tasks[task_id] = task
print(f"[{self.agent_id}] Task {task_id} FAILED or AUDIT_FAILED. Error: {error if error else 'Audit failed.'}")
self._update_executor_reputation(sender, False)
self._reassign_task(task) # 尝试重分配
del self.tasks_in_progress[task_id]
else:
print(f"[{self.agent_id}] Received result for unknown or already processed task {task_id}.")
else:
print(f"[{self.agent_id}] Received unknown message: {message}")
def add_task(self, task):
self.pending_tasks.append(task)
print(f"[{self.agent_id}] Added new task {task.task_id} ({task.task_type}). Pending tasks: {len(self.pending_tasks)}")
def _find_best_executor(self, task):
eligible_executors = []
for exec_id, info in self.executor_agents.items():
# 检查能力是否匹配
if task.task_type in info['capabilities'] and info['capabilities'][task.task_type]:
eligible_executors.append((exec_id, info['load'], info['reputation']))
if not eligible_executors:
return None
# 综合考虑负载和声誉
# 简单权重示例:声誉权重0.7,负载权重0.3 (负载越低越好,所以用 (1-load_ratio) 或直接排序)
# 假设所有executor的最大负载相同,或者我们关心的是相对负载
# 这里简化为:优先选择负载最低的,其次考虑声誉高的
eligible_executors.sort(key=lambda x: (x[1], -x[2])) # 按负载升序,按声誉降序
best_executor_id = eligible_executors[0][0]
# 更新管理器内部的负载记录,这只是一个预测值,实际以Executor Agent报告为准
if best_executor_id in self.executor_agents:
self.executor_agents[best_executor_id]['load'] += 1
return best_executor_id
def assign_next_task(self):
if not self.pending_tasks:
return
task = self.pending_tasks.popleft()
executor_id = self._find_best_executor(task)
if executor_id:
task.assigned_to = executor_id
task.status = "ASSIGNED"
task.assignment_history.append({"executor": executor_id, "timestamp": datetime.datetime.now().isoformat()})
self.tasks_in_progress[task.task_id] = task
executor_agent_ref = self.executor_agents[executor_id]['agent_ref']
if executor_agent_ref:
assign_message = {
"performative": "ASSIGN_TASK",
"sender": self.agent_id,
"receiver": executor_id,
"content": {"task": task.to_dict()}
}
self.send_message(executor_agent_ref, assign_message)
print(f"[{self.agent_id}] Assigned task {task.task_id} to {executor_id}.")
else:
print(f"[{self.agent_id}] Error: Executor agent reference not found for {executor_id}. Re-queuing task {task.task_id}.")
self.pending_tasks.appendleft(task) # 重新入队
else:
print(f"[{self.agent_id}] No suitable executor found for task {task.task_id}. Re-queuing.")
self.pending_tasks.appendleft(task) # 重新入队
def _audit_task_result(self, task, result_data, error, status, execution_time):
"""
审计任务结果的逻辑。
可以包括:
1. 结果格式校验
2. 结果内容逻辑校验 (例如,如果是数值计算,检查是否在合理范围)
3. 执行时间是否在预期范围内
4. 错误信息是否合理
"""
audit_passed = True
# 1. 状态校验
if status == "FAILED" and error is None:
print(f"[{self.agent_id}] Audit Warning: Task {task.task_id} failed but no error message provided.")
audit_passed = False
# 2. 结果格式校验 (简化示例)
if status == "COMPLETED" and not isinstance(result_data, dict):
print(f"[{self.agent_id}] Audit Error: Task {task.task_id} result format invalid.")
audit_passed = False
# 3. 模拟更复杂的逻辑校验 (例如:web_scraping任务应该有'processed_data'字段)
if status == "COMPLETED" and task.task_type == "web_scraping":
if not (result_data and 'processed_data' in result_data):
print(f"[{self.agent_id}] Audit Error: Web scraping task {task.task_id} missing 'processed_data'.")
audit_passed = False
# 4. 执行时间审计
if execution_time > 10 and status == "COMPLETED": # 假设超过10秒是慢的
print(f"[{self.agent_id}] Audit Warning: Task {task.task_id} completed but took too long ({execution_time}s).")
# 这不一定导致审计失败,但会影响声誉
return audit_passed
def _update_executor_reputation(self, executor_id, success):
"""
根据任务执行结果更新执行智能体的声誉。
简单的声誉模型:成功增加声誉,失败降低声誉。
"""
if executor_id in self.executor_agents:
current_reputation = self.executor_agents[executor_id]['reputation']
if success:
self.executor_agents[executor_id]['reputation'] = min(1.0, current_reputation + 0.1)
else:
self.executor_agents[executor_id]['reputation'] = max(0.0, current_reputation - 0.2) # 失败惩罚更大
print(f"[{self.agent_id}] Updated reputation for {executor_id}: {self.executor_agents[executor_id]['reputation']:.2f} (Success: {success})")
def _reassign_task(self, task):
"""
任务失败或审计未通过时,尝试重新分配任务。
"""
if task.status == "FAILED" or task.status == "AUDIT_FAILED":
# 可以设置重试次数限制
if task.assignment_history and len(task.assignment_history) > 3: # 超过3次重试则放弃
print(f"[{self.agent_id}] Task {task.task_id} failed too many times, giving up.")
return
print(f"[{self.agent_id}] Re-queuing task {task.task_id} for re-assignment.")
task.status = "PENDING"
task.assigned_to = None
self.pending_tasks.appendleft(task) # 重新放回任务队列头部,优先处理
def run(self):
print(f"Manager Agent {self.agent_id} started.")
while not self.stop_event.is_set():
message = self.receive_message()
if message:
self.handle_message(message)
# 定期尝试分配新任务
if self.pending_tasks:
self.assign_next_task()
time.sleep(0.5)
print(f"Manager Agent {self.agent_id} stopped.")
3. 去中心化分配与协商 (Decentralized Assignment & Negotiation)
去中心化分配将任务分配的决策权部分或全部下放给执行智能体。这增强了系统的鲁棒性和弹性,但也增加了通信开销和决策复杂性。
优点:增强智能体自主性,系统更具弹性,避免单点瓶颈。
缺点:全局优化困难,可能导致次优解,通信开销增加。
主要机制:
-
契约网协议 (Contract Net Protocol, CNP):这是MAS中最经典的协商协议之一,由经理智能体(作为发起者)发布任务招标,执行智能体(作为投标者)评估任务并提交报价,经理智能体选择最佳报价并签订“合同”。
CNP 流程:
- 招标 (Call for Proposals, CFP):经理智能体广播一个CFP消息,描述任务需求。
- 提案 (Propose):接收到CFP的执行智能体评估自身能力和负载,如果能够执行,则向经理智能体发送一个包含其报价(如执行时间、成本、置信度等)的提案。
- 接受/拒绝提案 (Accept/Reject Proposal):经理智能体收集所有提案,根据其内部策略(如最低价、最快时间、最高声誉)选择一个或多个最佳提案,并向相应的执行智能体发送接受消息,同时向其他智能体发送拒绝消息。
- 执行与报告 (Inform Done/Failure):被接受提案的执行智能体开始执行任务,完成后向经理智能体报告结果。
Python代码示例:模拟CNP流程
为了模拟CNP,我们需要修改ManagerAgent和ExecutorAgent的通信逻辑。
# ManagerAgent for CNP class ManagerAgentCNP(ManagerAgent): def __init__(self, agent_id): super().__init__(agent_id) self.cfps_in_progress = {} # {task_id: {"task_obj": task, "proposals": {executor_id: proposal_content}}} def handle_message(self, message): super().handle_message(message) # 调用父类的通用处理逻辑 performative = message['performative'] sender = message['sender'] if performative == "PROPOSE": task_id = message['content']['task_id'] proposal = message['content']['proposal'] if task_id in self.cfps_in_progress: self.cfps_in_progress[task_id]["proposals"][sender] = proposal print(f"[{self.agent_id}] Received proposal for task {task_id} from {sender}: {proposal}") else: print(f"[{self.agent_id}] Received proposal for unknown CFP task {task_id} from {sender}.") def assign_next_task(self): if not self.pending_tasks: return task = self.pending_tasks.popleft() # 发起CFP cfp_message = { "performative": "CALL_FOR_PROPOSAL", "sender": self.agent_id, "content": { "task": task.to_dict(), "task_id": task.task_id } } # 广播CFP给所有注册的执行智能体 for exec_id, info in self.executor_agents.items(): executor_agent_ref = info['agent_ref'] if executor_agent_ref: self.send_message(executor_agent_ref, cfp_message) self.cfps_in_progress[task.task_id] = {"task_obj": task, "proposals": {}} print(f"[{self.agent_id}] Issued CFP for task {task.task_id} ({task.task_type}).") # 在一段时间后评估提案 (这里简化为立即评估,实际可能需要等待一段时间) threading.Thread(target=self._evaluate_proposals, args=(task.task_id,)).start() def _evaluate_proposals(self, task_id): # 实际系统中,这里可能需要等待一段时间,收集所有提案 time.sleep(1) # 模拟等待提案 if task_id not in self.cfps_in_progress: return cfp_data = self.cfps_in_progress[task_id] task = cfp_data["task_obj"] proposals = cfp_data["proposals"] if not proposals: print(f"[{self.agent_id}] No proposals received for task {task_id}. Re-queuing.") self.pending_tasks.appendleft(task) del self.cfps_in_progress[task_id] return # 评估提案:选择最佳的(例如:最低报价,最快完成时间,且声誉良好) # 这里的proposal内容可以自定义,例如 {"estimated_time": 5, "cost": 100, "confidence": 0.9} best_executor_id = None best_score = float('inf') # 假设我们想最小化一个分数,例如 (estimated_time / reputation) for exec_id, proposal in proposals.items(): if "estimated_time" in proposal and "confidence" in proposal: executor_reputation = self.executor_agents.get(exec_id, {}).get("reputation", 0.5) # 简单评分:时间越短越好,声誉越高越好,置信度越高越好 # score = proposal["estimated_time"] / (executor_reputation * proposal["confidence"]) # 简化:只考虑时间最短的 score = proposal["estimated_time"] if score < best_score: best_score = score best_executor_id = exec_id if best_executor_id: # 接受最佳提案 task.assigned_to = best_executor_id task.status = "ASSIGNED" task.assignment_history.append({"executor": best_executor_id, "timestamp": datetime.datetime.now().isoformat()}) self.tasks_in_progress[task.task_id] = task executor_agent_ref = self.executor_agents[best_executor_id]['agent_ref'] if executor_agent_ref: accept_message = { "performative": "ACCEPT_PROPOSAL", "sender": self.agent_id, "receiver": best_executor_id, "content": {"task": task.to_dict(), "task_id": task.task_id} } self.send_message(executor_agent_ref, accept_message) print(f"[{self.agent_id}] Accepted proposal for task {task_id} from {best_executor_id}.") # 拒绝其他提案 for exec_id, _ in proposals.items(): if exec_id != best_executor_id: other_executor_ref = self.executor_agents[exec_id]['agent_ref'] if other_executor_ref: reject_message = { "performative": "REJECT_PROPOSAL", "sender": self.agent_id, "receiver": exec_id, "content": {"task_id": task_id} } self.send_message(other_executor_ref, reject_message) # print(f"[{self.agent_id}] Rejected proposal for task {task_id} from {exec_id}.") else: print(f"[{self.agent_id}] No best executor selected for task {task_id}. Re-queuing.") self.pending_tasks.appendleft(task) del self.cfps_in_progress[task_id] # ExecutorAgent for CNP class ExecutorAgentCNP(ExecutorAgent): def __init__(self, agent_id, capabilities): super().__init__(agent_id, capabilities) def handle_message(self, message): performative = message['performative'] sender = message['sender'] if performative == "CALL_FOR_PROPOSAL": task_data = message['content']['task'] task_id = message['content']['task_id'] task_type = task_data['task_type'] if task_type in self.capabilities and self.capabilities[task_type]: # 评估任务,生成提案 estimated_time = random.uniform(2, 7) + self.current_load * 1 # 负载越高,预估时间越长 confidence = random.uniform(0.7, 0.99) # 执行智能体对完成任务的置信度 proposal = { "task_id": task_id, "estimated_time": estimated_time, "cost": estimated_time * 10, # 模拟成本 "confidence": confidence } propose_message = { "performative": "PROPOSE", "sender": self.agent_id, "receiver": sender, "content": {"task_id": task_id, "proposal": proposal} } self.send_message(sender, propose_message) print(f"[{self.agent_id}] Sent proposal for task {task_id}: {proposal}") else: # 如果不能执行,则不回应,或者发送拒绝消息 # print(f"[{self.agent_id}] Cannot perform task {task_id} ({task_type}). Not proposing.") pass elif performative == "ACCEPT_PROPOSAL": task_data = message['content']['task'] task = Task.from_dict(task_data) print(f"[{self.agent_id}] Accepted proposal for task {task.task_id}. Starting execution.") self.current_load += 1 self.tasks_in_progress[task.task_id] = task threading.Thread(target=self._execute_task, args=(task, sender)).start() elif performative == "REJECT_PROPOSAL": task_id = message['content']['task_id'] # print(f"[{self.agent_id}] Proposal for task {task_id} rejected.") else: super().handle_message(message) # 传递给父类处理,例如INFORM_RESULT
4. 基于能力与偏好的动态匹配
在更复杂的系统中,执行智能体不仅注册静态能力,还可能表达偏好(例如,对某些类型任务的兴趣度更高),或者其能力会随着学习和经验积累而动态变化。经理智能体需要一个更智能的匹配引擎,能够:
- 能力查询:根据任务需求,查询哪些智能体具备相应能力。
- 状态查询:获取智能体的实时负载、可用性。
- 偏好匹配:如果多个智能体都具备能力,优先选择偏好度高的。
- 动态调整:根据历史表现和反馈,调整对智能体能力的评估或优先选择权重。
智能体间通信协议与消息格式
智能体之间的有效通信是MAS运作的基石。一个良好的通信协议需要定义消息的结构、语义和传输方式。
1. FIPA ACL (Agent Communication Language)
FIPA(Foundation for Intelligent Physical Agents)是一个致力于推广智能体互操作性的国际组织,其定义的ACL是智能体通信的行业标准之一。FIPA ACL定义了一套丰富的通信原语(Performatives),以及消息的结构。
FIPA ACL 消息结构:
消息通常包含以下关键字段:
performative:通信行为的类型,如REQUEST(请求),INFORM(告知),CFP(招标),PROPOSE(提案),ACCEPT_PROPOSAL(接受提案),REJECT_PROPOSAL(拒绝提案),FAILURE(失败),AGREE(同意) 等。sender:发送智能体的标识符。receiver:接收智能体的标识符(可以是单个智能体、智能体列表或一个组)。content:消息的具体内容,通常是结构化数据(如XML、JSON)。language:用于编码content的语言(如Python-dict,JSON,XML)。ontology:描述content所属领域的概念和词汇的本体。reply-with:用于在响应中标识此消息,支持异步通信。in-reply-to:响应消息引用原消息的标识符。
Python 字典模拟 FIPA ACL 消息
在我们的示例代码中,已经使用了Python字典来模拟FIPA ACL消息,例如:
# CFP 消息示例
cfp_message = {
"performative": "CALL_FOR_PROPOSAL",
"sender": "manager_alpha",
"receiver": "all", # 广播时可以这样表示
"content": {
"task": example_task.to_dict(),
"task_id": example_task.task_id
},
"language": "Python-dict",
"ontology": "TaskAllocation"
}
# PROPOSE 消息示例
propose_message = {
"performative": "PROPOSE",
"sender": "executor_01",
"receiver": "manager_alpha",
"content": {
"task_id": "T1001",
"proposal": {"estimated_time": 5, "cost": 100, "confidence": 0.9}
},
"language": "Python-dict",
"ontology": "TaskAllocation"
}
2. 传输层协议
FIPA ACL 定义的是消息的语义和结构,而传输则依赖底层的通信协议:
- HTTP/REST:简单易用,适用于Web服务交互,但可能存在长连接和实时性问题。
- MQTT:轻量级发布-订阅协议,适合物联网和资源受限环境,支持异步通信。
- WebSocket:提供全双工通信,适合需要实时、持久连接的场景。
- gRPC:基于HTTP/2和Protocol Buffers,提供高性能、强类型、多语言支持的RPC框架。
- 消息队列 (如 Kafka, RabbitMQ):提供解耦、异步、高吞吐量的消息传递机制,是构建大规模MAS的理想选择。
在我们的Python示例中,为了简化,我们使用了智能体对象间的直接方法调用和内部deque作为邮箱,模拟了消息传递。在真实系统中,这些send_message会封装成对上述传输协议的调用。
结果报告与反馈循环
任务执行完成后,执行智能体必须将结果报告给经理智能体。这是一个关键的反馈环节,对于经理智能体进行审计、更新智能体声誉以及后续决策至关重要。
1. 报告内容
一个完整的任务结果报告应包含:
task_id:标识是哪个任务的结果。status:任务的最终状态 (COMPLETED,FAILED,CANCELLED等)。result_data:任务的实际产出数据(如果成功)。execution_logs:任务执行过程中的日志信息,方便调试和追溯。error_details:如果任务失败,提供详细的错误信息和堆栈跟踪。execution_time:任务从开始到结束所花费的时间。resource_consumption:任务执行过程中消耗的资源(CPU、内存、网络等),用于性能审计。timestamp:报告生成的时间。
2. 报告机制
- 异步通知 (Push):执行智能体在任务完成后主动向经理智能体发送结果报告。这是最常见的模式。
- 定期查询 (Pull):经理智能体定期向执行智能体查询任务状态和结果。适用于任务执行时间较长,需要频繁了解进度的场景,或者执行智能体不具备主动推送能力时。
我们的代码示例中采用了异步通知模式:
# ExecutorAgent 中的报告逻辑
report_message = {
"performative": "INFORM_RESULT",
"sender": self.agent_id,
"receiver": manager_agent.agent_id,
"content": {
"task_id": task.task_id,
"status": status,
"result": result_data,
"error": error_message,
"execution_time": execution_time
}
}
self.send_message(manager_agent, report_message)
结果审计与验证策略
结果审计是经理智能体最重要的职责之一,它确保了系统输出的质量和可靠性。审计不仅要检查结果是否正确,还要评估执行过程是否高效、合规。
1. 审计目标
- 正确性 (Correctness):结果是否符合任务要求和预期。
- 完整性 (Completeness):所有预期的输出是否都已生成。
- 及时性 (Timeliness):任务是否在截止时间前完成。
- 效率 (Efficiency):任务执行是否使用了合理的资源和时间。
- 合规性 (Compliance):是否遵循了特定的业务规则或协议。
2. 审计策略
| 审计类型 | 描述 | 示例 |
|---|---|---|
| 数据校验 | 检查结果数据的格式、类型、范围和基本逻辑是否正确。 | 字符串是否为预期格式,数值是否在合理区间内,必填字段是否缺失。 |
| 一致性校验 | 将结果与已知事实、历史数据或通过其他方式(如冗余计算)获得的结果进行比较。 | 抓取的数据与参考数据库比对;对同一任务由多个智能体并行执行,然后进行结果投票或比较。 |
| 性能审计 | 评估任务执行的时间和资源消耗是否在可接受范围内。 | 任务执行时间是否超过截止时间或平均水平;CPU/内存使用是否异常。 |
| 日志与证据审计 | 检查执行智能体提供的日志、中间步骤或证明材料。 | 执行日志是否完整,是否包含关键操作步骤,是否有异常错误信息。 |
| 声誉与信任系统 | 结合执行智能体的历史表现,评估本次结果的可信度。 | 历史成功率高的智能体提交的结果更值得信任。 |
3. Python代码示例:数据校验与声誉系统
在 ManagerAgent 的 _audit_task_result 方法中,我们已经展示了基本的审计逻辑:
def _audit_task_result(self, task, result_data, error, status, execution_time):
audit_passed = True
# 1. 状态校验
if status == "FAILED" and error is None:
print(f"[{self.agent_id}] Audit Warning: Task {task.task_id} failed but no error message provided.")
audit_passed = False
# 2. 结果格式校验 (示例:web_scraping任务,需要结果是字典,且包含'processed_data')
if status == "COMPLETED" and task.task_type == "web_scraping":
if not isinstance(result_data, dict):
print(f"[{self.agent_id}] Audit Error: Task {task.task_id} result format invalid (expected dict).")
audit_passed = False
elif 'processed_data' not in result_data:
print(f"[{self.agent_id}] Audit Error: Web scraping task {task.task_id} missing 'processed_data'.")
audit_passed = False
# 进一步可以校验 processed_data 的结构,例如是否是列表,列表元素是否是字典等
elif not isinstance(result_data.get('processed_data'), str) or not result_data.get('processed_data').startswith("Result for"):
print(f"[{self.agent_id}] Audit Error: Web scraping task {task.task_id} processed_data content invalid.")
audit_passed = False
# 3. 执行时间审计
if execution_time > 6 and status == "COMPLETED": # 假设6秒是正常上限
print(f"[{self.agent_id}] Audit Warning: Task {task.task_id} completed but took too long ({execution_time:.2f}s).")
# 这不直接导致audit_passed = False,但会影响声誉更新
return audit_passed
def _update_executor_reputation(self, executor_id, success):
if executor_id in self.executor_agents:
current_reputation = self.executor_agents[executor_id]['reputation']
if success:
self.executor_agents[executor_id]['reputation'] = min(1.0, current_reputation + 0.1)
else:
self.executor_agents[executor_id]['reputation'] = max(0.0, current_reputation - 0.2)
# print(f"[{self.agent_id}] Updated reputation for {executor_id}: {self.executor_agents[executor_id]['reputation']:.2f} (Success: {success})")
信任与声誉系统:
声誉系统是审计的延伸。它通过记录智能体的历史表现,为未来的任务分配提供决策依据。一个智能体的声誉通常是一个介于0到1之间的数值,代表其可靠性。
- 初始化:所有智能体可以有一个中等初始声誉。
- 更新:每次任务执行成功并通过审计,声誉增加;失败或审计未通过,声誉降低。失败的惩罚通常大于成功的奖励,以促使智能体更负责。
- 遗忘机制:为了避免旧数据对当前表现的过度影响,可以引入遗忘因子,让近期表现对声誉的影响更大。
错误处理与任务重分配
在分布式系统中,错误是不可避免的。经理智能体需要一套健壮的错误处理和任务重分配机制来确保系统能够从失败中恢复并继续运行。
1. 失败类型
- 执行失败:执行智能体在执行任务时遇到错误并报告失败。
- 超时:任务在规定的截止时间前未能完成或报告结果。
- 结果不符合预期:执行智能体报告任务完成,但经理智能体审计后发现结果不正确或不完整。
- 智能体离线/无响应:执行智能体在任务执行过程中崩溃或失去连接。
2. 重试策略
当任务失败时,经理智能体可以采取以下重试策略:
- 固定重试次数:为每个任务设置一个最大重试次数。
- 指数退避 (Exponential Backoff):每次重试之间等待的时间逐渐增加,以避免对系统造成过大压力。
- 更换执行智能体:如果任务在同一智能体上多次失败,则将其分配给不同的、有能力的智能体。
- 任务回滚/分解:对于复杂的、有依赖关系的任务,可能需要回滚到上一个成功状态,或者重新分解任务。
在 ManagerAgent 的 _reassign_task 方法中,我们实现了简单的重试机制:
def _reassign_task(self, task):
if task.status == "FAILED" or task.status == "AUDIT_FAILED":
# 可以设置重试次数限制
# 记录在 task.assignment_history 中,每次分配都会增加一条记录
if len(task.assignment_history) > 3: # 假设任务重试超过3次则放弃
print(f"[{self.agent_id}] Task {task.task_id} failed too many times ({len(task.assignment_history)} attempts), giving up.")
return
print(f"[{self.agent_id}] Re-queuing task {task.task_id} for re-assignment. Attempt: {len(task.assignment_history) + 1}")
task.status = "PENDING"
task.assigned_to = None
self.pending_tasks.appendleft(task) # 重新放回任务队列头部,优先处理
系统扩展性与性能优化
随着智能体数量和任务负载的增加,系统的扩展性和性能优化变得至关重要。
- 并发处理:经理智能体和执行智能体都应设计为支持并发。经理智能体可以同时处理多个入站消息和管理多个任务分配流程。执行智能体可以同时执行多个分配给它的任务。
- 实现方式:使用线程池、进程池或异步I/O(如Python的
asyncio)。
- 实现方式:使用线程池、进程池或异步I/O(如Python的
- 消息队列:引入消息队列(如RabbitMQ, Kafka)来解耦智能体之间的通信。经理智能体将任务发布到队列,执行智能体从队列中拉取任务,结果也通过队列报告。这提高了系统的吞吐量和弹性。
- 分布式状态管理:经理智能体需要维护执行智能体的状态(能力、负载、声誉)和任务的状态。在分布式环境中,这需要一个可靠的分布式存储(如Redis, ZooKeeper, etcd)来存储和同步这些状态。
- 智能体池化:维护一个执行智能体池,根据需求动态地启动或关闭智能体实例。
- 负载预测与主动分配:经理智能体可以根据历史数据预测未来的任务负载,并主动调整任务分配策略,甚至预先启动更多执行智能体。
安全性考量
在MAS中,安全问题不容忽视,尤其是在处理敏感数据或执行关键任务时。
- 认证与授权:确保只有合法的智能体才能参与系统,并限制其对资源的访问权限。
- 实现方式:使用API密钥、令牌、数字证书等进行身份验证。
- 数据加密:保护智能体之间通信内容的机密性。
- 实现方式:使用TLS/SSL加密通信通道。
- 防篡改:确保任务内容和执行结果的完整性,防止恶意智能体篡改。
- 实现方式:对任务和结果数据进行哈希校验,或使用数字签名。
- 隔离:将不同任务或不同信任级别的智能体隔离在独立的沙箱环境中,防止恶意行为扩散。
- 日志与审计追踪:详细记录所有智能体的行为,以便在出现安全事件时进行审计和追溯。
Python实战:构建一个简化的经理-执行智能体系统
现在,我们将上述概念整合到一个更完整的Python示例中。我们将使用threading来模拟并发智能体,deque作为消息队列。
import uuid
import datetime
import time
import random
import threading
from collections import deque
# --- 1. 任务的标准化表示 ---
class Task:
def __init__(self, task_type, description, parameters, deadline=None, priority="MEDIUM", requester_id=None):
self.task_id = str(uuid.uuid4())
self.task_type = task_type
self.description = description
self.parameters = parameters
self.deadline = deadline if deadline else (datetime.datetime.now() + datetime.timedelta(hours=1)).isoformat()
self.priority = priority
self.expected_output_format = {"data": "any", "status": "string"} # Simplified
self.requester_id = requester_id
self.assigned_to = None
self.status = "PENDING"
self.creation_time = datetime.datetime.now().isoformat()
self.assignment_history = [] # Records {"executor": id, "timestamp": time, "attempt": N}
def to_dict(self):
return {
"task_id": self.task_id,
"task_type": self.task_type,
"description": self.description,
"parameters": self.parameters,
"deadline": self.deadline,
"priority": self.priority,
"expected_output_format": self.expected_output_format,
"requester_id": self.requester_id,
"assigned_to": self.assigned_to,
"status": self.status,
"creation_time": self.creation_time,
"assignment_history": self.assignment_history
}
@classmethod
def from_dict(cls, data):
task = cls(
task_type=data['task_type'],
description=data['description'],
parameters=data['parameters'],
deadline=data.get('deadline'),
priority=data.get('priority', 'MEDIUM'),
requester_id=data.get('requester_id')
)
task.task_id = data['task_id']
task.assigned_to = data.get('assigned_to')
task.status = data.get('status', 'PENDING')
task.creation_time = data.get('creation_time')
task.assignment_history = data.get('assignment_history', [])
return task
# --- 2. 智能体间通信基础 ---
class BaseAgent:
def __init__(self, agent_id):
self.agent_id = agent_id
self.mailbox = deque()
self.stop_event = threading.Event()
self.agent_manager_ref = None # Reference to the global agent manager for message routing
def set_agent_manager(self, manager):
self.agent_manager_ref = manager
def send_message(self, receiver_id, message):
if self.agent_manager_ref:
self.agent_manager_ref.route_message(self.agent_id, receiver_id, message)
else:
print(f"[{self.agent_id}] Error: No agent manager to route message to {receiver_id}.")
def receive_message(self):
if self.mailbox:
return self.mailbox.popleft()
return None
def run(self):
print(f"Agent {self.agent_id} started.")
while not self.stop_event.is_set():
message = self.receive_message()
if message:
self.handle_message(message)
time.sleep(0.05) # Small delay to prevent busy-waiting
print(f"Agent {self.agent_id} stopped.")
def handle_message(self, message):
raise NotImplementedError("Subclasses must implement handle_message method.")
def stop(self):
self.stop_event.set()
# --- 3. 执行智能体 (ExecutorAgent) ---
class ExecutorAgent(BaseAgent):
def __init__(self, agent_id, capabilities):
super().__init__(agent_id)
self.capabilities = capabilities
self.current_load = 0
self.reputation = 0.5 # Initial reputation
self.tasks_in_progress = {} # {task_id: task_object}
self.manager_agent_id = None # Store manager ID for direct communication
def register_self(self, manager_agent_id):
self.manager_agent_id = manager_agent_id
message = {
"performative": "REGISTER",
"sender": self.agent_id,
"capabilities": self.capabilities,
"load": self.current_load,
"reputation": self.reputation
}
self.send_message(manager_agent_id, message)
print(f"[{self.agent_id}] Registered with manager {manager_agent_id}, capabilities: {self.capabilities}")
def handle_message(self, message):
performative = message['performative']
sender = message['sender']
if performative == "ASSIGN_TASK":
task_data = message['content']['task']
task = Task.from_dict(task_data)
print(f"[{self.agent_id}] Received task {task.task_id[:8]}... ({task.task_type}).")
self.current_load += 1
self.tasks_in_progress[task.task_id] = task
threading.Thread(target=self._execute_task, args=(task,)).start()
elif performative == "REQUEST_CAPABILITIES":
response = {
"performative": "INFORM_CAPABILITIES",
"sender": self.agent_id,
"receiver": sender,
"content": {
"capabilities": self.capabilities,
"load": self.current_load,
"reputation": self.reputation
}
}
self.send_message(sender, response)
elif performative == "CALL_FOR_PROPOSAL": # CNP part
task_data = message['content']['task']
task_id = message['content']['task_id']
task_type = task_data['task_type']
if task_type in self.capabilities and self.capabilities[task_type]:
estimated_time = random.uniform(2, 7) + self.current_load * 0.5 # Load affects time
confidence = random.uniform(0.7, 0.99)
proposal = {
"task_id": task_id,
"estimated_time": estimated_time,
"cost": estimated_time * 10,
"confidence": confidence
}
propose_message = {
"performative": "PROPOSE",
"sender": self.agent_id,
"receiver": sender,
"content": {"task_id": task_id, "proposal": proposal}
}
self.send_message(sender, propose_message)
print(f"[{self.agent_id}] Sent proposal for task {task_id[:8]}...: {proposal}")
elif performative == "ACCEPT_PROPOSAL": # CNP part
task_data = message['content']['task']
task = Task.from_dict(task_data)
print(f"[{self.agent_id}] Accepted proposal for task {task.task_id[:8]}.... Starting execution.")
self.current_load += 1
self.tasks_in_progress[task.task_id] = task
threading.Thread(target=self._execute_task, args=(task,)).start()
elif performative == "REJECT_PROPOSAL": # CNP part
task_id = message['content']['task_id']
# print(f"[{self.agent_id}] Proposal for task {task_id[:8]}... rejected.")
else:
print(f"[{self.agent_id}] Received unknown message: {message['performative']}")
def _execute_task(self, task):
print(f"[{self.agent_id}] Started executing task {task.task_id[:8]}... ({task.task_type}).")
execution_time = random.uniform(2, 5)
time.sleep(execution_time)
success = random.random() > 0.1 # 10% failure rate
result_data = None
status = "FAILED"
error_message = None
if success:
status = "COMPLETED"
result_data = {"processed_data": f"Result for {task.task_id[:8]} by {self.agent_id}", "execution_time": execution_time}
print(f"[{self.agent_id}] Task {task.task_id[:8]}... completed successfully.")
else:
error_message = "Simulated execution failure."
print(f"[{self.agent_id}] Task {task.task_id[:8]}... failed: {error_message}")
report_message = {
"performative": "INFORM_RESULT",
"sender": self.agent_id,
"receiver": self.manager_agent_id,
"content": {
"task_id": task.task_id,
"status": status,
"result": result_data,
"error": error_message,
"execution_time": execution_time
}
}
self.send_message(self.manager_agent_id, report_message)
self.current_load -= 1
del self.tasks_in_progress[task.task_id]
# --- 4. 经理智能体 (ManagerAgent) ---
class ManagerAgent(BaseAgent):
def __init__(self, agent_id, assignment_strategy="CNP"):
super().__init__(agent_id)
self.executor_agents_info = {} # {agent_id: {"capabilities": {}, "load": 0, "reputation": 0.5}}
self.pending_tasks = deque()
self.tasks_in_progress = {} # {task_id: task_obj}
self.completed_tasks = {}
self.failed_tasks = {}
self.assignment_strategy = assignment_strategy
self.cfps_in_progress = {} # For CNP: {task_id: {"task_obj": task, "proposals": {executor_id: proposal_content}}}
def handle_message(self, message):
performative = message['performative']
sender = message['sender']
if performative == "REGISTER":
capabilities = message['capabilities']
load = message.get('load', 0)
reputation = message.get('reputation', 0.5)
self.executor_agents_info[sender] = {
"capabilities": capabilities,
"load": load,
"reputation": reputation
}
print(f"[{self.agent_id}] Executor {sender} registered. Capabilities: {capabilities}")
elif performative == "INFORM_CAPABILITIES": # Update executor info
if sender in self.executor_agents_info:
self.executor_agents_info[sender]['capabilities'] = message['content']['capabilities']
self.executor_agents_info[sender]['load'] = message['content']['load']
self.executor_agents_info[sender]['reputation'] = message['content']['reputation']
elif performative == "INFORM_RESULT":
task_id = message['content']['task_id']
status = message['content']['status']
result = message['content']['result']
error = message['content']['error']
execution_time = message['content']['execution_time']
if task_id in self.tasks_in_progress:
task = self.tasks_in_progress[task_id]
task.status = status
task.assigned_to = None # Task completed or failed, so it's no longer assigned
print(f"[{self.agent_id}] Received result for task {task_id[:8]}... from {sender}. Status: {status}")
audit_passed = self._audit_task_result(task, result, error, status, execution_time)
if status == "COMPLETED" and audit_passed:
self.completed_tasks[task_id] = task
print(f"[{self.agent_id}] Task {task_id[:8]}... AUDITED_SUCCESSFULLY.")
self._update_executor_reputation(sender, True)
else:
self.failed_tasks[task_id] = task
print(f"[{self.agent_id}] Task {task_id[:8]}... FAILED or AUDIT_FAILED. Error: {error if error else 'Audit failed.'}")
self._update_executor_reputation(sender, False)
self._reassign_task(task)
del self.tasks_in_progress[task_id]
else:
print(f"[{self.agent_id}] Received result for unknown or already processed task {task_id[:8]}...")
elif performative == "PROPOSE" and self.assignment_strategy == "CNP": # CNP part
task_id = message['content']['task_id']
proposal = message['content']['proposal']
if task_id in self.cfps_in_progress:
self.cfps_in_progress[task_id]["proposals"][sender] = proposal
# print(f"[{self.agent_id}] Received proposal for task {task_id[:8]}... from {sender}: {proposal}")
else:
print(f"[{self.agent_id}] Received proposal for unknown CFP task {task_id[:8]}... from {sender}.")
else:
print(f"[{self.agent_id}] Received unknown message: {message['performative']}")
def add_task(self, task):
self.pending_tasks.append(task)
print(f"[{self.agent_id}] Added new task {task.task_id[:8]}... ({task.task_type}). Pending tasks: {len(self.pending_tasks)}")
def _find_best_executor_centralized(self, task):
eligible_executors = []
for exec_id, info in self.executor_agents_info.items():
if task.task_type in info['capabilities'] and info['capabilities'][task.task_type]:
eligible_executors.append((exec_id, info['load'], info['reputation']))
if not eligible_executors:
return None
# Sort by load (ascending), then by reputation (descending)
eligible_executors.sort(key=lambda x: (x[1], -x[2]))
best_executor_id = eligible_executors[0][0]
return best_executor_id
def _assign_task_centralized(self):
if not self.pending_tasks:
return
task = self.pending_tasks.popleft()
executor_id = self._find_best_executor_centralized(task)
if executor_id:
task.assigned_to = executor_id
task.status = "ASSIGNED"
task.assignment_history.append({"executor": executor_id, "timestamp": datetime.datetime.now().isoformat(), "attempt": len(task.assignment_history) + 1})
self.tasks_in_progress[task.task_id] = task
assign_message = {
"performative": "ASSIGN_TASK",
"sender": self.agent_id,
"receiver": executor_id,
"content": {"task": task.to_dict()}
}
self.send_message(executor_id, assign_message)
print(f"[{self.agent_id}] Assigned task {task.task_id[:8]}... to {executor_id}.")
else:
print(f"[{self.agent_id}] No suitable executor found for task {task.task_id[:8]}.... Re-queuing.")
self.pending_tasks.appendleft(task)
def _assign_task_cnp(self):
if not self.pending_tasks:
return
task = self.pending_tasks.popleft()
cfp_message = {
"performative": "CALL_FOR_PROPOSAL",
"sender": self.agent_id,
"content": {
"task": task.to_dict(),
"task_id": task.task_id
}
}
# Broadcast CFP to all registered executors
for exec_id in self.executor_agents_info.keys():
self.send_message(exec_id, cfp_message)
self.cfps_in_progress[task.task_id] = {"task_obj": task, "proposals": {}, "cfp_time": time.time()}
print(f"[{self.agent_id}] Issued CFP for task {task.task_id[:8]}... ({task.task_type}).")
def _evaluate_proposals(self, task_id):
if task_id not in self.cfps_in_progress:
return
cfp_data = self.cfps_in_progress[task_id]
task = cfp_data["task_obj"]
proposals = cfp_data["proposals"]
if not proposals:
print(f"[{self.agent_id}] No proposals received for task {task_id[:8]}.... Re-queuing.")
self.pending_tasks.appendleft(task)
del self.cfps_in_progress[task_id]
return
best_executor_id = None
best_score = float('inf') # Lower score is better
for exec_id, proposal in proposals.items():
if "estimated_time" in proposal and "confidence" in proposal:
executor_reputation = self.executor_agents_info.get(exec_id, {}).get("reputation", 0.5)
# Score = Estimated_time / (Reputation * Confidence) - lower is better
# Add a small epsilon to avoid division by zero if reputation or confidence is 0
score = proposal["estimated_time"] / (executor_reputation * proposal["confidence"] + 0.01)
if score < best_score:
best_score = score
best_executor_id = exec_id
if best_executor_id:
task.assigned_to = best_executor_id
task.status = "ASSIGNED"
task.assignment_history.append({"executor": best_executor_id, "timestamp": datetime.datetime.now().isoformat(), "attempt": len(task.assignment_history) + 1})
self.tasks_in_progress[task.task_id] = task
accept_message = {
"performative": "ACCEPT_PROPOSAL",
"sender": self.agent_id,
"receiver": best_executor_id,
"content": {"task": task.to_dict(), "task_id": task.task_id}
}
self.send_message(best_executor_id, accept_message)
print(f"[{self.agent_id}] Accepted proposal for task {task_id[:8]}... from {best_executor_id}.")
# Reject other proposals
for exec_id, _ in proposals.items():
if exec_id != best_executor_id:
reject_message = {
"performative": "REJECT_PROPOSAL",
"sender": self.agent_id,
"receiver": exec_id,
"content": {"task_id": task_id}
}
self.send_message(exec_id, reject_message)
else:
print(f"[{self.agent_id}] No best executor selected for task {task_id[:8]}.... Re-queuing.")
self.pending_tasks.appendleft(task)
del self.cfps_in_progress[task_id]
def _audit_task_result(self, task, result_data, error, status, execution_time):
audit_passed = True
# 1. Status check
if status == "FAILED" and error is None:
print(f"[{self.agent_id}] Audit Warning: Task {task.task_id[:8]}... failed but no error message provided.")
audit_passed = False
# 2. Result format check (simplified for web_scraping)
if status == "COMPLETED" and task.task_type == "web_scraping":
if not isinstance(result_data, dict):
print(f"[{self.agent_id}] Audit Error: Task {task.task_id[:8]}... result format invalid (expected dict).")
audit_passed = False
elif 'processed_data' not in result_data:
print(f"[{self.agent_id}] Audit Error: Web scraping task {task.task_id[:8]}... missing 'processed_data'.")
audit_passed = False
elif not isinstance(result_data.get('processed_data'), str) or not result_data.get('processed_data').startswith("Result for"):
print(f"[{self.agent_id}] Audit Error: Web scraping task {task.task_id[:8]}... processed_data content invalid.")
audit_passed = False
# 3. Execution time audit
if execution_time > 6 and status == "COMPLETED":
print(f"[{self.agent_id}] Audit Warning: Task {task.task_id[:8]}... completed but took too long ({execution_time:.2f}s).")
return audit_passed
def _update_executor_reputation(self, executor_id, success):
if executor_id in self.executor_agents_info:
current_reputation = self.executor_agents_info[executor_id]['reputation']