逻辑题:如果一个 Agent 在环形图中由于 Tool 的随机报错陷入无限死循环,你该如何设计通用的‘逃逸机制’?

各位同仁,各位技术爱好者:

欢迎来到今天的技术讲座。今天我们将探讨一个在智能体(Agent)设计中至关重要,却又极具挑战性的问题:当一个Agent在复杂的、尤其是环形图中,由于其依赖的工具(Tool)出现随机报错而陷入无限死循环时,我们该如何设计一套通用、健壮的“逃逸机制”?

这不仅仅是一个理论问题,更是我们在构建自动化系统、AI Agent、微服务编排等领域中,确保系统稳定性和韧性的核心实践。想象一下,一个负责关键业务流程的Agent,因为某个第三方API的间歇性故障,或者内部服务的瞬时性错误,被困在一个重复执行、永无结果的循环中,这可能导致资源耗尽、业务停滞甚至数据不一致。作为编程专家,我们的职责便是预见并解决这类深层次的系统行为问题。

1. 困境与机遇:Agent、环形图与随机报错的交织

首先,让我们精确地定义问题场景。

Agent的本质: 我们的Agent是一个具备感知、决策和行动能力的实体。它在一个由节点(Nodes)和边(Edges)构成的图中导航,并利用一系列外部或内部的“工具”来执行特定操作,以达成其预设目标。Agent的每一步行动都可能涉及状态的改变,或者对某个工具的调用。

环形图的特性: “环形图”并不特指严格意义上的数学环图,而是泛指任何存在循环路径的图结构。在这样的图中,Agent可能沿着一条路径前进,最终又回到了之前访问过的某个节点,形成一个回路。这在现实世界中非常常见,例如:

  • 业务流程: 审批流程可能因为特定条件而退回上一环节。
  • 网络拓扑: 数据包可能在路由设备间形成循环。
  • 微服务调用: 服务A调用服务B,服务B又间接调用服务A,形成依赖环。
  • 状态机: 某些状态转换可能导致Agent回到之前的状态。

工具的随机报错: 这是问题的核心触发点。Agent在执行行动时,需要调用各种工具(例如:数据库操作、API调用、文件读写、机器学习模型推理等)。这些工具并非百分之百可靠。它们可能因为以下原因出现“随机报错”:

  • 瞬时性错误 (Transient Errors):网络抖动、数据库连接超时、并发冲突、临时资源不足。这类错误往往在短时间内重试即可恢复。
  • 持久性错误 (Persistent Errors):配置错误、权限不足、API版本不兼容、底层服务宕机。这类错误重试也无济于事,需要更深层次的干预。
  • 业务逻辑错误: 工具执行成功但返回了不符合预期的业务结果,导致Agent做出错误判断并进入循环。

无限死循环的形成: 当Agent在一个环形图中移动时,如果它尝试通过某个节点或路径,而该路径上的关键工具反复随机报错,导致Agent无法成功通过,或者每次报错后都根据其默认策略(例如,简单重试或选择“看似”可行的相邻节点)回到起点或前序节点,那么一个无限死循环就形成了。Agent不断地尝试同一组行动,调用同一组工具,遭遇同样的错误模式,却始终无法突破。

这种死循环不仅消耗宝贵的计算资源,更可能导致业务流程中断,数据不一致,甚至整个系统崩溃。因此,设计一个通用的、能够识别并打破这种循环的“逃逸机制”,是构建健壮、自适应Agent系统的关键。

2. 逃逸机制的核心设计原则

一个有效的通用逃逸机制,必须遵循以下几个核心原则:

  • 可观测性 (Observability):Agent的每一步行为,包括访问的节点、调用的工具、工具的输入、输出以及任何发生的错误,都必须被详细记录。这是进行问题分析和循环检测的基础。
  • 状态感知 (State Awareness):逃逸机制需要能够访问Agent的当前状态,并理解其历史路径和操作上下文。它不应仅仅是盲目的干预,而是基于Agent所处的环境和历史行为进行决策。
  • 循环检测 (Loop Detection):这是机制的核心功能。它需要一套智能算法来识别Agent是否正在重复相同的行为模式,或者是否在短时间内频繁访问同一组节点。
  • 智能干预 (Intelligent Intervention):一旦检测到循环,机制不能简单地停止Agent。它需要根据错误的性质、循环的深度以及Agent的目标,采取一系列有层次、有策略的干预措施,以帮助Agent脱困。
  • 自适应与学习 (Adaptation & Learning):一个高级的逃逸机制应该能够从过去的失败中学习,调整其检测阈值、干预策略,甚至向Agent提供反馈,使其在未来避免类似陷阱。
  • 通用性与解耦 (Generality & Decoupling):逃逸机制本身应该与具体的Agent业务逻辑和工具实现解耦。它应该是一个独立的、可插拔的组件,能够监控任何符合特定接口规范的Agent。

3. 构建逃逸机制的关键组件

基于上述原则,我们可以将逃逸机制分解为几个相互协作的关键组件:

3.1 行为历史记录器 (Behavior History Tracker)

这是逃逸机制的眼睛和耳朵。它负责捕获Agent在执行过程中的所有相关事件和状态。

目的:详细记录Agent的行动轨迹、工具调用及其结果(包括成功与失败),以及任何错误信息。

数据结构:通常使用一个定长的环形缓冲区(Ring Buffer)或一个可扩展的列表来存储历史记录。定长缓冲区可以控制内存占用,但可能丢失更早期的历史;可扩展列表则保留所有历史,但需注意内存消耗。为了通用性,我们通常会存储一个抽象的AgentEvent对象。

存储内容:每个事件记录至少应包含以下信息:

字段名 类型 描述
timestamp datetime 事件发生的时间
event_type str 事件类型 (e.g., ‘NODE_VISIT’, ‘TOOL_CALL’, ‘TOOL_SUCCESS’, ‘TOOL_FAILURE’)
agent_state dict Agent在事件发生时的状态快照 (e.g., current_node_id, goal)
tool_name str 如果是工具调用事件,记录工具名称
tool_input dict 工具调用的输入参数
tool_output dict 工具调用的输出结果 (成功时)
error_details dict 错误详情 (失败时,e.g., error_code, message, stack_trace)
path_id str Agent当前所处路径的唯一标识符(如果Agent有概念)

代码示例:HistoryTracker

import collections
import datetime
import hashlib
import json
from typing import Dict, Any, List, Optional, Callable

class AgentState:
    """Representing the Agent's state at a given point."""
    def __init__(self, current_node_id: str, goal: str, **kwargs):
        self.current_node_id = current_node_id
        self.goal = goal
        self.custom_state = kwargs

    def to_dict(self) -> Dict[str, Any]:
        return {
            "current_node_id": self.current_node_id,
            "goal": self.goal,
            **self.custom_state
        }

    def __hash__(self) -> int:
        return hash(json.dumps(self.to_dict(), sort_keys=True))

    def __eq__(self, other: Any) -> bool:
        if not isinstance(other, AgentState):
            return NotImplemented
        return self.to_dict() == other.to_dict()

    def __repr__(self) -> str:
        return f"AgentState(node='{self.current_node_id}', goal='{self.goal}', custom={self.custom_state})"

