深度挑战:设计一个能够自动发现并调用‘从未见过的 REST API’(仅提供 API 根路径)的自主 Agent 架构

各位技术同仁,下午好!

今天,我们将共同踏上一段充满挑战的旅程,探索一个在软件工程领域极具前瞻性和实践意义的课题——设计一个能够自动发现并调用‘从未见过的 REST API’的自主 Agent 架构。我们假设,我们所拥有的仅仅是这些API的根路径。这是一个典型的“黑盒探索”问题,它要求我们的Agent不仅要智能,更要具备强大的学习和适应能力。

作为一名编程专家,我深知这项任务的复杂性。它触及了自动化、机器学习、自然语言处理(尽管初期可能不直接使用,但其潜力巨大)、网络通信、以及智能决策等多个交叉领域。但我相信,通过系统性的架构设计和精妙的算法组合,我们能够构建出这样一个具备初步智能的Agent。


一、 挑战的深度与广度

在深入架构设计之前,我们首先需要清晰地认识到这个任务所面临的挑战。一个“从未见过的 REST API”,意味着我们缺乏:

  1. API Schema/Specification: 没有 OpenAPI (Swagger), RAML, API Blueprint 等标准定义文件。我们不知道有哪些端点,每个端点支持哪些HTTP方法。
  2. Endpoint Discovery: 除了根路径,我们不知道任何具体的资源路径(例如 /users, /products/{id}, /orders)。
  3. Parameter Inference: 对于一个已发现的端点,我们不知道它需要哪些查询参数、路径参数或请求体参数。也不知道这些参数的名称、类型、是否必需、以及有效值的范围。
  4. HTTP Method Guessing: 对于一个路径,我们不知道它支持 GET, POST, PUT, DELETE, PATCH 中的哪些方法。
  5. Authentication/Authorization: 如何进行身份验证?是 API Key、OAuth2、Bearer Token 还是其他方式?密钥在哪里?
  6. Rate Limiting/Throttling: 如何避免因为请求过多而被服务器拒绝服务或封禁?
  7. Statefulness & Side Effects: POST, PUT, DELETE 等方法通常会改变服务器状态。Agent如何判断操作的后果?如何处理依赖性?例如,要删除一个资源,可能需要先创建一个。
  8. Data Type & Format: 请求和响应的数据格式是什么?JSON、XML、Form Data?字段的数据类型是什么?
  9. Error Handling: 如何识别和解析不同类型的错误响应,并从中学习?
  10. Semantic Understanding: 如何理解API的业务含义?例如,/users 是获取用户列表,/products 是商品列表。这对于智能调用至关重要。

这些挑战使得我们的Agent不能仅仅是一个简单的爬虫,它必须是一个能“思考”和“学习”的实体。


二、 自主 Agent 的总体架构

为了应对上述挑战,我将提出一个分层、模块化的自主 Agent 架构。这个架构的设计理念是:探索-推理-执行-学习 的闭环迭代。

模块名称 核心功能 输入 输出
Agent Core 任务编排,控制流,决策引擎 目标,知识库状态 操作指令,任务优先级
探索者 (Explorer) 发现新的 API 路径和潜在的 HTTP 方法 API 根路径,已知端点,待探索队列 新发现的端点,初步的 HTTP 方法建议
推理者 (Inferer) 推断端点的参数(名称、类型、位置)、请求体结构、认证机制等 端点信息,历史请求/响应数据,启发式规则 完整的 API 调用蓝图(Endpoint, Method, Params, Body, Headers, Auth)
执行者 (Executor) 根据调用蓝图发送 HTTP 请求,处理网络通信 API 调用蓝图 HTTP 响应(状态码,头部,体),网络错误
学习与反馈者 (Learner & Feedback) 解析响应,评估调用结果,从成功和失败中提取信息,更新知识库,优化后续策略 HTTP 响应,调用蓝图,Agent 目标 知识库更新,新的探索线索,参数优化建议
知识库 (Knowledge Base) 存储 Agent 的所有已知信息:已发现端点、推断的参数、认证凭证、成功/失败历史、API 结构图等 学习与反馈者,外部配置 供其他模块查询的数据
安全与资源管理器 (Security & Resource Manager) 处理认证凭证,管理请求频率,记录和监控 API 状态 认证配置,请求历史 认证凭证,请求延迟,API 状态报告

架构图(概念性):

+------------------------------------------------------------------+
|                           Agent Core                             |
|                           (Orchestrator, Decision Engine)        |
+------------------------------------+-----------------------------+
|                                    |                             |
|  +--------------------------+      |      +--------------------------+
|  |       探索者 (Explorer)  |<------------>|      知识库 (Knowledge Base) |
|  | (Path/Method Discovery)  |      |      | (Endpoints, Schemas, Auth, History) |
|  +--------------------------+      |      +--------------------------+
|            ^                       |                 ^
|            |                       |                 |
|            v                       |                 v
|  +--------------------------+      |      +--------------------------+
|  |      推理者 (Inferer)    |<------------>| 安全与资源管理器         |
|  | (Parameter/Schema Guessing) |      |      | (Auth, Rate Limiting)    |
|  +--------------------------+      |      +--------------------------+
|            ^                       |                 ^
|            |                       |                 |
|            v                       |                 v
|  +--------------------------+      |      +--------------------------+
|  |      执行者 (Executor)   |------------->| 学习与反馈者             |
|  | (HTTP Request Sender)    |<-------------|(Response Parser, Learner)|
|  +--------------------------+      |      +--------------------------+
|                                    |                             |
+------------------------------------+-----------------------------+

三、 核心模块详解与代码示例

我们将使用 Python 作为实现语言,因为它拥有丰富的库生态系统,非常适合快速原型开发和数据处理。

3.1 知识库 (Knowledge Base)

