深度挑战:设计一个能够自动发现并调用从未见过的 API(仅给其提供 Swagger 文档)的自主 Agent

各位同仁,下午好。今天,我们将共同深入探讨一个极具挑战性且充满前景的领域:设计一个能够自动发现并调用从未见过的 API 的自主 Agent。我们仅为其提供一个 Swagger 或 OpenAPI 文档。这不仅仅是一个理论构想,更是构建真正智能、自适应系统,应对爆炸式增长的 API 生态的关键一步。

想象一下这样的场景:一个企业需要集成数百个外部服务,每个服务都有自己的 API。传统方法是手动阅读文档、编写代码、测试和维护。这个过程耗时、易错且成本高昂。如果我们的 Agent 能够像一个经验丰富的开发者一样,阅读 API 文档,理解其功能,并自动生成调用代码,那将是生产力的一次飞跃。

本次讲座,我将以一名编程专家的视角,为大家剖析实现这样一个 Agent 所需的核心技术、设计思路和面临的挑战。我们将深入代码层面,探讨如何将抽象的文档转化为可执行的操作。

1. 问题的核心与挑战

构建一个能够处理“从未见过”的 API 的 Agent,其核心挑战在于泛化能力。我们不能针对特定 API 硬编码逻辑,而是需要一套能够理解并适应任何符合 OpenAPI 规范的 API 的通用框架。

主要的挑战包括:

  1. 文档理解与解析: 如何将结构化的 YAML/JSON 文档转化为 Agent 内部可操作的数据模型?
  2. 语义理解与意图映射: Agent 如何根据高层目标(例如“创建一个用户”,“查询产品库存”)从众多 API 操作中选择最相关的一个或一系列操作?
  3. 参数生成与校验: 针对一个 API 操作,如何自动生成符合其复杂结构、类型和约束的有效请求参数?这是最复杂的部分之一。
  4. 状态管理与链式调用: 许多业务流程需要多个 API 调用协同完成,前一个调用的输出可能是后一个调用的输入。Agent 如何维护这种上下文状态?
  5. 错误处理与恢复: API 调用可能失败,Agent 需要具备健壮的错误检测、日志记录和适当的恢复策略。
  6. 安全性与认证: 如何安全地处理各种认证机制(API Key, OAuth2 等)?

为了应对这些挑战,我们的 Agent 需要模块化的设计,每个模块专注于解决特定的问题。

2. 自主 Agent 的总体架构

一个能够自动发现并调用 API 的自主 Agent,其内部可以被构想成一系列协作模块。这些模块共同完成从高层目标到具体 API 调用的整个流程。

模块名称 主要功能 关键技术/考虑
API 文档解析器 解析 Swagger/OpenAPI 文档,将其转换为 Agent 内部统一的数据结构。 pyyaml, json, openapi-spec-validator, Pydantic 模型
语义理解与规划器 将用户或系统的高层目标映射到可用的 API 操作,并规划一系列操作的执行顺序。 LLMs (Large Language Models), 向量数据库 ( embeddings ), 规则引擎, 状态机
参数生成器 根据 API 操作的参数定义(类型、约束、示例等)和当前上下文,生成有效的请求参数。 LLMs, 启发式规则, 数据类型校验, 动态数据源 (如从前一步骤获取)
API 调用执行器 构造 HTTP 请求(URL、方法、头部、正文),发送请求,并接收响应。 requests (Python), HTTP 客户端库, 认证机制管理
响应处理器与状态管理器 解析 API 响应,提取关键信息,更新 Agent 的内部状态,并将结果反馈给规划器。 JSON/XML 解析, 响应 schema 校验, 内部知识图谱/上下文存储
错误与重试机制 识别 API 调用过程中出现的错误,并根据错误类型执行重试、回退或报告。 指数退避重试策略, 错误分类 (网络、客户端、服务端), 熔断机制
认证管理 存储和管理不同 API 的认证凭证,并在调用时自动应用。 凭证存储 (环境变量、密钥管理服务), OAuth2 流程管理

现在,让我们逐一深入探讨这些模块。

3. API 文档解析器:理解 API 的“语言”

Swagger(现在更广泛地称为 OpenAPI Specification, OAS)是 API 描述的事实标准。它提供了一种语言无关的、机器可读的接口描述格式。我们的 Agent 的第一步就是能够“阅读”并理解这份文档。

解析器的任务是将 YAML 或 JSON 格式的 OpenAPI 文档加载到内存中,并将其转换为 Agent 内部易于操作的对象模型。这个模型应该清晰地表示出 API 的所有关键信息:路径、操作(GET, POST 等)、参数(名称、位置、类型、是否必需、schema)、请求体、响应以及安全定义。

核心步骤:

  1. 加载文档: 从文件路径或 URL 加载 YAML/JSON 内容。
  2. 验证文档: 使用 openapi-spec-validator 等工具验证文档是否符合 OAS 规范。
  3. 数据模型转换: 将解析后的字典/JSON 结构映射到 Pydantic 或自定义的 Python 对象上,以便于类型提示和属性访问。

代码示例:基础解析器

import yaml
import json
from typing import Dict, Any, List, Optional
from pydantic import BaseModel, Field

# 定义 Agent 内部表示 API 结构的数据模型
class ParameterSchema(BaseModel):
    type: Optional[str] = None
    format: Optional[str] = None
    enum: Optional[List[str]] = None
    items: Optional[Dict[str, Any]] = None # For array types
    properties: Optional[Dict[str, Any]] = None # For object types
    required: Optional[List[str]] = None # For object properties

class APIParameter(BaseModel):
    name: str
    in_: str = Field(alias='in') # path, query, header, cookie, body
    description: Optional[str] = None
    required: bool = False
    schema_: ParameterSchema = Field(alias='schema', default_factory=ParameterSchema) # Use default_factory for mutable defaults

class APIRequestBodyContent(BaseModel):
    schema_: ParameterSchema = Field(alias='schema', default_factory=ParameterSchema)

class APIRequestBody(BaseModel):
    description: Optional[str] = None
    required: bool = False
    content: Dict[str, APIRequestBodyContent] # e.g., {'application/json': APIRequestBodyContent}

class APIResponse(BaseModel):
    description: str
    content: Optional[Dict[str, APIRequestBodyContent]] = None # Similar to request body content

class APIOperation(BaseModel):
    operationId: Optional[str] = None
    summary: Optional[str] = None
    description: Optional[str] = None
    parameters: Optional[List[APIParameter]] = None
    requestBody: Optional[APIRequestBody] = None
    responses: Dict[str, APIResponse] # e.g., {'200': APIResponse}
    tags: Optional[List[str]] = None
    security: Optional[List[Dict[str, List[str]]]] = None

class APIPath(BaseModel):
    get: Optional[APIOperation] = None
    post: Optional[APIOperation] = None
    put: Optional[APIOperation] = None
    delete: Optional[APIOperation] = None
    patch: Optional[APIOperation] = None

class APISchema(BaseModel):
    openapi: str
    info: Dict[str, Any]
    paths: Dict[str, APIPath]
    components: Optional[Dict[str, Any]] = None # For reusable schemas, parameters, etc.
    security: Optional[List[Dict[str, List[str]]]] = None # Global security schemes

