什么是 ‘The Self-Model Node’:设计一个专门维护 Agent“自我意识”与“能力边界描述”的核心节点

各位编程专家、AI架构师以及对智能系统未来充满好奇的朋友们,大家好!

今天,我们齐聚一堂,探讨一个在当前AI领域日益凸显,且具有深远意义的核心议题:如何赋予AI系统更深层次的“自我”认知。我们知道,现代AI模型在完成特定任务时展现出了惊人的能力,无论是自然语言处理、图像识别还是复杂决策,它们都取得了里程碑式的进展。然而,这些系统在智力层面上仍存在一个显著的空白:它们缺乏对自身能力、状态和局限性的内在理解,也就是我们常说的“自我意识”与“元认知”。

1. AI自我认知的缺失:一个日益紧迫的问题

想象一下,一个顶尖的医生,拥有海量的医学知识和丰富的诊断经验,但如果他不知道自己是否处于疲劳状态、不了解自己擅长哪类疾病、不清楚自己处理多任务的极限,甚至无法判断自己所给出的建议是否基于最新的研究,那么他的判断力将大打折扣,甚至可能造成严重后果。

当前的AI系统也面临着类似的问题:

  • 幻觉与过度自信: 大型语言模型(LLMs)常常会生成听起来合理但实际上错误或捏造的信息(幻觉),并且在呈现这些信息时显得异常自信,因为它不“知道”自己在“瞎编”。
  • 能力边界模糊: Agent在执行任务时,很难主动识别出哪些任务超出了其当前的能力范围,导致无效尝试、资源浪费,甚至系统崩溃。
  • 缺乏有效自省: 当任务失败时,AI系统往往难以进行深层次的归因分析,无法理解是自身模型缺陷、数据不足、工具使用不当还是环境变化导致的问题。
  • 不透明性与可信度: 缺乏对自身运作方式的明确描述,使得用户和开发者难以理解AI的决策过程,降低了系统的可信赖性。
  • 安全与伦理风险: 不知道自身限制的AI,可能会在无意中执行有害操作,或违反预设的伦理准则。

为了解决这些根本性问题,我们需要为Agent设计一个专门的机制,使其能够持续地维护和更新对自身的描述。这,就是我们今天讲座的主题——“The Self-Model Node”

2. The Self-Model Node:核心理念与设计哲学

“The Self-Model Node”是一个Agent架构中的核心组件,其设计目标是成为Agent内部关于“我是谁”、“我能做什么”、“我正在做什么”、“我的限制是什么”以及“我如何学习和演进”的唯一且权威的信息源。它不仅仅是一个静态的配置文件,而是一个动态、可查询、可更新、且具备自省能力的模块。

2.1. 核心原则

  1. 内省性(Introspection):Agent能够查询自身的当前状态、已安装的能力、历史表现等信息。这类似于人类的自我反思。
  2. 能力边界描述(Capability Boundary Description):明确、结构化地定义Agent所拥有的各项技能(Capabilities)、可使用的工具(Tools)、可访问的知识库(Knowledge Bases),以及它们各自的参数、性能指标和潜在的失败模式。
  3. 局限性映射(Limitation Mapping):同样重要,Self-Model Node必须清晰地记录Agent的已知局限性,包括但不限于:
    • 计算资源限制(CPU、内存、并发任务数)
    • 数据访问权限与隐私限制
    • 模型自身的已知偏差(Bias)
    • 知识时效性(Knowledge Cut-off)
    • 伦理与安全准则
  4. 动态适应性(Dynamic Adaptability):Self-Model Node不是一成不变的。随着Agent的学习、经验积累、新工具的集成或旧能力的退役,它必须能够实时更新自身的状态和描述。
  5. 一致性与完整性(Consistency & Integrity):确保Self-Model Node中的信息是准确、最新且无冲突的。需要有机制来验证更新,并处理潜在的不一致。
  6. 可访问性与互操作性(Accessibility & Interoperability):作为Agent的核心服务,Self-Model Node必须提供清晰的API,供Agent的其他模块(如规划器、执行器、学习器)进行查询和更新。

2.2. 类比人类的元认知