知识库是 Agent 的大脑和记忆。它存储了Agent在探索过程中获取的所有信息。

import json
import os
from collections import defaultdict
from typing import Dict, Any, List, Optional

class AgentKnowledgeBase:
    """
    Agent的知识库,存储所有已发现的端点、推断的Schema、认证信息和操作历史。
    """
    def __init__(self, api_root_url: str, persistence_file: str = "knowledge_base.json"):
        self.api_root_url = api_root_url.rstrip('/')
        self.persistence_file = persistence_file

        # 存储已发现的端点及其元数据
        # {
        #   "/users": {
        #     "methods": {
        #       "GET": {"inferred_params": {...}, "response_schema": {...}, "successful_calls": [...]},
        #       "POST": {...}
        #     },
        #     "discovery_source": ["root_path", "/api/v1/links"]
        #   },
        #   "/products/{id}": {...}
        # }
        self.endpoints: Dict[str, Any] = defaultdict(lambda: {"methods": defaultdict(dict), "discovery_source": []})

        # 存储通用认证信息
        self.auth_config: Dict[str, Any] = {}

        # 存储API通用信息,如版本,服务器类型等
        self.api_meta: Dict[str, Any] = {}

        # 存储待探索的URL和方法组合队列
        # [(url, method, params, body, headers), ...]
        self.exploration_queue: List[Dict[str, Any]] = [] 

        self._load_from_disk()

    def _load_from_disk(self):
        """从文件加载知识库状态"""
        if os.path.exists(self.persistence_file):
            try:
                with open(self.persistence_file, 'r', encoding='utf-8') as f:
                    data = json.load(f)
                    self.endpoints.update(data.get("endpoints", {}))
                    self.auth_config.update(data.get("auth_config", {}))
                    self.api_meta.update(data.get("api_meta", {}))
                    self.exploration_queue.extend(data.get("exploration_queue", []))
                print(f"知识库从 {self.persistence_file} 加载成功。")
            except json.JSONDecodeError:
                print(f"警告: 无法解析知识库文件 {self.persistence_file}。将初始化一个空知识库。")
        else:
            print(f"知识库文件 {self.persistence_file} 不存在,初始化空知识库。")

        # 初始探索任务:从根路径开始
        if not self.exploration_queue:
            self.add_to_exploration_queue(self.api_root_url, "GET", {}) # 默认先GET根路径
            self.add_to_exploration_queue(self.api_root_url, "OPTIONS", {}) # 尝试OPTIONS获取支持方法

    def save_to_disk(self):
        """将知识库状态保存到文件"""
        data = {
            "api_root_url": self.api_root_url,
            "endpoints": self._convert_defaultdicts_to_dicts(self.endpoints),
            "auth_config": self.auth_config,
            "api_meta": self.api_meta,
            "exploration_queue": self.exploration_queue
        }
        with open(self.persistence_file, 'w', encoding='utf-8') as f:
            json.dump(data, f, indent=4, ensure_ascii=False)
        print(f"知识库保存到 {self.persistence_file}。")

    def _convert_defaultdicts_to_dicts(self, obj):
        """递归地将defaultdict转换为普通dict以便JSON序列化"""
        if isinstance(obj, defaultdict):
            obj = dict(obj)
        if isinstance(obj, dict):
            return {k: self._convert_defaultdicts_to_dicts(v) for k, v in obj.items()}
        elif isinstance(obj, list):
            return [self._convert_defaultdicts_to_dicts(elem) for elem in obj]
        return obj

    def add_endpoint(self, path: str, method: str, inferred_params: Dict[str, Any], response_schema: Dict[str, Any], source: str):
        """添加或更新端点信息"""
        relative_path = path.replace(self.api_root_url, '', 1) if path.startswith(self.api_root_url) else path
        if not relative_path.startswith('/'):
            relative_path = '/' + relative_path

        self.endpoints[relative_path]["methods"][method]["inferred_params"] = inferred_params
        self.endpoints[relative_path]["methods"][method]["response_schema"] = response_schema
        if source not in self.endpoints[relative_path]["discovery_source"]:
            self.endpoints[relative_path]["discovery_source"].append(source)

        if "successful_calls" not in self.endpoints[relative_path]["methods"][method]:
             self.endpoints[relative_path]["methods"][method]["successful_calls"] = []

    def record_successful_call(self, path: str, method: str, request_params: Dict[str, Any], response_data: Dict[str, Any]):
        """记录成功的API调用"""
        relative_path = path.replace(self.api_root_url, '', 1) if path.startswith(self.api_root_url) else path
        if not relative_path.startswith('/'):
            relative_path = '/' + relative_path

        if method in self.endpoints[relative_path]["methods"]:
            self.endpoints[relative_path]["methods"][method]["successful_calls"].append({
                "request_params": request_params,
                "response_data": response_data
            })

    def get_endpoint_info(self, path: str, method: str) -> Optional[Dict[str, Any]]:
        """获取特定端点和方法的详细信息"""
        relative_path = path.replace(self.api_root_url, '', 1) if path.startswith(self.api_root_url) else path
        if not relative_path.startswith('/'):
            relative_path = '/' + relative_path

        if relative_path in self.endpoints and method in self.endpoints[relative_path]["methods"]:
            return self.endpoints[relative_path]["methods"][method]
        return None

    def add_to_exploration_queue(self, url: str, method: str, params: Dict[str, Any], body: Optional[Dict[str, Any]] = None, headers: Optional[Dict[str, Any]] = None):
        """添加待探索任务到队列,并去重"""
        task = {"url": url, "method": method, "params": params, "body": body, "headers": headers}
        if task not in self.exploration_queue:
            self.exploration_queue.append(task)

    def pop_exploration_task(self) -> Optional[Dict[str, Any]]:
        """从队列中取出一个探索任务"""
        if self.exploration_queue:
            return self.exploration_queue.pop(0)
        return None

    def update_auth_config(self, config: Dict[str, Any]):
        """更新认证配置"""
        self.auth_config.update(config)

    def get_auth_config(self) -> Dict[str, Any]:
        """获取认证配置"""
        return self.auth_config

    def get_all_endpoints(self) -> Dict[str, Any]:
        """获取所有已发现的端点"""
        return self.endpoints