class SwaggerParser:
    def __init__(self, spec_path: str):
        self.spec_path = spec_path
        self.api_schema: Optional[APISchema] = None

    def load_and_parse(self) -> APISchema:
        with open(self.spec_path, 'r', encoding='utf-8') as f:
            if self.spec_path.endswith('.yaml') or self.spec_path.endswith('.yml'):
                spec_data = yaml.safe_load(f)
            elif self.spec_path.endswith('.json'):
                spec_data = json.load(f)
            else:
                raise ValueError("Unsupported file format. Must be .yaml, .yml, or .json")

        # Resolve $ref references - a crucial step for real-world OAS
        # This is a simplified placeholder. Real resolution needs recursion and cycle detection.
        resolved_data = self._resolve_refs(spec_data)

        self.api_schema = APISchema.model_validate(resolved_data)
        print(f"Successfully parsed OpenAPI spec from {self.spec_path}")
        return self.api_schema

    def _resolve_refs(self, data: Dict[str, Any], root_data: Optional[Dict[str, Any]] = None) -> Dict[str, Any]:
        """
        A simplified recursive function to resolve $ref references.
        In a real system, this would be more robust, handling external refs,
        and caching resolutions.
        """
        if root_data is None:
            root_data = data

        if isinstance(data, dict):
            if '$ref' in data:
                ref_path = data['$ref']
                if ref_path.startswith('#/'):
                    parts = ref_path[2:].split('/')
                    resolved = root_data
                    for part in parts:
                        resolved = resolved.get(part)
                        if resolved is None:
                            raise ValueError(f"Could not resolve reference: {ref_path}")
                    # Recursively resolve any refs within the resolved component itself
                    return self._resolve_refs(resolved, root_data)
                else:
                    # External references are not handled in this basic example
                    print(f"Warning: External reference '{ref_path}' not resolved.")
                    return data
            else:
                return {k: self._resolve_refs(v, root_data) for k, v in data.items()}
        elif isinstance(data, list):
            return [self._resolve_refs(item, root_data) for item in data]
        else:
            return data

    def get_operations(self) -> List[Dict[str, Any]]:
        if not self.api_schema:
            raise ValueError("API schema not loaded. Call load_and_parse() first.")

        operations = []
        for path, path_obj in self.api_schema.paths.items():
            for method in ['get', 'post', 'put', 'delete', 'patch']:
                op = getattr(path_obj, method, None)
                if op:
                    operations.append({
                        "path": path,
                        "method": method.upper(),
                        "operation_id": op.operationId,
                        "summary": op.summary,
                        "description": op.description,
                        "parameters": op.parameters,
                        "request_body": op.requestBody,
                        "responses": op.responses,
                        "tags": op.tags,
                        "security": op.security
                    })
        return operations

# 示例用法(需要一个实际的 swagger.yaml/json 文件)
# 假设我们有一个简单的 petstore.yaml 文件
"""
# petstore.yaml
openapi: 3.0.0
info:
  title: Pet Store API
  version: 1.0.0
paths:
  /pets:
    get:
      summary: List all pets
      operationId: listPets
      parameters:
        - name: limit
          in: query
          description: How many pets to return at one time (max 100)
          required: false
          schema:
            type: integer
            format: int32
      responses:
        '200':
          description: A paged array of pets
          content:
            application/json:
              schema:
                type: array
                items:
                  $ref: '#/components/schemas/Pet'
    post:
      summary: Create a pet
      operationId: createPet
      requestBody:
        description: Pet to create
        required: true
        content:
          application/json:
            schema:
              $ref: '#/components/schemas/NewPet'
      responses:
        '201':
          description: Created
        '400':
          description: Bad request
  /pets/{petId}:
    get:
      summary: Info for a specific pet
      operationId: showPetById
      parameters:
        - name: petId
          in: path
          required: true
          description: The id of the pet to retrieve
          schema:
            type: string
      responses:
        '200':
          description: Expected response to a valid request
          content:
            application/json:
              schema:
                $ref: '#/components/schemas/Pet'
components:
  schemas:
    Pet:
      type: object
      required:
        - id
        - name
      properties:
        id:
          type: integer
          format: int64
        name:
          type: string
        tag:
          type: string
    NewPet:
      type: object
      required:
        - name
      properties:
        name:
          type: string
        tag:
          type: string
"""
# 创建一个 petstore.yaml 文件用于测试
with open("petstore.yaml", "w") as f:
    f.write("""
openapi: 3.0.0
info:
  title: Pet Store API
  version: 1.0.0
paths:
  /pets:
    get:
      summary: List all pets
      operationId: listPets
      parameters:
        - name: limit
          in: query
          description: How many pets to return at one time (max 100)
          required: false
          schema:
            type: integer
            format: int32
      responses:
        '200':
          description: A paged array of pets
          content:
            application/json:
              schema:
                type: array
                items:
                  $ref: '#/components/schemas/Pet'
    post:
      summary: Create a pet
      operationId: createPet
      requestBody:
        description: Pet to create
        required: true
        content:
          application/json:
            schema:
              $ref: '#/components/schemas/NewPet'
      responses:
        '201':
          description: Created
        '400':
          description: Bad request
  /pets/{petId}:
    get:
      summary: Info for a specific pet
      operationId: showPetById
      parameters:
        - name: petId
          in: path
          required: true
          description: The id of the pet to retrieve
          schema:
            type: string
      responses:
        '200':
          description: Expected response to a valid request
          content:
            application/json:
              schema:
                $ref: '#/components/schemas/Pet'
components:
  schemas:
    Pet:
      type: object
      required:
        - id
        - name
      properties:
        id:
          type: integer
          format: int64
        name:
          type: string
        tag:
          type: string
    NewPet:
      type: object
      required:
        - name
      properties:
        name:
          type: string
        tag:
          type: string
""")

# Test the parser
parser = SwaggerParser("petstore.yaml")
api_spec = parser.load_and_parse()

print("n--- Extracted Operations ---")
for op in parser.get_operations():
    print(f"Path: {op['path']}, Method: {op['method']}, Operation ID: {op['operation_id']}")
    if op['parameters']:
        print(f"  Parameters:")
        for param in op['parameters']:
            print(f"    - Name: {param.name}, In: {param.in_}, Type: {param.schema_.type}, Required: {param.required}")
    if op['request_body']:
        print(f"  Request Body Content Types: {list(op['request_body'].content.keys())}")
        if 'application/json' in op['request_body'].content and op['request_body'].content['application/json'].schema_.properties:
            print(f"    Body Properties: {list(op['request_body'].content['application/json'].schema_.properties.keys())}")

解析器说明:

  • 我们定义了一系列 Pydantic 模型来精确映射 OpenAPI 规范中的结构,例如 APIOperation, APIParameter, APIRequestBody 等。这提供了类型安全和数据验证。
  • SwaggerParser 类负责加载 YAML/JSON 文件,并使用 _resolve_refs 方法(一个简化版)来处理 $ref 引用。在真实的场景中,$ref 解析是至关重要的,因为它允许 API 文档复用定义,例如在 components/schemas 中定义的模型。
  • get_operations 方法遍历所有路径和 HTTP 方法,提取出所有的 API 操作及其详细信息,为后续的规划和调用做准备。