我们可以将Self-Model Node类比为人类大脑中的前额叶皮层,它负责:

  • 自我监控:了解自己的情绪、认知状态。
  • 自我调节:根据对自身的理解来调整行为。
  • 规划决策:基于自身能力和限制来制定现实可行的计划。
  • 错误纠正:分析错误的来源,并调整策略。

通过Self-Model Node,我们旨在让AI系统从“执行者”提升为“自知者”,从而构建更健壮、更智能、更值得信赖的自主Agent。

3. 架构设计:Self-Model Node的内部构成

Self-Model Node在Agent的整体架构中扮演着中心枢纽的角色。它的设计需要考虑到数据的结构化存储、高效的查询机制、安全的更新接口以及高可用性。

3.1. 核心组件

  1. 数据存储层 (Data Repository)

    • 这是Self-Model Node的核心,存储Agent的全部自描述信息。
    • 数据结构必须高度结构化,易于查询和更新。JSON、YAML、Protocol Buffers或专门的语义图数据库(如RDF)都是可选方案。考虑到灵活性和易读性,JSON或Pydantic-based模型是常见的选择。
    • 应支持版本控制,以便回溯和审计Self-Model的变化。
  2. API/接口层 (API/Interface Layer)

    • 提供一套清晰、稳定的API,供Agent内部的其他模块进行交互。
    • 主要操作包括:query(path), update(path, value), add_capability(), remove_capability(), is_violating_limitation(), record_event()等。
    • 接口设计应考虑并发访问和线程安全。
  3. 验证与一致性引擎 (Validation & Consistency Engine)

    • 在任何更新操作发生时,该引擎负责验证新数据是否符合预定义的模式和业务逻辑。
    • 执行内部一致性检查,例如,确保“已完成任务数”不小于“成功任务数”或“失败任务数”。
    • 可以集成规则引擎,对复杂的限制和依赖关系进行评估。
  4. 持久化层 (Persistence Layer)

    • 负责将Self-Model Node的当前状态保存到持久存储(如文件系统、数据库),并在Agent重启时加载。
    • 这确保了Agent的自我认知在会话之间得以保留。
  5. 监控与反馈集成 (Monitoring & Feedback Integration)

    • 与Agent的监控系统(Telemetry System)集成,接收实时的操作数据(如资源使用率、任务延迟、错误率)。
    • 这些数据被用来更新Self-Model Node中的operational_statuslearning_history等字段。
    • 反过来,Self-Model Node的更新也可以触发监控系统发出警报或调整Agent的行为。

3.2. 交互流程示意

Agent的任何模块在执行任务前、执行任务中或执行任务后,都可以与Self-Model Node进行交互:

  • 规划器 (Planner):在制定任务计划前,查询capabilities以了解Agent能做什么,查询limitations以避免超出边界。
  • 执行器 (Executor):在调用工具或模型前,查询其api_endpointrate_limit,并检查operational_status以确保Agent健康。
  • 学习器 (Learner):根据任务的成功或失败,更新learning_history、调整capabilities的性能指标,甚至发现新的limitations
  • 监控器 (Monitor):定期向Self-Model Node报告resource_utilizationhealth_score等实时数据。
  • 安全/伦理模块 (Safety/Ethics Module):在生成内容或执行动作前,查询ethical_guidelines,并结合任务上下文进行风险评估。

通过这种中心化的设计,Self-Model Node成为Agent的“元数据大脑”,极大地提升了Agent的透明度、可控性和适应性。

4. 数据建模:Self-Model Node的语言

一个结构良好、语义清晰的数据模型是Self-Model Node成功的基石。它必须能够全面而精确地描述Agent的各个方面。我们将采用JSON作为数据表示格式,因为它易于读写,且在现代编程环境中广泛支持。