3.2 安全与资源管理器 (Security & Resource Manager)

负责认证凭证管理、请求频率控制和API状态监控。

import time
import random
from typing import Dict, Any, Optional

class SecurityAndResourceManager:
    """
    负责管理认证凭证、控制请求频率和处理API状态。
    """
    def __init__(self, knowledge_base: AgentKnowledgeBase, rate_limit_interval: float = 0.5):
        self.knowledge_base = knowledge_base
        self.rate_limit_interval = rate_limit_interval # 请求间隔秒数
        self._last_request_time = 0.0

        # 常见的认证头部和参数名称
        self.common_auth_headers = ["Authorization", "X-API-Key", "X-Auth-Token"]
        self.common_auth_params = ["api_key", "token", "auth_token"]

    def get_auth_headers(self) -> Dict[str, str]:
        """根据知识库中的认证配置生成HTTP头部"""
        auth_config = self.knowledge_base.get_auth_config()
        headers = {}
        if auth_config:
            auth_type = auth_config.get("type")
            if auth_type == "Bearer Token":
                token = auth_config.get("token")
                if token:
                    headers["Authorization"] = f"Bearer {token}"
            elif auth_type == "API Key Header":
                key_name = auth_config.get("key_name", "X-API-Key")
                api_key = auth_config.get("api_key")
                if api_key:
                    headers[key_name] = api_key
            # 可以扩展支持其他认证类型
        return headers

    def get_auth_params(self) -> Dict[str, str]:
        """根据知识库中的认证配置生成URL查询参数"""
        auth_config = self.knowledge_base.get_auth_config()
        params = {}
        if auth_config:
            auth_type = auth_config.get("type")
            if auth_type == "API Key Param":
                key_name = auth_config.get("key_name", "api_key")
                api_key = auth_config.get("api_key")
                if api_key:
                    params[key_name] = api_key
        return params

    def apply_rate_limit(self):
        """应用请求频率限制"""
        current_time = time.time()
        time_since_last_request = current_time - self._last_request_time
        if time_since_last_request < self.rate_limit_interval:
            sleep_time = self.rate_limit_interval - time_since_last_request
            # 引入一点随机性,避免请求模式过于规律
            time.sleep(sleep_time + random.uniform(0, 0.1)) 
        self._last_request_time = time.time()

    def infer_auth_strategy(self, response_headers: Dict[str, str], response_body: Dict[str, Any], status_code: int):
        """
        根据响应头和状态码尝试推断认证策略。
        例如:401 Unauthorized, 403 Forbidden 响应可能提供认证提示。
        """
        if status_code in [401, 403]:
            # 检查WWW-Authenticate头部
            if "WWW-Authenticate" in response_headers:
                auth_scheme = response_headers["WWW-Authenticate"].lower()
                if "bearer" in auth_scheme:
                    print("检测到可能需要Bearer Token认证。")
                    # 这里需要人工干预提供Token或从其他API获取
                    self.knowledge_base.update_auth_config({"type": "Bearer Token", "token": None})
                elif "basic" in auth_scheme:
                    print("检测到可能需要Basic认证。")
                    # 这里需要人工干预提供用户名密码
                    # self.knowledge_base.update_auth_config({"type": "Basic", "username": None, "password": None})

            # 检查响应体中的错误信息
            if isinstance(response_body, dict):
                error_message = json.dumps(response_body).lower()
                if "api key" in error_message or "x-api-key" in error_message:
                    print("检测到可能需要API Key认证。")
                    self.knowledge_base.update_auth_config({"type": "API Key Header", "key_name": "X-API-Key", "api_key": None}) # 假设默认是X-API-Key

        # 如果知识库中尚未有认证配置,可以尝试从环境变量或配置文件加载常见的API Key
        if not self.knowledge_base.get_auth_config():
            for header_name in self.common_auth_headers:
                if os.getenv(f"API_KEY_{header_name.replace('-', '_').upper()}"):
                    self.knowledge_base.update_auth_config({"type": "API Key Header", "key_name": header_name, "api_key": os.getenv(f"API_KEY_{header_name.replace('-', '_').upper()}")})
                    print(f"从环境变量加载API Key: {header_name}")
                    break
            if not self.knowledge_base.get_auth_config(): # 如果头部没找到,尝试参数
                 for param_name in self.common_auth_params:
                    if os.getenv(f"API_KEY_{param_name.upper()}"):
                        self.knowledge_base.update_auth_config({"type": "API Key Param", "key_name": param_name, "api_key": os.getenv(f"API_KEY_{param_name.upper()}")})
                        print(f"从环境变量加载API Key: {param_name}")
                        break

3.3 执行者 (Executor)

负责发送HTTP请求。它将使用requests库,并集成安全与资源管理器的功能。

import requests
from requests.exceptions import RequestException
from typing import Dict, Any, Tuple, Optional