4. 语义理解与规划器:Agent 的“大脑”

这是 Agent 最具智能的模块。它负责将一个高层次的用户请求(例如“我要订购一个 ID 为 123 的产品,数量是 5”)转化为一系列具体的 API 调用。这通常涉及:

  1. 意图识别: 理解用户想要做什么。
  2. API 匹配: 找到能够实现该意图的一个或多个 API 操作。
  3. 操作序列规划: 如果需要多个 API 调用,确定它们的正确顺序和数据流。

技术选择:

  • 关键词匹配/规则引擎: 对于简单、明确的 API,可以通过匹配操作 ID、摘要、描述中的关键词来选择。例如,“list pets”可能匹配 listPets 操作。
  • 基于嵌入的相似性搜索: 将用户意图和每个 API 操作的描述(summary, description, operationId, parameter names)都转换为向量嵌入,然后计算相似度来找到最匹配的 API。这比关键词匹配更鲁棒。
  • 大型语言模型 (LLMs): 这是目前最强大的方法。LLMs 能够理解复杂的自然语言意图,并根据其对 API 文档的理解(通过 Few-shot prompting 或 Fine-tuning)来选择和规划 API 调用。它们甚至可以生成调用所需的参数结构。

我们将重点关注 LLM-driven 的方法,因为它提供了最高的泛化能力和智能程度。

规划器的工作流程:

  1. 接收目标: Agent 接收到一个自然语言形式的目标。
  2. API 目录查询: Agent 访问其内部的 API 目录(由解析器提供)。
  3. LLM 决策: 将用户目标和 API 目录(或其摘要)作为上下文提供给 LLM。LLM 的任务是:
    • 识别最相关的 API 操作。
    • 确定这些操作的执行顺序。
    • 指出每个操作所需的参数及其来源(用户输入、前一个 API 调用的结果、默认值等)。
  4. 生成执行计划: LLM 返回一个结构化的计划,例如 JSON 格式,描述要执行的 API 调用序列。

代码示例:基于 LLM 的简化规划器

import os
from openai import OpenAI # Or other LLM providers like Anthropic, Google GenAI
import json
from typing import Dict, Any, List

# Ensure you have your OpenAI API key set as an environment variable
# os.environ["OPENAI_API_KEY"] = "YOUR_OPENAI_API_KEY"

class AgentPlanner:
    def __init__(self, api_catalog: List[Dict[str, Any]]):
        self.api_catalog = api_catalog
        self.client = OpenAI() # Initialize OpenAI client

        # Pre-process API catalog for LLM input
        self.api_summaries = self._generate_api_summaries()

    def _generate_api_summaries(self) -> str:
        """
        Generates a concise summary of available API operations for the LLM.
        In a real scenario, this might involve more sophisticated summarization
        or embedding lookup for large catalogs.
        """
        summaries = []
        for op in self.api_catalog:
            summary_parts = [f"Operation ID: {op.get('operation_id', 'N/A')}",
                             f"Path: {op['path']}",
                             f"Method: {op['method']}",
                             f"Summary: {op.get('summary', 'N/A')}"]
            if op.get('parameters'):
                param_names = [p.name for p in op['parameters']]
                summary_parts.append(f"Parameters: {', '.join(param_names)}")
            if op.get('request_body'):
                summary_parts.append(f"Requires a request body.")
            summaries.append(" - ".join(summary_parts))
        return "n".join(summaries)

    def plan_api_calls(self, user_goal: str, current_context: Dict[str, Any] = None) -> List[Dict[str, Any]]:
        """
        Uses an LLM to plan a sequence of API calls to achieve the user's goal.
        """
        context_str = json.dumps(current_context, indent=2) if current_context else "No specific context."

        prompt = f"""
        You are an autonomous agent designed to interact with various APIs based on a given OpenAPI specification.
        Your goal is to fulfill user requests by planning a sequence of API calls.

        Here is a list of available API operations:
        {self.api_summaries}

        Current context / available data:
        {context_str}

        User's goal: "{user_goal}"

        Based on the user's goal and available APIs, generate a JSON array of API calls.
        Each item in the array should be an object with the following structure:
        {{
            "operation_id": "string", // The operation ID of the API to call
            "parameters": {{ // A dictionary of parameters for the API call
                "param_name_1": "value_or_source_1",
                "param_name_2": "value_or_source_2",
                // ...
            }},
            "description": "A brief explanation of why this step is needed."
        }}

        For parameter values, if they are directly available from the user's goal or current context, use them.
        If a parameter is required but not directly available, indicate its type and let the parameter generator handle it.
        If a parameter's value should come from a previous API call's response, specify it like: "$response.operation_id.field_name".
        If a parameter needs to be generated (e.g., a new resource ID), specify "$generate.type" (e.g., "$generate.uuid").

        Example plan for "Create a pet named 'Buddy' with tag 'dog'":
        [
            {{
                "operation_id": "createPet",
                "parameters": {{
                    "name": "Buddy",
                    "tag": "dog"
                }},
                "description": "Calling createPet to add a new pet."
            }}
        ]

        Example plan for "List pets and then show details for pet with ID 123":
        [
            {{
                "operation_id": "listPets",
                "parameters": {{}},
                "description": "First, list all pets to see available options."
            }},
            {{
                "operation_id": "showPetById",
                "parameters": {{
                    "petId": "123" 
                }},
                "description": "Then, retrieve details for pet with ID 123."
            }}
        ]

        Your JSON plan:
        """

        try:
            response = self.client.chat.completions.create(
                model="gpt-4o", # Or other suitable model
                messages=[
                    {"role": "system", "content": "You are a helpful assistant that plans API calls."},
                    {"role": "user", "content": prompt}
                ],
                response_format={"type": "json_object"},
                temperature=0.0 # Make it deterministic for planning
            )

            plan_json_str = response.choices[0].message.content
            plan = json.loads(plan_json_str)
            if not isinstance(plan, list):
                # Sometimes LLMs might wrap the list in an outer object
                if isinstance(plan, dict) and 'plan' in plan and isinstance(plan['plan'], list):
                    plan = plan['plan']
                else:
                    raise ValueError(f"LLM did not return a list as expected: {plan_json_str}")
            return plan
        except Exception as e:
            print(f"Error during planning: {e}")
            print(f"LLM raw output: {response.choices[0].message.content if 'response' in locals() else 'N/A'}")
            return []

# Test the planner
# Re-use the parser from before to get the catalog
parser = SwaggerParser("petstore.yaml")
api_spec = parser.load_and_parse()
api_catalog_for_planner = parser.get_operations()

planner = AgentPlanner(api_catalog_for_planner)

user_goal_1 = "Create a new cat named 'Whiskers' with tag 'feline'."
plan_1 = planner.plan_api_calls(user_goal_1)
print(f"nPlan for '{user_goal_1}':n{json.dumps(plan_1, indent=2)}")

