各位同仁,下午好。今天,我们将共同深入探讨一个极具挑战性且充满前景的领域:设计一个能够自动发现并调用从未见过的 API 的自主 Agent。我们仅为其提供一个 Swagger 或 OpenAPI 文档。这不仅仅是一个理论构想,更是构建真正智能、自适应系统,应对爆炸式增长的 API 生态的关键一步。
想象一下这样的场景:一个企业需要集成数百个外部服务,每个服务都有自己的 API。传统方法是手动阅读文档、编写代码、测试和维护。这个过程耗时、易错且成本高昂。如果我们的 Agent 能够像一个经验丰富的开发者一样,阅读 API 文档,理解其功能,并自动生成调用代码,那将是生产力的一次飞跃。
本次讲座,我将以一名编程专家的视角,为大家剖析实现这样一个 Agent 所需的核心技术、设计思路和面临的挑战。我们将深入代码层面,探讨如何将抽象的文档转化为可执行的操作。
1. 问题的核心与挑战
构建一个能够处理“从未见过”的 API 的 Agent,其核心挑战在于泛化能力。我们不能针对特定 API 硬编码逻辑,而是需要一套能够理解并适应任何符合 OpenAPI 规范的 API 的通用框架。
主要的挑战包括:
- 文档理解与解析: 如何将结构化的 YAML/JSON 文档转化为 Agent 内部可操作的数据模型?
- 语义理解与意图映射: Agent 如何根据高层目标(例如“创建一个用户”,“查询产品库存”)从众多 API 操作中选择最相关的一个或一系列操作?
- 参数生成与校验: 针对一个 API 操作,如何自动生成符合其复杂结构、类型和约束的有效请求参数?这是最复杂的部分之一。
- 状态管理与链式调用: 许多业务流程需要多个 API 调用协同完成,前一个调用的输出可能是后一个调用的输入。Agent 如何维护这种上下文状态?
- 错误处理与恢复: API 调用可能失败,Agent 需要具备健壮的错误检测、日志记录和适当的恢复策略。
- 安全性与认证: 如何安全地处理各种认证机制(API Key, OAuth2 等)?
为了应对这些挑战,我们的 Agent 需要模块化的设计,每个模块专注于解决特定的问题。
2. 自主 Agent 的总体架构
一个能够自动发现并调用 API 的自主 Agent,其内部可以被构想成一系列协作模块。这些模块共同完成从高层目标到具体 API 调用的整个流程。
| 模块名称 | 主要功能 | 关键技术/考虑 |
|---|---|---|
| API 文档解析器 | 解析 Swagger/OpenAPI 文档,将其转换为 Agent 内部统一的数据结构。 | pyyaml, json, openapi-spec-validator, Pydantic 模型 |
| 语义理解与规划器 | 将用户或系统的高层目标映射到可用的 API 操作,并规划一系列操作的执行顺序。 | LLMs (Large Language Models), 向量数据库 ( embeddings ), 规则引擎, 状态机 |
| 参数生成器 | 根据 API 操作的参数定义(类型、约束、示例等)和当前上下文,生成有效的请求参数。 | LLMs, 启发式规则, 数据类型校验, 动态数据源 (如从前一步骤获取) |
| API 调用执行器 | 构造 HTTP 请求(URL、方法、头部、正文),发送请求,并接收响应。 | requests (Python), HTTP 客户端库, 认证机制管理 |
| 响应处理器与状态管理器 | 解析 API 响应,提取关键信息,更新 Agent 的内部状态,并将结果反馈给规划器。 | JSON/XML 解析, 响应 schema 校验, 内部知识图谱/上下文存储 |
| 错误与重试机制 | 识别 API 调用过程中出现的错误,并根据错误类型执行重试、回退或报告。 | 指数退避重试策略, 错误分类 (网络、客户端、服务端), 熔断机制 |
| 认证管理 | 存储和管理不同 API 的认证凭证,并在调用时自动应用。 | 凭证存储 (环境变量、密钥管理服务), OAuth2 流程管理 |
现在,让我们逐一深入探讨这些模块。
3. API 文档解析器:理解 API 的“语言”
Swagger(现在更广泛地称为 OpenAPI Specification, OAS)是 API 描述的事实标准。它提供了一种语言无关的、机器可读的接口描述格式。我们的 Agent 的第一步就是能够“阅读”并理解这份文档。
解析器的任务是将 YAML 或 JSON 格式的 OpenAPI 文档加载到内存中,并将其转换为 Agent 内部易于操作的对象模型。这个模型应该清晰地表示出 API 的所有关键信息:路径、操作(GET, POST 等)、参数(名称、位置、类型、是否必需、schema)、请求体、响应以及安全定义。
核心步骤:
- 加载文档: 从文件路径或 URL 加载 YAML/JSON 内容。
- 验证文档: 使用
openapi-spec-validator等工具验证文档是否符合 OAS 规范。 - 数据模型转换: 将解析后的字典/JSON 结构映射到 Pydantic 或自定义的 Python 对象上,以便于类型提示和属性访问。
代码示例:基础解析器
import yaml
import json
from typing import Dict, Any, List, Optional
from pydantic import BaseModel, Field
# 定义 Agent 内部表示 API 结构的数据模型
class ParameterSchema(BaseModel):
type: Optional[str] = None
format: Optional[str] = None
enum: Optional[List[str]] = None
items: Optional[Dict[str, Any]] = None # For array types
properties: Optional[Dict[str, Any]] = None # For object types
required: Optional[List[str]] = None # For object properties
class APIParameter(BaseModel):
name: str
in_: str = Field(alias='in') # path, query, header, cookie, body
description: Optional[str] = None
required: bool = False
schema_: ParameterSchema = Field(alias='schema', default_factory=ParameterSchema) # Use default_factory for mutable defaults
class APIRequestBodyContent(BaseModel):
schema_: ParameterSchema = Field(alias='schema', default_factory=ParameterSchema)
class APIRequestBody(BaseModel):
description: Optional[str] = None
required: bool = False
content: Dict[str, APIRequestBodyContent] # e.g., {'application/json': APIRequestBodyContent}
class APIResponse(BaseModel):
description: str
content: Optional[Dict[str, APIRequestBodyContent]] = None # Similar to request body content
class APIOperation(BaseModel):
operationId: Optional[str] = None
summary: Optional[str] = None
description: Optional[str] = None
parameters: Optional[List[APIParameter]] = None
requestBody: Optional[APIRequestBody] = None
responses: Dict[str, APIResponse] # e.g., {'200': APIResponse}
tags: Optional[List[str]] = None
security: Optional[List[Dict[str, List[str]]]] = None
class APIPath(BaseModel):
get: Optional[APIOperation] = None
post: Optional[APIOperation] = None
put: Optional[APIOperation] = None
delete: Optional[APIOperation] = None
patch: Optional[APIOperation] = None
class APISchema(BaseModel):
openapi: str
info: Dict[str, Any]
paths: Dict[str, APIPath]
components: Optional[Dict[str, Any]] = None # For reusable schemas, parameters, etc.
security: Optional[List[Dict[str, List[str]]]] = None # Global security schemes
class SwaggerParser:
def __init__(self, spec_path: str):
self.spec_path = spec_path
self.api_schema: Optional[APISchema] = None
def load_and_parse(self) -> APISchema:
with open(self.spec_path, 'r', encoding='utf-8') as f:
if self.spec_path.endswith('.yaml') or self.spec_path.endswith('.yml'):
spec_data = yaml.safe_load(f)
elif self.spec_path.endswith('.json'):
spec_data = json.load(f)
else:
raise ValueError("Unsupported file format. Must be .yaml, .yml, or .json")
# Resolve $ref references - a crucial step for real-world OAS
# This is a simplified placeholder. Real resolution needs recursion and cycle detection.
resolved_data = self._resolve_refs(spec_data)
self.api_schema = APISchema.model_validate(resolved_data)
print(f"Successfully parsed OpenAPI spec from {self.spec_path}")
return self.api_schema
def _resolve_refs(self, data: Dict[str, Any], root_data: Optional[Dict[str, Any]] = None) -> Dict[str, Any]:
"""
A simplified recursive function to resolve $ref references.
In a real system, this would be more robust, handling external refs,
and caching resolutions.
"""
if root_data is None:
root_data = data
if isinstance(data, dict):
if '$ref' in data:
ref_path = data['$ref']
if ref_path.startswith('#/'):
parts = ref_path[2:].split('/')
resolved = root_data
for part in parts:
resolved = resolved.get(part)
if resolved is None:
raise ValueError(f"Could not resolve reference: {ref_path}")
# Recursively resolve any refs within the resolved component itself
return self._resolve_refs(resolved, root_data)
else:
# External references are not handled in this basic example
print(f"Warning: External reference '{ref_path}' not resolved.")
return data
else:
return {k: self._resolve_refs(v, root_data) for k, v in data.items()}
elif isinstance(data, list):
return [self._resolve_refs(item, root_data) for item in data]
else:
return data
def get_operations(self) -> List[Dict[str, Any]]:
if not self.api_schema:
raise ValueError("API schema not loaded. Call load_and_parse() first.")
operations = []
for path, path_obj in self.api_schema.paths.items():
for method in ['get', 'post', 'put', 'delete', 'patch']:
op = getattr(path_obj, method, None)
if op:
operations.append({
"path": path,
"method": method.upper(),
"operation_id": op.operationId,
"summary": op.summary,
"description": op.description,
"parameters": op.parameters,
"request_body": op.requestBody,
"responses": op.responses,
"tags": op.tags,
"security": op.security
})
return operations
# 示例用法(需要一个实际的 swagger.yaml/json 文件)
# 假设我们有一个简单的 petstore.yaml 文件
"""
# petstore.yaml
openapi: 3.0.0
info:
title: Pet Store API
version: 1.0.0
paths:
/pets:
get:
summary: List all pets
operationId: listPets
parameters:
- name: limit
in: query
description: How many pets to return at one time (max 100)
required: false
schema:
type: integer
format: int32
responses:
'200':
description: A paged array of pets
content:
application/json:
schema:
type: array
items:
$ref: '#/components/schemas/Pet'
post:
summary: Create a pet
operationId: createPet
requestBody:
description: Pet to create
required: true
content:
application/json:
schema:
$ref: '#/components/schemas/NewPet'
responses:
'201':
description: Created
'400':
description: Bad request
/pets/{petId}:
get:
summary: Info for a specific pet
operationId: showPetById
parameters:
- name: petId
in: path
required: true
description: The id of the pet to retrieve
schema:
type: string
responses:
'200':
description: Expected response to a valid request
content:
application/json:
schema:
$ref: '#/components/schemas/Pet'
components:
schemas:
Pet:
type: object
required:
- id
- name
properties:
id:
type: integer
format: int64
name:
type: string
tag:
type: string
NewPet:
type: object
required:
- name
properties:
name:
type: string
tag:
type: string
"""
# 创建一个 petstore.yaml 文件用于测试
with open("petstore.yaml", "w") as f:
f.write("""
openapi: 3.0.0
info:
title: Pet Store API
version: 1.0.0
paths:
/pets:
get:
summary: List all pets
operationId: listPets
parameters:
- name: limit
in: query
description: How many pets to return at one time (max 100)
required: false
schema:
type: integer
format: int32
responses:
'200':
description: A paged array of pets
content:
application/json:
schema:
type: array
items:
$ref: '#/components/schemas/Pet'
post:
summary: Create a pet
operationId: createPet
requestBody:
description: Pet to create
required: true
content:
application/json:
schema:
$ref: '#/components/schemas/NewPet'
responses:
'201':
description: Created
'400':
description: Bad request
/pets/{petId}:
get:
summary: Info for a specific pet
operationId: showPetById
parameters:
- name: petId
in: path
required: true
description: The id of the pet to retrieve
schema:
type: string
responses:
'200':
description: Expected response to a valid request
content:
application/json:
schema:
$ref: '#/components/schemas/Pet'
components:
schemas:
Pet:
type: object
required:
- id
- name
properties:
id:
type: integer
format: int64
name:
type: string
tag:
type: string
NewPet:
type: object
required:
- name
properties:
name:
type: string
tag:
type: string
""")
# Test the parser
parser = SwaggerParser("petstore.yaml")
api_spec = parser.load_and_parse()
print("n--- Extracted Operations ---")
for op in parser.get_operations():
print(f"Path: {op['path']}, Method: {op['method']}, Operation ID: {op['operation_id']}")
if op['parameters']:
print(f" Parameters:")
for param in op['parameters']:
print(f" - Name: {param.name}, In: {param.in_}, Type: {param.schema_.type}, Required: {param.required}")
if op['request_body']:
print(f" Request Body Content Types: {list(op['request_body'].content.keys())}")
if 'application/json' in op['request_body'].content and op['request_body'].content['application/json'].schema_.properties:
print(f" Body Properties: {list(op['request_body'].content['application/json'].schema_.properties.keys())}")
解析器说明:
- 我们定义了一系列 Pydantic 模型来精确映射 OpenAPI 规范中的结构,例如
APIOperation,APIParameter,APIRequestBody等。这提供了类型安全和数据验证。 SwaggerParser类负责加载 YAML/JSON 文件,并使用_resolve_refs方法(一个简化版)来处理$ref引用。在真实的场景中,$ref解析是至关重要的,因为它允许 API 文档复用定义,例如在components/schemas中定义的模型。get_operations方法遍历所有路径和 HTTP 方法,提取出所有的 API 操作及其详细信息,为后续的规划和调用做准备。
4. 语义理解与规划器:Agent 的“大脑”
这是 Agent 最具智能的模块。它负责将一个高层次的用户请求(例如“我要订购一个 ID 为 123 的产品,数量是 5”)转化为一系列具体的 API 调用。这通常涉及:
- 意图识别: 理解用户想要做什么。
- API 匹配: 找到能够实现该意图的一个或多个 API 操作。
- 操作序列规划: 如果需要多个 API 调用,确定它们的正确顺序和数据流。
技术选择:
- 关键词匹配/规则引擎: 对于简单、明确的 API,可以通过匹配操作 ID、摘要、描述中的关键词来选择。例如,“list pets”可能匹配
listPets操作。 - 基于嵌入的相似性搜索: 将用户意图和每个 API 操作的描述(summary, description, operationId, parameter names)都转换为向量嵌入,然后计算相似度来找到最匹配的 API。这比关键词匹配更鲁棒。
- 大型语言模型 (LLMs): 这是目前最强大的方法。LLMs 能够理解复杂的自然语言意图,并根据其对 API 文档的理解(通过 Few-shot prompting 或 Fine-tuning)来选择和规划 API 调用。它们甚至可以生成调用所需的参数结构。
我们将重点关注 LLM-driven 的方法,因为它提供了最高的泛化能力和智能程度。
规划器的工作流程:
- 接收目标: Agent 接收到一个自然语言形式的目标。
- API 目录查询: Agent 访问其内部的 API 目录(由解析器提供)。
- LLM 决策: 将用户目标和 API 目录(或其摘要)作为上下文提供给 LLM。LLM 的任务是:
- 识别最相关的 API 操作。
- 确定这些操作的执行顺序。
- 指出每个操作所需的参数及其来源(用户输入、前一个 API 调用的结果、默认值等)。
- 生成执行计划: LLM 返回一个结构化的计划,例如 JSON 格式,描述要执行的 API 调用序列。
代码示例:基于 LLM 的简化规划器
import os
from openai import OpenAI # Or other LLM providers like Anthropic, Google GenAI
import json
from typing import Dict, Any, List
# Ensure you have your OpenAI API key set as an environment variable
# os.environ["OPENAI_API_KEY"] = "YOUR_OPENAI_API_KEY"
class AgentPlanner:
def __init__(self, api_catalog: List[Dict[str, Any]]):
self.api_catalog = api_catalog
self.client = OpenAI() # Initialize OpenAI client
# Pre-process API catalog for LLM input
self.api_summaries = self._generate_api_summaries()
def _generate_api_summaries(self) -> str:
"""
Generates a concise summary of available API operations for the LLM.
In a real scenario, this might involve more sophisticated summarization
or embedding lookup for large catalogs.
"""
summaries = []
for op in self.api_catalog:
summary_parts = [f"Operation ID: {op.get('operation_id', 'N/A')}",
f"Path: {op['path']}",
f"Method: {op['method']}",
f"Summary: {op.get('summary', 'N/A')}"]
if op.get('parameters'):
param_names = [p.name for p in op['parameters']]
summary_parts.append(f"Parameters: {', '.join(param_names)}")
if op.get('request_body'):
summary_parts.append(f"Requires a request body.")
summaries.append(" - ".join(summary_parts))
return "n".join(summaries)
def plan_api_calls(self, user_goal: str, current_context: Dict[str, Any] = None) -> List[Dict[str, Any]]:
"""
Uses an LLM to plan a sequence of API calls to achieve the user's goal.
"""
context_str = json.dumps(current_context, indent=2) if current_context else "No specific context."
prompt = f"""
You are an autonomous agent designed to interact with various APIs based on a given OpenAPI specification.
Your goal is to fulfill user requests by planning a sequence of API calls.
Here is a list of available API operations:
{self.api_summaries}
Current context / available data:
{context_str}
User's goal: "{user_goal}"
Based on the user's goal and available APIs, generate a JSON array of API calls.
Each item in the array should be an object with the following structure:
{{
"operation_id": "string", // The operation ID of the API to call
"parameters": {{ // A dictionary of parameters for the API call
"param_name_1": "value_or_source_1",
"param_name_2": "value_or_source_2",
// ...
}},
"description": "A brief explanation of why this step is needed."
}}
For parameter values, if they are directly available from the user's goal or current context, use them.
If a parameter is required but not directly available, indicate its type and let the parameter generator handle it.
If a parameter's value should come from a previous API call's response, specify it like: "$response.operation_id.field_name".
If a parameter needs to be generated (e.g., a new resource ID), specify "$generate.type" (e.g., "$generate.uuid").
Example plan for "Create a pet named 'Buddy' with tag 'dog'":
[
{{
"operation_id": "createPet",
"parameters": {{
"name": "Buddy",
"tag": "dog"
}},
"description": "Calling createPet to add a new pet."
}}
]
Example plan for "List pets and then show details for pet with ID 123":
[
{{
"operation_id": "listPets",
"parameters": {{}},
"description": "First, list all pets to see available options."
}},
{{
"operation_id": "showPetById",
"parameters": {{
"petId": "123"
}},
"description": "Then, retrieve details for pet with ID 123."
}}
]
Your JSON plan:
"""
try:
response = self.client.chat.completions.create(
model="gpt-4o", # Or other suitable model
messages=[
{"role": "system", "content": "You are a helpful assistant that plans API calls."},
{"role": "user", "content": prompt}
],
response_format={"type": "json_object"},
temperature=0.0 # Make it deterministic for planning
)
plan_json_str = response.choices[0].message.content
plan = json.loads(plan_json_str)
if not isinstance(plan, list):
# Sometimes LLMs might wrap the list in an outer object
if isinstance(plan, dict) and 'plan' in plan and isinstance(plan['plan'], list):
plan = plan['plan']
else:
raise ValueError(f"LLM did not return a list as expected: {plan_json_str}")
return plan
except Exception as e:
print(f"Error during planning: {e}")
print(f"LLM raw output: {response.choices[0].message.content if 'response' in locals() else 'N/A'}")
return []
# Test the planner
# Re-use the parser from before to get the catalog
parser = SwaggerParser("petstore.yaml")
api_spec = parser.load_and_parse()
api_catalog_for_planner = parser.get_operations()
planner = AgentPlanner(api_catalog_for_planner)
user_goal_1 = "Create a new cat named 'Whiskers' with tag 'feline'."
plan_1 = planner.plan_api_calls(user_goal_1)
print(f"nPlan for '{user_goal_1}':n{json.dumps(plan_1, indent=2)}")
user_goal_2 = "List all pets."
plan_2 = planner.plan_api_calls(user_goal_2)
print(f"nPlan for '{user_goal_2}':n{json.dumps(plan_2, indent=2)}")
user_goal_3 = "Show me details for the pet with ID 456 after listing them."
plan_3 = planner.plan_api_calls(user_goal_3)
print(f"nPlan for '{user_goal_3}':n{json.dumps(plan_3, indent=2)}")
规划器说明:
AgentPlanner维护一个api_catalog,它是通过SwaggerParser提取的 API 操作列表。_generate_api_summaries将 API 目录转化为 LLM 易于理解的简洁格式。对于大型 API 目录,可能需要更高级的技术,例如只提供与用户目标相关的前 K 个 API 摘要,或者使用 RAG (Retrieval Augmented Generation) 模式。plan_api_calls方法是核心。它构建一个详细的 prompt,指导 LLM 生成一个结构化的 JSON 计划。这个 prompt 定义了输出格式、参数来源的约定 ($response.op_id.field,$generate.type)。response_format={"type": "json_object"}是 OpenAI API 的一个特性,强制模型输出有效的 JSON。temperature=0.0使得 LLM 的输出更具确定性,适合规划任务。
5. 参数生成器:填充 API 的“空白”
规划器告诉我们“调用哪个 API,以及需要哪些参数”。参数生成器的任务就是根据这些信息,结合 Agent 的内部状态和 API 的 schema 定义,生成实际的、有效的参数值。这是整个 Agent 最复杂和容易出错的部分。
挑战:
- 数据类型匹配: 字符串、整数、布尔值、数组、对象、日期等。
- 复杂结构: 嵌套对象、数组中的对象。
- 约束条件:
min/max,minLength/maxLength,pattern(regex),enum,format(email, uuid, date-time)。 - 动态值: 从用户输入、前一个 API 调用的响应、系统生成(UUID、时间戳)。
- 默认值/示例值: 利用 Swagger 文档中提供的
default或example。
策略:
- 从规划器获取: 如果规划器已经提供了具体值,直接使用。
- 从 Agent 状态获取: 查找当前 Agent 维护的上下文状态(例如,前一个 API 调用返回的
resource_id)。 - 根据 Schema 生成:
- 基本类型:
string,integer,boolean可以根据format和enum生成随机、合法的值或使用默认值。 - 数组: 如果
items指定了类型,可以生成一个包含几个该类型元素的数组。 - 对象: 递归地为对象的每个
property生成值,并确保required字段被填充。 - 引用 (
$ref): 解析components/schemas中的定义。
- 基本类型:
- LLM 辅助生成: 对于复杂或需要语义理解的字符串参数(例如,一个描述性文本),可以再次咨询 LLM。
代码示例:基础参数生成器
import uuid
import datetime
import random
from typing import Dict, Any, List, Optional
class ParameterGenerator:
def __init__(self, api_schema: APISchema, agent_state: Dict[str, Any]):
self.api_schema = api_schema
self.agent_state = agent_state # Store current state for dynamic parameter values
def _get_schema_by_ref(self, ref: str) -> Optional[ParameterSchema]:
"""Resolves a $ref to a schema definition in components."""
if not ref.startswith('#/components/schemas/'):
print(f"Warning: Non-component schema reference not supported: {ref}")
return None
schema_name = ref.replace('#/components/schemas/', '')
component_schema = self.api_schema.components.get('schemas', {}).get(schema_name)
if component_schema:
return ParameterSchema.model_validate(component_schema)
return None
def _generate_value_for_schema(self, schema: ParameterSchema, context: str = "", path: str = "") -> Any:
"""Recursively generates a value based on a given schema."""
if schema.enum:
return random.choice(schema.enum)
if schema.type == 'string':
if schema.format == 'uuid':
return str(uuid.uuid4())
elif schema.format == 'date-time':
return datetime.datetime.now(datetime.timezone.utc).isoformat()
elif schema.format == 'email':
return f"test_{uuid.uuid4().hex[:8]}@example.com"
else:
# Basic string generation, could be enhanced with LLM for semantic context
if "name" in path.lower(): return "GeneratedName"
if "description" in path.lower(): return "Generated description for " + context
if "tag" in path.lower(): return "default_tag"
return "generated_string_" + uuid.uuid4().hex[:4]
elif schema.type == 'integer':
# Could add min/max constraint handling
return random.randint(1, 100)
elif schema.type == 'boolean':
return random.choice([True, False])
elif schema.type == 'array':
if schema.items:
# Generate a small number of items for the array
return [self._generate_value_for_schema(ParameterSchema.model_validate(schema.items), context, path + "[i]") for _ in range(1)]
return [] # Empty array if items schema is missing
elif schema.type == 'object':
generated_object = {}
if schema.properties:
for prop_name, prop_schema_dict in schema.properties.items():
prop_schema = ParameterSchema.model_validate(prop_schema_dict)
# Handle nested $ref
if '$ref' in prop_schema_dict:
resolved_prop_schema = self._get_schema_by_ref(prop_schema_dict['$ref'])
if resolved_prop_schema:
prop_schema = resolved_prop_schema
# Only generate required properties by default, or if explicitly requested/needed
is_required = schema.required and prop_name in schema.required
if is_required: # For simplicity, always generate required
generated_object[prop_name] = self._generate_value_for_schema(prop_schema, context, path + "." + prop_name)
return generated_object
return None
def generate_parameters(self, operation: APIOperation, planned_params: Dict[str, Any]) -> Dict[str, Any]:
"""
Generates final parameters for an API operation based on its schema
and the planned_params from the planner.
"""
final_params = {}
request_body_data = {}
# Handle path, query, header, cookie parameters
if operation.parameters:
for param_def in operation.parameters:
param_name = param_def.name
param_in = param_def.in_
param_schema = param_def.schema_
value = planned_params.get(param_name)
if value is None: # Value not provided by planner
if param_def.required:
# Attempt to generate based on schema or state
value = self._generate_value_for_schema(param_schema, param_name, f"parameter.{param_name}")
if value is None:
raise ValueError(f"Required parameter '{param_name}' (in: {param_in}) could not be generated and was not provided by planner.")
else:
continue # Optional parameter, no value provided, so skip
# Check for dynamic values from state
if isinstance(value, str) and value.startswith('$response.'):
parts = value.split('.')
if len(parts) == 3: # $response.operation_id.field_name
op_id = parts[1]
field_name = parts[2]
if op_id in self.agent_state and field_name in self.agent_state[op_id]:
value = self.agent_state[op_id][field_name]
else:
raise ValueError(f"Dynamic parameter source '{value}' not found in agent state.")
else:
raise ValueError(f"Invalid dynamic parameter format: {value}")
elif isinstance(value, str) and value.startswith('$generate.'):
# For simplicity, we only have $generate.uuid handled in _generate_value_for_schema
gen_type = value.split('.')[1]
if gen_type == 'uuid':
value = str(uuid.uuid4())
else:
raise ValueError(f"Unsupported generation type: {gen_type}")
final_params[f"{param_in}_{param_name}"] = value # Prefix to distinguish parameter types
# Handle request body
if operation.requestBody:
# Assume application/json for now
json_content = operation.requestBody.content.get('application/json')
if json_content and json_content.schema_:
body_schema = json_content.schema_
# Resolve $ref if the body schema is a reference
if '$ref' in json_content.content: # Pydantic might put it here
resolved_body_schema = self._get_schema_by_ref(json_content.content['$ref'])
if resolved_body_schema:
body_schema = resolved_body_schema
# Take values from planned_params that are for the request body
# This needs careful mapping, as planned_params might be flat
# For simplicity, we assume planned_params directly maps to body properties
# Start with properties from planned_params
request_body_data = planned_params
# Fill in missing required properties based on schema
if body_schema.properties:
for prop_name, prop_schema_dict in body_schema.properties.items():
prop_schema = ParameterSchema.model_validate(prop_schema_dict)
# Resolve nested $ref within body properties
if '$ref' in prop_schema_dict:
resolved_prop_schema = self._get_schema_by_ref(prop_schema_dict['$ref'])
if resolved_prop_schema:
prop_schema = resolved_prop_schema
is_required = body_schema.required and prop_name in body_schema.required
if is_required and prop_name not in request_body_data:
generated_value = self._generate_value_for_schema(prop_schema, prop_name, f"body.{prop_name}")
if generated_value is None:
raise ValueError(f"Required body property '{prop_name}' could not be generated.")
request_body_data[prop_name] = generated_value
final_params['request_body'] = request_body_data
return final_params
# Test the parameter generator
# Assume we have an agent_state (empty for now)
current_agent_state = {}
# Get an operation from the parser
parser = SwaggerParser("petstore.yaml")
api_spec = parser.load_and_parse()
operations = parser.get_operations()
# Find the createPet operation
create_pet_op = next((op for op in operations if op['operation_id'] == 'createPet'), None)
if create_pet_op:
create_pet_operation_model = APIOperation.model_validate(create_pet_op) # Convert to Pydantic model
print("n--- Testing ParameterGenerator for createPet ---")
# Scenario 1: Planner provides all needed parameters
planned_params_1 = {"name": "Buddy", "tag": "dog"}
generated_params_1 = ParameterGenerator(api_spec, current_agent_state).generate_parameters(create_pet_operation_model, planned_params_1)
print(f"Generated params for 'createPet' with full plan:n{json.dumps(generated_params_1, indent=2)}")
# Scenario 2: Planner provides only 'tag', 'name' needs to be generated (if schema allows, but 'name' is required)
# This will raise an error because 'name' is required and not provided/generatable without explicit rule
# try:
# planned_params_2 = {"tag": "fish"}
# generated_params_2 = ParameterGenerator(api_spec, current_agent_state).generate_parameters(create_pet_operation_model, planned_params_2)
# print(f"Generated params for 'createPet' with partial plan:n{json.dumps(generated_params_2, indent=2)}")
# except ValueError as e:
# print(f"Expected error caught: {e}")
# Scenario 3: Test with an operation that has query parameters, e.g., listPets
list_pets_op = next((op for op in operations if op['operation_id'] == 'listPets'), None)
if list_pets_op:
list_pets_operation_model = APIOperation.model_validate(list_pets_op)
planned_params_3 = {"limit": 50}
generated_params_3 = ParameterGenerator(api_spec, current_agent_state).generate_parameters(list_pets_operation_model, planned_params_3)
print(f"nGenerated params for 'listPets' with limit 50:n{json.dumps(generated_params_3, indent=2)}")
planned_params_4 = {} # No limit provided, it's optional
generated_params_4 = ParameterGenerator(api_spec, current_agent_state).generate_parameters(list_pets_operation_model, planned_params_4)
print(f"Generated params for 'listPets' with no limit specified:n{json.dumps(generated_params_4, indent=2)}")
参数生成器说明:
ParameterGenerator接收APISchema和agent_state。_get_schema_by_ref辅助函数用于解析components/schemas中的引用。_generate_value_for_schema是核心递归函数,根据参数的type、format、enum等生成值。它处理了基本类型、数组和对象,并支持简单的上下文感知(例如,如果路径包含“name”,则生成“GeneratedName”)。generate_parameters方法遍历 API 操作的所有参数,结合planned_params和agent_state来确定最终值。它也处理了requestBody。- 对于动态值,如
$response.operation_id.field_name,它会从agent_state中查找。
6. API 调用执行器:与外部世界交互
一旦参数生成器提供了完整的、有效的请求参数,API 调用执行器就负责构造并发送实际的 HTTP 请求,然后接收和初步处理响应。
核心功能:
- URL 构造: 将路径参数(如
/pets/{petId}中的{petId})替换为实际值。 - 请求头: 设置
Content-Type,Accept,Authorization等。 - 请求体: 将请求体数据(通常是 JSON)序列化。
- HTTP 方法: 使用正确的 GET, POST, PUT, DELETE 等方法。
- 发送请求: 使用 HTTP 客户端库(如 Python 的
requests库)。 - 基本响应检查: 检查 HTTP 状态码。
代码示例:通用 API 调用执行器
import requests
from urllib.parse import urljoin
class APIInvoker:
def __init__(self, base_url: str, auth_manager: Any = None): # auth_manager for future integration
self.base_url = base_url
self.session = requests.Session()
self.auth_manager = auth_manager # Placeholder for authentication manager
def invoke(self, operation: APIOperation, generated_params: Dict[str, Any]) -> requests.Response:
"""
Constructs and sends an HTTP request based on the API operation and generated parameters.
"""
method = operation.model_fields_set.intersection({'get', 'post', 'put', 'delete', 'patch'}).pop().upper() # Get the actual method
path_template = operation.path # Assuming operation now holds the original path string
# 1. Construct URL
url = urljoin(self.base_url, path_template)
# 2. Extract and apply path parameters
path_params = {k.replace('path_', ''): v for k, v in generated_params.items() if k.startswith('path_')}
for param_name, param_value in path_params.items():
url = url.replace(f"{{{param_name}}}", str(param_value))
# 3. Extract query parameters
query_params = {k.replace('query_', ''): v for k, v in generated_params.items() if k.startswith('query_')}
# 4. Extract header parameters
headers = {k.replace('header_', ''): str(v) for k, v in generated_params.items() if k.startswith('header_')}
headers.setdefault('Accept', 'application/json') # Default accept JSON
# 5. Handle authentication (simplified)
if self.auth_manager:
# This would integrate with a dedicated authentication manager
# For now, let's assume it can add an Authorization header
auth_header = self.auth_manager.get_auth_header(operation.security) # Pass security requirements
if auth_header:
headers.update(auth_header)
# 6. Prepare request body
json_data = generated_params.get('request_body')
if json_data:
headers.setdefault('Content-Type', 'application/json')
print(f"n--- Invoking API ---")
print(f"Method: {method}, URL: {url}")
print(f"Query Params: {query_params}")
print(f"Headers: {headers}")
print(f"JSON Body: {json_data}")
try:
response = self.session.request(
method,
url,
params=query_params,
headers=headers,
json=json_data,
timeout=30 # Add a timeout
)
response.raise_for_status() # Raise HTTPError for bad responses (4xx or 5xx)
print(f"API Call Success! Status: {response.status_code}")
return response
except requests.exceptions.HTTPError as e:
print(f"HTTP Error during API invocation: {e.response.status_code} - {e.response.text}")
raise
except requests.exceptions.ConnectionError as e:
print(f"Connection Error during API invocation: {e}")
raise
except requests.exceptions.Timeout as e:
print(f"Timeout Error during API invocation: {e}")
raise
except Exception as e:
print(f"An unexpected error occurred during API invocation: {e}")
raise
# --- Mock Authentication Manager for demonstration ---
class MockAuthManager:
def __init__(self, api_key: str):
self.api_key = api_key
def get_auth_header(self, security_schemes: Optional[List[Dict[str, List[str]]]] = None) -> Dict[str, str]:
# A very basic example: if any security scheme is present, assume API Key auth
if security_schemes:
# In a real scenario, you'd parse securityDefinitions from OpenAPI spec
# and match them to the operation's 'security' field
return {"X-API-KEY": self.api_key}
return {}
# Test the invoker
# Re-use parser and parameter generator setup
parser = SwaggerParser("petstore.yaml")
api_spec = parser.load_and_parse()
operations = parser.get_operations()
# Find createPet operation
create_pet_op_raw = next((op for op in operations if op['operation_id'] == 'createPet'), None)
create_pet_op_model = APIOperation.model_validate(create_pet_op_raw)
create_pet_op_model.path = create_pet_op_raw['path'] # Attach original path for URL construction
# Find listPets operation
list_pets_op_raw = next((op for op in operations if op['operation_id'] == 'listPets'), None)
list_pets_op_model = APIOperation.model_validate(list_pets_op_raw)
list_pets_op_model.path = list_pets_op_raw['path']
# Initialize components
agent_state = {}
param_gen = ParameterGenerator(api_spec, agent_state)
auth_mgr = MockAuthManager(api_key="your-super-secret-api-key") # Replace with a real key
invoker = APIInvoker(base_url="http://localhost:8080/api/v1", auth_manager=auth_mgr) # Assume a local petstore API is running
# Scenario: Create a pet
try:
planned_params_create = {"name": "TestPet", "tag": "test"}
generated_params_create = param_gen.generate_parameters(create_pet_op_model, planned_params_create)
# response_create = invoker.invoke(create_pet_op_model, generated_params_create)
# print(f"Response from createPet: {response_create.json()}")
# For demonstration, we'll just print the prepared params and pretend to invoke
print("n(Skipping actual API invocation for createPet for demo purposes, see commented line)")
print(f"Would invoke createPet with: {json.dumps(generated_params_create, indent=2)}")
except Exception as e:
print(f"Failed to invoke createPet: {e}")
# Scenario: List pets
try:
planned_params_list = {"limit": 10}
generated_params_list = param_gen.generate_parameters(list_pets_op_model, planned_params_list)
# response_list = invoker.invoke(list_pets_op_model, generated_params_list)
# print(f"Response from listPets: {response_list.json()}")
print("n(Skipping actual API invocation for listPets for demo purposes, see commented line)")
print(f"Would invoke listPets with: {json.dumps(generated_params_list, indent=2)}")
except Exception as e:
print(f"Failed to invoke listPets: {e}")
API 调用执行器说明:
APIInvoker使用requests.Session来保持连接,这在进行多个连续调用时更高效。invoke方法负责将APIOperation对象和generated_params转化为一个完整的 HTTP 请求。- 它智能地处理路径参数替换、查询参数、请求头和 JSON 请求体。
auth_manager是一个占位符,用于处理复杂的认证逻辑。在真实系统中,它会根据 OpenAPI 的securitySchemes定义来动态应用 API Key、OAuth2 Token 等。response.raise_for_status()是requests库的一个便捷功能,它会在收到 4xx 或 5xx 状态码时自动抛出HTTPError。
7. 响应处理器与状态管理器:学习与适应
API 调用成功后,Agent 需要处理响应:解析数据,检查其是否符合预期,并提取出对后续操作有用的信息,更新其内部状态。
核心功能:
- 响应解析: 将 JSON 或 XML 响应解析为 Python 对象。
- 响应校验: (可选但强烈推荐)根据 OpenAPI 文档中定义的响应 schema 校验响应结构和数据类型。
- 信息提取: 根据规划器的指示或预设规则,从响应中提取关键数据(例如,新创建资源的 ID、状态更新信息)。
- 状态更新: 将提取到的信息存储到 Agent 的内部状态或上下文,供后续 API 调用使用。
- 进度报告: 将当前操作的结果反馈给规划器。
代码示例:响应处理器与状态管理器
from typing import Dict, Any, Optional
import json
import requests
class ResponseProcessor:
def __init__(self, api_schema: APISchema, agent_state: Dict[str, Any]):
self.api_schema = api_schema
self.agent_state = agent_state
def process_response(self, operation: APIOperation, response: requests.Response) -> Dict[str, Any]:
"""
Processes an API response, extracts relevant data, and updates agent state.
"""
response_data = {}
try:
# 1. Parse response body (assuming JSON)
if 'application/json' in response.headers.get('Content-Type', ''):
response_data = response.json()
else:
response_data = {"raw_text": response.text} # Fallback for non-JSON responses
except json.JSONDecodeError:
print(f"Warning: Could not decode JSON from response for operation {operation.operationId}. Raw text: {response.text[:200]}...")
response_data = {"raw_text": response.text}
# 2. (Optional) Validate response against schema
# This part is complex and typically requires a dedicated library or robust recursive validation.
# For brevity, we'll skip detailed schema validation here but acknowledge its importance.
# If response schema is available for the given status code, one would compare response_data
# against operation.responses[str(response.status_code)].content['application/json'].schema_
# 3. Extract key information and update agent state
# The agent needs to know *what* to extract. This can be pre-defined,
# or driven by LLM instructions (e.g., "extract the 'id' field from the response").
extracted_info = {}
if operation.operationId == 'createPet':
# After creating a pet, we expect an 'id'
if 'id' in response_data:
extracted_info['id'] = response_data['id']
print(f"Extracted pet ID: {response_data['id']}")
if 'name' in response_data:
extracted_info['name'] = response_data['name']
elif operation.operationId == 'listPets':
# After listing pets, we might want to store the list
if isinstance(response_data, list):
extracted_info['pets'] = response_data
print(f"Extracted {len(response_data)} pets from list.")
elif operation.operationId == 'showPetById':
# After getting pet details, store that specific pet's info
if 'id' in response_data:
extracted_info[f"pet_{response_data['id']}"] = response_data
print(f"Extracted details for pet ID {response_data['id']}.")
# Update agent_state with results, typically prefixed by operation ID
self.agent_state[operation.operationId] = extracted_info
print(f"Agent state updated for {operation.operationId}: {extracted_info}")
return extracted_info
# Test the response processor
# Mock a response object
class MockResponse:
def __init__(self, status_code, content_type, text_content):
self.status_code = status_code
self._content_type = content_type
self._text_content = text_content
@property
def headers(self):
return {'Content-Type': self._content_type}
def json(self):
return json.loads(self._text_content)
@property
def text(self):
return self._text_content
def raise_for_status(self):
if 400 <= self.status_code < 600:
raise requests.exceptions.HTTPError(f"HTTP Error: {self.status_code}", response=self)
# Re-use parser setup
parser = SwaggerParser("petstore.yaml")
api_spec = parser.load_and_parse()
operations = parser.get_operations()
# Find createPet operation
create_pet_op_raw = next((op for op in operations if op['operation_id'] == 'createPet'), None)
create_pet_op_model = APIOperation.model_validate(create_pet_op_raw)
create_pet_op_model.path = create_pet_op_raw['path']
# Find listPets operation
list_pets_op_raw = next((op for op in operations if op['operation_id'] == 'listPets'), None)
list_pets_op_model = APIOperation.model_validate(list_pets_op_raw)
list_pets_op_model.path = list_pets_op_raw['path']
agent_state = {}
response_proc = ResponseProcessor(api_spec, agent_state)
# Scenario 1: Process a successful createPet response
print("n--- Processing createPet response ---")
mock_create_response = MockResponse(201, 'application/json', '{"id": 123, "name": "TestPet", "tag": "test"}')
extracted_create = response_proc.process_response(create_pet_op_model, mock_create_response)
print(f"Current Agent State: {json.dumps(agent_state, indent=2)}")
# Scenario 2: Process a successful listPets response
print("n--- Processing listPets response ---")
mock_list_response = MockResponse(200, 'application/json', '[{"id": 123, "name": "TestPet", "tag": "test"}, {"id": 456, "name": "OtherPet", "tag": "wild"}]')
extracted_list = response_proc.process_response(list_pets_op_model, mock_list_response)
print(f"Current Agent State: {json.dumps(agent_state, indent=2)}")
响应处理器说明:
ResponseProcessor同样持有api_schema和agent_state。process_response首先尝试解析 JSON。- 信息提取是这个模块的关键。在示例中,我们使用了硬编码的
if operation.operationId == '...'逻辑来演示提取。在真正的自主 Agent 中,这种提取逻辑将由 LLM 动态生成,或者由规划器在计划阶段就明确指示(例如,“从createPet响应中提取id字段并存储为new_pet_id”)。 - 提取到的信息被存储在
agent_state中,以操作 ID 作为键,便于后续查找和使用(例如,在参数生成器中解析$response.operation_id.field_name)。
8. 错误与重试机制:构建健壮性
即使是最完美的 API 调用也可能失败,原因可能是网络问题、服务器暂时过载、无效请求等。一个健壮的 Agent 必须能够优雅地处理这些错误。
策略:
- 错误分类:
- 网络错误 (ConnectionError, Timeout): 通常是临时性的,适合重试。
- 客户端错误 (4xx HTTP 状态码): 通常表示请求本身有问题(例如,参数无效、认证失败)。很少适合自动重试,可能需要重新规划或报告给用户。
- 服务器错误 (5xx HTTP 状态码): 表示服务器端问题,通常适合重试。
- 重试机制:
- 指数退避 (Exponential Backoff): 每次重试等待的时间逐渐增长,以避免加重服务器负担。
- 最大重试次数: 限制重试次数,防止无限循环。
- 抖动 (Jitter): 在指数退避的基础上增加随机性,避免所有客户端同时重试。
- 熔断 (Circuit Breaker): 如果某个 API 持续失败,暂时停止对其的调用,避免浪费资源。
- 失败报告与回退: 对于不可恢复的错误,将失败信息报告给规划器,规划器可能需要调整计划,或者将问题上报给人类。
代码示例:带指数退避的重试装饰器
import time
import random
import requests
from functools import wraps
from typing import Callable, Any
def retry_with_exponential_backoff(
max_retries: int = 3,
initial_delay: float = 1.0,
backoff_factor: float = 2.0,
jitter: bool = True,
catch_exceptions: tuple = (
requests.exceptions.ConnectionError,
requests.exceptions.Timeout,
requests.exceptions.HTTPError
)
) -> Callable:
"""
A decorator to retry a function call with exponential backoff.
Suitable for transient network errors and server-side issues (5xx).
"""
def decorator(func: Callable) -> Callable:
@wraps(func)
def wrapper(*args, **kwargs) -> Any:
delay = initial_delay
for i in range(max_retries + 1):
try:
return func(*args, **kwargs)
except catch_exceptions as e:
if i == max_retries:
print(f"Max retries ({max_retries}) reached. Raising exception: {e}")
raise
print(f"Attempt {i+1}/{max_retries+1} failed with error: {type(e).__name__} - {e}. Retrying in {delay:.2f} seconds...")
sleep_time = delay
if jitter:
sleep_time = delay * (0.5 + random.random() * 0.5) # Add 50-100% random jitter
time.sleep(sleep_time)
delay *= backoff_factor
return wrapper
return decorator
# Integrate with APIInvoker
class APIInvokerWithRetry(APIInvoker): # Inherit from our previous Invoker
@retry_with_exponential_backoff(max_retries=3)
def invoke(self, operation: APIOperation, generated_params: Dict[str, Any]) -> requests.Response:
# Call the original invoke method from the parent class
return super().invoke(operation, generated_params)
# Test the invoker with retry
# We need to make the mock response fail sometimes to test retry
class FailingMockResponse(MockResponse):
_call_count = 0
def __init__(self, status_code, content_type, text_content, fail_until_attempt: int = 1):
super().__init__(status_code, content_type, text_content)
self.fail_until_attempt = fail_until_attempt
self._current_attempt = 0
def raise_for_status(self):
FailingMockResponse._call_count += 1
self._current_attempt = FailingMockResponse._call_count
if self._current_attempt <= self.fail_until_attempt:
print(f" (Mocking failure on attempt {self._current_attempt} / Fail until {self.fail_until_attempt})")
# Simulate a 500 error for retries
raise requests.exceptions.HTTPError(f"Mocked 500 Server Error on attempt {self._current_attempt}", response=MockResponse(500, 'text/plain', 'Internal Server Error'))
else:
print(f" (Mocking success on attempt {self._current_attempt})")
super().raise_for_status() # Call parent's raise_for_status for actual success/failure logic
# Reset call count for subsequent tests
FailingMockResponse._call_count = 0
# Re-use parser setup
parser = SwaggerParser("petstore.yaml")
api_spec = parser.load_and_parse()
operations = parser.get_operations()
create_pet_op_raw = next((op for op in operations if op['operation_id'] == 'createPet'), None)
create_pet_op_model = APIOperation.model_validate(create_pet_op_raw)
create_pet_op_model.path = create_pet_op_raw['path'] # Attach original path for URL construction
agent_state = {}
param_gen = ParameterGenerator(api_spec, agent_state)
auth_mgr = MockAuthManager(api_key="your-super-secret-api-key")
# We need to mock the actual requests.Session.request method to test the retry logic without hitting a real API
class MockRequestsSession:
def __init__(self, mock_response_instance):
self._mock_response = mock_response_instance
def request(self, method, url, params=None, headers=None, json=None, timeout=None):
print(f" [Mock Session] Calling method: {method}, URL: {url}")
# The mock response will internally handle the failure/success logic
return self._mock_response
# Instantiate the invoker with the mock session
mock_failing_response = FailingMockResponse(201, 'application/json', '{"id": 124, "name": "RetryPet", "tag": "retry"}', fail_until_attempt=2)
mock_session = MockRequestsSession(mock_failing_response)
invoker_with_retry = APIInvokerWithRetry(base_url="http://localhost:8080/api/v1", auth_manager=auth_mgr)
invoker_with_retry.session = mock_session # Override the session with our mock
print("n--- Testing APIInvokerWithRetry (should succeed after 2 retries) ---")
try:
planned_params_retry = {"name": "RetryPet", "tag": "test"}
generated_params_retry = param_gen.generate_parameters(create_pet_op_model, planned_params_retry)
response_retry = invoker_with_retry.invoke(create_pet_op_model, generated_params_retry)
print(f"Final response status: {response_retry.status_code}")
except Exception as e:
print(f"Failed after retries: {e}")
# Reset call count for next test
FailingMockResponse._call_count = 0
mock_failing_response_max_fail = FailingMockResponse(201, 'application/json', '{"id": 125, "name": "MaxRetryPet", "tag": "max"}', fail_until_attempt=4) # Fails 4 times, max_retries is 3
mock_session_max_fail = MockRequestsSession(mock_failing_response_max_fail)
invoker_with_retry_max_fail = APIInvokerWithRetry(base_url="http://localhost:8080/api/v1", auth_manager=auth_mgr)
invoker_with_retry_max_fail.session = mock_session_max_fail
print("n--- Testing APIInvokerWithRetry (should fail after max retries) ---")
try:
planned_params_max_retry = {"name": "MaxRetryPet", "tag": "test"}
generated_params_max_retry = param_gen.generate_parameters(create_pet_op_model, planned_params_max_retry)
response_max_retry = invoker_with_retry_max_fail.invoke(create_pet_op_model, generated_params_max_retry)
print(f"Final response status: {response_max_retry.status_code}")
except Exception as e:
print(f"Successfully failed after max retries (as expected): {e}")
错误与重试机制说明:
retry_with_exponential_backoff是一个通用的 Python 装饰器,可以应用于任何可能失败的函数。- 它配置了
max_retries、initial_delay、backoff_factor和jitter。 catch_exceptions指定了哪些异常触发重试。我们通常只重试网络错误和服务器端错误(5xx)。APIInvokerWithRetry类继承自APIInvoker,并使用@retry_with_exponential_backoff装饰器来增强其invoke方法。- 测试代码使用了
MockRequestsSession和FailingMockResponse来模拟不同次数的失败,以验证重试逻辑。
9. 高级考量与未来挑战
我们已经构建了一个功能强大的 Agent 骨架,但要使其在真实世界中达到“自主”和“智能”的水平,还有许多高级考量和挑战:
- 认证与授权:
- OAuth2 流程: 许多现代 API 使用 OAuth2。Agent 需要能够执行授权码流、客户端凭证流等,这涉及重定向、获取和刷新令牌,是一个复杂的状态机。
- 动态凭证管理: 如何安全地存储和检索不同 API 的凭证?密钥管理系统集成是必要的。
- 异步操作与 Webhooks:
- 某些 API 操作是异步的,它们会返回一个 Job ID,然后通过轮询或 Webhook 通知结果。Agent 需要能够处理这种模式,包括设置和监听 Webhook。
- 速率限制与配额管理:
- API 通常有速率限制。Agent 需要能够理解并遵守这些限制,通过令牌桶或漏桶算法来管理请求发送频率,避免被封禁。
- 数据流与转换:
- 在复杂的链式调用中,一个 API 的输出可能需要进行转换才能作为另一个 API 的输入。例如,一个 API 返回
user_id,另一个 API 需要userIdentifier。LLM 在这里可以提供帮助,识别并执行必要的转换。
- 在复杂的链式调用中,一个 API 的输出可能需要进行转换才能作为另一个 API 的输入。例如,一个 API 返回
- Agent 记忆与学习:
- Agent 应该能够从成功的操作中学习,优化未来的规划。例如,它可能会发现某个 API 组合总是能高效地完成特定任务。
- 长期记忆存储:将重要的实体(如创建的 ID、配置信息)持久化,以便 Agent 可以在会话之间记住它们。
- 人类在环 (Human-in-the-Loop):
- 当 Agent 无法解决问题、遇到歧义或需要关键决策时,它应该能够暂停并寻求人类的帮助,提供清晰的上下文和选项。
- 成本优化:
- LLM API 调用是昂贵的。需要优化 prompt 的长度、调用频率,并考虑使用更小的、专门化的模型来处理特定任务。
10. 展望未来
我们今天所探讨的自主 Agent,代表了软件自动化领域的一个重要方向。从手动编写集成代码到 Agent 能够自主理解和调用 API,这是一个巨大的飞跃。尽管挑战重重,尤其是语义理解、复杂参数生成和状态管理方面,但随着 LLMs 和其他 AI 技术的发展,我们正逐步接近构建真正通用、智能的 API 交互 Agent。
这样的 Agent 不仅能极大地提高开发效率,还能让非技术人员更容易地利用复杂的 API 服务,开启全新的应用场景。这不仅仅是技术上的进步,更是对人类与机器协作模式的一次深刻变革。我们正站在一个激动人心的时代门槛上,见证着软件工程的未来。