class APIExecutor:
    """
    负责发送HTTP请求并处理响应。
    """
    def __init__(self, security_manager: SecurityAndResourceManager):
        self.security_manager = security_manager

    def execute_request(self, url: str, method: str, params: Optional[Dict[str, Any]] = None, 
                        body: Optional[Dict[str, Any]] = None, headers: Optional[Dict[str, Any]] = None, 
                        timeout: int = 10) -> Tuple[int, Dict[str, str], Any, Optional[str]]:
        """
        发送HTTP请求。
        返回:(status_code, response_headers, response_json_or_text, error_message)
        """
        self.security_manager.apply_rate_limit()

        merged_headers = self.security_manager.get_auth_headers()
        if headers:
            merged_headers.update(headers)

        merged_params = self.security_manager.get_auth_params()
        if params:
            merged_params.update(params)

        try:
            print(f"执行请求: {method} {url} (params: {merged_params}, body: {body})")
            response = requests.request(
                method,
                url,
                params=merged_params,
                json=body if body else None, # 优先使用json作为请求体
                headers=merged_headers,
                timeout=timeout,
                allow_redirects=False # 不自动跟随重定向,有时重定向是学习机会
            )

            response_json = None
            try:
                response_json = response.json()
            except json.JSONDecodeError:
                response_json = response.text # 如果不是JSON,则返回文本

            return response.status_code, dict(response.headers), response_json, None

        except RequestException as e:
            print(f"请求 {url} 失败: {e}")
            return 0, {}, {}, str(e) # 返回0表示网络错误或请求异常
        except Exception as e:
            print(f"未知错误发生: {e}")
            return 0, {}, {}, str(e)

3.4 探索者 (Explorer)

负责发现新的API路径和支持的HTTP方法。它将结合多种策略:

  1. 路径猜测 (Path Fuzzing): 基于常见名词和动词。
  2. HATEOAS/Link Extraction: 从JSON响应中提取超媒体链接。
  3. URL分析: 解析URL中的路径参数。
  4. OPTIONS方法: 尝试使用OPTIONS方法来获取端点支持的HTTP方法。
import re
from urllib.parse import urljoin, urlparse
from typing import Dict, Any, List, Set, Optional

class Explorer:
    """
    负责发现新的API路径和支持的HTTP方法。
    """
    def __init__(self, knowledge_base: AgentKnowledgeBase):
        self.knowledge_base = knowledge_base
        self.common_paths = [
            "users", "products", "items", "orders", "customers", "auth", "login", "register",
            "api", "v1", "v2", "status", "health", "info", "data", "admin", "config", "settings",
            "search", "reports", "files", "images", "docs", "schemas", "metadata"
        ]
        self.common_methods = ["GET", "POST", "PUT", "DELETE", "PATCH", "HEAD", "OPTIONS"]
        self.path_separator = "/"
        self._visited_urls: Set[str] = set() # 记录已访问的完整URL,避免重复探索

    def _normalize_url(self, url: str) -> str:
        """规范化URL,去除末尾斜杠,处理查询参数等"""
        parsed = urlparse(url)
        path = parsed.path.rstrip('/')
        return urljoin(parsed.scheme + "://" + parsed.netloc, path)

    def discover_from_response(self, base_url: str, response_json: Any):
        """从JSON响应中提取潜在的API路径"""
        if isinstance(response_json, dict):
            # 检查HATEOAS风格的链接 (_links, links)
            for key in ["_links", "links"]:
                if key in response_json and isinstance(response_json[key], dict):
                    for rel, link_info in response_json[key].items():
                        if isinstance(link_info, dict) and "href" in link_info:
                            full_path = urljoin(base_url, link_info["href"])
                            if full_path.startswith(self.knowledge_base.api_root_url) and full_path not in self._visited_urls:
                                self.knowledge_base.add_to_exploration_queue(full_path, "GET", {}) # 默认以GET探索
                                self._visited_urls.add(full_path)
                                print(f"发现新路径 (HATEOAS): {full_path}")

            # 递归搜索所有字符串值,查找看起来像URL的字符串
            for key, value in response_json.items():
                if isinstance(value, str):
                    try:
                        parsed_url = urlparse(value)
                        if parsed_url.scheme and parsed_url.netloc and parsed_url.netloc == urlparse(base_url).netloc:
                            full_path = urljoin(base_url, value)
                            if full_path.startswith(self.knowledge_base.api_root_url) and full_path not in self._visited_urls:
                                self.knowledge_base.add_to_exploration_queue(full_path, "GET", {})
                                self._visited_urls.add(full_path)
                                print(f"发现新路径 (String Value): {full_path}")
                    except ValueError:
                        pass # 不是有效的URL
                elif isinstance(value, (dict, list)):
                    self.discover_from_response(base_url, value) # 递归

        elif isinstance(response_json, list):
            for item in response_json:
                self.discover_from_response(base_url, item)

    def generate_path_guesses(self, base_url: str):
        """生成基于常见词汇的路径猜测"""
        parsed_base = urlparse(base_url)
        current_path_segments = [s for s in parsed_base.path.split('/') if s]

        # 尝试在当前路径下添加常见子路径
        for common_path in self.common_paths:
            new_path = urljoin(base_url, common_path)
            if new_path.startswith(self.knowledge_base.api_root_url) and new_path not in self._visited_urls:
                self.knowledge_base.add_to_exploration_queue(new_path, "GET", {})
                self._visited_urls.add(new_path)
                print(f"生成路径猜测: {new_path}")

        # 尝试在更高级别路径下添加常见子路径(例如 /api/v1 -> /api/v1/users)
        if len(current_path_segments) > 0:
            parent_path = urljoin(self.knowledge_base.api_root_url, "/".join(current_path_segments[:-1]))
            for common_path in self.common_paths:
                new_path = urljoin(parent_path + self.path_separator if parent_path else self.knowledge_base.api_root_url, common_path)
                if new_path.startswith(self.knowledge_base.api_root_url) and new_path not in self._visited_urls:
                    self.knowledge_base.add_to_exploration_queue(new_path, "GET", {})
                    self._visited_urls.add(new_path)
                    print(f"生成路径猜测 (Parent): {new_path}")

    def discover_methods_for_path(self, url: str, executor: Any): # executor类型可以是APIExecutor
        """尝试使用OPTIONS请求发现某个URL支持的HTTP方法"""
        print(f"尝试使用OPTIONS发现 {url} 支持的方法...")
        status_code, headers, _, _ = executor.execute_request(url, "OPTIONS")

        if status_code == 200 and "Allow" in headers:
            allowed_methods = [m.strip().upper() for m in headers["Allow"].split(',')]
            print(f"OPTIONS请求发现 {url} 支持的方法: {allowed_methods}")
            # 将这些方法添加到知识库和探索队列
            for method in allowed_methods:
                if method in self.common_methods: # 仅考虑常见的REST方法
                    relative_path = url.replace(self.knowledge_base.api_root_url, '', 1)
                    if not relative_path.startswith('/'): relative_path = '/' + relative_path

                    # 仅当知识库中没有该方法信息时才添加,避免重复
                    if not self.knowledge_base.get_endpoint_info(url, method):
                        self.knowledge_base.add_endpoint(url, method, {}, {}, "OPTIONS_discovery")
                        self.knowledge_base.add_to_exploration_queue(url, method, {})
        elif status_code == 405: # Method Not Allowed
            print(f"OPTIONS方法被 {url} 拒绝 (405)。")
        else:
            print(f"OPTIONS方法对 {url} 返回状态码 {status_code}。")

    def analyze_path_for_parameters(self, path: str) -> str:
        """
        分析路径,识别潜在的路径参数,并将其转换为通用形式。
        例如:/users/123 -> /users/{id}
        """
        # 简单的启发式:如果路径段是数字或UUID,则可能是参数
        segments = path.split('/')
        transformed_segments = []
        for segment in segments:
            if re.fullmatch(r'd+', segment): # 纯数字
                transformed_segments.append("{id}")
            elif re.fullmatch(r'[0-9a-fA-F]{8}-[0-9a-fA-F]{4}-[0-9a-fA-F]{4}-[0-9a-fA-F]{4}-[0-9a-fA-F]{12}', segment): # UUID
                transformed_segments.append("{uuid}")
            else:
                transformed_segments.append(segment)

        generic_path = '/'.join(transformed_segments)
        # 确保根路径是 / 而不是空字符串
        if not generic_path.startswith('/'):
            generic_path = '/' + generic_path

        # 将根路径 / 规范化
        if generic_path == "//":
            generic_path = "/"

        # 尝试从已经发现的端点中进行匹配,如果一个新路径和某个已知的通用路径匹配,则使用通用路径
        for known_path_pattern in self.knowledge_base.get_all_endpoints().keys():
            if '{' in known_path_pattern: # 仅检查包含参数的已知模式
                # 将已知模式转换为正则表达式
                regex_pattern = re.sub(r'{[^}]+}', r'[^/]+', known_path_pattern)
                if re.fullmatch(regex_pattern, path):
                    return known_path_pattern # 匹配成功,使用已知模式

        return generic_path