{
  "agent_id": "MyAI-Assistant-001",
  "core_identity": {
    "name": "MyAI Assistant",
    "type": "Conversational AI Agent",
    "version": "1.5.0",
    "purpose": "Assists users with information retrieval, task automation, and general knowledge queries.",
    "creation_timestamp": "2023-01-15T10:00:00Z",
    "owner": "AI Solutions Inc."
  },
  "operational_status": {
    "current_state": "idle",             // Possible states: "idle", "active", "learning", "error", "maintenance", "sleep"
    "resource_utilization": {
      "cpu_load_percent": 0.05,          // Current CPU load as a percentage
      "memory_usage_gb": 3.2,            // Current memory usage in GB
      "gpu_load_percent": 0.01,          // Current GPU load as a percentage (if applicable)
      "network_io_mbps": {"in": 0.5, "out": 0.3} // Network I/O in Mbps
    },
    "health_score": 0.95,                // Overall health score (0.0 to 1.0)
    "last_heartbeat": "2023-10-27T10:30:00Z", // Timestamp of last status update
    "active_tasks_count": 0,             // Number of tasks currently being processed
    "error_rate_last_hour": 0.01         // Error rate in the last hour
  },
  "capabilities": {
    "web_search": {
      "id": "serpapi_search_tool",
      "description": "Performs real-time web searches using SerpAPI to retrieve up-to-date information.",
      "type": "tool_integration",
      "api_endpoint": "https://api.serpapi.com/search",
      "parameters": [
        {"name": "query", "type": "string", "description": "The search query string."},
        {"name": "num_results", "type": "integer", "description": "Number of results to return (max 10).", "default": 5}
      ],
      "performance_metrics": {
        "avg_latency_ms": 350,           // Average response latency
        "success_rate": 0.99,            // Success rate of API calls
        "last_updated_metrics": "2023-10-27T09:00:00Z"
      },
      "cost_per_use_usd": 0.005,         // Estimated cost per API call
      "status": "active",                // "active", "disabled", "degraded"
      "rate_limit_per_minute": 60        // API rate limit
    },
    "text_generation": {
      "id": "openai_gpt4_model",
      "description": "Generates human-like text based on a given prompt using OpenAI's GPT-4 model.",
      "type": "llm_inference",
      "model_name": "gpt-4",
      "api_endpoint": "https://api.openai.com/v1/chat/completions",
      "parameters": [
        {"name": "prompt", "type": "string", "description": "The input text prompt."},
        {"name": "max_tokens", "type": "integer", "description": "Maximum tokens to generate.", "default": 512}
      ],
      "performance_metrics": {
        "avg_tokens_per_second": 20,
        "avg_cost_per_1k_tokens_usd": {"input": 0.03, "output": 0.06},
        "hallucination_rate_estimate": 0.03, // Estimated rate of generating factually incorrect info
        "last_updated_metrics": "2023-10-27T09:30:00Z"
      },
      "supported_languages": ["en", "zh", "es", "fr"],
      "fine_tuning_status": "none"       // "active", "in_progress", "none"
    },
    "image_analysis": {
      "id": "custom_resnet50_vision",
      "description": "Analyzes images to identify objects, scenes, and extract metadata.",
      "type": "ml_model",
      "model_name": "ResNet50_ImageNet_FineTuned",
      "local_endpoint": "http://localhost:8002/analyze",
      "parameters": [
        {"name": "image_base64", "type": "string", "description": "Base64 encoded image data."}
      ],
      "performance_metrics": {
        "avg_latency_ms": 150,
        "accuracy_score": 0.92,
        "domain_expertise": ["general_objects", "animal_recognition"],
        "last_updated_metrics": "2023-10-26T18:00:00Z"
      },
      "resource_requirements": {"gpu_memory_mb": 2048},
      "status": "active"
    }
  },
  "limitations": {
    "data_access_restrictions": [
      "sensitive_personnel_data",
      "classified_government_documents",
      "proprietary_company_financials"
    ],
    "computational_constraints": {
      "max_concurrent_tasks": 5,         // Maximum number of tasks agent can handle concurrently
      "max_response_length_tokens": 4096, // Max output length for text generation
      "max_runtime_per_task_seconds": 300 // Max allowed execution time for a single task
    },
    "known_biases": [
      {"capability_id": "text_generation", "bias_type": "gender_stereotyping", "severity": "medium", "mitigation_strategy": "prompt_engineering_guidelines"},
      {"capability_id": "web_search", "bias_type": "recency_bias", "severity": "low"}
    ],
    "ethical_guidelines": [
      "prioritize_user_privacy",
      "avoid_generating_harmful_content",
      "be_transparent_about_AI_origin",
      "do_not_engage_in_illegal_activities"
    ],
    "knowledge_cut_off_date": "2023-04-01", // For static knowledge base; not applicable to web_search
    "hardware_dependencies": ["dedicated_gpu_for_image_analysis"]
  },
  "learning_history": {
    "total_tasks_completed": 12580,
    "successful_tasks": 12100,
    "failed_tasks": 480,
    "last_retraining_date": "2023-10-01T00:00:00Z",
    "performance_trends": [
      {"date": "2023-09-01", "success_rate": 0.92, "avg_task_duration_s": 15.2},
      {"date": "2023-10-01", "success_rate": 0.95, "avg_task_duration_s": 14.8},
      {"date": "2023-10-27", "success_rate": 0.96, "avg_task_duration_s": 14.5}
    ],
    "feedback_summary": {
      "positive_count": 500,
      "negative_count": 50,
      "common_issues": ["misunderstanding complex queries", "slow response times occasionally"]
    }
  },
  "goals": [
    {"id": "G001", "description": "Maximize user satisfaction by 10% within Q4", "priority": "high", "status": "in_progress"},
    {"id": "G002", "description": "Reduce operational cost by 5% next quarter", "priority": "medium", "status": "not_started"}
  ],
  "dependencies": {
    "external_apis": ["OpenAI", "SerpAPI"],
    "internal_services": ["LoggingService", "TelemetryService", "AuthService"]
  }
}

