各位同仁,各位对未来智能系统充满憧憬的技术专家们,晚上好。
今天,我们将深入探讨一个正日益成为人工智能领域核心议题的范式:Agent Micro-payments,即代理微支付。随着AI模型能力的飞跃,我们正从简单的工具使用迈向构建一个由自主智能体(Autonomous Agents)构成的生态系统。在这个系统中,代理不再仅仅是执行预设任务的程序,它们将具备目标设定、规划、执行、反思,乃至相互协作的能力。而要实现高效、可扩展、去中心化的协作,一个健全的内部经济系统——特别是围绕微支付的机制——是不可或缺的。
我们将从编程专家的视角出发,剖析如何设计一个支持Agent之间相互购买推理算力、知识片段或特定服务能力的内部经济系统。这不仅仅是技术挑战,更是经济学与计算机科学的深度融合。
1. 智能体经济的黎明:为何需要Agent微支付?
想象一下,一个由数百甚至数千个智能体组成的数字城市。有些智能体擅长数据分析,有些是语言专家,有些掌握着独特的API接口,有些则拥有强大的计算资源。当一个宏大任务被提出时,比如“预测未来五年全球气候变化对某地区农业生产的影响”,单一智能体可能无法独立完成。它需要:
- 从“数据收集代理”那里购买最新的卫星图像和气象数据。
- 从“气候模型代理”那里购买气候模拟的推理算力。
- 从“经济分析代理”那里购买农业经济学的知识片段和市场分析。
- 从“语言生成代理”那里购买最终报告的撰写服务。
在这种协作模式下,资源是稀缺的,服务是有成本的。如果不引入经济机制,我们将面临:
- 资源滥用: 免费或无限制的资源使用会导致浪费和系统过载。
- 激励缺失: 代理提供高质量服务或分享稀有资源的动力不足。
- 效率低下: 难以在众多代理中自动选择最佳服务提供者。
- 信任问题: 如何确保服务提供者兑现承诺,请求者支付费用?
Agent微支付系统正是为了解决这些问题而生。它旨在建立一个内部市场,让代理能够:
- 量化服务价值: 将推理算力、知识、数据访问等抽象资源具象化为可交易的商品。
- 实现按需付费: 仅为实际使用的资源和提供的服务付费,避免预付和浪费。
- 激励优质服务: 表现良好的代理可以获得更多收入和更高的声誉。
- 促进动态资源分配: 市场机制可以根据供需关系自动调整资源价格和流向。
- 构建信任与透明: 通过可验证的交易记录和协议,增强代理间的信任。
我们的目标是设计一个不仅能处理交易,还能促进代理间协商、发现服务并最终实现复杂任务协作的经济骨架。
2. 核心概念与系统架构蓝图
在深入代码之前,我们必须先建立清晰的概念模型和高层架构。
2.1 核心概念
- 智能体 (Agent): 系统中的基本参与者,拥有独特的能力、资源和目标。它们是交易的买方和卖方。每个Agent拥有一个唯一的标识符(Agent ID)和一个数字钱包。
- 服务 (Service): Agent可以提供或消费的特定能力。例如:
LLM_Inference_GPT4_Turbo,Image_Recognition_YOLOv8,Data_Retrieval_WeatherAPI,Knowledge_Fragment_BioChem_Synthesis。每个服务都有一个定义好的接口和潜在的价格模型。 - 资源 (Resource): 智能体需要消耗或提供的基本要素。计算时间(CPU/GPU cycles)、内存、存储、API调用配额、特定数据集访问权、预训练模型权重、知识库访问权限等。
- 货币 (Currency): 内部经济系统的媒介。它可以是简单的整数计数(
Credits,Tokens),也可以是更复杂的、带有属性的数字资产。 - 交易 (Transaction): 两个或多个Agent之间关于资源或服务交换的原子操作,涉及货币的转移。
- 账本 (Ledger): 记录所有交易历史的系统,确保资金流动的透明性和可审计性。
- 声誉 (Reputation): 衡量Agent可靠性、服务质量和交易历史的指标,影响其在市场中的地位和潜在交易机会。
2.2 多层架构设计
为了构建一个健壮且可扩展的Agent经济系统,我们可以采用分层架构:
| 层级 | 职责 | 关键组件 | 协议/技术 |
|---|---|---|---|
| 应用层 | 代理行为逻辑,任务编排 | 任务管理器、规划器、领域特定智能体 | 代理内部协议、API |
| 服务层 | 代理服务发现、请求与提供 | 代理注册中心、服务提供者、服务消费者、协商器 | gRPC, REST, WebSocket, 自定义消息协议 |
| 经济核心层 | 货币、账户、交易管理 | 内部账本、交易处理器、支付渠道管理器、智能合约 | 内部数据库(SQLite/PostgreSQL)、P2P协议 |
| 信任与安全层 | 身份验证、授权、数据完整性、声誉管理 | 身份服务、签名验证、声誉管理器 | 非对称加密、哈希、时间戳 |
| 基础设施层 | 代理通信、持久化、计算资源 | 消息队列、数据库、容器编排(Kubernetes)、RPC | RabbitMQ, Redis, Kafka, Docker |
接下来,我们将聚焦于服务层和经济核心层,通过代码示例进行详细阐述。
3. 深入组件:代码驱动的实现
我们将使用Python作为主要的编程语言,因为它在AI和分布式系统开发中具有广泛的应用和良好的生态。
3.1 Agent身份与注册中心
每个Agent都需要一个唯一的身份,并能够注册其提供的服务。注册中心是Agent发现彼此并了解其能力的关键。
agent_identity.py
import uuid
import time
from typing import Dict, Any, List
class AgentID:
"""
Represent a unique identifier for an Agent.
"""
def __init__(self, agent_id: str = None):
self._id = agent_id if agent_id else str(uuid.uuid4())
@property
def id(self) -> str:
return self._id
def __eq__(self, other):
if not isinstance(other, AgentID):
return NotImplemented
return self._id == other._id
def __hash__(self):
return hash(self._id)
def __str__(self):
return f"AgentID({self._id[:8]}...)"
def __repr__(self):
return f"AgentID('{self._id}')"
class ServiceDescriptor:
"""
Describes a service offered by an agent.
"""
def __init__(self,
service_name: str,
description: str,
input_schema: Dict[str, Any],
output_schema: Dict[str, Any],
price_model: Dict[str, Any], # e.g., {"type": "fixed", "amount": 10}, {"type": "per_token", "cost_per_unit": 0.001}
capabilities: List[str] = None):
self.service_name = service_name
self.description = description
self.input_schema = input_schema
self.output_schema = output_schema
self.price_model = price_model
self.capabilities = capabilities if capabilities is not None else []
self.last_updated = time.time()
def to_dict(self):
return {
"service_name": self.service_name,
"description": self.description,
"input_schema": self.input_schema,
"output_schema": self.output_schema,
"price_model": self.price_model,
"capabilities": self.capabilities,
"last_updated": self.last_updated
}
@classmethod
def from_dict(cls, data: Dict[str, Any]):
desc = cls(
service_name=data["service_name"],
description=data["description"],
input_schema=data["input_schema"],
output_schema=data["output_schema"],
price_model=data["price_model"],
capabilities=data.get("capabilities", [])
)
desc.last_updated = data.get("last_updated", time.time())
return desc
class AgentInfo:
"""
Holds information about an agent registered in the system.
"""
def __init__(self, agent_id: AgentID, address: str, services: List[ServiceDescriptor] = None):
self.agent_id = agent_id
self.address = address # Network address (e.g., "localhost:50051")
self.services = {s.service_name: s for s in (services if services is not None else [])}
self.last_seen = time.time()
def add_service(self, service: ServiceDescriptor):
self.services[service.service_name] = service
self.last_seen = time.time()
def update_last_seen(self):
self.last_seen = time.time()
def to_dict(self):
return {
"agent_id": self.agent_id.id,
"address": self.address,
"services": {name: desc.to_dict() for name, desc in self.services.items()},
"last_seen": self.last_seen
}
@classmethod
def from_dict(cls, data: Dict[str, Any]):
agent_id = AgentID(data["agent_id"])
services = [ServiceDescriptor.from_dict(s_data) for s_data in data["services"].values()]
info = cls(agent_id, data["address"], services)
info.last_seen = data.get("last_seen", time.time())
return info
AgentID提供唯一标识,ServiceDescriptor详细描述服务,AgentInfo整合代理信息。
agent_registry.py
import threading
from typing import Dict, Optional, List
import json
import os
from agent_identity import AgentID, AgentInfo, ServiceDescriptor
class AgentRegistry:
"""
A centralized registry for agents and their services.
Provides discovery and health checking mechanisms.
"""
_instance = None
_lock = threading.Lock()
REGISTRY_FILE = "agent_registry.json"
def __new__(cls):
if cls._instance is None:
with cls._lock:
if cls._instance is None:
cls._instance = super(AgentRegistry, cls).__new__(cls)
cls._instance._init_registry()
return cls._instance
def _init_registry(self):
self.agents: Dict[AgentID, AgentInfo] = {}
self.lock = threading.Lock()
self._load_registry()
print(f"AgentRegistry initialized with {len(self.agents)} agents loaded.")
def _load_registry(self):
if os.path.exists(self.REGISTRY_FILE):
try:
with open(self.REGISTRY_FILE, 'r') as f:
data = json.load(f)
for agent_id_str, agent_data in data.items():
agent_info = AgentInfo.from_dict(agent_data)
self.agents[agent_info.agent_id] = agent_info
print(f"Loaded registry from {self.REGISTRY_FILE}")
except json.JSONDecodeError as e:
print(f"Error decoding registry file: {e}")
self.agents = {} # Reset if corrupted
else:
print(f"Registry file {self.REGISTRY_FILE} not found. Starting fresh.")
def _save_registry(self):
with open(self.REGISTRY_FILE, 'w') as f:
json.dump({aid.id: info.to_dict() for aid, info in self.agents.items()}, f, indent=4)
print(f"Saved registry to {self.REGISTRY_FILE}")
def register_agent(self, agent_info: AgentInfo):
"""Registers a new agent or updates an existing one."""
with self.lock:
self.agents[agent_info.agent_id] = agent_info
self._save_registry()
print(f"Agent {agent_info.agent_id} registered/updated at {agent_info.address}")
def deregister_agent(self, agent_id: AgentID):
"""Removes an agent from the registry."""
with self.lock:
if agent_id in self.agents:
del self.agents[agent_id]
self._save_registry()
print(f"Agent {agent_id} deregistered.")
return True
return False
def update_agent_heartbeat(self, agent_id: AgentID):
"""Updates an agent's last seen timestamp."""
with self.lock:
if agent_id in self.agents:
self.agents[agent_id].update_last_seen()
# print(f"Agent {agent_id} heartbeat updated.") # Too verbose
return True
return False
def find_agents_by_service(self, service_name: str) -> List[AgentInfo]:
"""Finds agents offering a specific service."""
found_agents = []
with self.lock:
for agent_info in self.agents.values():
if service_name in agent_info.services:
found_agents.append(agent_info)
return found_agents
def get_agent_info(self, agent_id: AgentID) -> Optional[AgentInfo]:
"""Retrieves information for a specific agent."""
with self.lock:
return self.agents.get(agent_id)
def list_all_agents(self) -> List[AgentInfo]:
"""Lists all registered agents."""
with self.lock:
return list(self.agents.values())
def list_all_services(self) -> Dict[str, List[AgentID]]:
"""Lists all services and the agents providing them."""
all_services: Dict[str, List[AgentID]] = {}
with self.lock:
for agent_id, agent_info in self.agents.items():
for service_name in agent_info.services:
all_services.setdefault(service_name, []).append(agent_id)
return all_services
# Example usage (for demonstration, normally accessed by other services)
if __name__ == "__main__":
registry = AgentRegistry()
# Create dummy agents
agent1_id = AgentID()
agent2_id = AgentID()
# Define services
service_llm = ServiceDescriptor(
service_name="LLM_Inference",
description="Provides general purpose LLM inference (e.g., GPT-3.5 equivalent).",
input_schema={"prompt": "string", "max_tokens": "integer"},
output_schema={"response": "string"},
price_model={"type": "per_token", "cost_per_unit": 0.0001, "currency": "CREDIT"}
)
service_image = ServiceDescriptor(
service_name="Image_Recognition",
description="Detects objects in an image.",
input_schema={"image_url": "string"},
output_schema={"detections": "array"},
price_model={"type": "fixed", "amount": 5, "currency": "CREDIT"}
)
service_data = ServiceDescriptor(
service_name="Weather_Data_API",
description="Provides real-time weather data for a given location.",
input_schema={"location": "string", "date": "string"},
output_schema={"temperature": "float", "humidity": "float"},
price_model={"type": "per_call", "cost_per_unit": 1, "currency": "CREDIT"}
)
# Create AgentInfo objects
agent1_info = AgentInfo(agent_id=agent1_id, address="localhost:50051", services=[service_llm, service_data])
agent2_info = AgentInfo(agent_id=agent2_id, address="localhost:50052", services=[service_image])
# Register agents
registry.register_agent(agent1_info)
registry.register_agent(agent2_info)
# Find agents
llm_providers = registry.find_agents_by_service("LLM_Inference")
print(f"nAgents providing LLM_Inference: {[str(a.agent_id) for a in llm_providers]}")
image_providers = registry.find_agents_by_service("Image_Recognition")
print(f"Agents providing Image_Recognition: {[str(a.agent_id) for a in image_providers]}")
# Update heartbeat
import time
time.sleep(1)
registry.update_agent_heartbeat(agent1_id)
agent1_updated_info = registry.get_agent_info(agent1_id)
print(f"Agent {agent1_id} last seen updated to {agent1_updated_info.last_seen}")
# List all services
all_services_map = registry.list_all_services()
print("nAll registered services:")
for svc_name, provider_ids in all_services_map.items():
print(f"- {svc_name}: {[str(aid) for aid in provider_ids]}")
# Deregister an agent
registry.deregister_agent(agent2_id)
image_providers_after_dereg = registry.find_agents_by_service("Image_Recognition")
print(f"Agents providing Image_Recognition after deregistration: {[str(a.agent_id) for a in image_providers_after_dereg]}")
# Re-initialize registry to show persistence
print("nRe-initializing registry to show persistence...")
del AgentRegistry._instance # Force re-initialization
new_registry = AgentRegistry()
print(f"Agents in new registry: {[str(a.agent_id) for a in new_registry.list_all_agents()]}")
注册中心使用单例模式确保全局唯一性,并支持Agent注册、注销、心跳更新以及按服务查找。它还包括简单的持久化机制,将注册信息保存到JSON文件。
3.2 内部账本系统
这是经济系统的核心。它负责追踪每个Agent的余额,并记录所有交易。我们选择一个简化的基于SQLite的账本,因为它轻量、易于嵌入,且足以处理内部微支付场景。
ledger_db.py
import sqlite3
import time
from typing import Optional, Dict, List, Any
DATABASE_FILE = "agent_ledger.db"
class LedgerDB:
"""
Manages the SQLite database for agent accounts and transactions.
"""
def __init__(self, db_file: str = DATABASE_FILE):
self.db_file = db_file
self._initialize_db()
def _initialize_db(self):
conn = self._get_connection()
cursor = conn.cursor()
# Create Accounts table
cursor.execute("""
CREATE TABLE IF NOT EXISTS accounts (
agent_id TEXT PRIMARY KEY,
balance INTEGER NOT NULL DEFAULT 0,
last_updated INTEGER
);
""")
# Create Transactions table
cursor.execute("""
CREATE TABLE IF NOT EXISTS transactions (
id TEXT PRIMARY KEY,
sender_id TEXT NOT NULL,
receiver_id TEXT NOT NULL,
amount INTEGER NOT NULL,
currency TEXT NOT NULL,
timestamp INTEGER NOT NULL,
description TEXT,
status TEXT NOT NULL DEFAULT 'PENDING', -- PENDING, COMPLETED, FAILED, REVERSED
FOREIGN KEY (sender_id) REFERENCES accounts(agent_id),
FOREIGN KEY (receiver_id) REFERENCES accounts(agent_id)
);
""")
conn.commit()
conn.close()
def _get_connection(self):
return sqlite3.connect(self.db_file)
def create_account(self, agent_id: str, initial_balance: int = 0) -> bool:
"""Creates a new account for an agent."""
conn = self._get_connection()
cursor = conn.cursor()
try:
timestamp = int(time.time())
cursor.execute("INSERT INTO accounts (agent_id, balance, last_updated) VALUES (?, ?, ?)",
(agent_id, initial_balance, timestamp))
conn.commit()
print(f"Account created for {agent_id} with initial balance {initial_balance}")
return True
except sqlite3.IntegrityError:
print(f"Account for {agent_id} already exists.")
return False
finally:
conn.close()
def get_account_balance(self, agent_id: str) -> Optional[int]:
"""Retrieves the balance of an agent's account."""
conn = self._get_connection()
cursor = conn.cursor()
cursor.execute("SELECT balance FROM accounts WHERE agent_id = ?", (agent_id,))
result = cursor.fetchone()
conn.close()
return result[0] if result else None
def update_account_balance(self, agent_id: str, amount: int) -> bool:
"""Updates an agent's account balance. Use carefully, usually part of a transaction."""
conn = self._get_connection()
cursor = conn.cursor()
timestamp = int(time.time())
try:
cursor.execute("UPDATE accounts SET balance = balance + ?, last_updated = ? WHERE agent_id = ?",
(amount, timestamp, agent_id))
conn.commit()
print(f"Updated balance for {agent_id} by {amount}")
return True
except sqlite3.Error as e:
print(f"Error updating balance for {agent_id}: {e}")
conn.rollback()
return False
finally:
conn.close()
def record_transaction(self, tx_id: str, sender_id: str, receiver_id: str, amount: int, currency: str, description: str = None, status: str = 'PENDING') -> bool:
"""Records a new transaction."""
conn = self._get_connection()
cursor = conn.cursor()
timestamp = int(time.time())
try:
cursor.execute("""
INSERT INTO transactions (id, sender_id, receiver_id, amount, currency, timestamp, description, status)
VALUES (?, ?, ?, ?, ?, ?, ?, ?)
""", (tx_id, sender_id, receiver_id, amount, currency, timestamp, description, status))
conn.commit()
print(f"Transaction {tx_id} recorded: {sender_id} -> {receiver_id}, Amount: {amount} {currency}")
return True
except sqlite3.IntegrityError:
print(f"Transaction {tx_id} already exists.")
return False
except sqlite3.Error as e:
print(f"Error recording transaction {tx_id}: {e}")
conn.rollback()
return False
finally:
conn.close()
def update_transaction_status(self, tx_id: str, status: str) -> bool:
"""Updates the status of a transaction."""
conn = self._get_connection()
cursor = conn.cursor()
try:
cursor.execute("UPDATE transactions SET status = ? WHERE id = ?", (status, tx_id))
conn.commit()
print(f"Transaction {tx_id} status updated to {status}")
return True
except sqlite3.Error as e:
print(f"Error updating transaction {tx_id} status: {e}")
conn.rollback()
return False
finally:
conn.close()
def get_transaction(self, tx_id: str) -> Optional[Dict[str, Any]]:
"""Retrieves a specific transaction by ID."""
conn = self._get_connection()
cursor = conn.fetchone("SELECT * FROM transactions WHERE id = ?", (tx_id,))
result = cursor.fetchone()
conn.close()
if result:
columns = [description[0] for description in cursor.description]
return dict(zip(columns, result))
return None
def get_transactions_for_agent(self, agent_id: str) -> List[Dict[str, Any]]:
"""Retrieves all transactions involving a specific agent."""
conn = self._get_connection()
cursor = conn.cursor()
cursor.execute("SELECT * FROM transactions WHERE sender_id = ? OR receiver_id = ? ORDER BY timestamp DESC",
(agent_id, agent_id))
results = cursor.fetchall()
columns = [description[0] for description in cursor.description]
conn.close()
return [dict(zip(columns, row)) for row in results]
# Example usage (for demonstration)
if __name__ == "__main__":
db = LedgerDB()
# Create dummy agents
agent_a = "agent_alpha"
agent_b = "agent_beta"
agent_c = "agent_gamma"
db.create_account(agent_a, 1000)
db.create_account(agent_b, 500)
db.create_account(agent_c, 200)
print(f"nBalance {agent_a}: {db.get_account_balance(agent_a)}")
print(f"Balance {agent_b}: {db.get_account_balance(agent_b)}")
# Simulate a transaction
tx_id1 = str(uuid.uuid4())
if db.record_transaction(tx_id1, agent_a, agent_b, 100, "CREDIT", "LLM Inference"):
# If transaction is recorded, update balances
db.update_account_balance(agent_a, -100)
db.update_account_balance(agent_b, 100)
db.update_transaction_status(tx_id1, 'COMPLETED')
print(f"nBalance {agent_a} after TX: {db.get_account_balance(agent_a)}")
print(f"Balance {agent_b} after TX: {db.get_account_balance(agent_b)}")
tx_id2 = str(uuid.uuid4())
if db.record_transaction(tx_id2, agent_b, agent_c, 50, "CREDIT", "Data Retrieval"):
db.update_account_balance(agent_b, -50)
db.update_account_balance(agent_c, 50)
db.update_transaction_status(tx_id2, 'COMPLETED')
print(f"nTransactions for {agent_a}:")
for tx in db.get_transactions_for_agent(agent_a):
print(tx)
print(f"nTransactions for {agent_b}:")
for tx in db.get_transactions_for_agent(agent_b):
print(tx)
LedgerDB提供了账户创建、余额查询、余额更新(通常在事务处理中进行)和事务记录及状态更新的功能。它使用SQL事务来确保操作的原子性,但在这里简化为手动更新。在生产环境中,一个专门的LedgerService会封装这些DB操作,提供更高级别的原子性转账方法。
3.3 经济核心服务:LedgerService
LedgerService将封装LedgerDB的底层操作,提供更安全的转账和查询API。
ledger_service.py
import uuid
import threading
from typing import Optional, Dict, List, Any
from agent_identity import AgentID
from ledger_db import LedgerDB
class LedgerService:
"""
Provides an API for managing agent accounts and processing transactions.
Ensures atomic transfers between accounts.
"""
_instance = None
_lock = threading.Lock()
def __new__(cls):
if cls._instance is None:
with cls._lock:
if cls._instance is None:
cls._instance = super(LedgerService, cls).__new__(cls)
cls._instance._init_service()
return cls._instance
def _init_service(self):
self.db = LedgerDB()
self.transaction_lock = threading.Lock() # Global lock for transfers for simplicity
def create_agent_account(self, agent_id: AgentID, initial_balance: int = 0) -> bool:
"""Creates an account for a new agent."""
return self.db.create_account(agent_id.id, initial_balance)
def get_agent_balance(self, agent_id: AgentID) -> Optional[int]:
"""Retrieves an agent's current balance."""
return self.db.get_account_balance(agent_id.id)
def deposit(self, agent_id: AgentID, amount: int) -> bool:
"""Deposits funds into an agent's account (e.g., from a system faucet)."""
if amount <= 0:
print("Deposit amount must be positive.")
return False
return self.db.update_account_balance(agent_id.id, amount)
def withdraw(self, agent_id: AgentID, amount: int) -> bool:
"""Withdraws funds from an agent's account."""
if amount <= 0:
print("Withdrawal amount must be positive.")
return False
# Check balance before withdrawal
current_balance = self.get_agent_balance(agent_id)
if current_balance is None or current_balance < amount:
print(f"Insufficient funds for withdrawal from {agent_id}. Current: {current_balance}, Requested: {amount}")
return False
return self.db.update_account_balance(agent_id.id, -amount)
def transfer(self, sender_id: AgentID, receiver_id: AgentID, amount: int, currency: str = "CREDIT", description: str = None) -> Optional[str]:
"""
Executes an atomic transfer of funds from sender to receiver.
Returns transaction ID on success, None on failure.
"""
if sender_id == receiver_id:
print("Cannot transfer to self.")
return None
if amount <= 0:
print("Transfer amount must be positive.")
return None
tx_id = str(uuid.uuid4())
with self.transaction_lock: # Ensure atomicity for the transfer operation
sender_balance = self.db.get_account_balance(sender_id.id)
if sender_balance is None:
print(f"Sender account {sender_id} not found.")
return None
if sender_balance < amount:
print(f"Insufficient funds for {sender_id}. Balance: {sender_balance}, Attempted transfer: {amount}")
return None
receiver_balance = self.db.get_account_balance(receiver_id.id)
if receiver_balance is None:
print(f"Receiver account {receiver_id} not found.")
return None
# Record transaction as PENDING
if not self.db.record_transaction(tx_id, sender_id.id, receiver_id.id, amount, currency, description, 'PENDING'):
return None
# Update balances
sender_update_success = self.db.update_account_balance(sender_id.id, -amount)
receiver_update_success = self.db.update_account_balance(receiver_id.id, amount)
if sender_update_success and receiver_update_success:
self.db.update_transaction_status(tx_id, 'COMPLETED')
print(f"Transfer {tx_id} completed: {sender_id} -> {receiver_id}, {amount} {currency}")
return tx_id
else:
# Rollback - this is a simplified rollback. In a real system,
# you'd need more robust transaction management (e.g., proper SQL transactions).
print(f"Transfer {tx_id} failed, attempting rollback.")
if sender_update_success: # If sender was debited, credit back
self.db.update_account_balance(sender_id.id, amount)
if receiver_update_success: # If receiver was credited, debit back
self.db.update_account_balance(receiver_id.id, -amount)
self.db.update_transaction_status(tx_id, 'FAILED')
return None
def get_transaction_details(self, tx_id: str) -> Optional[Dict[str, Any]]:
"""Retrieves details for a specific transaction."""
return self.db.get_transaction(tx_id)
def get_agent_transactions(self, agent_id: AgentID) -> List[Dict[str, Any]]:
"""Retrieves all transactions involving a specific agent."""
return self.db.get_transactions_for_agent(agent_id.id)
# Example usage
if __name__ == "__main__":
ledger = LedgerService()
agent_x_id = AgentID("agent_x")
agent_y_id = AgentID("agent_y")
agent_z_id = AgentID("agent_z")
# Ensure accounts exist or create them
ledger.create_agent_account(agent_x_id, 1000)
ledger.create_agent_account(agent_y_id, 500)
ledger.create_agent_account(agent_z_id, 200) # Should already exist from ledger_db test
print(f"nInitial Balances:")
print(f"Agent X: {ledger.get_agent_balance(agent_x_id)}")
print(f"Agent Y: {ledger.get_agent_balance(agent_y_id)}")
print(f"Agent Z: {ledger.get_agent_balance(agent_z_id)}")
# Test transfer
print("nAttempting transfer X -> Y (150 CREDIT)")
tx_id_xy = ledger.transfer(agent_x_id, agent_y_id, 150, "CREDIT", "Service Fee")
if tx_id_xy:
print(f"Transfer successful. TX ID: {tx_id_xy}")
else:
print("Transfer failed.")
print("nBalances after X -> Y transfer:")
print(f"Agent X: {ledger.get_agent_balance(agent_x_id)}")
print(f"Agent Y: {ledger.get_agent_balance(agent_y_id)}")
# Test insufficient funds
print("nAttempting transfer Y -> X (1000 CREDIT - should fail)")
tx_id_yx_fail = ledger.transfer(agent_y_id, agent_x_id, 1000, "CREDIT", "Large service")
if tx_id_yx_fail:
print(f"Transfer successful (unexpected). TX ID: {tx_id_yx_fail}")
else:
print("Transfer failed (expected).")
print("nBalances after failed Y -> X transfer attempt:")
print(f"Agent X: {ledger.get_agent_balance(agent_x_id)}")
print(f"Agent Y: {ledger.get_agent_balance(agent_y_id)}")
# Test deposit and withdrawal
print("nDepositing 200 to Agent X:")
ledger.deposit(agent_x_id, 200)
print(f"Agent X balance: {ledger.get_agent_balance(agent_x_id)}")
print("nWithdrawing 50 from Agent Y:")
ledger.withdraw(agent_y_id, 50)
print(f"Agent Y balance: {ledger.get_agent_balance(agent_y_id)}")
print("nTransactions for Agent X:")
for tx in ledger.get_agent_transactions(agent_x_id):
print(f" {tx['id'][:8]}... From: {tx['sender_id']} To: {tx['receiver_id']} Amount: {tx['amount']} Status: {tx['status']}")
LedgerService提供了transfer方法,该方法通过获取全局锁来模拟原子性操作,确保在一个转账过程中,账户余额不会被其他并发操作干扰。它还处理余额检查和事务状态更新,是Agent经济系统的核心。
3.4 Agent服务发现与请求处理 (gRPC)
Agent之间需要高效地发现服务并进行通信。gRPC是一个理想的选择,因为它支持强类型接口定义和高性能的二进制通信。
agent_service.proto (定义服务接口)
syntax = "proto3";
package agent_economy;
service AgentService {
rpc RequestService(ServiceRequest) returns (ServiceResponse);
rpc ProposePrice(PriceProposal) returns (PriceAcceptance);
rpc ExecuteService(ServiceExecutionRequest) returns (ServiceExecutionResponse);
rpc GetBalance(BalanceRequest) returns (BalanceResponse);
}
message ServiceRequest {
string requester_agent_id = 1;
string service_name = 2;
map<string, string> request_params = 3; // Parameters for the service
int32 max_price_willing_to_pay = 4;
string preferred_currency = 5;
}
message ServiceResponse {
string provider_agent_id = 1;
string service_name = 2;
bool success = 3;
string message = 4;
PriceProposal price_proposal = 5; // If agent wants to offer a price
}
message PriceProposal {
string proposer_agent_id = 1;
string service_name = 2;
int32 proposed_price = 3;
string currency = 4;
string proposal_id = 5; // Unique ID for this proposal
int64 valid_until = 6; // Unix timestamp
}
message PriceAcceptance {
string acceptor_agent_id = 1;
string proposal_id = 2;
bool accepted = 3;
string message = 4;
}
message ServiceExecutionRequest {
string requester_agent_id = 1;
string provider_agent_id = 2;
string service_name = 3;
map<string, string> request_params = 4;
int32 agreed_price = 5;
string currency = 6;
string transaction_id = 7; // ID from ledger for pre-authorization or actual payment
}
message ServiceExecutionResponse {
string provider_agent_id = 1;
bool success = 2;
string message = 3;
map<string, string> result_data = 4; // Actual results of the service
string final_transaction_id = 5; // Confirmed transaction ID
}
message BalanceRequest {
string agent_id = 1;
}
message BalanceResponse {
string agent_id = 1;
int32 balance = 2;
string currency = 3;
}
使用protoc编译.proto文件生成Python代码:
python -m grpc_tools.protoc -I. --python_out=. --grpc_python_out=. agent_service.proto
这将生成agent_service_pb2.py和agent_service_pb2_grpc.py。
agent_server.py (服务提供者Agent的gRPC实现)
import grpc
import time
import uuid
from concurrent import futures
from typing import Dict, Any
from agent_identity import AgentID, AgentInfo, ServiceDescriptor
from agent_registry import AgentRegistry
from ledger_service import LedgerService
import agent_service_pb2 as pb2
import agent_service_pb2_grpc as pb2_grpc
class AgentServicer(pb2_grpc.AgentServiceServicer):
"""
Implements the gRPC service methods for an Agent.
"""
def __init__(self, agent_id: AgentID, address: str, provided_services: Dict[str, ServiceDescriptor]):
self.agent_id = agent_id
self.address = address
self.provided_services = provided_services
self.registry = AgentRegistry()
self.ledger = LedgerService()
self.active_proposals: Dict[str, pb2.PriceProposal] = {} # Store active price proposals
# Register self with the registry
agent_info = AgentInfo(agent_id, address, list(provided_services.values()))
self.registry.register_agent(agent_info)
print(f"Agent {self.agent_id} ({self.address}) initialized and registered.")
# Ensure agent has an account
self.ledger.create_agent_account(self.agent_id, initial_balance=10000) # Give some initial credits
def RequestService(self, request: pb2.ServiceRequest, context) -> pb2.ServiceResponse:
"""
Handles incoming service requests. Responds with a price proposal.
"""
print(f"Agent {self.agent_id}: Received service request for {request.service_name} from {request.requester_agent_id}")
service = self.provided_services.get(request.service_name)
if not service:
return pb2.ServiceResponse(
provider_agent_id=self.agent_id.id,
service_name=request.service_name,
success=False,
message=f"Service '{request.service_name}' not offered by this agent."
)
# Basic pricing logic (can be dynamic based on load, demand, etc.)
proposed_price = self._calculate_price(service.price_model, request.request_params)
if proposed_price > request.max_price_willing_to_pay:
return pb2.ServiceResponse(
provider_agent_id=self.agent_id.id,
service_name=request.service_name,
success=False,
message=f"Proposed price {proposed_price} exceeds requester's max price {request.max_price_willing_to_pay}."
)
proposal_id = str(uuid.uuid4())
valid_until = int(time.time()) + 60 # Proposal valid for 60 seconds
price_proposal = pb2.PriceProposal(
proposer_agent_id=self.agent_id.id,
service_name=request.service_name,
proposed_price=proposed_price,
currency=request.preferred_currency,
proposal_id=proposal_id,
valid_until=valid_until
)
self.active_proposals[proposal_id] = price_proposal # Store proposal
return pb2.ServiceResponse(
provider_agent_id=self.agent_id.id,
service_name=request.service_name,
success=True,
message="Price proposal issued.",
price_proposal=price_proposal
)
def ProposePrice(self, request: pb2.PriceProposal, context) -> pb2.PriceAcceptance:
"""
This method is typically called by a service requester *after* receiving a proposal,
if the requester wants to counter-offer or accept.
However, in our simplified model, the initial RequestService includes a proposal.
This method could be used for more complex negotiation.
For now, let's assume direct acceptance or rejection.
"""
# This method is not directly used in the current flow where RequestService returns the proposal.
# It's kept for future negotiation extensions.
# For demonstration, we'll assume the requester directly accepts or rejects via ExecuteService.
print(f"Agent {self.agent_id}: Received price proposal from {request.proposer_agent_id} (not directly handled in this example).")
return pb2.PriceAcceptance(
acceptor_agent_id=self.agent_id.id,
proposal_id=request.proposal_id,
accepted=False,
message="This agent does not handle counter-proposals in this simplified example."
)
def ExecuteService(self, request: pb2.ServiceExecutionRequest, context) -> pb2.ServiceExecutionResponse:
"""
Executes the requested service after price agreement and payment.
"""
print(f"Agent {self.agent_id}: Received service execution request for {request.service_name} from {request.requester_agent_id} for {request.agreed_price} {request.currency}")
service = self.provided_services.get(request.service_name)
if not service:
return pb2.ServiceExecutionResponse(
provider_agent_id=self.agent_id.id,
success=False,
message=f"Service '{request.service_name}' not offered by this agent."
)
# 1. Verify payment (or pre-authorization) via LedgerService
# In a real system, you might have a payment channel or escrow.
# Here, we assume the requester has already initiated a transfer with a transaction_id.
# The provider verifies this transaction.
tx_id = request.transaction_id
if not tx_id:
return pb2.ServiceExecutionResponse(
provider_agent_id=self.agent_id.id,
success=False,
message="Missing transaction ID for payment verification."
)
tx_details = self.ledger.get_transaction_details(tx_id)
if not tx_details or tx_details['status'] != 'COMPLETED' or
tx_details['sender_id'] != request.requester_agent_id or
tx_details['receiver_id'] != self.agent_id.id or
tx_details['amount'] != request.agreed_price:
print(f"Agent {self.agent_id}: Payment verification failed for TX {tx_id}. Details: {tx_details}")
return pb2.ServiceExecutionResponse(
provider_agent_id=self.agent_id.id,
success=False,
message=f"Payment verification failed for transaction ID {tx_id}."
)
print(f"Agent {self.agent_id}: Payment verified (TX ID: {tx_id}). Proceeding with service execution.")
# 2. Execute the actual service (dummy implementation)
try:
result_data = self._execute_dummy_service(service.service_name, request.request_params)
return pb2.ServiceExecutionResponse(
provider_agent_id=self.agent_id.id,
success=True,
message="Service executed successfully.",
result_data=result_data,
final_transaction_id=tx_id
)
except Exception as e:
print(f"Agent {self.agent_id}: Error executing service {request.service_name}: {e}")
# In case of execution failure, a refund or dispute mechanism would be needed.
# For simplicity, we just mark failure here.
return pb2.ServiceExecutionResponse(
provider_agent_id=self.agent_id.id,
success=False,
message=f"Service execution failed: {str(e)}"
)
def GetBalance(self, request: pb2.BalanceRequest, context) -> pb2.BalanceResponse:
"""
Retrieves the agent's current balance from the ledger.
"""
balance = self.ledger.get_agent_balance(AgentID(request.agent_id))
return pb2.BalanceResponse(
agent_id=request.agent_id,
balance=balance if balance is not None else 0,
currency="CREDIT"
)
def _calculate_price(self, price_model: Dict[str, Any], request_params: Dict[str, str]) -> int:
"""
Calculates price based on the service's price model and request parameters.
"""
if price_model["type"] == "fixed":
return price_model["amount"]
elif price_model["type"] == "per_token":
# Example: LLM inference, price per token
prompt_tokens = len(request_params.get("prompt", "").split()) # Simplified token count
max_tokens = int(request_params.get("max_tokens", "100"))
total_tokens = prompt_tokens + max_tokens # Estimate max tokens
return int(total_tokens * price_model["cost_per_unit"])
elif price_model["type"] == "per_call":
return price_model["cost_per_unit"]
return 0 # Default
def _execute_dummy_service(self, service_name: str, params: Dict[str, str]) -> Dict[str, str]:
"""
Placeholder for actual service execution logic.
"""
print(f"Executing actual service: {service_name} with params: {params}")
if service_name == "LLM_Inference":
prompt = params.get("prompt", "default prompt")
max_tokens = int(params.get("max_tokens", "50"))
response = f"LLM responded to '{prompt[:30]}...' with {max_tokens} tokens of wisdom."
return {"response": response, "actual_tokens": str(max_tokens)}
elif service_name == "Image_Recognition":
image_url = params.get("image_url", "dummy_url")
detections = f"Detected objects in {image_url}: car, tree, person."
return {"detections": detections, "count": "3"}
elif service_name == "Weather_Data_API":
location = params.get("location", "London")
date = params.get("date", "today")
return {"temperature": "15.5", "humidity": "70%", "location": location, "date": date}
else:
return {"status": "success", "result": f"Executed unknown service {service_name}"}
def serve_agent(agent_id_str: str, host: str, port: int, service_descriptors: List[ServiceDescriptor]):
agent_id = AgentID(agent_id_str)
address = f"{host}:{port}"
provided_services = {s.service_name: s for s in service_descriptors}
server = grpc.server(futures.ThreadPoolExecutor(max_workers=10))
pb2_grpc.add_AgentServiceServicer_to_server(
AgentServicer(agent_id, address, provided_services), server)
server.add_insecure_port(address)
server.start()
print(f"Agent {agent_id} serving on {address}")
try:
while True:
time.sleep(86400) # One day in seconds
except KeyboardInterrupt:
server.stop(0)
print(f"Agent {agent_id} stopped.")
# Deregister agent upon shutdown
AgentRegistry().deregister_agent(agent_id)
if __name__ == "__main__":
# Example: A compute agent offering LLM inference
llm_service = ServiceDescriptor(
service_name="LLM_Inference",
description="Provides general purpose LLM inference (e.g., GPT-3.5 equivalent).",
input_schema={"prompt": "string", "max_tokens": "integer"},
output_schema={"response": "string"},
price_model={"type": "per_token", "cost_per_unit": 0.0001, "currency": "CREDIT"}
)
# Example: A data agent offering weather data
weather_service = ServiceDescriptor(
service_name="Weather_Data_API",
description="Provides real-time weather data for a given location.",
input_schema={"location": "string", "date": "string"},
output_schema={"temperature": "float", "humidity": "float"},
price_model={"type": "per_call", "cost_per_unit": 1, "currency": "CREDIT"}
)
# Start a server for Agent 1 (LLM and Weather)
import multiprocessing
p1 = multiprocessing.Process(target=serve_agent, args=("agent_llm_weather_001", "localhost", 50051, [llm_service, weather_service]))
p1.start()
# Start a server for Agent 2 (Image Recognition)
image_service = ServiceDescriptor(
service_name="Image_Recognition",
description="Detects objects in an image.",
input_schema={"image_url": "string"},
output_schema={"detections": "array"},
price_model={"type": "fixed", "amount": 5, "currency": "CREDIT"}
)
p2 = multiprocessing.Process(target=serve_agent, args=("agent_image_002", "localhost", 50052, [image_service]))
p2.start()
try:
p1.join()
p2.join()
except KeyboardInterrupt:
p1.terminate()
p2.terminate()
print("Servers terminated.")
AgentServicer实现了我们定义的gRPC接口。当收到RequestService请求时,它会根据内部逻辑(此处简化为_calculate_price)计算价格并返回一个PriceProposal。当收到ExecuteService请求时,它会首先验证支付(通过transaction_id向LedgerService查询),然后执行实际的服务逻辑,并返回结果。
agent_client.py (服务请求者Agent的gRPC实现)
import grpc
import time
from typing import Dict, Any, List, Optional
import random
from agent_identity import AgentID, AgentInfo, ServiceDescriptor
from agent_registry import AgentRegistry
from ledger_service import LedgerService
import agent_service_pb2 as pb2
import agent_service_pb2_grpc as pb2_grpc
class AgentClient:
"""
Represents an Agent that can request services from other agents.
"""
def __init__(self, client_agent_id: AgentID):
self.agent_id = client_agent_id
self.registry = AgentRegistry()
self.ledger = LedgerService()
self.ledger.create_agent_account(self.agent_id, initial_balance=5000) # Give some initial credits
print(f"Agent Client {self.agent_id} initialized. Balance: {self.ledger.get_agent_balance(self.agent_id)}")
def request_and_execute_service(self, service_name: str, params: Dict[str, str], max_price: int, currency: str = "CREDIT") -> Optional[Dict[str, str]]:
"""
Discovers, negotiates, and executes a service.
"""
print(f"n{self.agent_id}: Searching for '{service_name}' service...")
providers = self.registry.find_agents_by_service(service_name)
if not providers:
print(f"{self.agent_id}: No providers found for service '{service_name}'.")
return None
# Simple selection: pick the first available provider (can be extended with reputation, price, etc.)
provider_info = random.choice(providers) # Or implement a more sophisticated selection
provider_id = provider_info.agent_id
provider_address = provider_info.address
print(f"{self.agent_id}: Found provider {provider_id} at {provider_address}. Initiating request...")
try:
with grpc.insecure_channel(provider_address) as channel:
stub = pb2_grpc.AgentServiceStub(channel)
# Step 1: Request service and get price proposal
request = pb2.ServiceRequest(
requester_agent_id=self.agent_id.id,
service_name=service_name,
request_params=params,
max_price_willing_to_pay=max_price,
preferred_currency=currency
)
response: pb2.ServiceResponse = stub.RequestService(request)
if not response.success:
print(f"{self.agent_id}: Service request failed: {response.message}")
return None
price_proposal = response.price_proposal
if not price_proposal or price_proposal.proposed_price > max_price or price_proposal.valid_until < int(time.time()):
print(f"{self.agent_id}: Received unacceptable or expired price proposal. Proposed: {price_proposal.proposed_price}, Max: {max_price}")
return None
agreed_price = price_proposal.proposed_price
print(f"{self.agent_id}: Accepted price proposal: {agreed_price} {currency} from {provider_id}.")
# Step 2: Initiate payment via LedgerService
current_balance = self.ledger.get_agent_balance(self.agent_id)
if current_balance < agreed_price:
print(f"{self.agent_id}: Insufficient funds to pay {agreed_price}. Current balance: {current_balance}")
return None
print(f"{self.agent_id}: Initiating payment of {agreed_price} to {provider_id}...")
transaction_id = self.ledger.transfer(self.agent_id, provider_id, agreed_price, currency, f"Payment for {service_name}")
if not transaction_id:
print(f"{self.agent_id}: Payment failed or was rejected by ledger.")
return None
print(f"{self.agent_id}: Payment successful. Transaction ID: {transaction_id}. Current balance: {self.ledger.get_agent_balance(self.agent_id)}")
# Step 3: Execute service with payment confirmation
execution_request = pb2.ServiceExecutionRequest(
requester_agent_id=self.agent_id.id,
provider_agent_id=provider_id.id,
service_name=service_name,
request_params=params,
agreed_price=agreed_price,
currency=currency,
transaction_id=transaction_id
)
execution_response: pb2.ServiceExecutionResponse = stub.ExecuteService(execution_request)
if execution_response.success:
print(f"{self.agent_id}: Service '{service_name}' executed successfully by {provider_id}.")
print(f"Result: {execution_response.result_data}")
return dict(execution_response.result_data) # Convert map to dict
else:
print(f"{self.agent_id}: Service execution failed: {execution_response.message}")
# In a real system, would need to handle refunds/disputes here
return None
except grpc.RpcError as e:
print(f"{self.agent_id}: gRPC error communicating with {provider_id} at {provider_address}: {e}")
return None
except Exception as e:
print(f"{self.agent_id}: An unexpected error occurred: {e}")
return None
def get_my_balance(self) -> int:
return self.ledger.get_agent_balance(self.agent_id) or 0
if __name__ == "__main__":
# Ensure registry and ledger are clean or initialized
# For a fresh run, delete agent_registry.json and agent_ledger.db before starting servers and clients.
# Give some time for servers to start and register
print("Waiting for agents to register...")
time.sleep(5)
client_agent_id = AgentID("agent_requester_003")
client = AgentClient(client_agent_id)
# Example 1: Request LLM Inference
llm_params = {"prompt": "What is the capital of France?", "max_tokens": "30"}
client.request_and_execute_service("LLM_Inference", llm_params, max_price=10)
print(f"n{client_agent_id} current balance: {client.get_my_balance()}")
# Example 2: Request Image Recognition (should be provided by agent_image_002)
image_params = {"image_url": "https://example.com/sunset.jpg"}
client.request_and_execute_service("Image_Recognition", image_params, max_price=7)
print(f"n{client_agent_id} current balance: {client.get_my_balance()}")
# Example 3: Request Weather Data
weather_params = {"location": "New York", "date": "2023-10-27"}
client.request_and_execute_service("Weather_Data_API", weather_params, max_price=2)
print(f"n{client_agent_id} current balance: {client.get_my_balance()}")
# Example 4: Request non-existent service
non_existent_params = {"query": "something"}
client.request_and_execute_service("NonExistentService", non_existent_params, max_price=5)
print(f"n{client_agent_id} current balance: {client.get_my_balance()}")
# Example 5: Request with insufficient funds (assume initial balance is 5000)
# If the previous requests consumed too much, this might fail as expected.
print(f"nAttempting high-cost LLM request with current balance {client.get_my_balance()}")
llm_high_cost_params = {"prompt": "Write a 500-word essay on quantum entanglement and its philosophical implications.", "max_tokens": "1000"}
client.request_and_execute_service("LLM_Inference", llm_high_cost_params, max_price=100) # Max price 100 might be too low for 1000 tokens * 0.0001 = 0.1, but if it was 10000 tokens it would be 1.0. Let's make it more substantial.
# Recalculate based on agent_server pricing for 1000 tokens: 1000 * 0.0001 = 0.1 CREDIT.
# Let's make it 100000 tokens for a price of 10.
# Let's assume the LLM agent charges per token and max_tokens means total output.
# LLM pricing is 0.0001 per token. If prompt is 10 tokens, max_tokens is 1000. Total 1010 tokens * 0.0001 = 0.101.
# If max_price=10 and a service costs, say, 5000 (intentionally high for test):
# Let's target a service that costs more than the remaining balance for the example.
# To reliably demonstrate insufficient funds, we'd need to know the exact cost.
# Current max_price for LLM is 10.
# Let's make a request with a very high max_tokens that would exceed current balance or max_price.
# If LLM agent estimates 100000 tokens for "essay on quantum entanglement", price would be 100000 * 0.0001 = 10.
# If client has 4900 balance, and requests 10 credits, it should pass.
# Let's make a service request that would *actually* cost more than the current balance.
# Say, client has 4900, requests 4901.
# This example requires more precise state management for balance. For now, assume it might fail.
# To guarantee insufficient funds:
current_balance = client.get_my_balance()
if current_balance > 0:
print(f"nAttempting to exhaust funds with a large payment...")
# Request a service that will cost slightly more than current balance
# This is a bit tricky with dynamic pricing, will rely on the max_price to trigger rejection.
# Let's set max_price very high to ensure the provider's price is accepted,
# but the client's balance is lower.
excessive_price_request_params = {"prompt": "Generate a very very very very very very very very very very very very very very very very very very very very very very very very very very very very very very very very very very very very very very very very very very very very very very very very very very very very very very very very very very very very very very very very very very very very very very very very very very very very very very very very very very very very very very very very very very very very very very very very very very very very very very very very very very very very very very very very very very very very very very very very very very very very very very very very very very very very very very very very very very very very very very very very very very very very very very very very very very very very very very very very very very very very very very very very very very very very very very very very very very very very very very very very very very very very very very very very very very very very very very very very very very very very very very very very very very very very very very very very very very very very very very very very very very very very very very very very very very very very very very very very very very very very very very very very very very very very very very very very very very very very very very very very very very very very very very very very very very very very very very very very very very very very very very very very very very very very very very very very very very very very very very very very very very very very very very very very very very very very very very very very very very very very very very very very very very very very very very very very very very very very very very very very very very very very very very very very very very very very very very very very very very very very very very very very very very very very very very very very very very very very very very very very very very very very very very very very very very very very very very very very very very very very very very very very very very very very very very very very very very very very very very very very very very very very very very very very very very very very very very very very very very very very very very very very very very very very very very very very very very very very very very very very very very very very very very very very very very very very very very very very very very very very very very very very very very very very very very very very very very very very very very very very very very very very very very very very very very very very very very very very very very very very very very very very very very very very very very very very very very very very very very very very very very very very very very very very very very very very very very very very very very very very very very very very very very very very very very very very very very very very very very very very very very very very very very very very very very very very very very very very very very very very very very very very very very very very very very very very very very very very very very very very very very very very very very very very very very very very very very very very very very very very very very very very very very very very very very very very very very very very very very very very very very very very very very very very very very very very very very very very very very very very very very very very very very very very very very very very very very very very very very very very very very very very very very very very very very very very very very very very very very very very very very very very very very very very very very very very very very very very very very very very very very very very very very very very very very very very very very very very very very very very very very very very very very very very very very very very very very very very very very very very very very very very very very very very very very very very very very very very very very very very very very very very very very very very very very very very very very very very very very very very very very very very very very very very very very very very very very very very very very very very very very very very very very very very very very very very very very very very very very very very very very very very very very very very very very very very very very very very very very very very very very very very very very very very very very very very very very very very very very very very very very very very very very very very very very very very very very very very very very very very very very very very very very very very very very very very very very very very very very very very very very very very very very very very very very very very very very very very very very very very very very very very very very very very very very very very very very very very very very very very very very very very very very very very very very very very very very very very very very very very very very very very very very very very very very very very very very very very very very very very very very very very very very very very very very very very very very very very very very very very very very very very very very very very very very very very very very very very very very very very very very very very very very very very very very very very very very very very very very very very very very very very very very very very very very very very very very very very very very very very very very very very very very very very very very very very very very very very very very very very very very very very very very very very very very very very very very very very very very very very very very very very very very very very very very very very very very very very very very very very very very very very very very very very very very very very very very very very very very very very very very very very very very very very very very very very very very very very very very very very very very very very very very very very very very very very very very very very very very very very very very very very very very very very very very very very very very very very very very very very very very very very very very very very very very very very very very very very very very very very very very very very very very very very very very very very very very very very very very very very very very very very very very very very very very very very very very very very very very very very very very very very very very very very very very very very very very very very very very very very very very very very very very very very very very very very very very very very very very very very very very very very very very very very very very very very very very very very very very very very very very very very very very very very very very very very very very very very very very very very very very very very very very very very very very very very very very very very very very very very very very very very very very very very much for very very good.
# The actual calculation inside AgentServicer is:
# prompt_tokens = len(request_params.get("prompt", "").split())
# max_tokens = int(request_params.get("max_tokens", "100"))
# total_tokens = prompt_tokens + max_tokens
# return int(total_tokens * price_model["cost_per_unit"])
# Let's say prompt is 100 tokens, max_tokens is 50000 (for 500-word essay, rough estimate).
# Total tokens = 50100. Cost = 50100 * 0.0001 = 5.01 CREDITS.
# This is not enough to deplete the balance for test.
# Let's re-run with a guaranteed insufficient fund scenario.
# First, drain client's balance.
# Give the client agent a small initial balance, say 10 credits.
# Request a service that costs 5 credits.
# Request another service that costs 8 credits (should fail).
print("n--- Testing insufficient funds scenario ---")
client_id_low_funds = AgentID("agent_low_funds_004")
client_low_funds = AgentClient(client_id_low_funds)
# Give initial 100 credits to this agent
client_low_funds.ledger.create_agent_account(client_id_low_funds, initial_balance=100)
print(f"Low funds agent {client_id_low_funds} balance: {client_low_funds.get_my_balance()}")
# Request a service costing 50 credits
llm_small_request_params = {"prompt": "Summarize AI ethics.", "max_tokens": "500"}
# Cost for 500 tokens + ~10 prompt tokens = 510 * 0.0001 = 0.051 credits. This is too small.
# To make it 50 credits, we need 50 / 0.0001 = 500,000 tokens.
large_llm_params = {"prompt": "Generate a comprehensive report on the geopolitical implications of quantum computing, including economic, military, and social impacts across major global powers. The report should be highly detailed, spanning at least 500,000 tokens.", "max_tokens": "500000"}
print(f"Requesting LLM service costing ~50 credits (max_tokens=500000)...")
client_low_funds.request_and_execute_service("LLM_Inference", large_llm_params, max_price=50) # Assuming the provider proposes 50
print(f"Low funds agent {client_id_low_funds} balance after first request: {client_low_funds.get_my_balance()}")
# Now request another service costing 80 credits, should fail as balance is 50.
# Let's simulate a request that needs 80 credits. (80 / 0.0001 = 800,000 tokens)
even_larger_llm_params = {"prompt": "Write an epic saga of the rise and fall of galactic empires, detailing their technological advancements, societal structures, and the philosophical underpinnings of their conflicts, spanning millions of years and involving countless civilizations. This requires at least 800,000 tokens of output.", "max_tokens": "800000"}
print(f"Requesting LLM service costing ~80 credits (max_tokens=800000, should fail due to insufficient funds)...")
client_low_funds.request_and_execute_service("LLM_Inference", even_larger_llm_params, max_price=80) # Assuming provider proposes 80
print(f"Low funds agent {client_id_low_funds} balance after second request: {client_low_funds.get_my_balance()}")
# Allow time for cleanup if needed
time.sleep(2)
AgentClient首先通过AgentRegistry查找服务提供者,然后通过gRPC向选定的提供者发送RequestService。一旦收到PriceProposal并接受,它会通过LedgerService发起支付,然后发送ExecuteService请求。这个流程体现了发现、协商、支付、执行的完整循环。
3.5 价格与协商策略
上述代码中,价格计算是内嵌在AgentServicer的_calculate_price方法中。这只是一个基础示例。实际的Agent经济系统可以采用更复杂的策略:
- 静态定价: 服务有固定价格。
- 动态定价: 根据供需、提供者负载、时间、声誉等因素调整价格。
- 代码片段 (AgentServicer):
# In _calculate_price method: if price_model["type"] == "dynamic_load_aware": current_load = self._get_current_service_load(service_name) # e.g., number of concurrent tasks base_price = price_model["base_amount"] load_factor = 1 + (current_load / price_model["max_load"]) * price_model["load_premium"] return int(base_price * load_factor)
- 代码片段 (AgentServicer):
- 拍卖机制: 买方或卖方发起拍卖。
- 英式拍卖 (English Auction): 价格从低到高,买方竞价。
- 荷式拍卖 (Dutch Auction): 价格从高到低,直到有买方接受。
- 代码片段 (AgentClient for bidding):
# Instead of direct RequestService, an auction_manager would be involved # client_agent.auction_manager.bid_on_service(service_name, initial_bid, max_bid) # Provider agents would respond to bids.
- 协商协议: 代理之间通过多轮消息交换达成价格共识。这通常涉及复杂的Agent对话协议。
3.6 声誉系统
一个简单的声誉系统可以追踪Agent的服务质量和交易历史,影响其被选择的概率或议价能力。
reputation_manager.py
import threading
from typing import Dict, Optional
import json
import os
from agent_identity import AgentID
class ReputationManager:
"""
Manages the reputation scores for agents based on their transaction history and feedback.
A higher score indicates a more reliable agent.
"""
_instance = None
_lock = threading.Lock()
REPUTATION_FILE = "agent_reputation.json"
def __new__(cls):
if cls._instance is None:
with cls._lock:
if cls._instance is None:
cls._instance = super(ReputationManager, cls).__new__(cls)
cls._instance._init_manager()
return cls._instance
def _init_manager(self):
self.reputations: Dict[AgentID, float] = {} # AgentID -> Reputation Score
self.lock = threading.Lock()
self._load_reputations()
print(f"ReputationManager initialized with {len(self.reputations)} agents' reputations loaded.")
def _load_reputations(self):
if os.path.exists(self.REPUTATION_FILE):
try:
with open(self.REPUTATION_FILE, 'r') as f:
data = json.load(f)
for agent_id_str, score in data.items():
self.reputations[AgentID(agent_