user_goal_2 = "List all pets."
plan_2 = planner.plan_api_calls(user_goal_2)
print(f"nPlan for '{user_goal_2}':n{json.dumps(plan_2, indent=2)}")

user_goal_3 = "Show me details for the pet with ID 456 after listing them."
plan_3 = planner.plan_api_calls(user_goal_3)
print(f"nPlan for '{user_goal_3}':n{json.dumps(plan_3, indent=2)}")

规划器说明:

  • AgentPlanner 维护一个 api_catalog,它是通过 SwaggerParser 提取的 API 操作列表。
  • _generate_api_summaries 将 API 目录转化为 LLM 易于理解的简洁格式。对于大型 API 目录,可能需要更高级的技术,例如只提供与用户目标相关的前 K 个 API 摘要,或者使用 RAG (Retrieval Augmented Generation) 模式。
  • plan_api_calls 方法是核心。它构建一个详细的 prompt,指导 LLM 生成一个结构化的 JSON 计划。这个 prompt 定义了输出格式、参数来源的约定 ($response.op_id.field, $generate.type)。
  • response_format={"type": "json_object"} 是 OpenAI API 的一个特性,强制模型输出有效的 JSON。
  • temperature=0.0 使得 LLM 的输出更具确定性,适合规划任务。

5. 参数生成器:填充 API 的“空白”

规划器告诉我们“调用哪个 API,以及需要哪些参数”。参数生成器的任务就是根据这些信息,结合 Agent 的内部状态和 API 的 schema 定义,生成实际的、有效的参数值。这是整个 Agent 最复杂和容易出错的部分。

挑战:

  • 数据类型匹配: 字符串、整数、布尔值、数组、对象、日期等。
  • 复杂结构: 嵌套对象、数组中的对象。
  • 约束条件: min/max, minLength/maxLength, pattern (regex), enum, format (email, uuid, date-time)。
  • 动态值: 从用户输入、前一个 API 调用的响应、系统生成(UUID、时间戳)。
  • 默认值/示例值: 利用 Swagger 文档中提供的 defaultexample

策略:

  1. 从规划器获取: 如果规划器已经提供了具体值,直接使用。
  2. 从 Agent 状态获取: 查找当前 Agent 维护的上下文状态(例如,前一个 API 调用返回的 resource_id)。
  3. 根据 Schema 生成:
    • 基本类型: string, integer, boolean 可以根据 formatenum 生成随机、合法的值或使用默认值。
    • 数组: 如果 items 指定了类型,可以生成一个包含几个该类型元素的数组。
    • 对象: 递归地为对象的每个 property 生成值,并确保 required 字段被填充。
    • 引用 ($ref): 解析 components/schemas 中的定义。
  4. LLM 辅助生成: 对于复杂或需要语义理解的字符串参数(例如,一个描述性文本),可以再次咨询 LLM。

代码示例:基础参数生成器

import uuid
import datetime
import random
from typing import Dict, Any, List, Optional

class ParameterGenerator:
    def __init__(self, api_schema: APISchema, agent_state: Dict[str, Any]):
        self.api_schema = api_schema
        self.agent_state = agent_state # Store current state for dynamic parameter values

    def _get_schema_by_ref(self, ref: str) -> Optional[ParameterSchema]:
        """Resolves a $ref to a schema definition in components."""
        if not ref.startswith('#/components/schemas/'):
            print(f"Warning: Non-component schema reference not supported: {ref}")
            return None

        schema_name = ref.replace('#/components/schemas/', '')
        component_schema = self.api_schema.components.get('schemas', {}).get(schema_name)
        if component_schema:
            return ParameterSchema.model_validate(component_schema)
        return None

    def _generate_value_for_schema(self, schema: ParameterSchema, context: str = "", path: str = "") -> Any:
        """Recursively generates a value based on a given schema."""
        if schema.enum:
            return random.choice(schema.enum)

        if schema.type == 'string':
            if schema.format == 'uuid':
                return str(uuid.uuid4())
            elif schema.format == 'date-time':
                return datetime.datetime.now(datetime.timezone.utc).isoformat()
            elif schema.format == 'email':
                return f"test_{uuid.uuid4().hex[:8]}@example.com"
            else:
                # Basic string generation, could be enhanced with LLM for semantic context
                if "name" in path.lower(): return "GeneratedName"
                if "description" in path.lower(): return "Generated description for " + context
                if "tag" in path.lower(): return "default_tag"
                return "generated_string_" + uuid.uuid4().hex[:4]

        elif schema.type == 'integer':
            # Could add min/max constraint handling
            return random.randint(1, 100)

        elif schema.type == 'boolean':
            return random.choice([True, False])

        elif schema.type == 'array':
            if schema.items:
                # Generate a small number of items for the array
                return [self._generate_value_for_schema(ParameterSchema.model_validate(schema.items), context, path + "[i]") for _ in range(1)]
            return [] # Empty array if items schema is missing

        elif schema.type == 'object':
            generated_object = {}
            if schema.properties:
                for prop_name, prop_schema_dict in schema.properties.items():
                    prop_schema = ParameterSchema.model_validate(prop_schema_dict)
                    # Handle nested $ref
                    if '$ref' in prop_schema_dict:
                        resolved_prop_schema = self._get_schema_by_ref(prop_schema_dict['$ref'])
                        if resolved_prop_schema:
                            prop_schema = resolved_prop_schema

                    # Only generate required properties by default, or if explicitly requested/needed
                    is_required = schema.required and prop_name in schema.required
                    if is_required: # For simplicity, always generate required
                        generated_object[prop_name] = self._generate_value_for_schema(prop_schema, context, path + "." + prop_name)
            return generated_object

        return None

    def generate_parameters(self, operation: APIOperation, planned_params: Dict[str, Any]) -> Dict[str, Any]:
        """
        Generates final parameters for an API operation based on its schema
        and the planned_params from the planner.
        """
        final_params = {}
        request_body_data = {}

        # Handle path, query, header, cookie parameters
        if operation.parameters:
            for param_def in operation.parameters:
                param_name = param_def.name
                param_in = param_def.in_
                param_schema = param_def.schema_

                value = planned_params.get(param_name)

                if value is None: # Value not provided by planner
                    if param_def.required:
                        # Attempt to generate based on schema or state
                        value = self._generate_value_for_schema(param_schema, param_name, f"parameter.{param_name}")
                        if value is None:
                            raise ValueError(f"Required parameter '{param_name}' (in: {param_in}) could not be generated and was not provided by planner.")
                    else:
                        continue # Optional parameter, no value provided, so skip

                # Check for dynamic values from state
                if isinstance(value, str) and value.startswith('$response.'):
                    parts = value.split('.')
                    if len(parts) == 3: # $response.operation_id.field_name
                        op_id = parts[1]
                        field_name = parts[2]
                        if op_id in self.agent_state and field_name in self.agent_state[op_id]:
                            value = self.agent_state[op_id][field_name]
                        else:
                            raise ValueError(f"Dynamic parameter source '{value}' not found in agent state.")
                    else:
                        raise ValueError(f"Invalid dynamic parameter format: {value}")
                elif isinstance(value, str) and value.startswith('$generate.'):
                    # For simplicity, we only have $generate.uuid handled in _generate_value_for_schema
                    gen_type = value.split('.')[1]
                    if gen_type == 'uuid':
                        value = str(uuid.uuid4())
                    else:
                        raise ValueError(f"Unsupported generation type: {gen_type}")

                final_params[f"{param_in}_{param_name}"] = value # Prefix to distinguish parameter types

        # Handle request body
        if operation.requestBody:
            # Assume application/json for now
            json_content = operation.requestBody.content.get('application/json')
            if json_content and json_content.schema_:
                body_schema = json_content.schema_

                # Resolve $ref if the body schema is a reference
                if '$ref' in json_content.content: # Pydantic might put it here
                    resolved_body_schema = self._get_schema_by_ref(json_content.content['$ref'])
                    if resolved_body_schema:
                        body_schema = resolved_body_schema

                # Take values from planned_params that are for the request body
                # This needs careful mapping, as planned_params might be flat
                # For simplicity, we assume planned_params directly maps to body properties

                # Start with properties from planned_params
                request_body_data = planned_params

                # Fill in missing required properties based on schema
                if body_schema.properties:
                    for prop_name, prop_schema_dict in body_schema.properties.items():
                        prop_schema = ParameterSchema.model_validate(prop_schema_dict)
                        # Resolve nested $ref within body properties
                        if '$ref' in prop_schema_dict:
                            resolved_prop_schema = self._get_schema_by_ref(prop_schema_dict['$ref'])
                            if resolved_prop_schema:
                                prop_schema = resolved_prop_schema

                        is_required = body_schema.required and prop_name in body_schema.required
                        if is_required and prop_name not in request_body_data:
                            generated_value = self._generate_value_for_schema(prop_schema, prop_name, f"body.{prop_name}")
                            if generated_value is None:
                                raise ValueError(f"Required body property '{prop_name}' could not be generated.")
                            request_body_data[prop_name] = generated_value

            final_params['request_body'] = request_body_data

        return final_params