3.5 推理者 (Inferer)

这是Agent最核心也是最困难的部分。它试图根据已有的信息和启发式规则,猜测API的参数和请求体结构。

from typing import Dict, Any, List, Union, Optional
import json

class Inferer:
    """
    负责推断API端点的参数、请求体结构和数据类型。
    """
    def __init__(self, knowledge_base: AgentKnowledgeBase):
        self.knowledge_base = knowledge_base
        self.common_query_params = ["page", "limit", "offset", "sort_by", "order", "q", "search"]
        self.common_body_fields = ["name", "description", "title", "content", "value", "status", "is_active"]
        self.common_data_types = {
            int: 1,  # 示例值
            str: "string_value",
            bool: True,
            list: [],
            dict: {}
        }

    def _infer_type_from_value(self, value: Any) -> str:
        """根据Python值的类型推断其JSON类型"""
        if isinstance(value, int): return "integer"
        if isinstance(value, float): return "number"
        if isinstance(value, str): return "string"
        if isinstance(value, bool): return "boolean"
        if isinstance(value, list): return "array"
        if isinstance(value, dict): return "object"
        return "unknown"

    def _build_schema_from_json(self, json_data: Any) -> Dict[str, Any]:
        """递归地从JSON数据构建一个简单的Schema"""
        if isinstance(json_data, dict):
            properties = {}
            for key, value in json_data.items():
                properties[key] = {"type": self._infer_type_from_value(value)}
                if isinstance(value, (dict, list)):
                    properties[key].update(self._build_schema_from_json(value))
            return {"type": "object", "properties": properties}
        elif isinstance(json_data, list) and json_data:
            # 假设列表中的所有元素都具有相同的Schema
            items_schema = self._build_schema_from_json(json_data[0])
            return {"type": "array", "items": items_schema}
        elif isinstance(json_data, list) and not json_data: # 空列表
            return {"type": "array", "items": {}} # 无法推断具体类型,留空
        else:
            return {"type": self._infer_type_from_value(json_data)}

    def infer_parameters_from_path(self, path: str) -> Dict[str, Any]:
        """从通用路径字符串中提取路径参数"""
        params = {}
        path_segments = path.split('/')
        for segment in path_segments:
            if segment.startswith('{') and segment.endswith('}'):
                param_name = segment[1:-1]
                # 默认路径参数为字符串类型,并提供一个示例值
                params[param_name] = {"in": "path", "type": "string", "example": "1"} # 默认给个1
        return params

    def infer_from_get_response(self, path: str, method: str, response_json: Any):
        """
        从GET请求的成功响应中推断潜在的POST/PUT请求体结构。
        如果GET /users 返回 [{id: 1, name: "Alice"}], 那么POST /users 可能需要 {name: "Bob"}
        """
        current_endpoint_info = self.knowledge_base.get_endpoint_info(path, method)
        if not current_endpoint_info:
            return

        # 更新响应Schema
        response_schema = self._build_schema_from_json(response_json)
        self.knowledge_base.add_endpoint(path, method, current_endpoint_info.get("inferred_params", {}), response_schema, "response_schema_update")

        # 如果是获取列表的GET请求,且响应是列表,尝试推断创建/更新的POST/PUT请求体
        if method == "GET" and isinstance(response_json, list) and response_json:
            sample_item = response_json[0]
            if isinstance(sample_item, dict):
                # 移除可能的ID字段,作为POST/PUT的请求体
                post_body_schema = self._build_schema_from_json({k: v for k, v in sample_item.items() if k not in ["id", "_id", "uuid", "created_at", "updated_at"]})

                # 查找对应的POST/PUT方法
                for m in ["POST", "PUT"]:
                    if m in self.knowledge_base.endpoints[path]["methods"]:
                         # 如果已经有推断的body schema,进行合并或更新
                        existing_body_schema = self.knowledge_base.endpoints[path]["methods"][m].get("request_body_schema", {})
                        # 这里可以实现更复杂的schema合并逻辑,目前简单覆盖
                        self.knowledge_base.endpoints[path]["methods"][m]["request_body_schema"] = post_body_schema
                        print(f"从GET响应推断 {path} {m} 的请求体Schema。")

                        # 为POST/PUT方法生成一个待探索任务
                        self.knowledge_base.add_to_exploration_queue(
                            url=self.knowledge_base.api_root_url + path, 
                            method=m, 
                            params={}, 
                            body=self._generate_sample_body(post_body_schema)
                        )

    def _generate_sample_body(self, schema: Dict[str, Any]) -> Dict[str, Any]:
        """根据Schema生成一个示例请求体"""
        if schema.get("type") == "object" and "properties" in schema:
            sample_body = {}
            for prop, prop_schema in schema["properties"].items():
                prop_type = prop_schema.get("type")
                if prop_type == "string":
                    sample_body[prop] = f"sample_{prop}"
                elif prop_type == "integer":
                    sample_body[prop] = 123
                elif prop_type == "boolean":
                    sample_body[prop] = True
                elif prop_type == "array":
                    sample_body[prop] = [] # 简单起见,不生成复杂数组项
                elif prop_type == "object":
                    sample_body[prop] = {} # 简单起见,不生成复杂嵌套对象
            return sample_body
        return {}

    def get_inferred_request_blueprint(self, path: str, method: str) -> Dict[str, Any]:
        """
        获取给定路径和方法的最优调用蓝图。
        这包括路径参数、查询参数、请求体和认证头部。
        """
        blueprint = {
            "url": self.knowledge_base.api_root_url + path,
            "method": method,
            "params": {},
            "body": None,
            "headers": {}
        }

        endpoint_info = self.knowledge_base.get_endpoint_info(path, method)
        if endpoint_info:
            # 1. 路径参数
            path_params = self.infer_parameters_from_path(path) # 从通用路径中提取
            # 替换路径中的占位符
            for param_name, param_info in path_params.items():
                # 尝试从成功的调用历史中获取真实值,否则用示例值
                example_value = param_info.get("example")
                if "successful_calls" in endpoint_info and endpoint_info["successful_calls"]:
                    # 寻找一个成功的GET请求,其URL中包含当前路径参数的值
                    for call in endpoint_info["successful_calls"]:
                        # 这是一个简化的逻辑,更复杂的情况需要URL匹配和参数提取
                        if method == "GET": # GET请求通常返回ID,可用于后续PUT/DELETE
                            response_data = call["response_data"]
                            if isinstance(response_data, list) and response_data:
                                if "id" in response_data[0]:
                                    example_value = response_data[0]["id"]
                                    break
                            elif isinstance(response_data, dict) and "id" in response_data:
                                example_value = response_data["id"]
                                break

                if example_value is not None:
                    # 替换路径中的 {param_name}
                    blueprint["url"] = blueprint["url"].replace(f"{{{param_name}}}", str(example_value))

            # 2. 查询参数(从知识库或通用参数猜测)
            inferred_params = endpoint_info.get("inferred_params", {})
            for param_name, param_details in inferred_params.items():
                if param_details.get("in") == "query":
                    # 尝试从成功调用的历史中获取有效值,否则用示例
                    blueprint["params"][param_name] = param_details.get("example", "test_value")

            # 3. 请求体 (仅适用于POST, PUT, PATCH)
            if method in ["POST", "PUT", "PATCH"]:
                request_body_schema = endpoint_info.get("request_body_schema")
                if request_body_schema:
                    blueprint["body"] = self._generate_sample_body(request_body_schema)
                else:
                    # 如果没有具体的请求体Schema,但方法是POST/PUT,尝试发送一个简单的通用体
                    blueprint["body"] = {"field1": "value1", "field2": 123} # 兜底策略

        # 4. 通用查询参数猜测(如果知识库中没有该端点的特定查询参数)
        if not blueprint["params"] and method == "GET":
            for common_param in self.common_query_params:
                blueprint["params"][common_param] = "1" # 默认值

        return blueprint

