各位同仁,各位对智能系统和协作架构充满热情的工程师们,大家好。
今天,我们将深入探讨一个在构建复杂智能代理(Agent)系统时至关重要的话题——Handoff Protocol,即“交接协议”。特别是,我们将聚焦于如何定义一种标准化的“交接消息”,以使得具备不同“性格”或内部逻辑的智能代理能够实现无缝、高效的协作。
在当今快速发展的AI领域,我们不再满足于单一功能的智能体。我们正迈向一个由多个、专业化代理协同工作的时代。想象一下,一个客户服务助理代理需要将一个复杂的技术问题转交给技术支持代理;一个数据分析代理完成报告后,需要通知决策支持代理进行下一步的策略制定。这些场景无不涉及到代理间的“交接”。
然而,代理的“性格”差异,即它们各自的专业领域、处理逻辑、优先级偏好、甚至对信息的解读方式,是实现无缝协作的巨大挑战。一个“性格”严谨细致的代理可能需要大量上下文信息,而一个“性格”高效简洁的代理可能只关注核心指令。如果没有一个统一的协议,这种差异将导致信息丢失、理解偏差、重复工作甚至系统崩溃。因此,一套健壮、可扩展的交接协议及其标准化的交接消息格式,是构建多代理系统协作基石。
智能代理的本质与“性格”差异
在深入协议细节之前,我们首先明确什么是“智能代理”以及我们所指的“性格”差异。
智能代理(Intelligent Agent)通常被定义为一个能够感知环境、做出决策并采取行动以实现其目标的自主实体。它具备以下一些关键特征:
- 自主性(Autonomy):能在一定程度上独立运行,无需持续的人类干预。
- 反应性(Reactivity):能够响应环境变化。
- 主动性(Proactiveness):能够主动发起行动以实现目标。
- 社会性(Sociality):能够与其他代理或人类交互。
当我们谈论代理的“性格”时,我们并非指人类情感上的性格,而是其内在操作逻辑、决策偏好、专业领域和处理风格的差异。这些差异来源于它们被设计时的目的、训练数据、算法模型以及所承担的职责。
| “性格”维度 | 描述 | 对交接协议的影响 |
|---|---|---|
| 专业领域 | 财务、法务、技术支持、客户服务等,决定其对信息的关注点。 | 需要协议能清晰标识领域相关信息,或能路由到特定领域代理。 |
| 认知风格 | 基于规则、机器学习、符号推理、统计分析等,影响其处理信息的方式。 | 需要协议提供结构化、可解析的数据,以便不同认知风格的代理都能理解。 |
| 决策偏好 | 风险规避、风险偏好、效率优先、准确性优先等。 | 需要协议能传递优先级、时效性、重要程度等元数据。 |
| 信息粒度偏好 | 倾向于高层次概括还是低层次细节。 | 需要协议支持不同粒度的数据传递,或提供按需获取细节的机制。 |
| 沟通风格 | 简洁明了、详细冗长、正式非正式(尽管我们正在标准化消息本身)。 | 协议本身应简洁高效,但需包含足够的上下文,以满足所有代理的需求。 |
| 任务范围 | 专注于单一任务、管理整个工作流、协调多个子任务等。 | 协议需要支持任务ID、工作流状态、子任务列表等概念。 |
这些差异使得一个代理产生的原始输出或内部状态,可能无法被另一个代理直接理解或有效利用。因此,定义一套标准化的交接消息,成为了弥合这些“性格”鸿沟的关键。
Handoff Protocol的核心原则
一个成功的Handoff Protocol,必须遵循以下核心原则:
- 清晰性(Clarity):消息内容必须明确无歧义,避免模糊的表述。
- 完整性(Completeness):包含所有接收代理完成任务所需的必要上下文信息,避免接收方需要额外查询。
- 简洁性(Conciseness):去除所有不必要的冗余信息,提高传输效率和可读性。
- 可操作性(Actionability):清晰指明接收代理应采取的下一步行动或期望结果。
- 持久性(Durability):消息应能被存储、检索和审计,以支持错误恢复和流程追溯。
- 可扩展性(Extensibility):协议应允许在不破坏现有系统的前提下,增加新的消息类型或字段。
- 可验证性(Verifiability):接收方应能验证消息的完整性和有效性,并提供反馈机制。
- 幂等性(Idempotency):在某些情况下,重复处理同一条消息不应引起副作用(尤其在消息队列系统中)。
标准化交接消息的结构设计
为了实现上述原则,我们推荐使用JSON (JavaScript Object Notation)作为交接消息的载体。JSON因其轻量级、易于读写、结构化良好以及广泛的语言支持而成为现代系统间通信的理想选择。对于对性能和体积有极致要求的场景,Protocol Buffers (Protobuf) 也是一个优秀的选择,但JSON在可读性和调试方面更具优势。
一个通用的标准化交接消息应包含以下几个核心部分:
1. 元数据 (Metadata)
提供关于消息本身和其在系统中的流转信息。
message_id(UUID): 消息的唯一标识符,用于追踪和去重。timestamp(ISO 8601): 消息创建的时间戳,精确到毫秒。protocol_version(String): 当前使用的协议版本,用于兼容性管理。sender_id(String): 发送消息的代理的唯一标识。recipient_id(String | Array): 接收消息的代理的唯一标识,可以是单个代理或代理组。correlation_id(UUID, Optional): 用于关联属于同一业务流程的不同消息,便于端到端追踪。task_id(UUID): 消息所属的业务任务的唯一标识符。priority(Enum: "LOW", "MEDIUM", "HIGH", "CRITICAL"): 消息的处理优先级。expiration_time(ISO 8601, Optional): 消息的有效期限,过期后不再处理。
2. 上下文信息 (Context)
提供任务的历史背景和当前状态,帮助接收代理快速理解情况。
workflow_state(String): 当前任务在整个工作流中的状态(例如:“INITIAL_REQUEST”, “DATA_COLLECTION_COMPLETE”, “AWAITING_APPROVAL”, “ESCALATED_TO_TIER2”)。previous_actions(Arrayhistorical_data_summary(String, Optional): 对历史数据的简要总结,避免传输大量原始数据。user_interaction_history(Array
3. 核心负载 (Payload)
这是交接消息的核心内容,包含具体的业务数据和交接类型。这是“性格”差异最需要被标准化和结构化的地方。
handoff_type(Enum): 定义了本次交接的类型,这是指导接收代理如何处理消息的关键字段。例如:TASK_TRANSFER: 完整任务的移交。REQUEST_INFORMATION: 请求特定信息。PROVIDE_INFORMATION: 提供请求的信息或主动分享信息。ESCALATION: 任务升级,通常意味着当前代理无法处理。NOTIFICATION: 仅通知某个事件的发生。STATUS_UPDATE: 更新任务的进度或状态。APPROVAL_REQUEST: 请求批准。
data(Object): 一个动态的、根据handoff_type而结构化的对象。这是协议可扩展性的关键点。每个handoff_type都应有一个对应的JSON Schema来定义其data对象的结构。
4. 指令与期望 (Instructions & Expectations)
明确接收代理应执行的操作和期望达成的结果。
next_steps_suggestion(String, Optional): 发送代理对下一步行动的建议。required_actions(Array, Optional): 接收代理必须执行的具体操作列表。constraints(Object, Optional): 针对接收代理操作的限制,例如time_limit、resource_limit等。success_criteria(String, Optional): 接收代理完成其部分任务的判断标准。failure_handling_strategy(Object, Optional): 如果接收代理无法完成任务,应如何处理(例如,retry_count,escalate_to,fallback_action)。
示例 JSON 消息结构
{
"metadata": {
"message_id": "a1b2c3d4-e5f6-7890-1234-567890abcdef",
"timestamp": "2023-10-27T10:30:00.123Z",
"protocol_version": "1.0.0",
"sender_id": "customer_service_agent_001",
"recipient_id": "technical_support_agent_pool",
"correlation_id": "flow-xyz-123",
"task_id": "task-abc-456",
"priority": "HIGH",
"expiration_time": "2023-10-27T12:00:00.000Z"
},
"context": {
"workflow_state": "CUSTOMER_ISSUE_ESCALATED",
"previous_actions": [
{
"action_type": "INITIAL_GREETING",
"details": "User greeted and problem described.",
"timestamp": "2023-10-27T10:20:05.000Z"
},
{
"action_type": "DIAGNOSTIC_ATTEMPT",
"details": "Attempted to troubleshoot network connectivity issues.",
"timestamp": "2023-10-27T10:25:30.000Z"
}
],
"historical_data_summary": "User 'John Doe' reported intermittent internet connection loss for 2 days. Basic router restart failed. User provided router model and ISP details.",
"user_interaction_history": [
{
"type": "CHAT_MESSAGE",
"sender": "User",
"content": "My internet keeps dropping every hour.",
"timestamp": "2023-10-27T10:21:00.000Z"
},
{
"type": "CHAT_MESSAGE",
"sender": "Agent",
"content": "Please restart your router and modem.",
"timestamp": "2023-10-27T10:23:00.000Z"
}
]
},
"payload": {
"handoff_type": "ESCALATION",
"data": {
"issue_category": "NETWORK_CONNECTIVITY",
"severity": "CRITICAL",
"customer_info": {
"user_id": "[email protected]",
"name": "John Doe",
"contact_number": "+1-555-123-4567"
},
"technical_details": {
"router_model": "RouterX-Pro",
"isp": "GlobalNet",
"symptoms": "Intermittent disconnections, unable to access specific websites.",
"logs_attached": true,
"log_references": ["log_file_id_123"]
}
}
},
"instructions": {
"next_steps_suggestion": "Please review logs, schedule a remote diagnostic session with the customer.",
"required_actions": ["DIAGNOSE_ROOT_CAUSE", "COMMUNICATE_SOLUTION_TO_CUSTOMER"],
"constraints": {
"time_limit": "PT2H"
},
"success_criteria": "Root cause identified and communicated to customer.",
"failure_handling_strategy": {
"escalate_to": "level3_network_expert_agent_pool",
"fallback_action": "notify_customer_of_delay"
}
}
}
JSON Schema 定义与验证
为了确保交接消息的标准化和一致性,JSON Schema是不可或缺的工具。它允许我们精确定义JSON文档的结构、数据类型、必填字段、枚举值、格式限制等。
基础 JSON Schema 定义
首先,我们定义一个基础的HandoffMessage Schema。
// base_handoff_message.schema.json
{
"$schema": "http://json-schema.org/draft-07/schema#",
"title": "Base Handoff Message",
"description": "Standardized structure for agent handoff messages.",
"type": "object",
"required": [
"metadata",
"context",
"payload",
"instructions"
],
"properties": {
"metadata": {
"type": "object",
"required": [
"message_id",
"timestamp",
"protocol_version",
"sender_id",
"recipient_id",
"task_id",
"priority"
],
"properties": {
"message_id": { "type": "string", "format": "uuid" },
"timestamp": { "type": "string", "format": "date-time" },
"protocol_version": { "type": "string", "pattern": "^\d+\.\d+\.\d+$" },
"sender_id": { "type": "string" },
"recipient_id": {
"oneOf": [
{ "type": "string" },
{ "type": "array", "items": { "type": "string" } }
]
},
"correlation_id": { "type": "string", "format": "uuid" },
"task_id": { "type": "string" },
"priority": { "type": "string", "enum": ["LOW", "MEDIUM", "HIGH", "CRITICAL"] },
"expiration_time": { "type": "string", "format": "date-time" }
},
"additionalProperties": false
},
"context": {
"type": "object",
"required": ["workflow_state", "previous_actions"],
"properties": {
"workflow_state": { "type": "string" },
"previous_actions": {
"type": "array",
"items": {
"type": "object",
"required": ["action_type", "details", "timestamp"],
"properties": {
"action_type": { "type": "string" },
"details": { "type": "string" },
"timestamp": { "type": "string", "format": "date-time" }
},
"additionalProperties": true
}
},
"historical_data_summary": { "type": "string" },
"user_interaction_history": {
"type": "array",
"items": {
"type": "object",
"required": ["type", "sender", "content", "timestamp"],
"properties": {
"type": { "type": "string" },
"sender": { "type": "string" },
"content": { "type": "string" },
"timestamp": { "type": "string", "format": "date-time" }
},
"additionalProperties": true
}
}
},
"additionalProperties": false
},
"payload": {
"type": "object",
"required": ["handoff_type", "data"],
"properties": {
"handoff_type": { "type": "string", "enum": ["TASK_TRANSFER", "REQUEST_INFORMATION", "PROVIDE_INFORMATION", "ESCALATION", "NOTIFICATION", "STATUS_UPDATE", "APPROVAL_REQUEST"] },
"data": {
"type": "object",
"description": "Specific data structure based on handoff_type. This will be defined by an external schema via '$ref'."
}
},
"additionalProperties": false
},
"instructions": {
"type": "object",
"properties": {
"next_steps_suggestion": { "type": "string" },
"required_actions": { "type": "array", "items": { "type": "string" } },
"constraints": { "type": "object" },
"success_criteria": { "type": "string" },
"failure_handling_strategy": {
"type": "object",
"properties": {
"retry_count": { "type": "integer", "minium": 0 },
"escalate_to": { "type": "string" },
"fallback_action": { "type": "string" }
},
"additionalProperties": false
}
},
"additionalProperties": false
}
},
"additionalProperties": false
}
针对 handoff_type 的扩展 Schema
payload.data字段的灵活性通过引用外部Schema实现。例如,为ESCALATION类型定义一个专门的Schema:
// escalation_data.schema.json
{
"$schema": "http://json-schema.org/draft-07/schema#",
"title": "Escalation Data Schema",
"description": "Schema for the 'data' field when handoff_type is ESCALATION.",
"type": "object",
"required": [
"issue_category",
"severity",
"customer_info",
"technical_details"
],
"properties": {
"issue_category": { "type": "string" },
"severity": { "type": "string", "enum": ["LOW", "MEDIUM", "HIGH", "CRITICAL"] },
"customer_info": {
"type": "object",
"required": ["user_id", "name"],
"properties": {
"user_id": { "type": "string" },
"name": { "type": "string" },
"contact_number": { "type": "string" }
},
"additionalProperties": false
},
"technical_details": {
"type": "object",
"properties": {
"router_model": { "type": "string" },
"isp": { "type": "string" },
"symptoms": { "type": "string" },
"logs_attached": { "type": "boolean" },
"log_references": { "type": "array", "items": { "type": "string" } }
},
"additionalProperties": true
}
},
"additionalProperties": false
}
在实际应用中,您会有一系列这样的data Schema文件,并在主Schema中通过运行时逻辑或更复杂的oneOf/if-then-else结构引用它们,例如:
// In base_handoff_message.schema.json, within payload.properties.data
"data": {
"type": "object",
"oneOf": [
{
"if": { "properties": { "handoff_type": { "const": "ESCALATION" } } },
"then": { "$ref": "escalation_data.schema.json" }
},
{
"if": { "properties": { "handoff_type": { "const": "REQUEST_INFORMATION" } } },
"then": { "$ref": "request_info_data.schema.json" }
}
// ... more handoff types
]
}
验证交接消息
在Python中,我们可以使用jsonschema库来验证消息:
import json
from jsonschema import validate, RefResolver
def load_schema(schema_path):
with open(schema_path, 'r', encoding='utf-8') as f:
return json.load(f)
def validate_handoff_message(message_data, base_schema, schema_store=None):
"""
Validate a handoff message against the base schema and dynamically
load sub-schemas for the 'data' payload based on 'handoff_type'.
"""
if schema_store is None:
schema_store = {}
# Initialize resolver for dynamic schema loading
# The 'base_uri' should point to the directory where schemas are located
resolver = RefResolver(base_uri="file:///path/to/your/schemas/", store=schema_store)
try:
# First, validate the overall structure against the base schema
validate(instance=message_data, schema=base_schema, resolver=resolver)
# Now, specifically validate the 'data' payload based on 'handoff_type'
handoff_type = message_data["payload"]["handoff_type"]
data_payload = message_data["payload"]["data"]
# Dynamically determine the data schema path
data_schema_name = f"{handoff_type.lower()}_data.schema.json"
data_schema_path = f"/path/to/your/schemas/{data_schema_name}" # Adjust path as needed
if data_schema_name not in schema_store:
# Attempt to load the specific data schema
try:
data_schema = load_schema(data_schema_path)
schema_store[data_schema_name] = data_schema
except FileNotFoundError:
print(f"Warning: No specific schema found for handoff_type '{handoff_type}'. Data payload will not be validated further.")
return True # Consider it valid if specific schema is missing
except json.JSONDecodeError:
print(f"Error: Invalid JSON schema file for '{handoff_type}'.")
return False
# Validate the data payload against its specific schema
validate(instance=data_payload, schema=schema_store[data_schema_name])
print("Handoff message is valid.")
return True
except Exception as e:
print(f"Handoff message validation failed: {e}")
return False
# --- Example Usage ---
if __name__ == "__main__":
# Create dummy schema files for demonstration
import os
schemas_dir = "schemas"
os.makedirs(schemas_dir, exist_ok=True)
with open(os.path.join(schemas_dir, "base_handoff_message.schema.json"), "w", encoding="utf-8") as f:
json.dump(load_schema("base_handoff_message.schema.json"), f, indent=2) # Assuming you saved schema above
with open(os.path.join(schemas_dir, "escalation_data.schema.json"), "w", encoding="utf-8") as f:
json.dump(load_schema("escalation_data.schema.json"), f, indent=2) # Assuming you saved schema above
with open(os.path.join(schemas_dir, "request_information_data.schema.json"), "w", encoding="utf-8") as f:
json.dump({
"$schema": "http://json-schema.org/draft-07/schema#",
"title": "Request Information Data Schema",
"type": "object",
"required": ["requested_info_keys"],
"properties": {
"requested_info_keys": { "type": "array", "items": { "type": "string" } },
"query_parameters": { "type": "object" }
},
"additionalProperties": false
}, f, indent=2)
# Load base schema
base_schema_path = os.path.join(schemas_dir, "base_handoff_message.schema.json")
base_handoff_schema = load_schema(base_schema_path)
# Prepare schema store for dynamic loading (resolver expects schema content, not path)
schema_store = {
f"file:///{schemas_dir}/base_handoff_message.schema.json": base_handoff_schema,
f"file:///{schemas_dir}/escalation_data.schema.json": load_schema(os.path.join(schemas_dir, "escalation_data.schema.json")),
f"file:///{schemas_dir}/request_information_data.schema.json": load_schema(os.path.join(schemas_dir, "request_information_data.schema.json"))
}
# Valid Escalation Message
valid_escalation_message = {
"metadata": {
"message_id": "a1b2c3d4-e5f6-7890-1234-567890abcdef",
"timestamp": "2023-10-27T10:30:00.123Z",
"protocol_version": "1.0.0",
"sender_id": "customer_service_agent_001",
"recipient_id": "technical_support_agent_pool",
"correlation_id": "flow-xyz-123",
"task_id": "task-abc-456",
"priority": "HIGH"
},
"context": {
"workflow_state": "CUSTOMER_ISSUE_ESCALATED",
"previous_actions": [
{
"action_type": "DIAGNOSTIC_ATTEMPT",
"details": "Attempted to troubleshoot network connectivity issues.",
"timestamp": "2023-10-27T10:25:30.000Z"
}
],
"historical_data_summary": "User 'John Doe' reported intermittent internet connection loss."
},
"payload": {
"handoff_type": "ESCALATION",
"data": {
"issue_category": "NETWORK_CONNECTIVITY",
"severity": "CRITICAL",
"customer_info": {
"user_id": "[email protected]",
"name": "John Doe",
"contact_number": "+1-555-123-4567"
},
"technical_details": {
"router_model": "RouterX-Pro",
"isp": "GlobalNet",
"symptoms": "Intermittent disconnections."
}
}
},
"instructions": {
"next_steps_suggestion": "Review logs.",
"required_actions": ["DIAGNOSE_ROOT_CAUSE"]
}
}
# Invalid Escalation Message (missing required field in technical_details)
invalid_escalation_message = {
"metadata": {
"message_id": "a1b2c3d4-e5f6-7890-1234-567890abcdef",
"timestamp": "2023-10-27T10:30:00.123Z",
"protocol_version": "1.0.0",
"sender_id": "customer_service_agent_001",
"recipient_id": "technical_support_agent_pool",
"correlation_id": "flow-xyz-123",
"task_id": "task-abc-456",
"priority": "HIGH"
},
"context": {
"workflow_state": "CUSTOMER_ISSUE_ESCALATED",
"previous_actions": [
{
"action_type": "DIAGNOSTIC_ATTEMPT",
"details": "Attempted to troubleshoot network connectivity issues.",
"timestamp": "2023-10-27T10:25:30.000Z"
}
],
"historical_data_summary": "User 'John Doe' reported intermittent internet connection loss."
},
"payload": {
"handoff_type": "ESCALATION",
"data": {
"issue_category": "NETWORK_CONNECTIVITY",
"severity": "CRITICAL",
"customer_info": {
"user_id": "[email protected]",
"name": "John Doe",
"contact_number": "+1-555-123-4567"
},
"technical_details": {
# Missing 'router_model' which is not explicitly required in this example's schema,
# but would fail if 'customer_info' was missing 'name' for instance.
# Let's make an explicit error: missing customer_info.name
"isp": "GlobalNet",
"symptoms": "Intermittent disconnections."
}
}
},
"instructions": {
"next_steps_suggestion": "Review logs.",
"required_actions": ["DIAGNOSE_ROOT_CAUSE"]
}
}
# To demonstrate an actual error, let's modify the invalid_escalation_message slightly to break 'customer_info'
del invalid_escalation_message['payload']['data']['customer_info']['name']
print("n--- Valid Message Validation ---")
# For resolver to work correctly, base_uri needs to reflect where schemas are
# In this script, we're using a relative path, so let's adjust the resolver base_uri.
# A cleaner way is to pass the schema_store directly to validate.
# For this simplified example, we'll manually handle the data schema validation.
# A more robust solution would integrate 'oneOf' directly in the base schema.
# Simplified validation for demo purposes (without full RefResolver setup in the function)
def validate_handoff_message_simplified(message_data, base_schema, specific_schemas_map):
try:
validate(instance=message_data, schema=base_schema)
handoff_type = message_data["payload"]["handoff_type"]
data_payload = message_data["payload"]["data"]
if handoff_type in specific_schemas_map:
validate(instance=data_payload, schema=specific_schemas_map[handoff_type])
print("Handoff message is valid.")
return True
except Exception as e:
print(f"Handoff message validation failed: {e}")
return False
specific_schemas = {
"ESCALATION": load_schema(os.path.join(schemas_dir, "escalation_data.schema.json")),
"REQUEST_INFORMATION": load_schema(os.path.join(schemas_dir, "request_information_data.schema.json"))
}
validate_handoff_message_simplified(valid_escalation_message, base_handoff_schema, specific_schemas)
print("n--- Invalid Message Validation ---")
validate_handoff_message_simplified(invalid_escalation_message, base_handoff_schema, specific_schemas)
(注意:上述Python代码中的load_schema和validate_handoff_message函数需要根据您实际的Schema文件路径和RefResolver的base_uri进行调整。为了简化演示,我提供了一个更直接的validate_handoff_message_simplified。在实际生产环境中,建议使用RefResolver来管理引用,或者将所有相关Schema嵌入到主Schema中。)
实现策略与代理交互逻辑
有了标准化的消息结构和验证机制,下一步就是如何在代理系统中实现它。
1. 消息类/数据结构
在编程语言中,将JSON Schema映射为强类型的数据结构(例如Python中的dataclass或Pydantic模型,Java中的POJO,Go中的struct)是最佳实践。这提供了编译时检查和更好的代码可读性。
Python Pydantic 示例:
from pydantic import BaseModel, Field, HttpUrl
from typing import List, Optional, Union, Dict, Any
from datetime import datetime
from enum import Enum
import uuid
# Enums
class Priority(str, Enum):
LOW = "LOW"
MEDIUM = "MEDIUM"
HIGH = "HIGH"
CRITICAL = "CRITICAL"
class HandoffType(str, Enum):
TASK_TRANSFER = "TASK_TRANSFER"
REQUEST_INFORMATION = "REQUEST_INFORMATION"
PROVIDE_INFORMATION = "PROVIDE_INFORMATION"
ESCALATION = "ESCALATION"
NOTIFICATION = "NOTIFICATION"
STATUS_UPDATE = "STATUS_UPDATE"
APPROVAL_REQUEST = "APPROVAL_REQUEST"
# Metadata Models
class Metadata(BaseModel):
message_id: uuid.UUID = Field(default_factory=uuid.uuid4)
timestamp: datetime = Field(default_factory=datetime.utcnow)
protocol_version: str = "1.0.0"
sender_id: str
recipient_id: Union[str, List[str]]
correlation_id: Optional[uuid.UUID] = None
task_id: uuid.UUID
priority: Priority = Priority.MEDIUM
expiration_time: Optional[datetime] = None
# Context Models
class PreviousAction(BaseModel):
action_type: str
details: str
timestamp: datetime = Field(default_factory=datetime.utcnow)
class UserInteraction(BaseModel):
type: str
sender: str
content: str
timestamp: datetime = Field(default_factory=datetime.utcnow)
class Context(BaseModel):
workflow_state: str
previous_actions: List[PreviousAction] = []
historical_data_summary: Optional[str] = None
user_interaction_history: Optional[List[UserInteraction]] = None
# Instructions Models
class FailureHandlingStrategy(BaseModel):
retry_count: Optional[int] = Field(None, ge=0)
escalate_to: Optional[str] = None
fallback_action: Optional[str] = None
class Instructions(BaseModel):
next_steps_suggestion: Optional[str] = None
required_actions: Optional[List[str]] = None
constraints: Optional[Dict[str, Any]] = None
success_criteria: Optional[str] = None
failure_handling_strategy: Optional[FailureHandlingStrategy] = None
# Payload Data Models (Specific to handoff_type)
class CustomerInfo(BaseModel):
user_id: str
name: str
contact_number: Optional[str] = None
class TechnicalDetails(BaseModel):
router_model: Optional[str] = None
isp: Optional[str] = None
symptoms: Optional[str] = None
logs_attached: Optional[bool] = False
log_references: Optional[List[str]] = None
class EscalationData(BaseModel):
issue_category: str
severity: Priority
customer_info: CustomerInfo
technical_details: TechnicalDetails
class RequestInformationData(BaseModel):
requested_info_keys: List[str]
query_parameters: Optional[Dict[str, Any]] = None
# Main Handoff Message
class HandoffPayload(BaseModel):
handoff_type: HandoffType
data: Any # This will be validated dynamically or through a Union of specific data models
class HandoffMessage(BaseModel):
metadata: Metadata
context: Context
payload: HandoffPayload
instructions: Instructions
# Example Usage
if __name__ == "__main__":
task_id = uuid.uuid4()
sender_agent_id = "customer_service_agent_001"
recipient_agent_id = "technical_support_agent_pool"
# Create an Escalation Handoff Message
escalation_data = EscalationData(
issue_category="NETWORK_CONNECTIVITY",
severity=Priority.CRITICAL,
customer_info=CustomerInfo(
user_id="[email protected]",
name="John Doe",
contact_number="+1-555-123-4567"
),
technical_details=TechnicalDetails(
router_model="RouterX-Pro",
isp="GlobalNet",
symptoms="Intermittent disconnections, unable to access specific websites."
)
)
escalation_message = HandoffMessage(
metadata=Metadata(
sender_id=sender_agent_id,
recipient_id=recipient_agent_id,
task_id=task_id,
priority=Priority.HIGH
),
context=Context(
workflow_state="CUSTOMER_ISSUE_ESCALATED",
previous_actions=[
PreviousAction(action_type="INITIAL_GREETING", details="User greeted.", timestamp=datetime(2023, 10, 27, 10, 20, 5)),
PreviousAction(action_type="DIAGNOSTIC_ATTEMPT", details="Troubleshooting failed.", timestamp=datetime(2023, 10, 27, 10, 25, 30))
],
historical_data_summary="User reported internet issues for 2 days. Basic troubleshooting ineffective."
),
payload=HandoffPayload(
handoff_type=HandoffType.ESCALATION,
data=escalation_data.dict() # Pydantic model to dict
),
instructions=Instructions(
next_steps_suggestion="Review logs, schedule remote diagnostic.",
required_actions=["DIAGNOSE_ROOT_CAUSE", "COMMUNICATE_SOLUTION"],
constraints={"time_limit": "PT2H"},
failure_handling_strategy=FailureHandlingStrategy(escalate_to="level3_network_expert")
)
)
print("--- Escalation Handoff Message (JSON) ---")
print(escalation_message.json(indent=2))
# Create a Request Information Handoff Message
request_info_data = RequestInformationData(
requested_info_keys=["user_preferences", "account_status"],
query_parameters={"user_id": "[email protected]"}
)
request_info_message = HandoffMessage(
metadata=Metadata(
sender_id="data_analysis_agent_001",
recipient_id="user_profile_agent_001",
task_id=uuid.uuid4(),
priority=Priority.MEDIUM
),
context=Context(
workflow_state="REPORT_GENERATION_PENDING_DATA",
previous_actions=[
PreviousAction(action_type="REPORT_TEMPLATE_LOADED", details="Loaded template.", timestamp=datetime.utcnow())
]
),
payload=HandoffPayload(
handoff_type=HandoffType.REQUEST_INFORMATION,
data=request_info_data.dict()
),
instructions=Instructions(
success_criteria="Relevant user data provided."
)
)
print("n--- Request Information Handoff Message (JSON) ---")
print(request_info_message.json(indent=2))
# To deserialize a message:
json_str = escalation_message.json()
deserialized_message = HandoffMessage.parse_raw(json_str)
print(f"nDeserialized message sender: {deserialized_message.metadata.sender_id}")
print(f"Deserialized message handoff type: {deserialized_message.payload.handoff_type}")
# Accessing specific data requires casting the 'data' field
if deserialized_message.payload.handoff_type == HandoffType.ESCALATION:
escalation_payload_data = EscalationData.parse_obj(deserialized_message.payload.data)
print(f"Escalation Issue Category: {escalation_payload_data.issue_category}")
2. 消息传递基础设施
代理之间的交接消息通常通过消息队列系统(如Apache Kafka, RabbitMQ, AWS SQS/SNS)进行异步传输。
- 解耦:发送方和接收方不需要直接知道彼此的存在。
- 持久性:消息被存储,即使接收方宕机也能在恢复后处理。
- 可伸缩性:轻松扩展消费者数量以处理负载。
- 路由:可以根据
recipient_id或handoff_type将消息路由到不同的队列或主题。
对于需要强实时性或紧密耦合的场景,也可以通过RPC (Remote Procedure Call)框架(如gRPC)或RESTful API进行同步通信。
3. 发送代理逻辑 (Sender Agent Logic)
发送代理负责:
- 收集信息:从其内部状态和任务上下文中收集所需的所有信息。
- 构建消息:使用标准化的数据模型(如Pydantic模型)构建交接消息,填充元数据、上下文、负载和指令。
- 序列化:将消息对象序列化为JSON字符串。
- 发送消息:将JSON字符串发布到消息队列或通过API发送。
# sender_agent.py
import json
from pydantic import ValidationError
# Assume HandoffMessage and related models are imported from handoff_models.py
from handoff_models import HandoffMessage, Metadata, Context, HandoffPayload, Instructions,
HandoffType, Priority, PreviousAction, EscalationData, CustomerInfo, TechnicalDetails
import uuid
from datetime import datetime
class CustomerServiceAgent:
def __init__(self, agent_id: str):
self.agent_id = agent_id
self.current_task_id = None
self.customer_data = {}
self.conversation_history = []
# Simulate a message queue producer
self.message_producer = self._init_message_producer()
def _init_message_producer(self):
# In a real system, this would be a KafkaProducer, RabbitMQPublisher etc.
print(f"{self.agent_id}: Initializing message producer...")
class MockProducer:
def send(self, topic: str, message: str):
print(f"[{datetime.utcnow().isoformat()}] {agent_id} sent message to topic '{topic}':n{message[:200]}...") # Print first 200 chars
return MockProducer()
def handle_customer_request(self, customer_info: dict, issue_details: str):
self.current_task_id = uuid.uuid4()
self.customer_data = customer_info
self.conversation_history.append(f"Customer reported: {issue_details}")
print(f"{self.agent_id}: Received new request for task {self.current_task_id}. Issue: {issue_details}")
# Simulate initial troubleshooting
print(f"{self.agent_id}: Attempting initial diagnostic...")
self.conversation_history.append("Attempted basic troubleshooting.")
if "network" in issue_details.lower():
self.escalate_to_technical_support(issue_details)
else:
print(f"{self.agent_id}: Handled issue directly. Task {self.current_task_id} complete.")
def escalate_to_technical_support(self, issue_description: str):
print(f"{self.agent_id}: Issue requires technical support. Escalating task {self.current_task_id}...")
# 1. Collect information
customer_info_model = CustomerInfo(
user_id=self.customer_data.get("email"),
name=self.customer_data.get("name"),
contact_number=self.customer_data.get("phone")
)
technical_details_model = TechnicalDetails(
symptoms=issue_description
)
escalation_data = EscalationData(
issue_category="GENERAL_SUPPORT", # This might be refined by a more intelligent agent
severity=Priority.HIGH,
customer_info=customer_info_model,
technical_details=technical_details_model
)
# 2. Build HandoffMessage
message = HandoffMessage(
metadata=Metadata(
sender_id=self.agent_id,
recipient_id="technical_support_agent_pool",
task_id=self.current_task_id,
priority=Priority.HIGH
),
context=Context(
workflow_state="CUSTOMER_ISSUE_ESCALATED",
previous_actions=[
PreviousAction(action_type="INITIAL_DIAGNOSIS", details="Basic troubleshooting attempted, issue persists.", timestamp=datetime.utcnow())
],
historical_data_summary=f"Customer '{customer_info_model.name}' reported '{issue_description}'. Conversation history: {'; '.join(self.conversation_history)}"
),
payload=HandoffPayload(
handoff_type=HandoffType.ESCALATION,
data=escalation_data.dict()
),
instructions=Instructions(
next_steps_suggestion="Please investigate the technical issue and contact the customer.",
required_actions=["DIAGNOSE_ROOT_CAUSE", "COMMUNICATE_SOLUTION_TO_CUSTOMER"],
failure_handling_strategy={"escalate_to": "level3_expert_agent"}
)
)
# 3. Serialize and Send
try:
json_message = message.json(indent=2)
self.message_producer.send(topic="agent_handoffs", message=json_message)
print(f"{self.agent_id}: Successfully sent ESCALATION message for task {self.current_task_id}.")
except ValidationError as e:
print(f"Error validating message before sending: {e}")
except Exception as e:
print(f"Error sending message: {e}")
# --- Demo Sender Agent ---
if __name__ == "__main__":
cs_agent = CustomerServiceAgent("customer_service_agent_001")
customer_data = {"name": "Alice Smith", "email": "[email protected]", "phone": "123-456-7890"}
cs_agent.handle_customer_request(customer_data, "My home internet is not working at all.")
print("-" * 50)
cs_agent.handle_customer_request(customer_data, "I can't log in to my account.") # This would not escalate in current simple logic
4. 接收代理逻辑 (Receiver Agent Logic)
接收代理负责:
- 接收消息:从消息队列或API接收JSON字符串。
- 反序列化:将JSON字符串反序列化为消息对象(如Pydantic模型)。
- 验证消息:使用JSON Schema或Pydantic的内置验证机制验证消息的结构和内容。
- 解析与分派:根据
handoff_type和其他元数据,将消息分派到其内部相应的处理函数。 - 执行操作:根据负载和指令执行其特定的业务逻辑。
# receiver_agent.py
import json
from pydantic import ValidationError
# Assume HandoffMessage and related models are imported from handoff_models.py
from handoff_models import HandoffMessage, HandoffType, EscalationData, RequestInformationData
import time
from datetime import datetime
class TechnicalSupportAgent:
def __init__(self, agent_id: str):
self.agent_id = agent_id
# Simulate a message queue consumer
self.message_consumer = self._init_message_consumer()
self.task_queue = [] # In real world, this would be a proper task management system
def _init_message_consumer(self):
print(f"{self.agent_id}: Initializing message consumer...")
class MockConsumer:
_messages = [] # This will be filled by a mock producer in a real test scenario
def consume(self):
if self._messages:
return self._messages.pop(0) # Get the oldest message
return None
def add_message(self, message: str):
self._messages.append(message)
return MockConsumer()
def start_listening(self):
print(f"{self.agent_id}: Started listening for handoff messages...")
while True:
raw_message = self.message_consumer.consume()
if raw_message:
self.process_handoff_message(raw_message)
else:
time.sleep(1) # Simulate polling interval
def process_handoff_message(self, raw_message: str):
try:
# 1. Deserialize
message_obj = HandoffMessage.parse_raw(raw_message)
print(f"n{self.agent_id}: Received message {message_obj.metadata.message_id} from {message_obj.metadata.sender_id}")
# 2. Validation (Pydantic handles this on parse_raw)
# If validation fails, parse_raw would raise ValidationError
# 3. Parse and Dispatch based on handoff_type
handoff_type = message_obj.payload.handoff_type
print(f"{self.agent_id}: Handoff Type: {handoff_type}")
if handoff_type == HandoffType.ESCALATION:
self._handle_escalation(message_obj)
elif handoff_type == HandoffType.REQUEST_INFORMATION:
self._handle_request_information(message_obj)
else:
print(f"{self.agent_id}: Unhandled handoff type: {handoff_type}. Task {message_obj.metadata.task_id}")
except ValidationError as e:
print(f"{self.agent_id}: Failed to validate incoming message: {e}")
except Exception as e:
print(f"{self.agent_id}: Error processing message: {e}")
def _handle_escalation(self, message: HandoffMessage):
print(f"{self.agent_id}: Handling ESCALATION for task {message.metadata.task_id}...")
# Different 'personalities' can interpret and act differently here.
# This TechnicalSupportAgent focuses on technical details.
# Access specific escalation data
escalation_data = EscalationData.parse_obj(message.payload.data)
print(f" Customer: {escalation_data.customer_info.name} ({escalation_data.customer_info.user_id})")
print(f" Issue Category: {escalation_data.issue_category}, Severity: {escalation_data.severity}")
print(f" Symptoms: {escalation_data.technical_details.symptoms}")
print(f" Suggested Next Steps: {message.instructions.next_steps_suggestion}")
# Simulate performing the required actions
print(f"{self.agent_id}: Performing diagnosis for task {message.metadata.task_id}...")
time.sleep(2) # Simulate work
print(f"{self.agent_id}: Diagnosis complete. Identified root cause: 'Faulty ISP configuration'.")
# In a real scenario, this would involve updating the task status and potentially sending a new message.
self.task_queue.append(f"Completed diagnosis for {message.metadata.task_id}")
def _handle_request_information(self, message: HandoffMessage):
print(f"{self.agent_id}: Handling REQUEST_INFORMATION for task {message.metadata.task_id}...")
request_data = RequestInformationData.parse_obj(message.payload.data)
print(f" Requested keys: {request_data.requested_info_keys}")
print(f" Query parameters: {request_data.query_parameters}")
# Simulate fetching information (e.g., from a database or another internal service)
# Then, send a PROVIDE_INFORMATION message back to the sender
print(f"{self.agent_id}: Simulating data retrieval and sending PROVIDE_INFORMATION message...")
time.sleep(1)
# This part would involve creating a new HandoffMessage of type PROVIDE_INFORMATION
# and sending it back.
# For brevity, we skip the actual sending back here.
self.task_queue.append(f"Processed info request for {message.metadata.task_id}")
# --- Demo Receiver Agent ---
if __name__ == "__main__":
# Setup for demonstration:
# We need to simulate the CustomerServiceAgent sending a message to this receiver.
# In a real system, they'd be separate processes communicating via a message broker.
# First, let's create a dummy message to be consumed
task_id_demo = uuid.uuid4()
sender_agent_id_demo = "customer_service_agent_001"
recipient_agent_id_demo = "technical_support_agent_pool"
escalation_data_demo = EscalationData(
issue_category="NETWORK_CONNECTIVITY",
severity=Priority.CRITICAL,
customer_info=CustomerInfo(
user_id="[email protected]",
name="John Doe",
contact_number="+1-555-123-4567"
),
technical_details=TechnicalDetails(
router_model="RouterX-Pro",
isp="GlobalNet",
symptoms="Intermittent disconnections, unable to access specific websites."
)
)
escalation_message_demo = HandoffMessage(
metadata=Metadata(
sender_id=sender_agent_id_demo,
recipient_id=recipient_agent_id_demo,
task_id=task_id_demo,
priority=Priority.HIGH
),
context=Context(
workflow_state="CUSTOMER_ISSUE_ESCALATED",
previous_actions=[
PreviousAction(action_type="INITIAL_DIAGNOSIS", details="Basic troubleshooting attempted, issue persists.", timestamp=datetime.utcnow())
],
historical_data_summary="User reported internet issues for 2 days. Basic troubleshooting ineffective."
),
payload=HandoffPayload(
handoff_type=HandoffType.ESCALATION,
data=escalation_data_demo.dict()
),
instructions=Instructions(
next_steps_suggestion="Review logs, schedule remote diagnostic.",
required_actions=["DIAGNOSE_ROOT_CAUSE", "COMMUNICATE_SOLUTION"],
failure_handling_strategy={"escalate_to": "level3_expert_agent"}
)
)
# Another message for request info
request_info_data_demo = RequestInformationData(
requested_info_keys=["user_id", "subscription_level"],
query_parameters={"customer_email": "[email protected]"}
)
request_info_message_demo = HandoffMessage(
metadata=Metadata(
sender_id="billing_agent_001",
recipient_id="technical_support_agent_pool", # This agent can also handle requests for info
task_id=uuid.uuid4(),
priority=Priority.MEDIUM
),
context=Context(
workflow_state="BILLING_INQUIRY",
previous_actions=[]
),
payload=HandoffPayload(
handoff_type=HandoffType.REQUEST_INFORMATION,
data=request_info_data_demo.dict()
),
instructions=Instructions(
next_steps_suggestion="Provide customer subscription details.",
required_actions=["FETCH_CUSTOMER_SUBSCRIPTION"]
)
)
# Instantiate the receiver agent
tech_agent = TechnicalSupportAgent("technical_support_agent_001")
# Manually add messages to the mock consumer for testing
tech_agent.message_consumer.add_message(escalation_message_demo.json(indent=2))
tech_agent.message_consumer.add_message(request_info_message_demo.json(indent=2))
# Start processing (just a few cycles for demo)
print("Simulating agent listening for messages...")
for _ in range(3): # Process a few messages, then stop
tech_agent.start_listening()
time.sleep(0.1) # Give it a moment, then break loop if no more messages
if not tech_agent.message_consumer._messages:
break
print("nSimulated processing finished.")
print(f"Technical Support Agent's completed tasks: {tech_agent.task_queue}")
5. 应对“性格”差异
标准化交接消息本身并不能消除代理的“性格”差异,但它提供了一个共同的语言和框架,使得这些差异变得可管理。
- 统一接口,多样实现:协议定义了消息的结构和语义,但每个代理如何内部处理这些信息,则由其“性格”决定。例如,一个“技术支持”代理收到
ESCALATION消息会启动诊断流程,而一个“法务合规”代理收到ESCALATION消息可能会启动审计流程。它们都理解ESCALATION的含义,但采取的行动路径不同。 - 上下文的重要性:
context字段的详细性对于弥合粒度偏好差异至关重要。一个“严谨”的代理可以深入解析所有历史数据,而一个“高效”的代理可能只读取historical_data_summary。 - 指令的精确性:
instructions字段允许发送代理明确表达期望,减少接收代理的猜测空间。required_actions和success_criteria是关键。 - 动态路由:基于
handoff_type、priority、payload.data中的特定字段(如issue_category),消息传递系统可以智能地将消息路由到最适合处理该“性格”或专业领域的代理池。 - 反馈机制:代理可以通过发送
STATUS_UPDATE或REQUEST_INFORMATION消息来提供进度反馈或请求更多信息,形成一个双向沟通循环。
高级考量
安全性
交接消息可能包含敏感信息。必须实施:
- 加密:传输中(TLS/SSL)和静态存储时(AES-256)。
- 认证与授权:确保只有合法的代理才能发送和接收消息,并限制其对特定消息类型的访问。
- 数据脱敏:在某些场景下,对非必要敏感数据进行脱敏处理。
可观测性
为了诊断问题和优化性能,需要强大的可观测性:
- 日志记录:记录所有消息的发送、接收、处理状态。
- 链路追踪:使用
correlation_id追踪一个业务流程中所有相关的交接消息,形成端到端的可视化。 - 监控:监控消息队列的积压、处理延迟、错误率等指标。
版本控制
随着系统演进,交接消息的Schema会发生变化。
- 协议版本号:
metadata.protocol_version字段允许代理识别并兼容不同版本的消息。 - 向后兼容性:新版本应尽量兼容旧版本,例如,新增可选字段而不是修改现有字段的语义。
- 增量更新:采用增量Schema更新,而非大刀阔斧的修改。
状态管理
代理在处理交接任务时,需要维护自身的状态。这些状态可能需要跨多个交接消息持久化。
- 外部状态存储:使用数据库(关系型或NoSQL)或键值存储(Redis)来持久化代理的长期状态和任务上下文。
- 幂等性处理:设计代理的业务逻辑使其在接收到重复消息时不会产生副作用,这对于异步消息系统至关重要。
编排与协作模式
交接协议是实现协作的基础,但具体的协作模式还需要进一步设计:
- 编排(Orchestration):由一个中心化的“协调者”代理来指导整个工作流,明确哪个代理何时接收哪个交接消息。
- 编舞(Choreography):代理们通过响应事件和交接消息自主协作,没有中心协调者。这通常通过发布/订阅模式实现。
标准化交接消息是构建弹性、可扩展的多代理协作系统的核心。它提供了一个清晰、一致的接口,使得无论代理的内部“性格”如何,都能在一个统一的框架下进行有效沟通和协作。通过精心设计消息结构、严格的Schema验证以及健壮的实现策略,我们能够克服代理间异构性带来的挑战,释放多代理系统的强大潜力。
最终,我们追求的不仅仅是消息的传递,更是意义的传递,让每一个代理都能在清晰的指引下,贡献其独特的智能。