# Test the parameter generator
# Assume we have an agent_state (empty for now)
current_agent_state = {}

# Get an operation from the parser
parser = SwaggerParser("petstore.yaml")
api_spec = parser.load_and_parse()
operations = parser.get_operations()

# Find the createPet operation
create_pet_op = next((op for op in operations if op['operation_id'] == 'createPet'), None)
if create_pet_op:
    create_pet_operation_model = APIOperation.model_validate(create_pet_op) # Convert to Pydantic model

    print("n--- Testing ParameterGenerator for createPet ---")
    # Scenario 1: Planner provides all needed parameters
    planned_params_1 = {"name": "Buddy", "tag": "dog"}
    generated_params_1 = ParameterGenerator(api_spec, current_agent_state).generate_parameters(create_pet_operation_model, planned_params_1)
    print(f"Generated params for 'createPet' with full plan:n{json.dumps(generated_params_1, indent=2)}")

    # Scenario 2: Planner provides only 'tag', 'name' needs to be generated (if schema allows, but 'name' is required)
    # This will raise an error because 'name' is required and not provided/generatable without explicit rule
    # try:
    #     planned_params_2 = {"tag": "fish"}
    #     generated_params_2 = ParameterGenerator(api_spec, current_agent_state).generate_parameters(create_pet_operation_model, planned_params_2)
    #     print(f"Generated params for 'createPet' with partial plan:n{json.dumps(generated_params_2, indent=2)}")
    # except ValueError as e:
    #     print(f"Expected error caught: {e}")

    # Scenario 3: Test with an operation that has query parameters, e.g., listPets
    list_pets_op = next((op for op in operations if op['operation_id'] == 'listPets'), None)
    if list_pets_op:
        list_pets_operation_model = APIOperation.model_validate(list_pets_op)
        planned_params_3 = {"limit": 50}
        generated_params_3 = ParameterGenerator(api_spec, current_agent_state).generate_parameters(list_pets_operation_model, planned_params_3)
        print(f"nGenerated params for 'listPets' with limit 50:n{json.dumps(generated_params_3, indent=2)}")

        planned_params_4 = {} # No limit provided, it's optional
        generated_params_4 = ParameterGenerator(api_spec, current_agent_state).generate_parameters(list_pets_operation_model, planned_params_4)
        print(f"Generated params for 'listPets' with no limit specified:n{json.dumps(generated_params_4, indent=2)}")

参数生成器说明:

  • ParameterGenerator 接收 APISchemaagent_state
  • _get_schema_by_ref 辅助函数用于解析 components/schemas 中的引用。
  • _generate_value_for_schema 是核心递归函数,根据参数的 typeformatenum 等生成值。它处理了基本类型、数组和对象,并支持简单的上下文感知(例如,如果路径包含“name”,则生成“GeneratedName”)。
  • generate_parameters 方法遍历 API 操作的所有参数,结合 planned_paramsagent_state 来确定最终值。它也处理了 requestBody
  • 对于动态值,如 $response.operation_id.field_name,它会从 agent_state 中查找。

6. API 调用执行器:与外部世界交互

一旦参数生成器提供了完整的、有效的请求参数,API 调用执行器就负责构造并发送实际的 HTTP 请求,然后接收和初步处理响应。

核心功能:

  • URL 构造: 将路径参数(如 /pets/{petId} 中的 {petId})替换为实际值。
  • 请求头: 设置 Content-Type, Accept, Authorization 等。
  • 请求体: 将请求体数据(通常是 JSON)序列化。
  • HTTP 方法: 使用正确的 GET, POST, PUT, DELETE 等方法。
  • 发送请求: 使用 HTTP 客户端库(如 Python 的 requests 库)。
  • 基本响应检查: 检查 HTTP 状态码。

代码示例:通用 API 调用执行器

import requests
from urllib.parse import urljoin