3.6 学习与反馈者 (Learner & Feedback)

这个模块是Agent智能的核心。它分析API调用的结果,从中学习,并更新知识库。

from typing import Dict, Any, List, Tuple
import json

class LearnerAndFeedbackProcessor:
    """
    解析HTTP响应,评估调用结果,从成功和失败中提取信息,更新知识库,优化后续策略。
    """
    def __init__(self, knowledge_base: AgentKnowledgeBase, explorer: Explorer, inferer: Inferer, security_manager: SecurityAndResourceManager):
        self.knowledge_base = knowledge_base
        self.explorer = explorer
        self.inferer = inferer
        self.security_manager = security_manager

    def process_feedback(self, url: str, method: str, request_params: Dict[str, Any], 
                         request_body: Optional[Dict[str, Any]], 
                         status_code: int, headers: Dict[str, str], response_data: Any, 
                         error_message: Optional[str]):
        """
        处理API调用的反馈,更新知识库。
        """
        relative_path = self.explorer.analyze_path_for_parameters(url.replace(self.knowledge_base.api_root_url, '', 1))
        if not relative_path.startswith('/'): relative_path = '/' + relative_path

        print(f"处理反馈:{method} {url} -> 状态码: {status_code}")

        # 1. 认证策略推断
        self.security_manager.infer_auth_strategy(headers, response_data if isinstance(response_data, dict) else {}, status_code)

        # 2. 状态码分析与知识库更新
        if 200 <= status_code < 300: # 成功响应
            print(f"成功调用: {method} {url}")
            # 更新知识库中的端点信息
            self.knowledge_base.add_endpoint(url, method, request_params, self.inferer._build_schema_from_json(response_data), "successful_call")
            self.knowledge_base.record_successful_call(url, method, request_params, response_data)

            # 从成功的JSON响应中发现新的URL
            if isinstance(response_data, (dict, list)):
                self.explorer.discover_from_response(url, response_data)

            # 如果是GET请求,尝试推断POST/PUT的请求体
            self.inferer.infer_from_get_response(relative_path, method, response_data)

        elif status_code == 404: # 未找到
            print(f"路径 {url} 不存在 (404)。将从知识库中移除此无效路径(如果存在)。")
            # 实际上不直接移除,而是标记为无效,避免重复尝试
            # 可以在知识库中增加一个字段来标记无效路径

        elif status_code == 405: # 方法不允许
            print(f"路径 {url} 不支持方法 {method} (405)。")
            # 知识库应记录此方法不被支持
            endpoint_info = self.knowledge_base.get_endpoint_info(url, method)
            if endpoint_info:
                endpoint_info["supported"] = False # 标记为不支持

        elif status_code == 400: # 错误的请求
            print(f"请求 {url} {method} 参数错误 (400)。响应: {response_data}")
            # 从错误响应中尝试学习正确的参数格式
            if isinstance(response_data, dict):
                error_message = json.dumps(response_data).lower()
                # 简单的启发式:如果错误信息提到“missing parameter X”,则尝试添加X
                for common_param in self.inferer.common_query_params + self.inferer.common_body_fields:
                    if f"missing parameter {common_param}" in error_message or f"field {common_param} is required" in error_message:
                        print(f"从错误信息推断需要参数: {common_param}")
                        # 需要更新推理者的逻辑,使其能将此参数添加到蓝图
                        # 暂时先手动添加到知识库中(模拟推理者的更新)
                        self.knowledge_base.endpoints[relative_path]["methods"][method].setdefault("inferred_params", {})[common_param] = {"in": "query_or_body", "type": "string", "example": "required_value"}
                        # 重新将任务添加回队列,带上新参数
                        self.knowledge_base.add_to_exploration_queue(
                            url, method, 
                            {**request_params, common_param: "test_value"}, 
                            {**request_body if request_body else {}, common_param: "test_value"} if method in ["POST", "PUT", "PATCH"] else None
                        )

        elif status_code >= 500: # 服务器错误
            print(f"服务器错误: {method} {url} -> {status_code}。响应: {response_data}")
            # 记录错误,但此时Agent能做的有限,可能需要人工介入

        elif status_code >= 300 and status_code < 400: # 重定向
            location_header = headers.get("Location")
            if location_header:
                redirect_url = urljoin(url, location_header)
                if redirect_url.startswith(self.knowledge_base.api_root_url):
                    print(f"发现重定向到: {redirect_url}")
                    self.knowledge_base.add_to_exploration_queue(redirect_url, "GET", {}) # 默认GET探索重定向目标

        # 3. 更新知识库持久化
        self.knowledge_base.save_to_disk()