class AgentEvent:
    """Base class for all agent events."""
    def __init__(self, timestamp: datetime.datetime, agent_state: AgentState):
        self.timestamp = timestamp
        self.agent_state = agent_state

    def to_dict(self) -> Dict[str, Any]:
        return {
            "timestamp": self.timestamp.isoformat(),
            "agent_state": self.agent_state.to_dict(),
            "event_type": self.__class__.__name__
        }

class NodeVisitEvent(AgentEvent):
    """Event representing an agent visiting a node."""
    def __init__(self, timestamp: datetime.datetime, agent_state: AgentState, node_id: str):
        super().__init__(timestamp, agent_state)
        self.node_id = node_id

    def to_dict(self) -> Dict[str, Any]:
        data = super().to_dict()
        data["node_id"] = self.node_id
        return data

class ToolCallEvent(AgentEvent):
    """Event representing an agent calling a tool."""
    def __init__(self, timestamp: datetime.datetime, agent_state: AgentState,
                 tool_name: str, tool_input: Dict[str, Any]):
        super().__init__(timestamp, agent_state)
        self.tool_name = tool_name
        self.tool_input = tool_input

    def to_dict(self) -> Dict[str, Any]:
        data = super().to_dict()
        data["tool_name"] = self.tool_name
        data["tool_input"] = self.tool_input
        return data

class ToolResultEvent(AgentEvent):
    """Event representing the result of a tool call."""
    def __init__(self, timestamp: datetime.datetime, agent_state: AgentState,
                 tool_name: str, tool_output: Optional[Dict[str, Any]] = None,
                 error_details: Optional[Dict[str, Any]] = None, success: bool = True):
        super().__init__(timestamp, agent_state)
        self.tool_name = tool_name
        self.tool_output = tool_output
        self.error_details = error_details
        self.success = success

    def to_dict(self) -> Dict[str, Any]:
        data = super().to_dict()
        data["tool_name"] = self.tool_name
        data["tool_output"] = self.tool_output
        data["error_details"] = self.error_details
        data["success"] = self.success
        return data

class HistoryTracker:
    """Manages the history of agent events."""
    def __init__(self, max_history_size: int = 100):
        self.history: collections.deque[AgentEvent] = collections.deque(maxlen=max_history_size)
        self.max_history_size = max_history_size

    def record_event(self, event: AgentEvent):
        """Records an event in the history."""
        self.history.append(event)
        # print(f"Recorded event: {event.to_dict()}")

    def get_history(self) -> List[AgentEvent]:
        """Returns the current history as a list."""
        return list(self.history)

    def get_last_n_events(self, n: int) -> List[AgentEvent]:
        """Returns the last n events."""
        return list(collections.deque(self.history, maxlen=n))

    def clear(self):
        """Clears the history."""
        self.history.clear()

# Example Usage:
# tracker = HistoryTracker(max_history_size=10)
# agent_state_1 = AgentState("node_A", "goal_X")
# tracker.record_event(NodeVisitEvent(datetime.datetime.now(), agent_state_1, "node_A"))
# tracker.record_event(ToolCallEvent(datetime.datetime.now(), agent_state_1, "tool_P", {"param": 1}))
# tracker.record_event(ToolResultEvent(datetime.datetime.now(), agent_state_1, "tool_P", error_details={"code": 500}, success=False))
# agent_state_2 = AgentState("node_B", "goal_X")
# tracker.record_event(NodeVisitEvent(datetime.datetime.now(), agent_state_2, "node_B"))
# print(tracker.get_history())

3.2 循环检测器 (Loop Detector)

有了历史记录,下一步就是分析这些记录,识别出Agent是否陷入了循环。

目的:分析Agent的行为历史,判断是否存在重复的、导致死循环的模式。

方法

  1. 节点访问频率 (Node Visit Frequency):简单但有效。如果在短时间内(例如,最近N个事件中),Agent频繁访问同一个节点或一组节点,这可能是一个循环的信号。
  2. 行动序列匹配 (Action Sequence Matching):更精确的方法。Agent不仅重复访问节点,而且重复执行一系列相同的工具调用和决策。例如,NodeA -> ToolX(fail) -> NodeB -> ToolY(fail) -> NodeA
  3. 状态哈希 (State Hashing):如果Agent的完整状态可以被哈希化(通过__hash__方法),那么检测到重复的状态哈希值序列,尤其是连续重复,是循环的强信号。这需要AgentState具备良好的哈希表示。
  4. 时间窗口检测 (Time-Windowed Detection):在特定时间窗口内,检查重复模式。

阈值与灵敏度

  • 循环长度 (Loop Length):检测多长的序列才算循环。
  • 重复次数 (Repetition Count):一个序列重复多少次才被认为是循环。
  • 时间阈值 (Time Threshold):在多长时间内发生的重复才被视为循环。

代码示例:LoopDetector