class APIInvoker:
    def __init__(self, base_url: str, auth_manager: Any = None): # auth_manager for future integration
        self.base_url = base_url
        self.session = requests.Session()
        self.auth_manager = auth_manager # Placeholder for authentication manager

    def invoke(self, operation: APIOperation, generated_params: Dict[str, Any]) -> requests.Response:
        """
        Constructs and sends an HTTP request based on the API operation and generated parameters.
        """
        method = operation.model_fields_set.intersection({'get', 'post', 'put', 'delete', 'patch'}).pop().upper() # Get the actual method
        path_template = operation.path # Assuming operation now holds the original path string

        # 1. Construct URL
        url = urljoin(self.base_url, path_template)

        # 2. Extract and apply path parameters
        path_params = {k.replace('path_', ''): v for k, v in generated_params.items() if k.startswith('path_')}
        for param_name, param_value in path_params.items():
            url = url.replace(f"{{{param_name}}}", str(param_value))

        # 3. Extract query parameters
        query_params = {k.replace('query_', ''): v for k, v in generated_params.items() if k.startswith('query_')}

        # 4. Extract header parameters
        headers = {k.replace('header_', ''): str(v) for k, v in generated_params.items() if k.startswith('header_')}
        headers.setdefault('Accept', 'application/json') # Default accept JSON

        # 5. Handle authentication (simplified)
        if self.auth_manager:
            # This would integrate with a dedicated authentication manager
            # For now, let's assume it can add an Authorization header
            auth_header = self.auth_manager.get_auth_header(operation.security) # Pass security requirements
            if auth_header:
                headers.update(auth_header)

        # 6. Prepare request body
        json_data = generated_params.get('request_body')
        if json_data:
            headers.setdefault('Content-Type', 'application/json')

        print(f"n--- Invoking API ---")
        print(f"Method: {method}, URL: {url}")
        print(f"Query Params: {query_params}")
        print(f"Headers: {headers}")
        print(f"JSON Body: {json_data}")

        try:
            response = self.session.request(
                method,
                url,
                params=query_params,
                headers=headers,
                json=json_data,
                timeout=30 # Add a timeout
            )
            response.raise_for_status() # Raise HTTPError for bad responses (4xx or 5xx)
            print(f"API Call Success! Status: {response.status_code}")
            return response
        except requests.exceptions.HTTPError as e:
            print(f"HTTP Error during API invocation: {e.response.status_code} - {e.response.text}")
            raise
        except requests.exceptions.ConnectionError as e:
            print(f"Connection Error during API invocation: {e}")
            raise
        except requests.exceptions.Timeout as e:
            print(f"Timeout Error during API invocation: {e}")
            raise
        except Exception as e:
            print(f"An unexpected error occurred during API invocation: {e}")
            raise

# --- Mock Authentication Manager for demonstration ---
class MockAuthManager:
    def __init__(self, api_key: str):
        self.api_key = api_key

    def get_auth_header(self, security_schemes: Optional[List[Dict[str, List[str]]]] = None) -> Dict[str, str]:
        # A very basic example: if any security scheme is present, assume API Key auth
        if security_schemes:
            # In a real scenario, you'd parse securityDefinitions from OpenAPI spec
            # and match them to the operation's 'security' field
            return {"X-API-KEY": self.api_key}
        return {}

# Test the invoker
# Re-use parser and parameter generator setup
parser = SwaggerParser("petstore.yaml")
api_spec = parser.load_and_parse()
operations = parser.get_operations()

# Find createPet operation
create_pet_op_raw = next((op for op in operations if op['operation_id'] == 'createPet'), None)
create_pet_op_model = APIOperation.model_validate(create_pet_op_raw)
create_pet_op_model.path = create_pet_op_raw['path'] # Attach original path for URL construction

# Find listPets operation
list_pets_op_raw = next((op for op in operations if op['operation_id'] == 'listPets'), None)
list_pets_op_model = APIOperation.model_validate(list_pets_op_raw)
list_pets_op_model.path = list_pets_op_raw['path']

# Initialize components
agent_state = {}
param_gen = ParameterGenerator(api_spec, agent_state)
auth_mgr = MockAuthManager(api_key="your-super-secret-api-key") # Replace with a real key
invoker = APIInvoker(base_url="http://localhost:8080/api/v1", auth_manager=auth_mgr) # Assume a local petstore API is running

# Scenario: Create a pet
try:
    planned_params_create = {"name": "TestPet", "tag": "test"}
    generated_params_create = param_gen.generate_parameters(create_pet_op_model, planned_params_create)
    # response_create = invoker.invoke(create_pet_op_model, generated_params_create)
    # print(f"Response from createPet: {response_create.json()}")
    # For demonstration, we'll just print the prepared params and pretend to invoke
    print("n(Skipping actual API invocation for createPet for demo purposes, see commented line)")
    print(f"Would invoke createPet with: {json.dumps(generated_params_create, indent=2)}")
except Exception as e:
    print(f"Failed to invoke createPet: {e}")

# Scenario: List pets
try:
    planned_params_list = {"limit": 10}
    generated_params_list = param_gen.generate_parameters(list_pets_op_model, planned_params_list)
    # response_list = invoker.invoke(list_pets_op_model, generated_params_list)
    # print(f"Response from listPets: {response_list.json()}")
    print("n(Skipping actual API invocation for listPets for demo purposes, see commented line)")
    print(f"Would invoke listPets with: {json.dumps(generated_params_list, indent=2)}")
except Exception as e:
    print(f"Failed to invoke listPets: {e}")

API 调用执行器说明:

  • APIInvoker 使用 requests.Session 来保持连接,这在进行多个连续调用时更高效。
  • invoke 方法负责将 APIOperation 对象和 generated_params 转化为一个完整的 HTTP 请求。
  • 它智能地处理路径参数替换、查询参数、请求头和 JSON 请求体。
  • auth_manager 是一个占位符,用于处理复杂的认证逻辑。在真实系统中,它会根据 OpenAPI 的 securitySchemes 定义来动态应用 API Key、OAuth2 Token 等。
  • response.raise_for_status()requests 库的一个便捷功能,它会在收到 4xx 或 5xx 状态码时自动抛出 HTTPError

7. 响应处理器与状态管理器:学习与适应

API 调用成功后,Agent 需要处理响应:解析数据,检查其是否符合预期,并提取出对后续操作有用的信息,更新其内部状态。

核心功能:

  • 响应解析: 将 JSON 或 XML 响应解析为 Python 对象。
  • 响应校验: (可选但强烈推荐)根据 OpenAPI 文档中定义的响应 schema 校验响应结构和数据类型。
  • 信息提取: 根据规划器的指示或预设规则,从响应中提取关键数据(例如,新创建资源的 ID、状态更新信息)。
  • 状态更新: 将提取到的信息存储到 Agent 的内部状态或上下文,供后续 API 调用使用。
  • 进度报告: 将当前操作的结果反馈给规划器。

代码示例:响应处理器与状态管理器

from typing import Dict, Any, Optional
import json
import requests

class ResponseProcessor:
    def __init__(self, api_schema: APISchema, agent_state: Dict[str, Any]):
        self.api_schema = api_schema
        self.agent_state = agent_state

    def process_response(self, operation: APIOperation, response: requests.Response) -> Dict[str, Any]:
        """
        Processes an API response, extracts relevant data, and updates agent state.
        """
        response_data = {}

        try:
            # 1. Parse response body (assuming JSON)
            if 'application/json' in response.headers.get('Content-Type', ''):
                response_data = response.json()
            else:
                response_data = {"raw_text": response.text} # Fallback for non-JSON responses
        except json.JSONDecodeError:
            print(f"Warning: Could not decode JSON from response for operation {operation.operationId}. Raw text: {response.text[:200]}...")
            response_data = {"raw_text": response.text}

        # 2. (Optional) Validate response against schema
        # This part is complex and typically requires a dedicated library or robust recursive validation.
        # For brevity, we'll skip detailed schema validation here but acknowledge its importance.
        # If response schema is available for the given status code, one would compare response_data
        # against operation.responses[str(response.status_code)].content['application/json'].schema_

        # 3. Extract key information and update agent state
        # The agent needs to know *what* to extract. This can be pre-defined,
        # or driven by LLM instructions (e.g., "extract the 'id' field from the response").
        extracted_info = {}

        if operation.operationId == 'createPet':
            # After creating a pet, we expect an 'id'
            if 'id' in response_data:
                extracted_info['id'] = response_data['id']
                print(f"Extracted pet ID: {response_data['id']}")
            if 'name' in response_data:
                extracted_info['name'] = response_data['name']
        elif operation.operationId == 'listPets':
            # After listing pets, we might want to store the list
            if isinstance(response_data, list):
                extracted_info['pets'] = response_data
                print(f"Extracted {len(response_data)} pets from list.")
        elif operation.operationId == 'showPetById':
            # After getting pet details, store that specific pet's info
            if 'id' in response_data:
                extracted_info[f"pet_{response_data['id']}"] = response_data
                print(f"Extracted details for pet ID {response_data['id']}.")

        # Update agent_state with results, typically prefixed by operation ID
        self.agent_state[operation.operationId] = extracted_info
        print(f"Agent state updated for {operation.operationId}: {extracted_info}")

        return extracted_info

