各位同仁,各位技术爱好者:
欢迎来到今天的技术讲座。今天我们将探讨一个在智能体(Agent)设计中至关重要,却又极具挑战性的问题:当一个Agent在复杂的、尤其是环形图中,由于其依赖的工具(Tool)出现随机报错而陷入无限死循环时,我们该如何设计一套通用、健壮的“逃逸机制”?
这不仅仅是一个理论问题,更是我们在构建自动化系统、AI Agent、微服务编排等领域中,确保系统稳定性和韧性的核心实践。想象一下,一个负责关键业务流程的Agent,因为某个第三方API的间歇性故障,或者内部服务的瞬时性错误,被困在一个重复执行、永无结果的循环中,这可能导致资源耗尽、业务停滞甚至数据不一致。作为编程专家,我们的职责便是预见并解决这类深层次的系统行为问题。
1. 困境与机遇:Agent、环形图与随机报错的交织
首先,让我们精确地定义问题场景。
Agent的本质: 我们的Agent是一个具备感知、决策和行动能力的实体。它在一个由节点(Nodes)和边(Edges)构成的图中导航,并利用一系列外部或内部的“工具”来执行特定操作,以达成其预设目标。Agent的每一步行动都可能涉及状态的改变,或者对某个工具的调用。
环形图的特性: “环形图”并不特指严格意义上的数学环图,而是泛指任何存在循环路径的图结构。在这样的图中,Agent可能沿着一条路径前进,最终又回到了之前访问过的某个节点,形成一个回路。这在现实世界中非常常见,例如:
- 业务流程: 审批流程可能因为特定条件而退回上一环节。
- 网络拓扑: 数据包可能在路由设备间形成循环。
- 微服务调用: 服务A调用服务B,服务B又间接调用服务A,形成依赖环。
- 状态机: 某些状态转换可能导致Agent回到之前的状态。
工具的随机报错: 这是问题的核心触发点。Agent在执行行动时,需要调用各种工具(例如:数据库操作、API调用、文件读写、机器学习模型推理等)。这些工具并非百分之百可靠。它们可能因为以下原因出现“随机报错”:
- 瞬时性错误 (Transient Errors):网络抖动、数据库连接超时、并发冲突、临时资源不足。这类错误往往在短时间内重试即可恢复。
- 持久性错误 (Persistent Errors):配置错误、权限不足、API版本不兼容、底层服务宕机。这类错误重试也无济于事,需要更深层次的干预。
- 业务逻辑错误: 工具执行成功但返回了不符合预期的业务结果,导致Agent做出错误判断并进入循环。
无限死循环的形成: 当Agent在一个环形图中移动时,如果它尝试通过某个节点或路径,而该路径上的关键工具反复随机报错,导致Agent无法成功通过,或者每次报错后都根据其默认策略(例如,简单重试或选择“看似”可行的相邻节点)回到起点或前序节点,那么一个无限死循环就形成了。Agent不断地尝试同一组行动,调用同一组工具,遭遇同样的错误模式,却始终无法突破。
这种死循环不仅消耗宝贵的计算资源,更可能导致业务流程中断,数据不一致,甚至整个系统崩溃。因此,设计一个通用的、能够识别并打破这种循环的“逃逸机制”,是构建健壮、自适应Agent系统的关键。
2. 逃逸机制的核心设计原则
一个有效的通用逃逸机制,必须遵循以下几个核心原则:
- 可观测性 (Observability):Agent的每一步行为,包括访问的节点、调用的工具、工具的输入、输出以及任何发生的错误,都必须被详细记录。这是进行问题分析和循环检测的基础。
- 状态感知 (State Awareness):逃逸机制需要能够访问Agent的当前状态,并理解其历史路径和操作上下文。它不应仅仅是盲目的干预,而是基于Agent所处的环境和历史行为进行决策。
- 循环检测 (Loop Detection):这是机制的核心功能。它需要一套智能算法来识别Agent是否正在重复相同的行为模式,或者是否在短时间内频繁访问同一组节点。
- 智能干预 (Intelligent Intervention):一旦检测到循环,机制不能简单地停止Agent。它需要根据错误的性质、循环的深度以及Agent的目标,采取一系列有层次、有策略的干预措施,以帮助Agent脱困。
- 自适应与学习 (Adaptation & Learning):一个高级的逃逸机制应该能够从过去的失败中学习,调整其检测阈值、干预策略,甚至向Agent提供反馈,使其在未来避免类似陷阱。
- 通用性与解耦 (Generality & Decoupling):逃逸机制本身应该与具体的Agent业务逻辑和工具实现解耦。它应该是一个独立的、可插拔的组件,能够监控任何符合特定接口规范的Agent。
3. 构建逃逸机制的关键组件
基于上述原则,我们可以将逃逸机制分解为几个相互协作的关键组件:
3.1 行为历史记录器 (Behavior History Tracker)
这是逃逸机制的眼睛和耳朵。它负责捕获Agent在执行过程中的所有相关事件和状态。
目的:详细记录Agent的行动轨迹、工具调用及其结果(包括成功与失败),以及任何错误信息。
数据结构:通常使用一个定长的环形缓冲区(Ring Buffer)或一个可扩展的列表来存储历史记录。定长缓冲区可以控制内存占用,但可能丢失更早期的历史;可扩展列表则保留所有历史,但需注意内存消耗。为了通用性,我们通常会存储一个抽象的AgentEvent对象。
存储内容:每个事件记录至少应包含以下信息:
| 字段名 | 类型 | 描述 |
|---|---|---|
timestamp |
datetime |
事件发生的时间 |
event_type |
str |
事件类型 (e.g., ‘NODE_VISIT’, ‘TOOL_CALL’, ‘TOOL_SUCCESS’, ‘TOOL_FAILURE’) |
agent_state |
dict |
Agent在事件发生时的状态快照 (e.g., current_node_id, goal) |
tool_name |
str |
如果是工具调用事件,记录工具名称 |
tool_input |
dict |
工具调用的输入参数 |
tool_output |
dict |
工具调用的输出结果 (成功时) |
error_details |
dict |
错误详情 (失败时,e.g., error_code, message, stack_trace) |
path_id |
str |
Agent当前所处路径的唯一标识符(如果Agent有概念) |
代码示例:HistoryTracker 类
import collections
import datetime
import hashlib
import json
from typing import Dict, Any, List, Optional, Callable
class AgentState:
"""Representing the Agent's state at a given point."""
def __init__(self, current_node_id: str, goal: str, **kwargs):
self.current_node_id = current_node_id
self.goal = goal
self.custom_state = kwargs
def to_dict(self) -> Dict[str, Any]:
return {
"current_node_id": self.current_node_id,
"goal": self.goal,
**self.custom_state
}
def __hash__(self) -> int:
return hash(json.dumps(self.to_dict(), sort_keys=True))
def __eq__(self, other: Any) -> bool:
if not isinstance(other, AgentState):
return NotImplemented
return self.to_dict() == other.to_dict()
def __repr__(self) -> str:
return f"AgentState(node='{self.current_node_id}', goal='{self.goal}', custom={self.custom_state})"
class AgentEvent:
"""Base class for all agent events."""
def __init__(self, timestamp: datetime.datetime, agent_state: AgentState):
self.timestamp = timestamp
self.agent_state = agent_state
def to_dict(self) -> Dict[str, Any]:
return {
"timestamp": self.timestamp.isoformat(),
"agent_state": self.agent_state.to_dict(),
"event_type": self.__class__.__name__
}
class NodeVisitEvent(AgentEvent):
"""Event representing an agent visiting a node."""
def __init__(self, timestamp: datetime.datetime, agent_state: AgentState, node_id: str):
super().__init__(timestamp, agent_state)
self.node_id = node_id
def to_dict(self) -> Dict[str, Any]:
data = super().to_dict()
data["node_id"] = self.node_id
return data
class ToolCallEvent(AgentEvent):
"""Event representing an agent calling a tool."""
def __init__(self, timestamp: datetime.datetime, agent_state: AgentState,
tool_name: str, tool_input: Dict[str, Any]):
super().__init__(timestamp, agent_state)
self.tool_name = tool_name
self.tool_input = tool_input
def to_dict(self) -> Dict[str, Any]:
data = super().to_dict()
data["tool_name"] = self.tool_name
data["tool_input"] = self.tool_input
return data
class ToolResultEvent(AgentEvent):
"""Event representing the result of a tool call."""
def __init__(self, timestamp: datetime.datetime, agent_state: AgentState,
tool_name: str, tool_output: Optional[Dict[str, Any]] = None,
error_details: Optional[Dict[str, Any]] = None, success: bool = True):
super().__init__(timestamp, agent_state)
self.tool_name = tool_name
self.tool_output = tool_output
self.error_details = error_details
self.success = success
def to_dict(self) -> Dict[str, Any]:
data = super().to_dict()
data["tool_name"] = self.tool_name
data["tool_output"] = self.tool_output
data["error_details"] = self.error_details
data["success"] = self.success
return data
class HistoryTracker:
"""Manages the history of agent events."""
def __init__(self, max_history_size: int = 100):
self.history: collections.deque[AgentEvent] = collections.deque(maxlen=max_history_size)
self.max_history_size = max_history_size
def record_event(self, event: AgentEvent):
"""Records an event in the history."""
self.history.append(event)
# print(f"Recorded event: {event.to_dict()}")
def get_history(self) -> List[AgentEvent]:
"""Returns the current history as a list."""
return list(self.history)
def get_last_n_events(self, n: int) -> List[AgentEvent]:
"""Returns the last n events."""
return list(collections.deque(self.history, maxlen=n))
def clear(self):
"""Clears the history."""
self.history.clear()
# Example Usage:
# tracker = HistoryTracker(max_history_size=10)
# agent_state_1 = AgentState("node_A", "goal_X")
# tracker.record_event(NodeVisitEvent(datetime.datetime.now(), agent_state_1, "node_A"))
# tracker.record_event(ToolCallEvent(datetime.datetime.now(), agent_state_1, "tool_P", {"param": 1}))
# tracker.record_event(ToolResultEvent(datetime.datetime.now(), agent_state_1, "tool_P", error_details={"code": 500}, success=False))
# agent_state_2 = AgentState("node_B", "goal_X")
# tracker.record_event(NodeVisitEvent(datetime.datetime.now(), agent_state_2, "node_B"))
# print(tracker.get_history())
3.2 循环检测器 (Loop Detector)
有了历史记录,下一步就是分析这些记录,识别出Agent是否陷入了循环。
目的:分析Agent的行为历史,判断是否存在重复的、导致死循环的模式。
方法:
- 节点访问频率 (Node Visit Frequency):简单但有效。如果在短时间内(例如,最近N个事件中),Agent频繁访问同一个节点或一组节点,这可能是一个循环的信号。
- 行动序列匹配 (Action Sequence Matching):更精确的方法。Agent不仅重复访问节点,而且重复执行一系列相同的工具调用和决策。例如,
NodeA -> ToolX(fail) -> NodeB -> ToolY(fail) -> NodeA。 - 状态哈希 (State Hashing):如果Agent的完整状态可以被哈希化(通过
__hash__方法),那么检测到重复的状态哈希值序列,尤其是连续重复,是循环的强信号。这需要AgentState具备良好的哈希表示。 - 时间窗口检测 (Time-Windowed Detection):在特定时间窗口内,检查重复模式。
阈值与灵敏度:
- 循环长度 (Loop Length):检测多长的序列才算循环。
- 重复次数 (Repetition Count):一个序列重复多少次才被认为是循环。
- 时间阈值 (Time Threshold):在多长时间内发生的重复才被视为循环。
代码示例:LoopDetector 类
class LoopDetector:
"""Detects loops in agent's behavior history."""
def __init__(self,
history_tracker: HistoryTracker,
min_loop_length: int = 3, # Minimum length of a sequence to consider as a loop
min_repetitions: int = 2, # Minimum number of times the sequence must repeat
time_window_seconds: int = 60): # Time window for detection
self.history_tracker = history_tracker
self.min_loop_length = min_loop_length
self.min_repetitions = min_repetitions
self.time_window_seconds = time_window_seconds
def _get_relevant_events(self) -> List[AgentEvent]:
"""Filters events within the time window."""
now = datetime.datetime.now()
return [event for event in self.history_tracker.get_history()
if (now - event.timestamp).total_seconds() <= self.time_window_seconds]
def _hash_event_sequence(self, sequence: List[AgentEvent]) -> str:
"""Generates a hash for an event sequence, focusing on key attributes."""
# For simplicity, we'll hash the current_node_id and tool_name/success status
# A more robust hash would include tool_input/output or full state.
seq_data = []
for event in sequence:
if isinstance(event, NodeVisitEvent):
seq_data.append(f"NODE:{event.node_id}")
elif isinstance(event, ToolCallEvent):
seq_data.append(f"TOOL_CALL:{event.tool_name}")
elif isinstance(event, ToolResultEvent):
seq_data.append(f"TOOL_RESULT:{event.tool_name}:{event.success}")
return hashlib.sha256("->".join(seq_data).encode('utf-8')).hexdigest()
def detect_node_repetition(self, node_id: str, threshold: int = 5) -> bool:
"""
Detects if a specific node has been visited too many times recently.
Args:
node_id: The ID of the node to check.
threshold: The maximum allowed visits within the time window.
Returns:
True if the node has been visited more than the threshold.
"""
relevant_events = self._get_relevant_events()
visit_count = sum(1 for event in relevant_events
if isinstance(event, NodeVisitEvent) and event.node_id == node_id)
return visit_count >= threshold
def detect_action_sequence_loop(self) -> Optional[List[AgentEvent]]:
"""
Detects if a sequence of actions (nodes visited, tools called, results) is repeating.
This is a more sophisticated detection method.
"""
relevant_events = self._get_relevant_events()
if len(relevant_events) < self.min_loop_length * self.min_repetitions:
return None
# Iterate backwards to find potential repeating sequences
for i in range(len(relevant_events) - self.min_loop_length, 0, -1):
for j in range(self.min_loop_length, (len(relevant_events) - i) // 2 + 1):
# Candidate sequence: events from i to i + j - 1
candidate_sequence = relevant_events[i : i + j]
candidate_hash = self._hash_event_sequence(candidate_sequence)
repetitions = 1
current_idx = i + j
while current_idx + j <= len(relevant_events):
next_sequence = relevant_events[current_idx : current_idx + j]
if self._hash_event_sequence(next_sequence) == candidate_hash:
repetitions += 1
current_idx += j
else:
break
if repetitions >= self.min_repetitions:
print(f"Detected loop: sequence repeated {repetitions} times.")
return candidate_sequence # Return the repeating sequence
return None
def detect_state_loop(self) -> Optional[List[AgentState]]:
"""
Detects if the agent's state is repeating in a cycle.
Requires AgentState to be hashable and comparable.
"""
relevant_events = self._get_relevant_events()
if len(relevant_events) < self.min_loop_length * self.min_repetitions:
return None
state_sequence = [event.agent_state for event in relevant_events]
if len(state_sequence) < self.min_loop_length * self.min_repetitions:
return None
for i in range(len(state_sequence) - self.min_loop_length, 0, -1):
for j in range(self.min_loop_length, (len(state_sequence) - i) // 2 + 1):
candidate_state_seq = state_sequence[i : i + j]
# Check for exact state sequence repetition
repetitions = 1
current_idx = i + j
while current_idx + j <= len(state_sequence):
next_state_seq = state_sequence[current_idx : current_idx + j]
if candidate_state_seq == next_state_seq: # Uses AgentState.__eq__
repetitions += 1
current_idx += j
else:
break
if repetitions >= self.min_repetitions:
print(f"Detected state loop: sequence repeated {repetitions} times.")
return candidate_state_seq
return None
def is_in_loop(self) -> bool:
"""Checks if the agent is currently in any detected loop."""
if self.detect_action_sequence_loop() is not None:
return True
if self.detect_state_loop() is not None:
return True
# You could add node repetition checks here for the current node
# if self.history_tracker.history:
# current_node = self.history_tracker.history[-1].agent_state.current_node_id
# if self.detect_node_repetition(current_node, threshold=self.min_repetitions + 1):
# return True
return False
# Example Usage:
# tracker = HistoryTracker(max_history_size=20)
# detector = LoopDetector(tracker, min_loop_length=3, min_repetitions=2, time_window_seconds=300)
#
# # Simulate a loop: A -> B -> C -> A -> B -> C
# for _ in range(3):
# s_a = AgentState("NodeA", "GoalX")
# tracker.record_event(NodeVisitEvent(datetime.datetime.now(), s_a, "NodeA"))
# tracker.record_event(ToolCallEvent(datetime.datetime.now(), s_a, "ToolX", {"val": 1}))
# tracker.record_event(ToolResultEvent(datetime.datetime.now(), s_a, "ToolX", success=True))
#
# s_b = AgentState("NodeB", "GoalX")
# tracker.record_event(NodeVisitEvent(datetime.datetime.now(), s_b, "NodeB"))
# tracker.record_event(ToolCallEvent(datetime.datetime.now(), s_b, "ToolY", {"val": 2}))
# tracker.record_event(ToolResultEvent(datetime.datetime.now(), s_b, "ToolY", success=False, error_details={"code": 500})) # Tool error
#
# s_c = AgentState("NodeC", "GoalX")
# tracker.record_event(NodeVisitEvent(datetime.datetime.now(), s_c, "NodeC"))
# tracker.record_event(ToolCallEvent(datetime.datetime.now(), s_c, "ToolZ", {"val": 3}))
# tracker.record_event(ToolResultEvent(datetime.datetime.now(), s_c, "ToolZ", success=True))
#
# print(f"Is in loop? {detector.is_in_loop()}")
#
# # Output would eventually be True
3.3 错误分类器 (Error Classifier)
随机报错是死循环的直接原因。区分错误的类型对于选择正确的干预策略至关重要。
目的:根据错误信息(如错误码、消息、堆栈跟踪),将工具报错分类为瞬时错误、持久错误或未知错误。
方法:
- 基于规则的匹配 (Rule-based Matching):最直接的方法。定义一系列正则表达式或关键字,匹配错误码或错误消息。
- 瞬时错误:
Timeout,Connection Refused,Too Many Requests (429),Internal Server Error (500)(有时是瞬时)。 - 持久错误:
Unauthorized (401),Forbidden (403),Not Found (404),Bad Request (400),Invalid Configuration。
- 瞬时错误:
- 统计分析 (Statistical Analysis):如果某个工具的错误在短时间内反复出现,即使错误码是瞬时的,也可能预示着一个更深层次的持久问题(例如,服务虽然存在,但持续过载)。
- 外部知识库/机器学习 (External Knowledge Base/ML):更复杂的系统可以利用历史数据训练模型,自动识别错误模式。
代码示例:ErrorClassifier 类
class ErrorClassifier:
"""Classifies errors into transient, persistent, or unknown."""
TRANSIENT_PATTERNS = [
"timeout", "connection refused", "network error", "service unavailable",
"too many requests", "rate limit exceeded", "internal server error",
"500", "502", "503", "504" # Common HTTP transient codes
]
PERSISTENT_PATTERNS = [
"unauthorized", "forbidden", "not found", "bad request",
"invalid credentials", "permission denied", "configuration error",
"400", "401", "403", "404", "405", "406", "409", "410" # Common HTTP persistent codes
]
def classify_error(self, error_details: Dict[str, Any]) -> str:
"""
Classifies an error based on its details.
Returns 'TRANSIENT', 'PERSISTENT', or 'UNKNOWN'.
"""
if not error_details:
return "UNKNOWN"
error_message = str(error_details.get("message", "")).lower()
error_code = str(error_details.get("code", "")).lower()
# Check for transient patterns
for pattern in self.TRANSIENT_PATTERNS:
if pattern in error_message or pattern in error_code:
return "TRANSIENT"
# Check for persistent patterns
for pattern in self.PERSISTENT_PATTERNS:
if pattern in error_message or pattern in error_code:
return "PERSISTENT"
return "UNKNOWN"
def is_transient(self, error_details: Dict[str, Any]) -> bool:
return self.classify_error(error_details) == "TRANSIENT"
def is_persistent(self, error_details: Dict[str, Any]) -> bool:
return self.classify_error(error_details) == "PERSISTENT"
# Example Usage:
# classifier = ErrorClassifier()
# print(classifier.classify_error({"code": 503, "message": "Service Unavailable"})) # TRANSIENT
# print(classifier.classify_error({"code": 401, "message": "Unauthorized access"})) # PERSISTENT
# print(classifier.classify_error({"message": "Unknown error type"})) # UNKNOWN
3.4 智能干预引擎 (Intelligent Intervention Engine)
这是逃逸机制的决策中心。它根据循环检测和错误分类的结果,选择并执行最合适的逃逸策略。
目的:在Agent陷入死循环时,采取一系列预设的策略来引导Agent脱离困境。
策略类型:
-
回退与重试 (Backoff & Retry):
- 适用场景:主要针对瞬时错误。
- 机制:在每次重试失败后,等待更长的时间间隔(指数退避,Exponential Backoff),并可能引入随机抖动(Jitter)以避免“惊群效应”。
- 限制:重试次数应有限制,避免无限重试。
-
路径探索 (Path Exploration):
- 适用场景:当Agent在某个节点或路径上反复失败,且没有明确的持久性错误阻止前进时。
- 机制:强制Agent尝试当前节点的所有未访问过或最近未尝试过的出边(next_node),即使这些出边并非Agent当前最优决策。这可能需要Agent提供一个
get_available_paths()或get_alternative_actions()接口。 - 风险:可能导致Agent偏离目标,进入更远的未知状态。
-
工具替换 (Tool Substitution):
- 适用场景:当特定工具持续报错,且存在功能相似的备用工具时。
- 机制:干预引擎指示Agent调用备用工具。例如,主数据库连接失败,尝试只读副本;主API服务失败,尝试镜像API。
- 要求:Agent或环境需要预先注册备用工具。
-
状态回滚 (State Rollback):
- 适用场景:当Agent状态因为循环而变得混乱或不一致时。
- 机制:将Agent的状态恢复到循环发生前的某个已知安全点。这要求Agent的状态是可序列化和可恢复的。
- 挑战:回滚可能涉及外部副作用的撤销,这通常非常复杂。
-
断路器模式 (Circuit Breaker Pattern):
- 适用场景:针对持续性工具报错。
- 机制:当某个工具的错误率达到阈值时,断路器会“打开”,阻止Agent继续调用该工具一段时间。在这段时间内,所有对该工具的调用都会立即失败,而不是等待工具实际执行。一段时间后,断路器进入“半开”状态,允许少量请求通过以测试工具是否恢复。
- 优势:防止Agent对一个已知的故障工具进行徒劳的调用,浪费资源。
-
人工介入 (Human Intervention):
- 适用场景:当所有自动化策略都失败,或问题性质复杂,需要人工判断时。
- 机制:发出警报(邮件、短信、日志),提供Agent的历史记录和当前状态,请求人工介入处理。Agent可以暂停或进入受限模式。
-
紧急停止 (Emergency Stop):
- 适用场景:系统处于不可恢复的死循环,或可能造成严重破坏时。
- 机制:强制终止Agent的执行。这是最后的手段。
优先级与组合:干预策略通常会按优先级组合使用,例如:
- 瞬时错误:回退与重试。
- 多次重试失败后,或检测到持久性错误:断路器模式 + 路径探索。
- 上述策略失败,且循环持续:工具替换或状态回滚。
- 所有自动化策略穷尽:人工介入或紧急停止。
代码示例:InterventionEngine 类
class EscapeAction:
"""Represents a proposed action by the escape mechanism."""
def __init__(self, action_type: str, details: Dict[str, Any] = None):
self.action_type = action_type # e.g., "RETRY_WITH_BACKOFF", "EXPLORE_PATH", "CIRCUIT_BREAKER_OPEN", "HUMAN_INTERVENTION", "STOP"
self.details = details if details is not None else {}
def __repr__(self):
return f"EscapeAction(type='{self.action_type}', details={self.details})"
class InterventionEngine:
"""Decides and suggests escape actions based on detected loops and errors."""
def __init__(self,
error_classifier: ErrorClassifier,
max_retries_on_transient: int = 3,
circuit_breaker_threshold: int = 5, # Consecutive failures
circuit_breaker_timeout_sec: int = 30):
self.error_classifier = error_classifier
self.max_retries_on_transient = max_retries_on_transient
self.circuit_breaker_threshold = circuit_breaker_threshold
self.circuit_breaker_timeout_sec = circuit_breaker_timeout_sec
self.tool_failure_counts: Dict[str, int] = collections.defaultdict(int)
self.circuit_breakers: Dict[str, "CircuitBreaker"] = {} # Tool name -> CircuitBreaker instance
def _get_circuit_breaker(self, tool_name: str) -> "CircuitBreaker":
if tool_name not in self.circuit_breakers:
self.circuit_breakers[tool_name] = CircuitBreaker(
failure_threshold=self.circuit_breaker_threshold,
reset_timeout_seconds=self.circuit_breaker_timeout_sec
)
return self.circuit_breakers[tool_name]
def reset_tool_failure_count(self, tool_name: str):
self.tool_failure_counts[tool_name] = 0
if tool_name in self.circuit_breakers:
self.circuit_breakers[tool_name].reset()
def propose_escape_action(self,
agent_state: AgentState,
last_tool_result: Optional[ToolResultEvent],
is_loop_detected: bool,
available_paths: List[str] = None) -> List[EscapeAction]:
"""
Proposes a list of ordered escape actions.
Args:
agent_state: Current state of the agent.
last_tool_result: The result of the last tool call, or None if no tool was called.
is_loop_detected: True if a loop has been detected by the LoopDetector.
available_paths: A list of alternative node IDs or path identifiers the agent can take.
Returns:
A list of EscapeAction objects, ordered by priority.
"""
actions: List[EscapeAction] = []
# 1. Handle direct tool failures
if last_tool_result and not last_tool_result.success:
tool_name = last_tool_result.tool_name
error_details = last_tool_result.error_details
error_type = self.error_classifier.classify_error(error_details)
cb = self._get_circuit_breaker(tool_name)
cb.record_failure() # Record failure for circuit breaker
if cb.is_open():
print(f"Circuit breaker for {tool_name} is OPEN. Suggesting alternative/stop.")
actions.append(EscapeAction("CIRCUIT_BREAKER_OPEN", {"tool_name": tool_name}))
# If CB is open, don't try this tool. Suggest path exploration or human intervention.
elif error_type == "TRANSIENT":
self.tool_failure_counts[tool_name] += 1
if self.tool_failure_counts[tool_name] <= self.max_retries_on_transient:
actions.append(EscapeAction("RETRY_WITH_BACKOFF", {"tool_name": tool_name, "attempt": self.tool_failure_counts[tool_name]}))
else:
# Too many transient failures, treat as persistent for this cycle
print(f"Tool {tool_name} had too many transient failures. Escalating.")
actions.append(EscapeAction("CONSIDER_PERSISTENT_ERROR", {"tool_name": tool_name}))
elif error_type == "PERSISTENT":
actions.append(EscapeAction("PERSISTENT_ERROR", {"tool_name": tool_name, "details": error_details}))
# For persistent errors, immediately suggest alternative paths or tool replacement
# If circuit breaker is not open yet, but is about to open (threshold reached)
if cb.is_half_open() and not actions: # If no other specific action yet
actions.append(EscapeAction("TEST_TOOL_HALF_OPEN", {"tool_name": tool_name}))
# 2. If a loop is detected or previous actions failed to break it
if is_loop_detected or not actions: # if no specific error action, but loop exists
# Prioritize exploring new paths if available
if available_paths:
actions.append(EscapeAction("EXPLORE_PATH", {"current_node": agent_state.current_node_id, "alternatives": available_paths}))
# Suggest tool substitution if applicable (requires external mapping)
# Example: if tool_name in tool_substitution_map:
# actions.append(EscapeAction("TOOL_SUBSTITUTION", {"original": tool_name, "substitute": tool_substitution_map[tool_name]}))
# If agent state is trackable and reversible, consider rollback
# actions.append(EscapeAction("STATE_ROLLBACK", {"target_state_hash": "previous_safe_hash"}))
# As a last resort for automated actions
if not actions and is_loop_detected: # If no other specific action and loop is detected
actions.append(EscapeAction("HUMAN_INTERVENTION", {"reason": "Persistent loop detected, automated strategies exhausted."}))
actions.append(EscapeAction("EMERGENCY_STOP", {"reason": "Unresolvable loop, potential resource drain."}))
return actions
class CircuitBreaker:
"""Implements the Circuit Breaker pattern."""
STATE_CLOSED = "CLOSED"
STATE_OPEN = "OPEN"
STATE_HALF_OPEN = "HALF_OPEN"
def __init__(self, failure_threshold: int = 5, reset_timeout_seconds: int = 30):
self.failure_threshold = failure_threshold
self.reset_timeout_seconds = reset_timeout_seconds
self.state = self.STATE_CLOSED
self.failures = 0
self.last_failure_time: Optional[datetime.datetime] = None
def record_failure(self):
if self.state == self.STATE_CLOSED:
self.failures += 1
self.last_failure_time = datetime.datetime.now()
if self.failures >= self.failure_threshold:
self.open()
elif self.state == self.STATE_HALF_OPEN:
# If a test request fails in half-open, go back to open
self.open()
def record_success(self):
if self.state == self.STATE_HALF_OPEN:
self.reset() # If success in half-open, close the breaker
elif self.state == self.STATE_CLOSED:
self.failures = 0 # Reset failures on success
def open(self):
self.state = self.STATE_OPEN
self.last_failure_time = datetime.datetime.now()
print(f"Circuit Breaker OPENED. Failures: {self.failures}")
def half_open(self):
self.state = self.STATE_HALF_OPEN
print("Circuit Breaker HALF_OPEN.")
def reset(self):
self.state = self.STATE_CLOSED
self.failures = 0
self.last_failure_time = None
print("Circuit Breaker CLOSED.")
def is_open(self) -> bool:
if self.state == self.STATE_OPEN:
if (datetime.datetime.now() - self.last_failure_time).total_seconds() > self.reset_timeout_seconds:
self.half_open()
return False # Allow one test request
return True
return False
def is_half_open(self) -> bool:
return self.state == self.STATE_HALF_OPEN
def allows_request(self) -> bool:
"""Determines if a request is allowed to pass through the breaker."""
if self.state == self.STATE_CLOSED:
return True
elif self.state == self.STATE_HALF_OPEN:
return True # Allow a single test request
elif self.state == self.STATE_OPEN:
if (datetime.datetime.now() - self.last_failure_time).total_seconds() > self.reset_timeout_seconds:
self.half_open()
return True # Allow the first request after timeout to be a test
return False
return False # Default to not allowing
# Example Usage:
# cb = CircuitBreaker(failure_threshold=3, reset_timeout_seconds=5)
# print(f"Initial state: {cb.state}")
# for i in range(5):
# if not cb.allows_request():
# print(f"Request blocked by CB at attempt {i+1}")
# continue
# # Simulate tool call
# if i < 3: # First 3 fail
# cb.record_failure()
# print(f"Tool failed. CB state: {cb.state}")
# else: # Then success
# cb.record_success()
# print(f"Tool succeeded. CB state: {cb.state}")
#
# # Output would show it opening and eventually half-opening after timeout.
3.5 策略管理器 (Policy Manager)
为了使逃逸机制通用且可配置,我们需要一个策略管理器来集中管理各种参数和行为。
目的:提供一个统一的接口,配置循环检测阈值、错误分类规则、干预策略的优先级和参数。
可配置性:
- 动态调整
max_history_size。 - 配置
min_loop_length,min_repetitions,time_window_seconds。 - 更新
TRANSIENT_PATTERNS,PERSISTENT_PATTERNS。 - 修改
max_retries_on_transient,circuit_breaker_threshold,circuit_breaker_timeout_sec。 - 定义工具替换映射。
- 配置人工介入的通知方式。
代码示例:EscapePolicyManager 类
class EscapePolicyManager:
"""Manages all configurable policies and parameters for the escape mechanism."""
def __init__(self):
self.config = {
"history_tracker": {
"max_history_size": 100
},
"loop_detector": {
"min_loop_length": 3,
"min_repetitions": 2,
"time_window_seconds": 60
},
"error_classifier": {
"transient_patterns": ErrorClassifier.TRANSIENT_PATTERNS,
"persistent_patterns": ErrorClassifier.PERSISTENT_PATTERNS
},
"intervention_engine": {
"max_retries_on_transient": 3,
"circuit_breaker_threshold": 5,
"circuit_breaker_timeout_sec": 30,
"tool_substitution_map": {} # e.g., {"primary_db": "read_replica_db"}
}
}
def update_config(self, new_config: Dict[str, Any]):
"""Recursively updates the configuration."""
def _update(d, u):
for k, v in u.items():
if isinstance(v, collections.abc.Mapping):
d[k] = _update(d.get(k, {}), v)
else:
d[k] = v
return d
self.config = _update(self.config, new_config)
print("Escape policies updated.")
def get_config(self) -> Dict[str, Any]:
return self.config
# Example Usage:
# policy_manager = EscapePolicyManager()
# new_settings = {
# "loop_detector": {
# "min_loop_length": 2,
# "time_window_seconds": 120
# },
# "intervention_engine": {
# "max_retries_on_transient": 5
# }
# }
# policy_manager.update_config(new_settings)
# print(policy_manager.get_config())
4. 集成:通用逃逸机制框架
现在我们将所有组件整合到一个通用的逃逸机制框架中。
4.1 架构设计
逃逸机制应该作为一个独立的、可插拔的模块存在,与Agent的核心业务逻辑解耦。它通过观察Agent的事件流来工作。
Agent接口与工具封装:
为了实现通用性,我们假设Agent和Tool都遵循一定的接口规范。
AbstractAgent:get_current_state() -> AgentState:返回Agent的当前状态。execute_action(action: Any) -> Optional[ToolResultEvent]:执行一个动作,可能涉及工具调用,并返回工具结果。get_available_paths() -> List[str]:返回Agent可以探索的替代路径或节点ID。apply_escape_action(action: EscapeAction): Agent接收逃逸机制的指令并执行。
AbstractTool:execute(input: Dict[str, Any]) -> Dict[str, Any]:执行工具的业务逻辑。- 内部封装错误处理,或直接抛出异常,由外部捕获并转换为
ToolResultEvent。
4.2 机制的运行流程
- Agent执行一步: Agent根据其内部逻辑选择一个动作(例如,移动到新节点,调用某个工具)。
- 记录历史: 在Agent执行动作之前或之后,
HistoryTracker记录NodeVisitEvent或ToolCallEvent。 - 工具调用与结果: 如果动作涉及工具调用,Agent执行工具。无论成功或失败,
HistoryTracker记录ToolResultEvent。 - 循环/错误检测:
LoopDetector检查历史记录,判断是否存在循环;ErrorClassifier分析最新的工具错误。 - 干预决策:
InterventionEngine根据LoopDetector和ErrorClassifier的输出,以及Agent的当前状态,提出一系列EscapeAction。 - 执行干预: Agent接收到
EscapeAction后,根据指令调整其行为。这可能意味着:- 等待(Backoff)。
- 重试。
- 选择一个非最优但可行的替代路径。
- 停止使用某个故障工具(Circuit Breaker)。
- 回滚状态。
- 发出警报并暂停。
- Agent恢复执行或终止: Agent在执行完干预动作后,尝试继续其任务,或者如果被指示,则终止。
表格:组件职责与交互
| 组件 | 主要职责 | 输入 | 输出 | 依赖组件 |
|---|---|---|---|---|
Agent |
业务逻辑,行动执行,状态管理 | EscapeAction |
AgentState, ToolResultEvent |
EscapeMechanismOrchestrator |
Tool |
提供具体功能,执行操作 | Agent的输入 | 工具输出或错误 | Agent |
HistoryTracker |
记录Agent所有行为和事件 | AgentEvent |
List[AgentEvent] |
– |
LoopDetector |
分析历史,检测循环模式 | List[AgentEvent] (from Tracker) |
bool (is_in_loop), Optional[List[AgentEvent]] |
HistoryTracker |
ErrorClassifier |
分类工具错误 | Dict[str, Any] (error_details) |
str (error_type) |
– |
InterventionEngine |
决策逃逸策略 | AgentState, ToolResultEvent, bool (is_loop), List[str] (paths) |
List[EscapeAction] |
ErrorClassifier |
EscapePolicyManager |
管理所有配置和策略参数 | Dict[str, Any] (new config) |
Dict[str, Any] (current config) |
– |
EscapeMechanismOrchestrator |
协调所有组件,驱动流程 | Agent的事件流 |
EscapeAction |
所有其他逃逸机制组件,Agent |
4.3 EscapeMechanismOrchestrator 类
这个类是整个逃逸机制的协调者,它将所有组件连接起来,并提供给Agent一个统一的接口。
import time
import random
import abc
# Assuming AgentState, AgentEvent classes, HistoryTracker, LoopDetector, ErrorClassifier,
# InterventionEngine, EscapeAction, EscapePolicyManager, CircuitBreaker are defined as above.
# --- Agent and Tool Interfaces for integration ---
class AbstractAgent(abc.ABC):
"""Abstract base class for an Agent that can be monitored and controlled."""
@abc.abstractmethod
def get_current_state(self) -> AgentState:
pass
@abc.abstractmethod
def execute_action(self, action_name: str, action_params: Dict[str, Any]) -> Optional[ToolResultEvent]:
"""
Executes an action, which might involve calling a tool.
Returns ToolResultEvent or None if no tool was called/result recorded directly.
"""
pass
@abc.abstractmethod
def get_available_paths(self) -> List[str]:
"""Returns a list of alternative node IDs or paths the agent can take."""
pass
@abc.abstractmethod
def apply_escape_action(self, action: EscapeAction):
"""Agent applies the escape action suggested by the mechanism."""
pass
@abc.abstractmethod
def is_goal_reached(self) -> bool:
"""Checks if the agent has reached its goal."""
pass
@abc.abstractmethod
def is_terminated(self) -> bool:
"""Checks if the agent has terminated (e.g., due to emergency stop)."""
pass
class AbstractTool(abc.ABC):
"""Abstract base class for a Tool used by the Agent."""
@abc.abstractmethod
def execute(self, input_params: Dict[str, Any]) -> Dict[str, Any]:
"""Executes the tool logic. Should raise exceptions on failure."""
pass
# --- EscapeMechanismOrchestrator ---
class EscapeMechanismOrchestrator:
"""
Coordinates the escape mechanism components and interacts with the Agent.
"""
def __init__(self, policy_manager: EscapePolicyManager):
self.policy_manager = policy_manager
config = self.policy_manager.get_config()
self.history_tracker = HistoryTracker(
max_history_size=config["history_tracker"]["max_history_size"]
)
self.error_classifier = ErrorClassifier()
self.error_classifier.TRANSIENT_PATTERNS = config["error_classifier"]["transient_patterns"]
self.error_classifier.PERSISTENT_PATTERNS = config["error_classifier"]["persistent_patterns"]
self.loop_detector = LoopDetector(
history_tracker=self.history_tracker,
min_loop_length=config["loop_detector"]["min_loop_length"],
min_repetitions=config["loop_detector"]["min_repetitions"],
time_window_seconds=config["loop_detector"]["time_window_seconds"]
)
self.intervention_engine = InterventionEngine(
error_classifier=self.error_classifier,
max_retries_on_transient=config["intervention_engine"]["max_retries_on_transient"],
circuit_breaker_threshold=config["intervention_engine"]["circuit_breaker_threshold"],
circuit_breaker_timeout_sec=config["intervention_engine"]["circuit_breaker_timeout_sec"]
)
# Add tool substitution map to intervention engine if needed
self.intervention_engine.tool_substitution_map = config["intervention_engine"].get("tool_substitution_map", {})
self._last_tool_call_timestamp: Dict[str, datetime.datetime] = {}
self._retry_delays: Dict[str, float] = collections.defaultdict(lambda: 0.1) # Initial delay for backoff
def record_agent_event(self, event: AgentEvent):
"""Records any event from the agent."""
self.history_tracker.record_event(event)
if isinstance(event, ToolResultEvent) and event.success:
self.intervention_engine.reset_tool_failure_count(event.tool_name)
self._retry_delays[event.tool_name] = 0.1 # Reset delay on success
elif isinstance(event, ToolCallEvent):
self._last_tool_call_timestamp[event.tool_name] = event.timestamp
def get_backoff_delay(self, tool_name: str, attempt: int) -> float:
"""Calculates exponential backoff delay with jitter."""
base_delay = self._retry_delays[tool_name]
delay = base_delay * (2 ** (attempt - 1)) + random.uniform(0, 0.1 * base_delay)
self._retry_delays[tool_name] = delay # Store for next attempt
return min(delay, 60) # Cap at 60 seconds to prevent excessively long delays
def check_and_intervene(self, agent: AbstractAgent, last_tool_result: Optional[ToolResultEvent]) -> bool:
"""
Checks for loops/errors and proposes/applies interventions.
Returns True if an intervention was applied and agent's state might have changed.
"""
current_agent_state = agent.get_current_state()
is_loop_detected = self.loop_detector.is_in_loop()
available_paths = agent.get_available_paths()
proposed_actions = self.intervention_engine.propose_escape_action(
agent_state=current_agent_state,
last_tool_result=last_tool_result,
is_loop_detected=is_loop_detected,
available_paths=available_paths
)
if not proposed_actions:
return False # No intervention needed
# Apply the highest priority action
action_applied = False
for action in proposed_actions:
print(f"Applying escape action: {action}")
if action.action_type == "RETRY_WITH_BACKOFF":
tool_name = action.details["tool_name"]
attempt = action.details["attempt"]
delay = self.get_backoff_delay(tool_name, attempt)
print(f"Backing off for {delay:.2f} seconds before retrying {tool_name} (attempt {attempt}).")
time.sleep(delay)
# Agent's next action should be a retry
agent.apply_escape_action(action) # Inform agent to retry
action_applied = True
break
elif action.action_type == "EXPLORE_PATH":
if available_paths:
# Choose a path that hasn't been tried recently or is not part of the detected loop
# For simplicity, pick a random one here. A real implementation would be smarter.
chosen_path = random.choice(available_paths)
print(f"Exploring alternative path: {chosen_path}")
agent.apply_escape_action(EscapeAction("SET_NEXT_NODE", {"node_id": chosen_path}))
action_applied = True
break
elif action.action_type == "CIRCUIT_BREAKER_OPEN":
tool_name = action.details["tool_name"]
print(f"Tool '{tool_name}' circuit breaker is OPEN. Agent should avoid this tool.")
# Agent needs to be instructed to avoid this tool for a while
agent.apply_escape_action(action)
action_applied = True
break
elif action.action_type == "HUMAN_INTERVENTION":
print("--- ALERT: Human Intervention Required! ---")
print(f"Reason: {action.details.get('reason', 'Unknown')}")
print(f"Current Agent State: {current_agent_state.to_dict()}")
print(f"History: {[e.to_dict() for e in self.history_tracker.get_last_n_events(10)]}")
# In a real system, send email/SMS, log to monitoring system
agent.apply_escape_action(action) # Agent might pause or await human input
action_applied = True
break
elif action.action_type == "EMERGENCY_STOP":
print("--- CRITICAL: Emergency Stop Initiated! ---")
print(f"Reason: {action.details.get('reason', 'Unresolvable issue.')}")
agent.apply_escape_action(action) # Agent should terminate
action_applied = True
break
# Add other action types (TOOL_SUBSTITUTION, STATE_ROLLBACK etc.)
return action_applied
5. 高级考量与优化
构建一个通用的逃逸机制并非一劳永逸,还需要考虑以下高级问题:
- 性能开销:
HistoryTracker的内存占用,LoopDetector的计算复杂度。需要平衡历史深度和检测速度。哈希化和定长队列是常用的优化手段。 - 误报与漏报:过于敏感的循环检测可能导致不必要的干预(误报),而不够敏感则可能让Agent在循环中运行太久(漏报)。需要通过调整阈值和多维度检测来平衡。
- Agent的认知能力:如果Agent本身具备对图结构、工具能力和自身目标的更深层次理解,它可以与逃逸机制进行更智能的协作,例如,主动报告“我遇到了困难”,或根据机制建议更精确地选择替代路径。
- 分布式Agent:在多个Agent协同工作的场景下,逃逸机制需要能够跨Agent协调。一个Agent的死循环可能影响其他Agent,甚至整个系统。这需要一个中心化的逃逸服务或Agent间的协调协议。
- 学习与适应:通过强化学习,Agent或逃逸机制可以学习哪些干预策略在特定情况下最有效。例如,记录每次循环发生时的Agent状态、干预策略和结果,然后训练模型预测最佳干预措施。
- 可视化与监控:提供实时的仪表板,展示Agent的当前状态、历史路径、工具错误率、循环检测状态以及干预措施。这对于调试和人工介入至关重要。
- 幂等性与事务:状态回滚和重试操作需要Agent和其调用的工具具备幂等性(重复执行不会产生额外副作用)和事务性(操作要么全部成功,要么全部失败),这在分布式系统中是巨大的挑战。
6. 实践案例与代码演练
让我们通过一个简化的模拟Agent来演示如何集成这些组件并触发逃逸机制。
假设我们有一个Agent,它的目标是从start_node移动到end_node。在途中,它可能需要调用工具。我们将模拟一个环形图,其中一个工具会随机失败。
# Re-import necessary modules in case this is run standalone
import collections
import datetime
import hashlib
import json
import time
import random
import abc
from typing import Dict, Any, List, Optional, Callable
# Assume all previous classes (AgentState, AgentEvent, NodeVisitEvent, ToolCallEvent, ToolResultEvent,
# HistoryTracker, LoopDetector, ErrorClassifier, EscapeAction, InterventionEngine, CircuitBreaker,
# EscapePolicyManager, AbstractAgent, AbstractTool) are defined and available here.
# For brevity, I'll just include the custom Agent and Tool for the simulation.
# --- Custom Agent Implementation for Simulation ---
class MyAgent(AbstractAgent):
def __init__(self, agent_id: str, start_node: str, end_node: str, graph: Dict[str, List[str]], tools: Dict[str, AbstractTool]):
self.agent_id = agent_id
self.current_node = start_node
self.goal_node = end_node
self.graph = graph # Adjacency list representation: {node_id: [neighbor_node_ids]}
self.tools = tools
self.path_history: List[str] = [start_node]
self.current_goal = "Reach " + end_node
self._terminated = False
self._next_forced_node: Optional[str] = None # For EXPLORE_PATH action
self._avoid_tools: collections.deque[str] = collections.deque(maxlen=5) # Tools to avoid temporarily
self._retry_tool_name: Optional[str] = None # Tool name to retry after backoff
def get_current_state(self) -> AgentState:
return AgentState(self.current_node, self.current_goal, path=self.path_history[-5:])
def execute_action(self, action_name: str, action_params: Dict[str, Any]) -> Optional[ToolResultEvent]:
current_state = self.get_current_state()
if action_name == "MOVE":
next_node = action_params["node_id"]
if next_node in self.graph.get(self.current_node, []):
self.current_node = next_node
self.path_history.append(next_node)
print(f"Agent {self.agent_id} moved to node: {self.current_node}")
return None # No tool result for simple move
else:
print(f"Agent {self.agent_id} tried to move to invalid node: {next_node}")
# This could be a tool result indicating failure of pathfinding/move
return ToolResultEvent(datetime.datetime.now(), current_state, "move_tool", success=False, error_details={"code": 400, "message": "Invalid move"})
elif action_name == "CALL_TOOL":
tool_name = action_params["tool_name"]
tool_input = action_params["tool_input"]
if tool_name in self._avoid_tools:
print(f"Agent {self.agent_id} tried to call avoided tool {tool_name}. Skipping.")
return ToolResultEvent(datetime.datetime.now(), current_state, tool_name, success=False, error_details={"code": 503, "message": f"Tool {tool_name} temporarily avoided by escape mechanism"})
if tool_name not in self.tools:
print(f"Agent {self.agent_id} tried to call unknown tool: {tool_name}")
return ToolResultEvent(datetime.datetime.now(), current_state, tool_name, success=False, error_details={"code": 404, "message": "Tool not found"})
print(f"Agent {self.agent_id} calling tool: {tool_name} with {tool_input}")
try:
tool_output = self.tools[tool_name].execute(tool_input)
return ToolResultEvent(datetime.datetime.now(), current_state, tool_name, tool_output=tool_output, success=True)
except Exception as e:
print(f"Agent {self.agent_id} tool {tool_name} failed: {e}")
return ToolResultEvent(datetime.datetime.now(), current_state, tool_name, success=False, error_details={"code": 500, "message": str(e)})
else:
print(f"Agent {self.agent_id} received unknown action: {action_name}")
return None
def get_available_paths(self) -> List[str]:
"""Returns neighbor nodes as available paths for exploration."""
return self.graph.get(self.current_node, [])
def apply_escape_action(self, action: EscapeAction):
"""Agent adjusts its behavior based on escape action."""
if action.action_type == "SET_NEXT_NODE":
self._next_forced_node = action.details["node_id"]
print(f"Agent {self.agent_id} forced to consider node: {self._next_forced_node}")
elif action.action_type == "CIRCUIT_BREAKER_OPEN":
tool_name = action.details["tool_name"]
self._avoid_tools.append(tool_name)
print(f"Agent {self.agent_id} instructed to avoid tool: {tool_name}")
elif action.action_type == "RETRY_WITH_BACKOFF":
self._retry_tool_name = action.details["tool_name"]
print(f"Agent {self.agent_id} instructed to retry tool {self._retry_tool_name} after delay.")
elif action.action_type == "HUMAN_INTERVENTION":
print(f"Agent {self.agent_id} pausing for human intervention.")
# In a real system, Agent might enter a suspended state.
pass
elif action.action_type == "EMERGENCY_STOP":
self._terminated = True
print(f"Agent {self.agent_id} terminated by emergency stop.")
else:
print(f"Agent {self.agent_id} received unhandled escape action: {action.action_type}")
def is_goal_reached(self) -> bool:
return self.current_node == self.goal_node
def is_terminated(self) -> bool:
return self._terminated
def decide_next_step(self) -> Tuple[str, Dict[str, Any]]:
"""Agent's internal logic to decide the next action."""
# If forced to a node by escape mechanism
if self._next_forced_node:
node_to_visit = self._next_forced_node
self._next_forced_node = None # Consume the forced action
return "MOVE", {"node_id": node_to_visit}
# If instructed to retry a tool
if self._retry_tool_name:
tool_name = self._retry_tool_name
self._retry_tool_name = None # Consume the retry instruction
return "CALL_TOOL", {"tool_name": tool_name, "tool_input": {"data": f"retry_data_{tool_name}"}}
# Default Agent Logic:
# 1. If at the goal, do nothing.
if self.is_goal_reached():
return "IDLE", {}
# 2. Try to move towards the goal (simplified: just pick any neighbor for now)
neighbors = self.graph.get(self.current_node, [])
if neighbors:
# Simple logic: prioritize moving away from current if it's not the goal
# Or just randomly pick a neighbor
next_node_options = [n for n in neighbors if n != self.current_node]
if next_node_options:
next_node = random.choice(next_node_options)
# Assume moving to NodeB requires ToolB. Move to NodeC requires ToolC.
# Here we simulate specific tool calls for specific nodes.
if next_node == "NodeB":
return "CALL_TOOL", {"tool_name": "ToolB", "tool_input": {"node_data": next_node}}
elif next_node == "NodeC":
return "CALL_TOOL", {"tool_name": "ToolC", "tool_input": {"node_data": next_node}}
else:
return "MOVE", {"node_id": next_node}
else:
# If no other options, maybe call a generic tool or stay
return "CALL_TOOL", {"tool_name": "GenericProcessTool", "tool_input": {"node_data": self.current_node}}
# If no neighbors or goal not reached, try a generic processing tool
return "CALL_TOOL", {"tool_name": "GenericProcessTool", "tool_input": {"node_data": self.current_node}}
# --- Custom Tool Implementation for Simulation ---
class MyTool(AbstractTool):
def __init__(self, name: str, fail_probability: float = 0.3, is_critical: bool = False):
self.name = name
self.fail_probability = fail_probability
self.is_critical = is_critical # If critical, more likely to cause loops if it fails
def execute(self, input_params: Dict[str, Any]) -> Dict[str, Any]:
print(f" Executing tool '{self.name}' with input: {input_params}")
if random.random() < self.fail_probability:
if self.is_critical:
raise Exception(f"Critical tool '{self.name}' failed randomly (simulated transient error).")
else:
raise Exception(f"Tool '{self.name}' failed randomly (simulated error).")
return {"status": "success", "tool_name": self.name, "processed_data": input_params}
# --- Main Simulation Loop ---
def run_simulation():
# 1. Setup Environment
# A circular graph: A -> B -> C -> A
graph = {
"NodeA": ["NodeB"],
"NodeB": ["NodeC"],
"NodeC": ["NodeA", "NodeD"], # NodeD is the escape path
"NodeD": ["NodeE"],
"NodeE": [] # Goal node
}
# Define Tools
tools = {
"ToolB": MyTool("ToolB", fail_probability=0.6, is_critical=True), # High failure rate tool
"ToolC": MyTool("ToolC", fail_probability=0.1),
"GenericProcessTool": MyTool("GenericProcessTool", fail_probability=0.2),
}
# Agent
agent = MyAgent("Agent001", "NodeA", "NodeE", graph, tools)
# Escape Mechanism Setup
policy_manager = EscapePolicyManager()
policy_manager.update_config({
"history_tracker": {"max_history_size": 30},
"loop_detector": {"min_loop_length": 3, "min_repetitions": 2, "time_window_seconds": 120},
"error_classifier": {
"transient_patterns": ["failed randomly", "critical tool"], # Custom patterns for simulation
"persistent_patterns": ["unknown tool", "invalid move"]
},
"intervention_engine": {
"max_retries_on_transient": 2,
"circuit_breaker_threshold": 3, # Lower threshold for faster CB open
"circuit_breaker_timeout_sec": 10
}
})
orchestrator = EscapeMechanismOrchestrator(policy_manager)
print("--- Simulation Started ---")
step_count = 0
max_steps = 100 # Prevent truly infinite loops in simulation
while not agent.is_goal_reached() and not agent.is_terminated() and step_count < max_steps:
step_count += 1
print(f"n--- Step {step_count} ---")
current_agent_state = agent.get_current_state()
orchestrator.record_agent_event(NodeVisitEvent(datetime.datetime.now(), current_agent_state, current_agent_state.current_node_id))
# Agent decides what to do next
action_name, action_params = agent.decide_next_step()
print(f"Agent decided to: {action_name} {action_params}")
last_tool_result = None
if action_name != "IDLE":
last_tool_result = agent.execute_action(action_name, action_params)
if last_tool_result:
orchestrator.record_agent_event(last_tool_result)
# Escape Mechanism checks and intervenes
intervened = orchestrator.check_and_intervene(agent, last_tool_result)
# If agent terminated by intervention, break loop
if agent.is_terminated():
break
# If agent was instructed to backoff, it would have slept in orchestrator.
# Otherwise, a small delay to simulate real-world processing.
if not intervened:
time.sleep(0.1) # Simulate some processing time
print("n--- Simulation Finished ---")
if agent.is_goal_reached():
print(f"Agent {agent.agent_id} reached goal '{agent.goal_node}' in {step_count} steps.")
elif agent.is_terminated():
print(f"Agent {agent.agent_id} terminated by escape mechanism.")
else:
print(f"Agent {agent.agent_id} did not reach goal or terminate within {max_steps} steps.")
print("Final Agent State:", agent.get_current_state().to_dict())
print("Last 10 Events:")
for event in orchestrator.history_tracker.get_last_n_events(10):
print(event.to_dict