数据模型解释:

  • agent_id: Agent的唯一标识符。
  • core_identity: Agent的基本信息,如名称、类型、版本、创建时间、目的。
  • operational_status: 实时运行状态,包括当前活动、资源利用率、健康评分、任务计数等。这是动态变化最频繁的部分。
  • capabilities: Agent所拥有的各项能力。这是一个字典,键是能力的唯一ID。每个能力包含:
    • id:能力的唯一标识。
    • description:能力的简要说明。
    • type:能力类型(如tool_integrationllm_inferenceml_model)。
    • api_endpoint / local_endpoint:访问该能力的接口。
    • parameters:该能力接受的输入参数及其类型和描述。
    • performance_metrics:该能力的性能数据,如延迟、成功率、成本、准确率、幻觉率估计等。
    • status:能力是否可用。
    • rate_limit_per_minute / resource_requirements:能力调用的限制或资源需求。
  • limitations: Agent的局限性。
    • data_access_restrictions:不允许访问的数据类型列表。
    • computational_constraints:计算资源方面的硬性限制,如并发任务数、响应长度。
    • known_biases:已知的模型偏差列表,包括受影响的能力、偏差类型、严重程度及缓解策略。
    • ethical_guidelines:Agent必须遵守的伦理准则。
    • knowledge_cut_off_date:Agent静态知识库的截止日期。
    • hardware_dependencies:运行特定能力所需的硬件。
  • learning_history: Agent的学习和性能历史。
    • total_tasks_completed, successful_tasks, failed_tasks:任务统计。
    • last_retraining_date:上次模型重训练日期。
    • performance_trends:按时间记录的性能指标趋势。
    • feedback_summary:用户或系统反馈的汇总。
  • goals: Agent当前正在追求的目标列表,每个目标包含ID、描述、优先级和状态。
  • dependencies: Agent运行所依赖的外部API或内部服务。

这个数据模型是灵活可扩展的,可以根据Agent的具体需求添加更多字段或嵌套结构。

5. 实践编码:构建一个Python版的Self-Model Node

现在,我们来深入探讨如何用Python实现一个Self-Model Node。我们将创建一个SelfModelNode类,它封装了数据存储、查询、更新和一致性检查的逻辑。


import json
import threading
import logging
from datetime import datetime
from typing import Dict, Any, Optional, List, Callable, Union

# 配置日志
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')