# Test the response processor
# Mock a response object
class MockResponse:
    def __init__(self, status_code, content_type, text_content):
        self.status_code = status_code
        self._content_type = content_type
        self._text_content = text_content

    @property
    def headers(self):
        return {'Content-Type': self._content_type}

    def json(self):
        return json.loads(self._text_content)

    @property
    def text(self):
        return self._text_content

    def raise_for_status(self):
        if 400 <= self.status_code < 600:
            raise requests.exceptions.HTTPError(f"HTTP Error: {self.status_code}", response=self)

# Re-use parser setup
parser = SwaggerParser("petstore.yaml")
api_spec = parser.load_and_parse()
operations = parser.get_operations()

# Find createPet operation
create_pet_op_raw = next((op for op in operations if op['operation_id'] == 'createPet'), None)
create_pet_op_model = APIOperation.model_validate(create_pet_op_raw)
create_pet_op_model.path = create_pet_op_raw['path']

# Find listPets operation
list_pets_op_raw = next((op for op in operations if op['operation_id'] == 'listPets'), None)
list_pets_op_model = APIOperation.model_validate(list_pets_op_raw)
list_pets_op_model.path = list_pets_op_raw['path']

agent_state = {}
response_proc = ResponseProcessor(api_spec, agent_state)

# Scenario 1: Process a successful createPet response
print("n--- Processing createPet response ---")
mock_create_response = MockResponse(201, 'application/json', '{"id": 123, "name": "TestPet", "tag": "test"}')
extracted_create = response_proc.process_response(create_pet_op_model, mock_create_response)
print(f"Current Agent State: {json.dumps(agent_state, indent=2)}")

# Scenario 2: Process a successful listPets response
print("n--- Processing listPets response ---")
mock_list_response = MockResponse(200, 'application/json', '[{"id": 123, "name": "TestPet", "tag": "test"}, {"id": 456, "name": "OtherPet", "tag": "wild"}]')
extracted_list = response_proc.process_response(list_pets_op_model, mock_list_response)
print(f"Current Agent State: {json.dumps(agent_state, indent=2)}")

响应处理器说明:

  • ResponseProcessor 同样持有 api_schemaagent_state
  • process_response 首先尝试解析 JSON。
  • 信息提取是这个模块的关键。在示例中,我们使用了硬编码的 if operation.operationId == '...' 逻辑来演示提取。在真正的自主 Agent 中,这种提取逻辑将由 LLM 动态生成,或者由规划器在计划阶段就明确指示(例如,“从 createPet 响应中提取 id 字段并存储为 new_pet_id”)。
  • 提取到的信息被存储在 agent_state 中,以操作 ID 作为键,便于后续查找和使用(例如,在参数生成器中解析 $response.operation_id.field_name)。

8. 错误与重试机制:构建健壮性

即使是最完美的 API 调用也可能失败,原因可能是网络问题、服务器暂时过载、无效请求等。一个健壮的 Agent 必须能够优雅地处理这些错误。

策略:

  1. 错误分类:
    • 网络错误 (ConnectionError, Timeout): 通常是临时性的,适合重试。
    • 客户端错误 (4xx HTTP 状态码): 通常表示请求本身有问题(例如,参数无效、认证失败)。很少适合自动重试,可能需要重新规划或报告给用户。
    • 服务器错误 (5xx HTTP 状态码): 表示服务器端问题,通常适合重试。
  2. 重试机制:
    • 指数退避 (Exponential Backoff): 每次重试等待的时间逐渐增长,以避免加重服务器负担。
    • 最大重试次数: 限制重试次数,防止无限循环。
    • 抖动 (Jitter): 在指数退避的基础上增加随机性,避免所有客户端同时重试。
  3. 熔断 (Circuit Breaker): 如果某个 API 持续失败,暂时停止对其的调用,避免浪费资源。
  4. 失败报告与回退: 对于不可恢复的错误,将失败信息报告给规划器,规划器可能需要调整计划,或者将问题上报给人类。

代码示例:带指数退避的重试装饰器

import time
import random
import requests
from functools import wraps
from typing import Callable, Any

def retry_with_exponential_backoff(
    max_retries: int = 3,
    initial_delay: float = 1.0,
    backoff_factor: float = 2.0,
    jitter: bool = True,
    catch_exceptions: tuple = (
        requests.exceptions.ConnectionError,
        requests.exceptions.Timeout,
        requests.exceptions.HTTPError
    )
) -> Callable:
    """
    A decorator to retry a function call with exponential backoff.
    Suitable for transient network errors and server-side issues (5xx).
    """
    def decorator(func: Callable) -> Callable:
        @wraps(func)
        def wrapper(*args, **kwargs) -> Any:
            delay = initial_delay
            for i in range(max_retries + 1):
                try:
                    return func(*args, **kwargs)
                except catch_exceptions as e:
                    if i == max_retries:
                        print(f"Max retries ({max_retries}) reached. Raising exception: {e}")
                        raise

                    print(f"Attempt {i+1}/{max_retries+1} failed with error: {type(e).__name__} - {e}. Retrying in {delay:.2f} seconds...")

                    sleep_time = delay
                    if jitter:
                        sleep_time = delay * (0.5 + random.random() * 0.5) # Add 50-100% random jitter

                    time.sleep(sleep_time)
                    delay *= backoff_factor
        return wrapper
    return decorator

# Integrate with APIInvoker
class APIInvokerWithRetry(APIInvoker): # Inherit from our previous Invoker
    @retry_with_exponential_backoff(max_retries=3)
    def invoke(self, operation: APIOperation, generated_params: Dict[str, Any]) -> requests.Response:
        # Call the original invoke method from the parent class
        return super().invoke(operation, generated_params)