class LoopDetector:
    """Detects loops in agent's behavior history."""
    def __init__(self,
                 history_tracker: HistoryTracker,
                 min_loop_length: int = 3,       # Minimum length of a sequence to consider as a loop
                 min_repetitions: int = 2,       # Minimum number of times the sequence must repeat
                 time_window_seconds: int = 60): # Time window for detection
        self.history_tracker = history_tracker
        self.min_loop_length = min_loop_length
        self.min_repetitions = min_repetitions
        self.time_window_seconds = time_window_seconds

    def _get_relevant_events(self) -> List[AgentEvent]:
        """Filters events within the time window."""
        now = datetime.datetime.now()
        return [event for event in self.history_tracker.get_history()
                if (now - event.timestamp).total_seconds() <= self.time_window_seconds]

    def _hash_event_sequence(self, sequence: List[AgentEvent]) -> str:
        """Generates a hash for an event sequence, focusing on key attributes."""
        # For simplicity, we'll hash the current_node_id and tool_name/success status
        # A more robust hash would include tool_input/output or full state.
        seq_data = []
        for event in sequence:
            if isinstance(event, NodeVisitEvent):
                seq_data.append(f"NODE:{event.node_id}")
            elif isinstance(event, ToolCallEvent):
                seq_data.append(f"TOOL_CALL:{event.tool_name}")
            elif isinstance(event, ToolResultEvent):
                seq_data.append(f"TOOL_RESULT:{event.tool_name}:{event.success}")
        return hashlib.sha256("->".join(seq_data).encode('utf-8')).hexdigest()

    def detect_node_repetition(self, node_id: str, threshold: int = 5) -> bool:
        """
        Detects if a specific node has been visited too many times recently.
        Args:
            node_id: The ID of the node to check.
            threshold: The maximum allowed visits within the time window.
        Returns:
            True if the node has been visited more than the threshold.
        """
        relevant_events = self._get_relevant_events()
        visit_count = sum(1 for event in relevant_events
                          if isinstance(event, NodeVisitEvent) and event.node_id == node_id)
        return visit_count >= threshold

    def detect_action_sequence_loop(self) -> Optional[List[AgentEvent]]:
        """
        Detects if a sequence of actions (nodes visited, tools called, results) is repeating.
        This is a more sophisticated detection method.
        """
        relevant_events = self._get_relevant_events()
        if len(relevant_events) < self.min_loop_length * self.min_repetitions:
            return None

        # Iterate backwards to find potential repeating sequences
        for i in range(len(relevant_events) - self.min_loop_length, 0, -1):
            for j in range(self.min_loop_length, (len(relevant_events) - i) // 2 + 1):
                # Candidate sequence: events from i to i + j - 1
                candidate_sequence = relevant_events[i : i + j]
                candidate_hash = self._hash_event_sequence(candidate_sequence)

                repetitions = 1
                current_idx = i + j
                while current_idx + j <= len(relevant_events):
                    next_sequence = relevant_events[current_idx : current_idx + j]
                    if self._hash_event_sequence(next_sequence) == candidate_hash:
                        repetitions += 1
                        current_idx += j
                    else:
                        break

                if repetitions >= self.min_repetitions:
                    print(f"Detected loop: sequence repeated {repetitions} times.")
                    return candidate_sequence # Return the repeating sequence

        return None

    def detect_state_loop(self) -> Optional[List[AgentState]]:
        """
        Detects if the agent's state is repeating in a cycle.
        Requires AgentState to be hashable and comparable.
        """
        relevant_events = self._get_relevant_events()
        if len(relevant_events) < self.min_loop_length * self.min_repetitions:
            return None

        state_sequence = [event.agent_state for event in relevant_events]
        if len(state_sequence) < self.min_loop_length * self.min_repetitions:
            return None

        for i in range(len(state_sequence) - self.min_loop_length, 0, -1):
            for j in range(self.min_loop_length, (len(state_sequence) - i) // 2 + 1):
                candidate_state_seq = state_sequence[i : i + j]

                # Check for exact state sequence repetition
                repetitions = 1
                current_idx = i + j
                while current_idx + j <= len(state_sequence):
                    next_state_seq = state_sequence[current_idx : current_idx + j]
                    if candidate_state_seq == next_state_seq: # Uses AgentState.__eq__
                        repetitions += 1
                        current_idx += j
                    else:
                        break

                if repetitions >= self.min_repetitions:
                    print(f"Detected state loop: sequence repeated {repetitions} times.")
                    return candidate_state_seq

        return None

    def is_in_loop(self) -> bool:
        """Checks if the agent is currently in any detected loop."""
        if self.detect_action_sequence_loop() is not None:
            return True
        if self.detect_state_loop() is not None:
            return True
        # You could add node repetition checks here for the current node
        # if self.history_tracker.history:
        #     current_node = self.history_tracker.history[-1].agent_state.current_node_id
        #     if self.detect_node_repetition(current_node, threshold=self.min_repetitions + 1):
        #         return True
        return False

# Example Usage:
# tracker = HistoryTracker(max_history_size=20)
# detector = LoopDetector(tracker, min_loop_length=3, min_repetitions=2, time_window_seconds=300)
#
# # Simulate a loop: A -> B -> C -> A -> B -> C
# for _ in range(3):
#     s_a = AgentState("NodeA", "GoalX")
#     tracker.record_event(NodeVisitEvent(datetime.datetime.now(), s_a, "NodeA"))
#     tracker.record_event(ToolCallEvent(datetime.datetime.now(), s_a, "ToolX", {"val": 1}))
#     tracker.record_event(ToolResultEvent(datetime.datetime.now(), s_a, "ToolX", success=True))
#
#     s_b = AgentState("NodeB", "GoalX")
#     tracker.record_event(NodeVisitEvent(datetime.datetime.now(), s_b, "NodeB"))
#     tracker.record_event(ToolCallEvent(datetime.datetime.now(), s_b, "ToolY", {"val": 2}))
#     tracker.record_event(ToolResultEvent(datetime.datetime.now(), s_b, "ToolY", success=False, error_details={"code": 500})) # Tool error
#
#     s_c = AgentState("NodeC", "GoalX")
#     tracker.record_event(NodeVisitEvent(datetime.datetime.now(), s_c, "NodeC"))
#     tracker.record_event(ToolCallEvent(datetime.datetime.now(), s_c, "ToolZ", {"val": 3}))
#     tracker.record_event(ToolResultEvent(datetime.datetime.now(), s_c, "ToolZ", success=True))
#
#     print(f"Is in loop? {detector.is_in_loop()}")
#
# # Output would eventually be True

3.3 错误分类器 (Error Classifier)

随机报错是死循环的直接原因。区分错误的类型对于选择正确的干预策略至关重要。

目的:根据错误信息(如错误码、消息、堆栈跟踪),将工具报错分类为瞬时错误、持久错误或未知错误。

方法

  1. 基于规则的匹配 (Rule-based Matching):最直接的方法。定义一系列正则表达式或关键字,匹配错误码或错误消息。
    • 瞬时错误:Timeout, Connection Refused, Too Many Requests (429), Internal Server Error (500) (有时是瞬时)。
    • 持久错误:Unauthorized (401), Forbidden (403), Not Found (404), Bad Request (400), Invalid Configuration
  2. 统计分析 (Statistical Analysis):如果某个工具的错误在短时间内反复出现,即使错误码是瞬时的,也可能预示着一个更深层次的持久问题(例如,服务虽然存在,但持续过载)。
  3. 外部知识库/机器学习 (External Knowledge Base/ML):更复杂的系统可以利用历史数据训练模型,自动识别错误模式。

代码示例:ErrorClassifier

class ErrorClassifier:
    """Classifies errors into transient, persistent, or unknown."""
    TRANSIENT_PATTERNS = [
        "timeout", "connection refused", "network error", "service unavailable",
        "too many requests", "rate limit exceeded", "internal server error",
        "500", "502", "503", "504" # Common HTTP transient codes
    ]
    PERSISTENT_PATTERNS = [
        "unauthorized", "forbidden", "not found", "bad request",
        "invalid credentials", "permission denied", "configuration error",
        "400", "401", "403", "404", "405", "406", "409", "410" # Common HTTP persistent codes
    ]

    def classify_error(self, error_details: Dict[str, Any]) -> str:
        """
        Classifies an error based on its details.
        Returns 'TRANSIENT', 'PERSISTENT', or 'UNKNOWN'.
        """
        if not error_details:
            return "UNKNOWN"

        error_message = str(error_details.get("message", "")).lower()
        error_code = str(error_details.get("code", "")).lower()

        # Check for transient patterns
        for pattern in self.TRANSIENT_PATTERNS:
            if pattern in error_message or pattern in error_code:
                return "TRANSIENT"

        # Check for persistent patterns
        for pattern in self.PERSISTENT_PATTERNS:
            if pattern in error_message or pattern in error_code:
                return "PERSISTENT"

        return "UNKNOWN"

    def is_transient(self, error_details: Dict[str, Any]) -> bool:
        return self.classify_error(error_details) == "TRANSIENT"

    def is_persistent(self, error_details: Dict[str, Any]) -> bool:
        return self.classify_error(error_details) == "PERSISTENT"

# Example Usage:
# classifier = ErrorClassifier()
# print(classifier.classify_error({"code": 503, "message": "Service Unavailable"})) # TRANSIENT
# print(classifier.classify_error({"code": 401, "message": "Unauthorized access"})) # PERSISTENT
# print(classifier.classify_error({"message": "Unknown error type"})) # UNKNOWN

3.4 智能干预引擎 (Intelligent Intervention Engine)

这是逃逸机制的决策中心。它根据循环检测和错误分类的结果,选择并执行最合适的逃逸策略。

目的:在Agent陷入死循环时,采取一系列预设的策略来引导Agent脱离困境。

策略类型

  1. 回退与重试 (Backoff & Retry)

    • 适用场景:主要针对瞬时错误。
    • 机制:在每次重试失败后,等待更长的时间间隔(指数退避,Exponential Backoff),并可能引入随机抖动(Jitter)以避免“惊群效应”。
    • 限制:重试次数应有限制,避免无限重试。
  2. 路径探索 (Path Exploration)

    • 适用场景:当Agent在某个节点或路径上反复失败,且没有明确的持久性错误阻止前进时。
    • 机制:强制Agent尝试当前节点的所有未访问过或最近未尝试过的出边(next_node),即使这些出边并非Agent当前最优决策。这可能需要Agent提供一个get_available_paths()get_alternative_actions()接口。
    • 风险:可能导致Agent偏离目标,进入更远的未知状态。
  3. 工具替换 (Tool Substitution)

    • 适用场景:当特定工具持续报错,且存在功能相似的备用工具时。
    • 机制:干预引擎指示Agent调用备用工具。例如,主数据库连接失败,尝试只读副本;主API服务失败,尝试镜像API。
    • 要求:Agent或环境需要预先注册备用工具。
  4. 状态回滚 (State Rollback)

    • 适用场景:当Agent状态因为循环而变得混乱或不一致时。
    • 机制:将Agent的状态恢复到循环发生前的某个已知安全点。这要求Agent的状态是可序列化和可恢复的。
    • 挑战:回滚可能涉及外部副作用的撤销,这通常非常复杂。
  5. 断路器模式 (Circuit Breaker Pattern)

    • 适用场景:针对持续性工具报错。
    • 机制:当某个工具的错误率达到阈值时,断路器会“打开”,阻止Agent继续调用该工具一段时间。在这段时间内,所有对该工具的调用都会立即失败,而不是等待工具实际执行。一段时间后,断路器进入“半开”状态,允许少量请求通过以测试工具是否恢复。
    • 优势:防止Agent对一个已知的故障工具进行徒劳的调用,浪费资源。
  6. 人工介入 (Human Intervention)

    • 适用场景:当所有自动化策略都失败,或问题性质复杂,需要人工判断时。
    • 机制:发出警报(邮件、短信、日志),提供Agent的历史记录和当前状态,请求人工介入处理。Agent可以暂停或进入受限模式。
  7. 紧急停止 (Emergency Stop)

    • 适用场景:系统处于不可恢复的死循环,或可能造成严重破坏时。
    • 机制:强制终止Agent的执行。这是最后的手段。

优先级与组合:干预策略通常会按优先级组合使用,例如:

  1. 瞬时错误:回退与重试。
  2. 多次重试失败后,或检测到持久性错误:断路器模式 + 路径探索。
  3. 上述策略失败,且循环持续:工具替换或状态回滚。
  4. 所有自动化策略穷尽:人工介入或紧急停止。

代码示例:InterventionEngine

class EscapeAction:
    """Represents a proposed action by the escape mechanism."""
    def __init__(self, action_type: str, details: Dict[str, Any] = None):
        self.action_type = action_type # e.g., "RETRY_WITH_BACKOFF", "EXPLORE_PATH", "CIRCUIT_BREAKER_OPEN", "HUMAN_INTERVENTION", "STOP"
        self.details = details if details is not None else {}

    def __repr__(self):
        return f"EscapeAction(type='{self.action_type}', details={self.details})"

class InterventionEngine:
    """Decides and suggests escape actions based on detected loops and errors."""
    def __init__(self,
                 error_classifier: ErrorClassifier,
                 max_retries_on_transient: int = 3,
                 circuit_breaker_threshold: int = 5, # Consecutive failures
                 circuit_breaker_timeout_sec: int = 30):
        self.error_classifier = error_classifier
        self.max_retries_on_transient = max_retries_on_transient
        self.circuit_breaker_threshold = circuit_breaker_threshold
        self.circuit_breaker_timeout_sec = circuit_breaker_timeout_sec
        self.tool_failure_counts: Dict[str, int] = collections.defaultdict(int)
        self.circuit_breakers: Dict[str, "CircuitBreaker"] = {} # Tool name -> CircuitBreaker instance

    def _get_circuit_breaker(self, tool_name: str) -> "CircuitBreaker":
        if tool_name not in self.circuit_breakers:
            self.circuit_breakers[tool_name] = CircuitBreaker(
                failure_threshold=self.circuit_breaker_threshold,
                reset_timeout_seconds=self.circuit_breaker_timeout_sec
            )
        return self.circuit_breakers[tool_name]

    def reset_tool_failure_count(self, tool_name: str):
        self.tool_failure_counts[tool_name] = 0
        if tool_name in self.circuit_breakers:
            self.circuit_breakers[tool_name].reset()

    def propose_escape_action(self,
                              agent_state: AgentState,
                              last_tool_result: Optional[ToolResultEvent],
                              is_loop_detected: bool,
                              available_paths: List[str] = None) -> List[EscapeAction]:
        """
        Proposes a list of ordered escape actions.
        Args:
            agent_state: Current state of the agent.
            last_tool_result: The result of the last tool call, or None if no tool was called.
            is_loop_detected: True if a loop has been detected by the LoopDetector.
            available_paths: A list of alternative node IDs or path identifiers the agent can take.
        Returns:
            A list of EscapeAction objects, ordered by priority.
        """
        actions: List[EscapeAction] = []

        # 1. Handle direct tool failures
        if last_tool_result and not last_tool_result.success:
            tool_name = last_tool_result.tool_name
            error_details = last_tool_result.error_details
            error_type = self.error_classifier.classify_error(error_details)

            cb = self._get_circuit_breaker(tool_name)
            cb.record_failure() # Record failure for circuit breaker

            if cb.is_open():
                print(f"Circuit breaker for {tool_name} is OPEN. Suggesting alternative/stop.")
                actions.append(EscapeAction("CIRCUIT_BREAKER_OPEN", {"tool_name": tool_name}))
                # If CB is open, don't try this tool. Suggest path exploration or human intervention.
            elif error_type == "TRANSIENT":
                self.tool_failure_counts[tool_name] += 1
                if self.tool_failure_counts[tool_name] <= self.max_retries_on_transient:
                    actions.append(EscapeAction("RETRY_WITH_BACKOFF", {"tool_name": tool_name, "attempt": self.tool_failure_counts[tool_name]}))
                else:
                    # Too many transient failures, treat as persistent for this cycle
                    print(f"Tool {tool_name} had too many transient failures. Escalating.")
                    actions.append(EscapeAction("CONSIDER_PERSISTENT_ERROR", {"tool_name": tool_name}))
            elif error_type == "PERSISTENT":
                actions.append(EscapeAction("PERSISTENT_ERROR", {"tool_name": tool_name, "details": error_details}))
                # For persistent errors, immediately suggest alternative paths or tool replacement

            # If circuit breaker is not open yet, but is about to open (threshold reached)
            if cb.is_half_open() and not actions: # If no other specific action yet
                actions.append(EscapeAction("TEST_TOOL_HALF_OPEN", {"tool_name": tool_name}))

        # 2. If a loop is detected or previous actions failed to break it
        if is_loop_detected or not actions: # if no specific error action, but loop exists
            # Prioritize exploring new paths if available
            if available_paths:
                actions.append(EscapeAction("EXPLORE_PATH", {"current_node": agent_state.current_node_id, "alternatives": available_paths}))

            # Suggest tool substitution if applicable (requires external mapping)
            # Example: if tool_name in tool_substitution_map:
            #   actions.append(EscapeAction("TOOL_SUBSTITUTION", {"original": tool_name, "substitute": tool_substitution_map[tool_name]}))

            # If agent state is trackable and reversible, consider rollback
            # actions.append(EscapeAction("STATE_ROLLBACK", {"target_state_hash": "previous_safe_hash"}))

            # As a last resort for automated actions
            if not actions and is_loop_detected: # If no other specific action and loop is detected
                actions.append(EscapeAction("HUMAN_INTERVENTION", {"reason": "Persistent loop detected, automated strategies exhausted."}))
                actions.append(EscapeAction("EMERGENCY_STOP", {"reason": "Unresolvable loop, potential resource drain."}))

        return actions

class CircuitBreaker:
    """Implements the Circuit Breaker pattern."""
    STATE_CLOSED = "CLOSED"
    STATE_OPEN = "OPEN"
    STATE_HALF_OPEN = "HALF_OPEN"

    def __init__(self, failure_threshold: int = 5, reset_timeout_seconds: int = 30):
        self.failure_threshold = failure_threshold
        self.reset_timeout_seconds = reset_timeout_seconds
        self.state = self.STATE_CLOSED
        self.failures = 0
        self.last_failure_time: Optional[datetime.datetime] = None

    def record_failure(self):
        if self.state == self.STATE_CLOSED:
            self.failures += 1
            self.last_failure_time = datetime.datetime.now()
            if self.failures >= self.failure_threshold:
                self.open()
        elif self.state == self.STATE_HALF_OPEN:
            # If a test request fails in half-open, go back to open
            self.open()

    def record_success(self):
        if self.state == self.STATE_HALF_OPEN:
            self.reset() # If success in half-open, close the breaker
        elif self.state == self.STATE_CLOSED:
            self.failures = 0 # Reset failures on success

    def open(self):
        self.state = self.STATE_OPEN
        self.last_failure_time = datetime.datetime.now()
        print(f"Circuit Breaker OPENED. Failures: {self.failures}")

    def half_open(self):
        self.state = self.STATE_HALF_OPEN
        print("Circuit Breaker HALF_OPEN.")

    def reset(self):
        self.state = self.STATE_CLOSED
        self.failures = 0
        self.last_failure_time = None
        print("Circuit Breaker CLOSED.")

    def is_open(self) -> bool:
        if self.state == self.STATE_OPEN:
            if (datetime.datetime.now() - self.last_failure_time).total_seconds() > self.reset_timeout_seconds:
                self.half_open()
                return False # Allow one test request
            return True
        return False

    def is_half_open(self) -> bool:
        return self.state == self.STATE_HALF_OPEN

    def allows_request(self) -> bool:
        """Determines if a request is allowed to pass through the breaker."""
        if self.state == self.STATE_CLOSED:
            return True
        elif self.state == self.STATE_HALF_OPEN:
            return True # Allow a single test request
        elif self.state == self.STATE_OPEN:
            if (datetime.datetime.now() - self.last_failure_time).total_seconds() > self.reset_timeout_seconds:
                self.half_open()
                return True # Allow the first request after timeout to be a test
            return False
        return False # Default to not allowing

# Example Usage:
# cb = CircuitBreaker(failure_threshold=3, reset_timeout_seconds=5)
# print(f"Initial state: {cb.state}")
# for i in range(5):
#     if not cb.allows_request():
#         print(f"Request blocked by CB at attempt {i+1}")
#         continue
#     # Simulate tool call
#     if i < 3: # First 3 fail
#         cb.record_failure()
#         print(f"Tool failed. CB state: {cb.state}")
#     else: # Then success
#         cb.record_success()
#         print(f"Tool succeeded. CB state: {cb.state}")
#
# # Output would show it opening and eventually half-opening after timeout.

3.5 策略管理器 (Policy Manager)

为了使逃逸机制通用且可配置,我们需要一个策略管理器来集中管理各种参数和行为。

目的:提供一个统一的接口,配置循环检测阈值、错误分类规则、干预策略的优先级和参数。

可配置性

  • 动态调整max_history_size
  • 配置min_loop_length, min_repetitions, time_window_seconds
  • 更新TRANSIENT_PATTERNS, PERSISTENT_PATTERNS
  • 修改max_retries_on_transient, circuit_breaker_threshold, circuit_breaker_timeout_sec
  • 定义工具替换映射。
  • 配置人工介入的通知方式。

代码示例:EscapePolicyManager

class EscapePolicyManager:
    """Manages all configurable policies and parameters for the escape mechanism."""
    def __init__(self):
        self.config = {
            "history_tracker": {
                "max_history_size": 100
            },
            "loop_detector": {
                "min_loop_length": 3,
                "min_repetitions": 2,
                "time_window_seconds": 60
            },
            "error_classifier": {
                "transient_patterns": ErrorClassifier.TRANSIENT_PATTERNS,
                "persistent_patterns": ErrorClassifier.PERSISTENT_PATTERNS
            },
            "intervention_engine": {
                "max_retries_on_transient": 3,
                "circuit_breaker_threshold": 5,
                "circuit_breaker_timeout_sec": 30,
                "tool_substitution_map": {} # e.g., {"primary_db": "read_replica_db"}
            }
        }

    def update_config(self, new_config: Dict[str, Any]):
        """Recursively updates the configuration."""
        def _update(d, u):
            for k, v in u.items():
                if isinstance(v, collections.abc.Mapping):
                    d[k] = _update(d.get(k, {}), v)
                else:
                    d[k] = v
            return d
        self.config = _update(self.config, new_config)
        print("Escape policies updated.")

    def get_config(self) -> Dict[str, Any]:
        return self.config

# Example Usage:
# policy_manager = EscapePolicyManager()
# new_settings = {
#     "loop_detector": {
#         "min_loop_length": 2,
#         "time_window_seconds": 120
#     },
#     "intervention_engine": {
#         "max_retries_on_transient": 5
#     }
# }
# policy_manager.update_config(new_settings)
# print(policy_manager.get_config())

4. 集成:通用逃逸机制框架

现在我们将所有组件整合到一个通用的逃逸机制框架中。

4.1 架构设计

逃逸机制应该作为一个独立的、可插拔的模块存在,与Agent的核心业务逻辑解耦。它通过观察Agent的事件流来工作。

Agent接口与工具封装
为了实现通用性,我们假设Agent和Tool都遵循一定的接口规范。

  • AbstractAgent
    • get_current_state() -> AgentState:返回Agent的当前状态。
    • execute_action(action: Any) -> Optional[ToolResultEvent]:执行一个动作,可能涉及工具调用,并返回工具结果。
    • get_available_paths() -> List[str]:返回Agent可以探索的替代路径或节点ID。
    • apply_escape_action(action: EscapeAction): Agent接收逃逸机制的指令并执行。
  • AbstractTool
    • execute(input: Dict[str, Any]) -> Dict[str, Any]:执行工具的业务逻辑。
    • 内部封装错误处理,或直接抛出异常,由外部捕获并转换为ToolResultEvent

4.2 机制的运行流程

  1. Agent执行一步: Agent根据其内部逻辑选择一个动作(例如,移动到新节点,调用某个工具)。
  2. 记录历史: 在Agent执行动作之前或之后,HistoryTracker记录NodeVisitEventToolCallEvent
  3. 工具调用与结果: 如果动作涉及工具调用,Agent执行工具。无论成功或失败,HistoryTracker记录ToolResultEvent
  4. 循环/错误检测: LoopDetector检查历史记录,判断是否存在循环;ErrorClassifier分析最新的工具错误。
  5. 干预决策: InterventionEngine根据LoopDetectorErrorClassifier的输出,以及Agent的当前状态,提出一系列EscapeAction
  6. 执行干预: Agent接收到EscapeAction后,根据指令调整其行为。这可能意味着:
    • 等待(Backoff)。
    • 重试。
    • 选择一个非最优但可行的替代路径。
    • 停止使用某个故障工具(Circuit Breaker)。
    • 回滚状态。
    • 发出警报并暂停。
  7. Agent恢复执行或终止: Agent在执行完干预动作后,尝试继续其任务,或者如果被指示,则终止。

表格:组件职责与交互

组件 主要职责 输入 输出 依赖组件
Agent 业务逻辑,行动执行,状态管理 EscapeAction AgentState, ToolResultEvent EscapeMechanismOrchestrator
Tool 提供具体功能,执行操作 Agent的输入 工具输出或错误 Agent
HistoryTracker 记录Agent所有行为和事件 AgentEvent List[AgentEvent]
LoopDetector 分析历史,检测循环模式 List[AgentEvent] (from Tracker) bool (is_in_loop), Optional[List[AgentEvent]] HistoryTracker
ErrorClassifier 分类工具错误 Dict[str, Any] (error_details) str (error_type)
InterventionEngine 决策逃逸策略 AgentState, ToolResultEvent, bool (is_loop), List[str] (paths) List[EscapeAction] ErrorClassifier
EscapePolicyManager 管理所有配置和策略参数 Dict[str, Any] (new config) Dict[str, Any] (current config)
EscapeMechanismOrchestrator 协调所有组件,驱动流程 Agent的事件流 EscapeAction 所有其他逃逸机制组件,Agent

4.3 EscapeMechanismOrchestrator

这个类是整个逃逸机制的协调者,它将所有组件连接起来,并提供给Agent一个统一的接口。

import time
import random
import abc

# Assuming AgentState, AgentEvent classes, HistoryTracker, LoopDetector, ErrorClassifier,
# InterventionEngine, EscapeAction, EscapePolicyManager, CircuitBreaker are defined as above.

# --- Agent and Tool Interfaces for integration ---
class AbstractAgent(abc.ABC):
    """Abstract base class for an Agent that can be monitored and controlled."""
    @abc.abstractmethod
    def get_current_state(self) -> AgentState:
        pass

    @abc.abstractmethod
    def execute_action(self, action_name: str, action_params: Dict[str, Any]) -> Optional[ToolResultEvent]:
        """
        Executes an action, which might involve calling a tool.
        Returns ToolResultEvent or None if no tool was called/result recorded directly.
        """
        pass

    @abc.abstractmethod
    def get_available_paths(self) -> List[str]:
        """Returns a list of alternative node IDs or paths the agent can take."""
        pass

    @abc.abstractmethod
    def apply_escape_action(self, action: EscapeAction):
        """Agent applies the escape action suggested by the mechanism."""
        pass

    @abc.abstractmethod
    def is_goal_reached(self) -> bool:
        """Checks if the agent has reached its goal."""
        pass

    @abc.abstractmethod
    def is_terminated(self) -> bool:
        """Checks if the agent has terminated (e.g., due to emergency stop)."""
        pass

class AbstractTool(abc.ABC):
    """Abstract base class for a Tool used by the Agent."""
    @abc.abstractmethod
    def execute(self, input_params: Dict[str, Any]) -> Dict[str, Any]:
        """Executes the tool logic. Should raise exceptions on failure."""
        pass

# --- EscapeMechanismOrchestrator ---
class EscapeMechanismOrchestrator:
    """
    Coordinates the escape mechanism components and interacts with the Agent.
    """
    def __init__(self, policy_manager: EscapePolicyManager):
        self.policy_manager = policy_manager
        config = self.policy_manager.get_config()

        self.history_tracker = HistoryTracker(
            max_history_size=config["history_tracker"]["max_history_size"]
        )
        self.error_classifier = ErrorClassifier()
        self.error_classifier.TRANSIENT_PATTERNS = config["error_classifier"]["transient_patterns"]
        self.error_classifier.PERSISTENT_PATTERNS = config["error_classifier"]["persistent_patterns"]

        self.loop_detector = LoopDetector(
            history_tracker=self.history_tracker,
            min_loop_length=config["loop_detector"]["min_loop_length"],
            min_repetitions=config["loop_detector"]["min_repetitions"],
            time_window_seconds=config["loop_detector"]["time_window_seconds"]
        )
        self.intervention_engine = InterventionEngine(
            error_classifier=self.error_classifier,
            max_retries_on_transient=config["intervention_engine"]["max_retries_on_transient"],
            circuit_breaker_threshold=config["intervention_engine"]["circuit_breaker_threshold"],
            circuit_breaker_timeout_sec=config["intervention_engine"]["circuit_breaker_timeout_sec"]
        )
        # Add tool substitution map to intervention engine if needed
        self.intervention_engine.tool_substitution_map = config["intervention_engine"].get("tool_substitution_map", {})

        self._last_tool_call_timestamp: Dict[str, datetime.datetime] = {}
        self._retry_delays: Dict[str, float] = collections.defaultdict(lambda: 0.1) # Initial delay for backoff

    def record_agent_event(self, event: AgentEvent):
        """Records any event from the agent."""
        self.history_tracker.record_event(event)
        if isinstance(event, ToolResultEvent) and event.success:
            self.intervention_engine.reset_tool_failure_count(event.tool_name)
            self._retry_delays[event.tool_name] = 0.1 # Reset delay on success
        elif isinstance(event, ToolCallEvent):
            self._last_tool_call_timestamp[event.tool_name] = event.timestamp

    def get_backoff_delay(self, tool_name: str, attempt: int) -> float:
        """Calculates exponential backoff delay with jitter."""
        base_delay = self._retry_delays[tool_name]
        delay = base_delay * (2 ** (attempt - 1)) + random.uniform(0, 0.1 * base_delay)
        self._retry_delays[tool_name] = delay # Store for next attempt
        return min(delay, 60) # Cap at 60 seconds to prevent excessively long delays

    def check_and_intervene(self, agent: AbstractAgent, last_tool_result: Optional[ToolResultEvent]) -> bool:
        """
        Checks for loops/errors and proposes/applies interventions.
        Returns True if an intervention was applied and agent's state might have changed.
        """
        current_agent_state = agent.get_current_state()
        is_loop_detected = self.loop_detector.is_in_loop()
        available_paths = agent.get_available_paths()

        proposed_actions = self.intervention_engine.propose_escape_action(
            agent_state=current_agent_state,
            last_tool_result=last_tool_result,
            is_loop_detected=is_loop_detected,
            available_paths=available_paths
        )

        if not proposed_actions:
            return False # No intervention needed

        # Apply the highest priority action
        action_applied = False
        for action in proposed_actions:
            print(f"Applying escape action: {action}")
            if action.action_type == "RETRY_WITH_BACKOFF":
                tool_name = action.details["tool_name"]
                attempt = action.details["attempt"]
                delay = self.get_backoff_delay(tool_name, attempt)
                print(f"Backing off for {delay:.2f} seconds before retrying {tool_name} (attempt {attempt}).")
                time.sleep(delay)
                # Agent's next action should be a retry
                agent.apply_escape_action(action) # Inform agent to retry
                action_applied = True
                break
            elif action.action_type == "EXPLORE_PATH":
                if available_paths:
                    # Choose a path that hasn't been tried recently or is not part of the detected loop
                    # For simplicity, pick a random one here. A real implementation would be smarter.
                    chosen_path = random.choice(available_paths)
                    print(f"Exploring alternative path: {chosen_path}")
                    agent.apply_escape_action(EscapeAction("SET_NEXT_NODE", {"node_id": chosen_path}))
                    action_applied = True
                    break
            elif action.action_type == "CIRCUIT_BREAKER_OPEN":
                tool_name = action.details["tool_name"]
                print(f"Tool '{tool_name}' circuit breaker is OPEN. Agent should avoid this tool.")
                # Agent needs to be instructed to avoid this tool for a while
                agent.apply_escape_action(action)
                action_applied = True
                break
            elif action.action_type == "HUMAN_INTERVENTION":
                print("--- ALERT: Human Intervention Required! ---")
                print(f"Reason: {action.details.get('reason', 'Unknown')}")
                print(f"Current Agent State: {current_agent_state.to_dict()}")
                print(f"History: {[e.to_dict() for e in self.history_tracker.get_last_n_events(10)]}")
                # In a real system, send email/SMS, log to monitoring system
                agent.apply_escape_action(action) # Agent might pause or await human input
                action_applied = True
                break
            elif action.action_type == "EMERGENCY_STOP":
                print("--- CRITICAL: Emergency Stop Initiated! ---")
                print(f"Reason: {action.details.get('reason', 'Unresolvable issue.')}")
                agent.apply_escape_action(action) # Agent should terminate
                action_applied = True
                break
            # Add other action types (TOOL_SUBSTITUTION, STATE_ROLLBACK etc.)

        return action_applied

5. 高级考量与优化

构建一个通用的逃逸机制并非一劳永逸,还需要考虑以下高级问题:

  • 性能开销HistoryTracker 的内存占用,LoopDetector 的计算复杂度。需要平衡历史深度和检测速度。哈希化和定长队列是常用的优化手段。
  • 误报与漏报:过于敏感的循环检测可能导致不必要的干预(误报),而不够敏感则可能让Agent在循环中运行太久(漏报)。需要通过调整阈值和多维度检测来平衡。
  • Agent的认知能力:如果Agent本身具备对图结构、工具能力和自身目标的更深层次理解,它可以与逃逸机制进行更智能的协作,例如,主动报告“我遇到了困难”,或根据机制建议更精确地选择替代路径。
  • 分布式Agent:在多个Agent协同工作的场景下,逃逸机制需要能够跨Agent协调。一个Agent的死循环可能影响其他Agent,甚至整个系统。这需要一个中心化的逃逸服务或Agent间的协调协议。
  • 学习与适应:通过强化学习,Agent或逃逸机制可以学习哪些干预策略在特定情况下最有效。例如,记录每次循环发生时的Agent状态、干预策略和结果,然后训练模型预测最佳干预措施。
  • 可视化与监控:提供实时的仪表板,展示Agent的当前状态、历史路径、工具错误率、循环检测状态以及干预措施。这对于调试和人工介入至关重要。
  • 幂等性与事务:状态回滚和重试操作需要Agent和其调用的工具具备幂等性(重复执行不会产生额外副作用)和事务性(操作要么全部成功,要么全部失败),这在分布式系统中是巨大的挑战。

6. 实践案例与代码演练

让我们通过一个简化的模拟Agent来演示如何集成这些组件并触发逃逸机制。

假设我们有一个Agent,它的目标是从start_node移动到end_node。在途中,它可能需要调用工具。我们将模拟一个环形图,其中一个工具会随机失败。


# Re-import necessary modules in case this is run standalone
import collections
import datetime
import hashlib
import json
import time
import random
import abc
from typing import Dict, Any, List, Optional, Callable

# Assume all previous classes (AgentState, AgentEvent, NodeVisitEvent, ToolCallEvent, ToolResultEvent,
# HistoryTracker, LoopDetector, ErrorClassifier, EscapeAction, InterventionEngine, CircuitBreaker,
# EscapePolicyManager, AbstractAgent, AbstractTool) are defined and available here.
# For brevity, I'll just include the custom Agent and Tool for the simulation.

# --- Custom Agent Implementation for Simulation ---
class MyAgent(AbstractAgent):
    def __init__(self, agent_id: str, start_node: str, end_node: str, graph: Dict[str, List[str]], tools: Dict[str, AbstractTool]):
        self.agent_id = agent_id
        self.current_node = start_node
        self.goal_node = end_node
        self.graph = graph # Adjacency list representation: {node_id: [neighbor_node_ids]}
        self.tools = tools
        self.path_history: List[str] = [start_node]
        self.current_goal = "Reach " + end_node
        self._terminated = False
        self._next_forced_node: Optional[str] = None # For EXPLORE_PATH action
        self._avoid_tools: collections.deque[str] = collections.deque(maxlen=5) # Tools to avoid temporarily
        self._retry_tool_name: Optional[str] = None # Tool name to retry after backoff

    def get_current_state(self) -> AgentState:
        return AgentState(self.current_node, self.current_goal, path=self.path_history[-5:])

    def execute_action(self, action_name: str, action_params: Dict[str, Any]) -> Optional[ToolResultEvent]:
        current_state = self.get_current_state()
        if action_name == "MOVE":
            next_node = action_params["node_id"]
            if next_node in self.graph.get(self.current_node, []):
                self.current_node = next_node
                self.path_history.append(next_node)
                print(f"Agent {self.agent_id} moved to node: {self.current_node}")
                return None # No tool result for simple move
            else:
                print(f"Agent {self.agent_id} tried to move to invalid node: {next_node}")
                # This could be a tool result indicating failure of pathfinding/move
                return ToolResultEvent(datetime.datetime.now(), current_state, "move_tool", success=False, error_details={"code": 400, "message": "Invalid move"})
        elif action_name == "CALL_TOOL":
            tool_name = action_params["tool_name"]
            tool_input = action_params["tool_input"]

            if tool_name in self._avoid_tools:
                print(f"Agent {self.agent_id} tried to call avoided tool {tool_name}. Skipping.")
                return ToolResultEvent(datetime.datetime.now(), current_state, tool_name, success=False, error_details={"code": 503, "message": f"Tool {tool_name} temporarily avoided by escape mechanism"})

            if tool_name not in self.tools:
                print(f"Agent {self.agent_id} tried to call unknown tool: {tool_name}")
                return ToolResultEvent(datetime.datetime.now(), current_state, tool_name, success=False, error_details={"code": 404, "message": "Tool not found"})

            print(f"Agent {self.agent_id} calling tool: {tool_name} with {tool_input}")
            try:
                tool_output = self.tools[tool_name].execute(tool_input)
                return ToolResultEvent(datetime.datetime.now(), current_state, tool_name, tool_output=tool_output, success=True)
            except Exception as e:
                print(f"Agent {self.agent_id} tool {tool_name} failed: {e}")
                return ToolResultEvent(datetime.datetime.now(), current_state, tool_name, success=False, error_details={"code": 500, "message": str(e)})
        else:
            print(f"Agent {self.agent_id} received unknown action: {action_name}")
            return None

    def get_available_paths(self) -> List[str]:
        """Returns neighbor nodes as available paths for exploration."""
        return self.graph.get(self.current_node, [])

    def apply_escape_action(self, action: EscapeAction):
        """Agent adjusts its behavior based on escape action."""
        if action.action_type == "SET_NEXT_NODE":
            self._next_forced_node = action.details["node_id"]
            print(f"Agent {self.agent_id} forced to consider node: {self._next_forced_node}")
        elif action.action_type == "CIRCUIT_BREAKER_OPEN":
            tool_name = action.details["tool_name"]
            self._avoid_tools.append(tool_name)
            print(f"Agent {self.agent_id} instructed to avoid tool: {tool_name}")
        elif action.action_type == "RETRY_WITH_BACKOFF":
            self._retry_tool_name = action.details["tool_name"]
            print(f"Agent {self.agent_id} instructed to retry tool {self._retry_tool_name} after delay.")
        elif action.action_type == "HUMAN_INTERVENTION":
            print(f"Agent {self.agent_id} pausing for human intervention.")
            # In a real system, Agent might enter a suspended state.
            pass
        elif action.action_type == "EMERGENCY_STOP":
            self._terminated = True
            print(f"Agent {self.agent_id} terminated by emergency stop.")
        else:
            print(f"Agent {self.agent_id} received unhandled escape action: {action.action_type}")

    def is_goal_reached(self) -> bool:
        return self.current_node == self.goal_node

    def is_terminated(self) -> bool:
        return self._terminated

    def decide_next_step(self) -> Tuple[str, Dict[str, Any]]:
        """Agent's internal logic to decide the next action."""
        # If forced to a node by escape mechanism
        if self._next_forced_node:
            node_to_visit = self._next_forced_node
            self._next_forced_node = None # Consume the forced action
            return "MOVE", {"node_id": node_to_visit}

        # If instructed to retry a tool
        if self._retry_tool_name:
            tool_name = self._retry_tool_name
            self._retry_tool_name = None # Consume the retry instruction
            return "CALL_TOOL", {"tool_name": tool_name, "tool_input": {"data": f"retry_data_{tool_name}"}}

        # Default Agent Logic:
        # 1. If at the goal, do nothing.
        if self.is_goal_reached():
            return "IDLE", {}

        # 2. Try to move towards the goal (simplified: just pick any neighbor for now)
        neighbors = self.graph.get(self.current_node, [])
        if neighbors:
            # Simple logic: prioritize moving away from current if it's not the goal
            # Or just randomly pick a neighbor
            next_node_options = [n for n in neighbors if n != self.current_node]
            if next_node_options:
                next_node = random.choice(next_node_options)
                # Assume moving to NodeB requires ToolB. Move to NodeC requires ToolC.
                # Here we simulate specific tool calls for specific nodes.
                if next_node == "NodeB":
                    return "CALL_TOOL", {"tool_name": "ToolB", "tool_input": {"node_data": next_node}}
                elif next_node == "NodeC":
                    return "CALL_TOOL", {"tool_name": "ToolC", "tool_input": {"node_data": next_node}}
                else:
                    return "MOVE", {"node_id": next_node}
            else:
                 # If no other options, maybe call a generic tool or stay
                 return "CALL_TOOL", {"tool_name": "GenericProcessTool", "tool_input": {"node_data": self.current_node}}

        # If no neighbors or goal not reached, try a generic processing tool
        return "CALL_TOOL", {"tool_name": "GenericProcessTool", "tool_input": {"node_data": self.current_node}}

# --- Custom Tool Implementation for Simulation ---
class MyTool(AbstractTool):
    def __init__(self, name: str, fail_probability: float = 0.3, is_critical: bool = False):
        self.name = name
        self.fail_probability = fail_probability
        self.is_critical = is_critical # If critical, more likely to cause loops if it fails

    def execute(self, input_params: Dict[str, Any]) -> Dict[str, Any]:
        print(f"  Executing tool '{self.name}' with input: {input_params}")
        if random.random() < self.fail_probability:
            if self.is_critical:
                raise Exception(f"Critical tool '{self.name}' failed randomly (simulated transient error).")
            else:
                raise Exception(f"Tool '{self.name}' failed randomly (simulated error).")
        return {"status": "success", "tool_name": self.name, "processed_data": input_params}

# --- Main Simulation Loop ---
def run_simulation():
    # 1. Setup Environment
    # A circular graph: A -> B -> C -> A
    graph = {
        "NodeA": ["NodeB"],
        "NodeB": ["NodeC"],
        "NodeC": ["NodeA", "NodeD"], # NodeD is the escape path
        "NodeD": ["NodeE"],
        "NodeE": [] # Goal node
    }

    # Define Tools
    tools = {
        "ToolB": MyTool("ToolB", fail_probability=0.6, is_critical=True), # High failure rate tool
        "ToolC": MyTool("ToolC", fail_probability=0.1),
        "GenericProcessTool": MyTool("GenericProcessTool", fail_probability=0.2),
    }

    # Agent
    agent = MyAgent("Agent001", "NodeA", "NodeE", graph, tools)

    # Escape Mechanism Setup
    policy_manager = EscapePolicyManager()
    policy_manager.update_config({
        "history_tracker": {"max_history_size": 30},
        "loop_detector": {"min_loop_length": 3, "min_repetitions": 2, "time_window_seconds": 120},
        "error_classifier": {
            "transient_patterns": ["failed randomly", "critical tool"], # Custom patterns for simulation
            "persistent_patterns": ["unknown tool", "invalid move"]
        },
        "intervention_engine": {
            "max_retries_on_transient": 2,
            "circuit_breaker_threshold": 3, # Lower threshold for faster CB open
            "circuit_breaker_timeout_sec": 10
        }
    })
    orchestrator = EscapeMechanismOrchestrator(policy_manager)

    print("--- Simulation Started ---")
    step_count = 0
    max_steps = 100 # Prevent truly infinite loops in simulation

    while not agent.is_goal_reached() and not agent.is_terminated() and step_count < max_steps:
        step_count += 1
        print(f"n--- Step {step_count} ---")
        current_agent_state = agent.get_current_state()
        orchestrator.record_agent_event(NodeVisitEvent(datetime.datetime.now(), current_agent_state, current_agent_state.current_node_id))

        # Agent decides what to do next
        action_name, action_params = agent.decide_next_step()
        print(f"Agent decided to: {action_name} {action_params}")

        last_tool_result = None
        if action_name != "IDLE":
            last_tool_result = agent.execute_action(action_name, action_params)
            if last_tool_result:
                orchestrator.record_agent_event(last_tool_result)

        # Escape Mechanism checks and intervenes
        intervened = orchestrator.check_and_intervene(agent, last_tool_result)

        # If agent terminated by intervention, break loop
        if agent.is_terminated():
            break

        # If agent was instructed to backoff, it would have slept in orchestrator.
        # Otherwise, a small delay to simulate real-world processing.
        if not intervened:
            time.sleep(0.1) # Simulate some processing time

    print("n--- Simulation Finished ---")
    if agent.is_goal_reached():
        print(f"Agent {agent.agent_id} reached goal '{agent.goal_node}' in {step_count} steps.")
    elif agent.is_terminated():
        print(f"Agent {agent.agent_id} terminated by escape mechanism.")
    else:
        print(f"Agent {agent.agent_id} did not reach goal or terminate within {max_steps} steps.")

    print("Final Agent State:", agent.get_current_state().to_dict())
    print("Last 10 Events:")
    for event in orchestrator.history_tracker.get_last_n_events(10):
        print(event.to_dict

发表回复

您的邮箱地址不会被公开。 必填项已用 * 标注