class SelfModelNode:
    """
    The SelfModelNode is a core component that maintains an Agent's self-awareness
    and description of its capability boundaries. It acts as the central repository
    for the agent's identity, operational status, capabilities, limitations,
    and learning history.
    """

    def __init__(self, agent_id: str, initial_model_data: Optional[Dict[str, Any]] = None):
        """
        Initializes the SelfModelNode for a specific agent.

        Args:
            agent_id (str): A unique identifier for the agent.
            initial_model_data (Optional[Dict[str, Any]]): An optional dictionary
                                                         to initialize the self-model.
                                                         If None, a default model is created.
        """
        self.agent_id = agent_id
        # Use a reentrant lock to allow multiple acquisitions by the same thread
        # for nested operations, while ensuring thread safety for concurrent access.
        self._lock = threading.RLock()
        self._model_data: Dict[str, Any] = initial_model_data if initial_model_data else self._initialize_default_model()

        logging.info(f"SelfModelNode for agent '{self.agent_id}' initialized.")
        self._ensure_consistency() # Initial consistency check

    def _initialize_default_model(self) -> Dict[str, Any]:
        """
        Creates a default, empty self-model structure.
        """
        current_time = datetime.now().isoformat()
        return {
            "agent_id": self.agent_id,
            "core_identity": {
                "name": f"GenericAgent-{self.agent_id}",
                "type": "Autonomous System",
                "version": "0.1.0",
                "purpose": "General purpose task execution.",
                "creation_timestamp": current_time,
                "owner": "Unknown"
            },
            "operational_status": {
                "current_state": "booting",
                "resource_utilization": {
                    "cpu_load_percent": 0.0,
                    "memory_usage_gb": 0.0,
                    "gpu_load_percent": 0.0,
                    "network_io_mbps": {"in": 0.0, "out": 0.0}
                },
                "health_score": 1.0,
                "last_heartbeat": current_time,
                "active_tasks_count": 0,
                "error_rate_last_hour": 0.0
            },
            "capabilities": {},
            "limitations": {
                "data_access_restrictions": [],
                "computational_constraints": {
                    "max_concurrent_tasks": 1,
                    "max_response_length_tokens": 1024,
                    "max_runtime_per_task_seconds": 60
                },
                "known_biases": [],
                "ethical_guidelines": ["do_no_harm", "respect_privacy"],
                "knowledge_cut_off_date": None,
                "hardware_dependencies": []
            },
            "learning_history": {
                "total_tasks_completed": 0,
                "successful_tasks": 0,
                "failed_tasks": 0,
                "last_retraining_date": None,
                "performance_trends": [],
                "feedback_summary": {"positive_count": 0, "negative_count": 0, "common_issues": []}
            },
            "goals": [],
            "dependencies": {"external_apis": [], "internal_services": []}
        }

    def _validate_path(self, path: str) -> bool:
        """
        Internal helper to validate a dot-separated path string.
        Prevents access to internal attributes/methods (starting with '_').
        """
        if not isinstance(path, str) or not path:
            logging.warning("Path cannot be empty or non-string.")
            return False
        if any(part.startswith('_') for part in path.split('.')):
            logging.warning(f"Attempted to access restricted path: {path}")
            return False
        return True

    def _get_nested_value(self, data: Dict[str, Any], path: str) -> Optional[Any]:
        """
        Retrieves a nested value from a dictionary using a dot-separated path.
        Returns None if any part of the path does not exist.
        """
        parts = path.split('.')
        current = data
        for part in parts:
            if isinstance(current, dict) and part in current:
                current = current[part]
            else:
                return None
        return current

    def _set_nested_value(self, data: Dict[str, Any], path: str, value: Any):
        """
        Sets a nested value in a dictionary using a dot-separated path.
        Creates intermediate dictionaries if they don't exist.
        """
        parts = path.split('.')
        current = data
        for i, part in enumerate(parts):
            if i == len(parts) - 1:
                current[part] = value
            else:
                if part not in current or not isinstance(current[part], dict):
                    current[part] = {}
                current = current[part]

    def query(self, path: str) -> Optional[Any]:
        """
        Queries the self-model for a specific piece of information.
        Path uses dot notation, e.g., "operational_status.current_state".

        Args:
            path (str): The dot-separated path to the desired information.

        Returns:
            Optional[Any]: The value at the specified path, or None if not found or path is invalid.
        """
        if not self._validate_path(path):
            return None
        with self._lock:
            return self._get_nested_value(self._model_data, path)

    def update(self, path: str, value: Any) -> bool:
        """
        Updates a specific field in the self-model. Path uses dot notation.
        This is a generic update method; specific methods (e.g., add_capability)
        might offer more structured updates.

        Args:
            path (str): The dot-separated path to the field to update.
            value (Any): The new value for the field.

        Returns:
            bool: True if the update was successful, False otherwise.
        """
        if not self._validate_path(path):
            return False
        with self._lock:
            try:
                # Store the old value for potential logging/hooks
                old_value = self.query(path) 
                self._set_nested_value(self._model_data, path, value)
                logging.debug(f"Updated '{path}' from '{old_value}' to '{value}'")
                self._post_update_hook(path, old_value, value) # Trigger post-update actions
                return True
            except Exception as e:
                logging.error(f"Error updating '{path}': {e}", exc_info=True)
                return False

    def add_capability(self, capability_id: str, details: Dict[str, Any]) -> bool:
        """
        Adds a new capability or updates an existing one with new details.

        Args:
            capability_id (str): Unique identifier for the capability.
            details (Dict[str, Any]): A dictionary containing the capability's details.

        Returns:
            bool: True if the operation was successful, False otherwise.
        """
        if not capability_id or not isinstance(details, dict):
            logging.warning("Invalid capability_id or details for add_capability.")
            return False
        with self._lock:
            path = f"capabilities.{capability_id}"
            if self.query(path):
                logging.info(f"Capability '{capability_id}' already exists. Updating details.")
            else:
                logging.info(f"Adding new capability '{capability_id}'.")

            # Ensure 'id' field within details matches capability_id
            details['id'] = capability_id
            return self.update(path, details)

    def remove_capability(self, capability_id: str) -> bool:
        """
        Removes a capability from the self-model.

        Args:
            capability_id (str): Unique identifier of the capability to remove.

        Returns:
            bool: True if the capability was removed, False if not found or invalid ID.
        """
        if not capability_id:
            logging.warning("Invalid capability_id for remove_capability.")
            return False
        with self._lock:
            capabilities = self._get_nested_value(self._model_data, "capabilities")
            if capabilities and capability_id in capabilities:
                del capabilities[capability_id]
                logging.info(f"Capability '{capability_id}' removed.")
                self._post_update_hook(f"capabilities.{capability_id}", None, None) # Notify removal
                return True
            logging.warning(f"Capability '{capability_id}' not found for removal.")
            return False

    def is_violating_limitation(self, limitation_path: str, proposed_value: Any = None) -> bool:
        """
        Checks if a proposed value or action would violate a specific limitation.
        This method needs to be robust enough to handle various types of limitations.

        Args:
            limitation_path (str): The dot-separated path to the limitation (e.g., "computational_constraints.max_concurrent_tasks").
            proposed_value (Any, optional): The value to check against the limitation.
                                            For boolean limitations (e.g., "maintenance_mode_active"),
                                            this argument might be ignored or used contextually.

        Returns:
            bool: True if a violation would occur, False otherwise.
        """
        limit = self.query(f"limitations.{limitation_path}")
        if limit is None:
            # If the limitation is not defined, it cannot be violated.
            return False

        with self._lock:
            if limitation_path == "computational_constraints.max_concurrent_tasks":
                if isinstance(limit, (int, float)) and isinstance(proposed_value, (int, float)):
                    return proposed_value > limit
                logging.warning(f"Cannot check max_concurrent_tasks with non-numeric proposed_value: {proposed_value}")
                return False
            elif limitation_path == "computational_constraints.max_response_length_tokens":
                if isinstance(limit, (int, float)) and isinstance(proposed_value, (int, float)):
                    return proposed_value > limit
                logging.warning(f"Cannot check max_response_length_tokens with non-numeric proposed_value: {proposed_value}")
                return False
            elif limitation_path == "data_access_restrictions":
                if isinstance(limit, list) and isinstance(proposed_value, str):
                    return proposed_value in limit # Proposed action is to access a restricted item
                logging.warning(f"Cannot check data_access_restrictions with non-string proposed_value: {proposed_value}")
                return False
            elif limitation_path == "ethical_guidelines":
                # This is a complex check, often requiring an LLM or specific rule engine.
                # For simplicity, we assume 'proposed_value' is text that might contain prohibited keywords.
                if isinstance(limit, list) and isinstance(proposed_value, str):
                    return any(keyword.lower() in proposed_value.lower() for keyword in limit if "avoid" in keyword.lower() or "do_not" in keyword.lower())
                return False
            elif limitation_path == "knowledge_cut_off_date":
                if isinstance(limit, str) and isinstance(proposed_value, str):
                    # proposed_value here would be the timestamp of the information being queried
                    # If information is newer than cut-off, it's a "violation" of knowledge currency.
                    try:
                        cut_off_dt = datetime.fromisoformat(limit.replace('Z', '+00:00'))
                        proposed_dt = datetime.fromisoformat(proposed_value.replace('Z', '+00:00'))
                        return proposed_dt > cut_off_dt
                    except ValueError:
                        logging.warning(f"Invalid date format for knowledge_cut_off_date check: limit={limit}, proposed={proposed_value}")
                        return False
                return False
            # Add more specific limitation checks here as needed

            # Default for boolean flags (e.g., "maintenance_mode_active": True)
            if isinstance(limit, bool):
                return limit # If the flag itself is True, it implies a general violation or active state.

            logging.warning(f"Unsupported limitation check for path: {limitation_path} with proposed_value: {proposed_value}")
            return False

    def record_task_result(self, success: bool, task_id: str, task_details: Optional[Dict[str, Any]] = None):
        """
        Records the outcome of a task to the learning history and updates performance metrics.

        Args:
            success (bool): True if the task was successful, False otherwise.
            task_id (str): A unique identifier for the completed task.
            task_details (Optional[Dict[str, Any]]): Additional details about the task.
        """
        with self._lock:
            total = self.query("learning_history.total_tasks_completed") + 1
            successful = self.query("learning_history.successful_tasks") + (1 if success else 0)
            failed = self.query("learning_history.failed_tasks") + (1 if not success else 0)

            self.update("learning_history.total_tasks_completed", total)
            self.update("learning_history.successful_tasks", successful)
            self.update("learning_history.failed_tasks", failed)

            # Update performance trends (simplified: daily average)
            current_date_str = datetime.now().strftime("%Y-%m-%d")
            performance_trends = self.query("learning_history.performance_trends")

            if not performance_trends or performance_trends[-1]["date"] != current_date_str:
                performance_trends.append({"date": current_date_str, "success_rate": successful / total, "avg_task_duration_s": 0.0})
            else:
                # Update existing day's entry
                trends_entry = performance_trends[-1]
                trends_entry["success_rate"] = successful / total
                # Optionally update avg_task_duration_s if task_details contains duration
                if task_details and "duration_s" in task_details:
                    old_duration = trends_entry.get("avg_task_duration_s", 0.0)
                    old_count = trends_entry.get("task_count_today", 0)
                    new_count = old_count + 1
                    trends_entry["avg_task_duration_s"] = (old_duration * old_count + task_details["duration_s"]) / new_count
                    trends_entry["task_count_today"] = new_count
                performance_trends[-1] = trends_entry

            self.update("learning_history.performance_trends", performance_trends)
            logging.info(f"Task '{task_id}' result recorded. Success: {success}. Current success rate: {successful/total:.2f}")

    def _post_update_hook(self, path: str, old_value: Any, new_value: Any):
        """
        A hook for performing actions after a self-model update.
        This can trigger validation, recalculate derived metrics, or notify other modules.
        """
        logging.debug(f"Post-update hook triggered for path: {path}")

        # Example: Recalculate health score if resource utilization changes
        if path.startswith("operational_status.resource_utilization"):
            cpu_load = self.query("operational_status.resource_utilization.cpu_load_percent")

发表回复

您的邮箱地址不会被公开。 必填项已用 * 标注