# Test the invoker with retry
# We need to make the mock response fail sometimes to test retry
class FailingMockResponse(MockResponse):
    _call_count = 0
    def __init__(self, status_code, content_type, text_content, fail_until_attempt: int = 1):
        super().__init__(status_code, content_type, text_content)
        self.fail_until_attempt = fail_until_attempt
        self._current_attempt = 0

    def raise_for_status(self):
        FailingMockResponse._call_count += 1
        self._current_attempt = FailingMockResponse._call_count

        if self._current_attempt <= self.fail_until_attempt:
            print(f"  (Mocking failure on attempt {self._current_attempt} / Fail until {self.fail_until_attempt})")
            # Simulate a 500 error for retries
            raise requests.exceptions.HTTPError(f"Mocked 500 Server Error on attempt {self._current_attempt}", response=MockResponse(500, 'text/plain', 'Internal Server Error'))
        else:
            print(f"  (Mocking success on attempt {self._current_attempt})")
            super().raise_for_status() # Call parent's raise_for_status for actual success/failure logic

# Reset call count for subsequent tests
FailingMockResponse._call_count = 0 

# Re-use parser setup
parser = SwaggerParser("petstore.yaml")
api_spec = parser.load_and_parse()
operations = parser.get_operations()
create_pet_op_raw = next((op for op in operations if op['operation_id'] == 'createPet'), None)
create_pet_op_model = APIOperation.model_validate(create_pet_op_raw)
create_pet_op_model.path = create_pet_op_raw['path'] # Attach original path for URL construction

agent_state = {}
param_gen = ParameterGenerator(api_spec, agent_state)
auth_mgr = MockAuthManager(api_key="your-super-secret-api-key")

# We need to mock the actual requests.Session.request method to test the retry logic without hitting a real API
class MockRequestsSession:
    def __init__(self, mock_response_instance):
        self._mock_response = mock_response_instance

    def request(self, method, url, params=None, headers=None, json=None, timeout=None):
        print(f"  [Mock Session] Calling method: {method}, URL: {url}")
        # The mock response will internally handle the failure/success logic
        return self._mock_response

# Instantiate the invoker with the mock session
mock_failing_response = FailingMockResponse(201, 'application/json', '{"id": 124, "name": "RetryPet", "tag": "retry"}', fail_until_attempt=2)
mock_session = MockRequestsSession(mock_failing_response)

invoker_with_retry = APIInvokerWithRetry(base_url="http://localhost:8080/api/v1", auth_manager=auth_mgr)
invoker_with_retry.session = mock_session # Override the session with our mock

print("n--- Testing APIInvokerWithRetry (should succeed after 2 retries) ---")
try:
    planned_params_retry = {"name": "RetryPet", "tag": "test"}
    generated_params_retry = param_gen.generate_parameters(create_pet_op_model, planned_params_retry)
    response_retry = invoker_with_retry.invoke(create_pet_op_model, generated_params_retry)
    print(f"Final response status: {response_retry.status_code}")
except Exception as e:
    print(f"Failed after retries: {e}")

# Reset call count for next test
FailingMockResponse._call_count = 0 
mock_failing_response_max_fail = FailingMockResponse(201, 'application/json', '{"id": 125, "name": "MaxRetryPet", "tag": "max"}', fail_until_attempt=4) # Fails 4 times, max_retries is 3
mock_session_max_fail = MockRequestsSession(mock_failing_response_max_fail)
invoker_with_retry_max_fail = APIInvokerWithRetry(base_url="http://localhost:8080/api/v1", auth_manager=auth_mgr)
invoker_with_retry_max_fail.session = mock_session_max_fail

print("n--- Testing APIInvokerWithRetry (should fail after max retries) ---")
try:
    planned_params_max_retry = {"name": "MaxRetryPet", "tag": "test"}
    generated_params_max_retry = param_gen.generate_parameters(create_pet_op_model, planned_params_max_retry)
    response_max_retry = invoker_with_retry_max_fail.invoke(create_pet_op_model, generated_params_max_retry)
    print(f"Final response status: {response_max_retry.status_code}")
except Exception as e:
    print(f"Successfully failed after max retries (as expected): {e}")

错误与重试机制说明:

  • retry_with_exponential_backoff 是一个通用的 Python 装饰器,可以应用于任何可能失败的函数。
  • 它配置了 max_retriesinitial_delaybackoff_factorjitter
  • catch_exceptions 指定了哪些异常触发重试。我们通常只重试网络错误和服务器端错误(5xx)。
  • APIInvokerWithRetry 类继承自 APIInvoker,并使用 @retry_with_exponential_backoff 装饰器来增强其 invoke 方法。
  • 测试代码使用了 MockRequestsSessionFailingMockResponse 来模拟不同次数的失败,以验证重试逻辑。

9. 高级考量与未来挑战

我们已经构建了一个功能强大的 Agent 骨架,但要使其在真实世界中达到“自主”和“智能”的水平,还有许多高级考量和挑战:

  1. 认证与授权:
    • OAuth2 流程: 许多现代 API 使用 OAuth2。Agent 需要能够执行授权码流、客户端凭证流等,这涉及重定向、获取和刷新令牌,是一个复杂的状态机。
    • 动态凭证管理: 如何安全地存储和检索不同 API 的凭证?密钥管理系统集成是必要的。
  2. 异步操作与 Webhooks:
    • 某些 API 操作是异步的,它们会返回一个 Job ID,然后通过轮询或 Webhook 通知结果。Agent 需要能够处理这种模式,包括设置和监听 Webhook。
  3. 速率限制与配额管理:
    • API 通常有速率限制。Agent 需要能够理解并遵守这些限制,通过令牌桶或漏桶算法来管理请求发送频率,避免被封禁。
  4. 数据流与转换:
    • 在复杂的链式调用中,一个 API 的输出可能需要进行转换才能作为另一个 API 的输入。例如,一个 API 返回 user_id,另一个 API 需要 userIdentifier。LLM 在这里可以提供帮助,识别并执行必要的转换。
  5. Agent 记忆与学习:
    • Agent 应该能够从成功的操作中学习,优化未来的规划。例如,它可能会发现某个 API 组合总是能高效地完成特定任务。
    • 长期记忆存储:将重要的实体(如创建的 ID、配置信息)持久化,以便 Agent 可以在会话之间记住它们。
  6. 人类在环 (Human-in-the-Loop):
    • 当 Agent 无法解决问题、遇到歧义或需要关键决策时,它应该能够暂停并寻求人类的帮助,提供清晰的上下文和选项。
  7. 成本优化:
    • LLM API 调用是昂贵的。需要优化 prompt 的长度、调用频率,并考虑使用更小的、专门化的模型来处理特定任务。

10. 展望未来

我们今天所探讨的自主 Agent,代表了软件自动化领域的一个重要方向。从手动编写集成代码到 Agent 能够自主理解和调用 API,这是一个巨大的飞跃。尽管挑战重重,尤其是语义理解、复杂参数生成和状态管理方面,但随着 LLMs 和其他 AI 技术的发展,我们正逐步接近构建真正通用、智能的 API 交互 Agent。

这样的 Agent 不仅能极大地提高开发效率,还能让非技术人员更容易地利用复杂的 API 服务,开启全新的应用场景。这不仅仅是技术上的进步,更是对人类与机器协作模式的一次深刻变革。我们正站在一个激动人心的时代门槛上,见证着软件工程的未来。

发表回复

您的邮箱地址不会被公开。 必填项已用 * 标注