3.7 Agent Core (Orchestrator)

这是Agent的控制中心,它将所有模块连接起来,执行探索-推理-执行-学习的循环。

import time
from urllib.parse import urljoin

class AutonomousAgent:
    """
    自主Agent的核心编排器。
    """
    def __init__(self, api_root_url: str, persistence_file: str = "knowledge_base.json"):
        self.knowledge_base = AgentKnowledgeBase(api_root_url, persistence_file)
        self.security_manager = SecurityAndResourceManager(self.knowledge_base)
        self.executor = APIExecutor(self.security_manager)
        self.explorer = Explorer(self.knowledge_base)
        self.inferer = Inferer(self.knowledge_base)
        self.learner = LearnerAndFeedbackProcessor(self.knowledge_base, self.explorer, self.inferer, self.security_manager)

        # 将根路径加入探索队列,以便首次GET和OPTIONS
        initial_root_url = self.knowledge_base.api_root_url
        if not any(t["url"] == initial_root_url for t in self.knowledge_base.exploration_queue):
             self.knowledge_base.add_to_exploration_queue(initial_root_url, "GET", {})
             self.knowledge_base.add_to_exploration_queue(initial_root_url, "OPTIONS", {})

    def run(self, max_iterations: int = 100):
        """
        运行Agent的主循环。
        """
        print(f"自主Agent启动,探索API根路径: {self.knowledge_base.api_root_url}")

        iteration_count = 0
        while iteration_count < max_iterations:
            task = self.knowledge_base.pop_exploration_task()
            if not task:
                print("探索队列为空,Agent进入休眠或完成任务。")
                break

            url = task["url"]
            method = task["method"]
            params = task.get("params", {})
            body = task.get("body")
            headers = task.get("headers", {})

            # 规范化URL,确保只处理API根路径下的URL
            if not url.startswith(self.knowledge_base.api_root_url):
                print(f"跳过非目标API域的URL: {url}")
                continue

            # 使用推理者获取完整的请求蓝图,特别是对于已发现的端点
            # 注意:这里的逻辑需要优化,确保不会重复探索已完成的任务
            # 目前,如果任务来自队列,就直接执行。蓝图主要用于生成新的、更智能的请求。
            # 为了简化,我们暂时让inferer直接返回给定task的蓝图
            request_blueprint = self.inferer.get_inferred_request_blueprint(
                self.explorer.analyze_path_for_parameters(url.replace(self.knowledge_base.api_root_url, '', 1)),
                method
            )

            # 合并task中自带的参数和蓝图中推断的参数
            final_params = {**request_blueprint["params"], **params}
            final_body = request_blueprint["body"] if request_blueprint["body"] else body
            final_headers = {**request_blueprint["headers"], **headers}

            # 执行请求
            status_code, response_headers, response_data, error_message = 
                self.executor.execute_request(url, method, final_params, final_body, final_headers)

            # 处理反馈并学习
            self.learner.process_feedback(url, method, final_params, final_body, status_code, response_headers, response_data, error_message)

            # 额外的探索步骤:对当前URL尝试OPTIONS方法以发现更多支持方法
            if method != "OPTIONS": # 避免无限循环
                self.explorer.discover_methods_for_path(url, self.executor)

            # 额外的探索步骤:生成路径猜测
            self.explorer.generate_path_guesses(url)

            iteration_count += 1
            print(f"n--- 迭代 {iteration_count} 完成 ---")
            time.sleep(1) # 礼貌性间隔

        print("n自主Agent运行结束。")
        self.knowledge_base.save_to_disk()
        print("最终知识库内容:")
        print(json.dumps(self.knowledge_base.get_all_endpoints(), indent=4, ensure_ascii=False))

# 示例运行
if __name__ == "__main__":
    # 假设一个测试API,你可以用一个本地的Flask/Node.js应用模拟
    # 或者用一些公开的测试API,例如 JSONPlaceholder (https://jsonplaceholder.typicode.com)
    # 但请注意,JSONPlaceholder不支持POST/PUT/DELETE的实际数据存储,仅模拟响应

    # 真实场景下,你需要提供一个可写且安全的API根路径
    # 例如:agent = AutonomousAgent("http://localhost:8000/api")

    # 为了演示,我们使用JSONPlaceholder并只做GET请求的探索
    # 请注意,由于JSONPlaceholder不支持实际的HATEOAS链接发现和动态创建资源,
    # 探索和推理的很多高级功能将不会被完全触发。
    # 这是一个演示架构,实际部署需要更真实的API环境。

    # 假设API根路径
    api_root = "https://jsonplaceholder.typicode.com" 
    # 可以设置环境变量,例如 export API_KEY_X_API_KEY="your_api_key_here"

    agent = AutonomousAgent(api_root, persistence_file="jsonplaceholder_knowledge.json")
    agent.run(max_iterations=20)

四、 进阶考虑与未来方向

上述架构提供了一个坚实的基础,但要使其在现实世界中更加强大和鲁棒,我们还需要考虑以下进阶特性:

  1. AI/ML 增强:

    • LLM (大型语言模型) 集成: 利用LLM理解API文档(如果存在)、错误消息、API路径和参数的语义。例如,LLM可以根据/users/{id}/posts推断出{id}是用户ID,并生成一个合理的示例值。它还可以帮助生成更智能的请求体。
    • 强化学习: 将API探索视为一个强化学习问题,Agent通过尝试不同的请求、观察响应来学习最佳的探索策略。
    • 模式识别: 利用机器学习识别API的常见模式(如分页、过滤、排序参数),以及认证模式。
  2. 数据持久化与可恢复性: 将知识库存储在更健壮的数据库(如SQLite、PostgreSQL)中,支持断点续传和大规模数据管理。

  3. 并发与分布式探索: 为了提高探索效率,Agent可以并行执行多个请求,甚至可以部署为分布式系统。

  4. 沙盒环境与安全性: 对于未知API,尤其是有写操作(POST/PUT/DELETE)的,应在一个隔离的沙盒环境中运行Agent,避免对生产系统造成意外破坏。Agent应具备识别和避免高风险操作的能力。

  5. 人机协作 (Human-in-the-Loop): 在Agent无法自动解决某些复杂问题(如认证凭证缺失、复杂业务逻辑)时,能够暂停并寻求人类专家的帮助,然后根据人类的输入继续运行。

  6. 高级数据类型推理: 识别日期、时间、枚举值、地理坐标等特殊数据类型,并生成符合这些类型的数据。

  7. API依赖链构建: 自动识别API之间的依赖关系。例如,要调用/orders/{order_id},首先需要通过/orders的POST方法创建一个订单并获取其order_id

  8. OpenAPI/Swagger 生成: 最终目标之一可以是根据探索到的信息,自动生成或完善API的OpenAPI规范。


五、 展望

我们今天探讨的自主 Agent 架构,从一个API根路径出发,通过一系列智能化的探索、推理、执行和学习循环,能够逐步构建出对“从未见过”的REST API的理解。这不仅仅是一个技术上的挑战,更是一扇通向未来自动化和智能软件开发的大门。

想象一下,一个能够自主学习和适应的Agent,可以极大地加速新服务的集成、提高API文档的质量、甚至帮助发现潜在的安全漏洞。它代表了软件工程从被动适应到主动探索的范式转变。虽然前路漫漫,充满了技术难题与伦理考量,但其潜力无疑是巨大的。这趟深度挑战之旅,才刚刚开始。

发表回复

您的邮箱地址不会被公开。 必填项已用 * 标注