各位同学,大家好。今天我们齐聚一堂,探讨一个在分布式系统和人工智能时代日益凸显的核心议题——“Worker Abstraction”,即“工作者抽象”。我们将深入剖析如何构建一套标准化的API接口,使得任何Agent,无论其内部实现多么复杂或简单,都能以即插即用的方式融入我们的团队,成为系统的一部分。这不仅仅是技术上的挑战,更是构建未来高度模块化、可扩展、智能系统基石的战略思考。
1. 为什么我们需要工作者抽象?——从混沌到秩序的演进
想象一下,我们正在组建一个由智能体(Agents)构成的团队,它们各自拥有不同的技能:有的擅长数据分析,有的精通图像识别,有的负责与外部API交互,还有的可能只是一个简单的定时任务执行器。在没有统一规范的情况下,每当我们需要引入一个新的Agent,就意味着一套全新的集成工作:理解其独特的接口、适配其数据格式、处理其异常机制。这无疑会导致以下问题:
- 高昂的集成成本: 每增加一个Agent,都需要定制化开发,耗费大量时间和资源。
- 脆弱的系统: 缺乏统一性使得系统难以维护,一个小改动可能导致连锁反应。
- 低下的复用性: 每个Agent都是一个“孤岛”,其能力难以被其他Agent或系统组件共享。
- 扩展性瓶颈: 随着Agent数量的增加,管理和协调的复杂性呈指数级增长。
这就像一个杂牌军,虽然各自英勇,但缺乏统一的指挥体系和通信协议,战斗力大打折扣。
“工作者抽象”的核心目标,正是为了解决这些问题。它旨在提供一个通用的框架和一套标准化的API接口,将不同类型的Agent——我们称之为“工作者(Worker)”——的实现细节进行抽象和封装。通过这层抽象,上层系统(如任务调度器、工作流引擎或更高层级的Agent)可以以统一的方式与任何工作者进行交互,而无需关心其底层技术栈、编程语言或具体实现逻辑。
其带来的价值是显而易见的:
- 即插即用 (Plug-and-Play): 新的工作者只需遵循约定好的接口,即可无缝加入团队。
- 模块化与解耦: 系统被分解成独立的、可替换的组件,提高了系统的鲁棒性和可维护性。
- 高复用性: 工作者提供的能力可以被广泛共享,避免重复造轮子。
- 卓越的扩展性: 可以轻松地根据需求增减工作者实例,实现弹性伸缩。
- 互操作性: 促进不同技术栈、不同团队甚至不同组织之间的协作。
简而言之,工作者抽象是构建未来智能、弹性、可扩展分布式系统的关键基石。
2. 工作者抽象的核心概念与组件
在深入API设计之前,我们首先要明确构成工作者抽象体系的核心概念和基本组件。
2.1 核心概念
- Agent (代理/智能体): 广义上指任何能够感知环境、进行决策并采取行动的自主实体。在我们的语境中,Agent是具有特定能力和业务逻辑的软件模块。
- Worker (工作者): 是Agent在工作者抽象层面的具体体现。它是一个符合我们标准化API接口的Agent实例。一个Agent可以被封装成一个或多个Worker。
- Task (任务): 工作者需要执行的最小工作单元。任务通常包含一个类型(或名称)、一组参数和可选的元数据。
- Worker Manager / Orchestrator (工作者管理器/编排器): 系统的“大脑”,负责发现、注册、监控、调度工作者,并分配任务。它不直接执行业务逻辑,而是协调工作者完成更高层级的目标。
- Worker Registry (工作者注册中心): 存储所有可用工作者的元数据,包括其地址、能力描述、健康状态等。供Orchestrator查询和发现工作者。
- Task Queue (任务队列): 用于缓冲待处理任务,实现任务的异步提交和工作者的异步处理,提高系统的吞吐量和弹性。
2.2 架构概览
一个典型的工作者抽象系统架构可以概括如下:
+---------------------+ +---------------------+ +---------------------+
| | | | | |
| Task Requester |----->| Orchestrator |----->| Worker Registry |
| (e.g., User UI, | | (Task Scheduler, | | (Service Discovery)|
| another Agent) | | Workflow Engine) |<-----| |
| | | | | |
+---------------------+ +----------^----------+ +---------------------+
|
| Task Assignment
| Status Updates
v
+---------------------+ +---------------------+ +---------------------+
| | | | | |
| Worker A |<---->| Task Queue |<---->| Worker B |
| (Implements | | (e.g., RabbitMQ, | | (Implements |
| Worker API) | | Kafka, RedisMQ) | | Worker API) |
| | | | | |
+---------------------+ +---------------------+ +---------------------+
^
| Worker C (Ad-hoc or dynamic worker)
|
+---------------------------------------
在这个架构中:
- Task Requester 发起任务请求。
- Orchestrator 接收请求,根据任务类型和工作者的能力,从 Worker Registry 中找到合适的工作者,并将任务放入 Task Queue。
- Worker 从 Task Queue 中拉取任务,执行任务,并将结果和状态报告给 Orchestrator(或通过特定通道)。
- Worker Registry 负责工作者的注册和发现。
3. 标准化API接口设计:契约的构建
现在,我们进入核心部分——如何设计一套标准化的API接口。这套接口是工作者抽象的“契约”,所有工作者必须遵守。
在选择通信协议和数据格式时,我们需要权衡性能、易用性、跨语言支持和可扩展性。
常见选择:
- RESTful HTTP + JSON: 优点是简单易用、浏览器原生支持、生态系统成熟。缺点是效率相对较低、缺少强类型约束、不适合长连接和双向流。
- gRPC + Protocol Buffers: 优点是高性能、强类型、多语言支持、支持双向流、更小的消息体。缺点是学习曲线稍陡峭、对HTTP/1.1工具链不友好。
- Message Queues (e.g., RabbitMQ, Kafka) + JSON/Protocol Buffers: 优点是解耦、异步、高吞吐、高可用。缺点是增加了系统复杂性、不适合实时交互。
- WebSocket + JSON: 优点是支持长连接、双向实时通信。缺点是消息可靠性需自行实现。
考虑到我们希望实现高性能、强类型且跨语言的“即插即用”,并支持更复杂的双向通信模式(例如任务执行过程中的实时状态更新),gRPC + Protocol Buffers 是一个非常理想的选择。它提供了结构化的服务定义和高效的二进制序列化,非常适合构建微服务和Agent系统。
我们将以gRPC为例,定义我们的Worker API。
3.1 Worker API 接口定义 (Protocol Buffers)
首先,我们定义worker.proto文件,它将描述工作者服务的所有RPC方法和数据结构。
syntax = "proto3";
package worker_abstraction;
import "google/protobuf/timestamp.proto";
import "google/protobuf/empty.proto";
// ToolParameter 定义了工具所需参数的结构
message ToolParameter {
string name = 1; // 参数名称
string type = 2; // 参数类型 (e.g., "string", "int", "boolean", "object")
string description = 3; // 参数描述
bool required = 4; // 是否必需
// 可以添加更多约束,如:
// repeated string enum_values = 5;
// google.protobuf.Any default_value = 6;
}
// ToolCapability 定义了一个工作者可以提供的工具或能力
message ToolCapability {
string name = 1; // 工具的唯一名称 (e.g., "data_analysis_tool", "image_processor")
string description = 2; // 工具的详细描述
repeated ToolParameter parameters = 3; // 工具所需的参数列表
// 可以添加关于资源消耗、并发限制等元数据
}
// WorkerInfo 定义了工作者的基本信息
message WorkerInfo {
string worker_id = 1; // 全局唯一的Worker ID
string name = 2; // Worker的友好名称 (e.g., "数据分析助手", "图像处理服务")
string version = 3; // Worker的版本号
string address = 4; // Worker的RPC服务地址 (host:port)
repeated string tags = 5; // 用于分类和过滤的标签 (e.g., "GPU", "Python", "ML")
google.protobuf.Timestamp last_heartbeat = 6; // 最后心跳时间
repeated ToolCapability capabilities = 7; // Worker提供的能力列表
}
// TaskParameter 定义了任务执行时传入的参数
message TaskParameter {
string key = 1;
string value_json = 2; // 参数值以JSON字符串形式传递,方便复杂数据结构
}
// Task 定义了工作者需要执行的任务
message Task {
string task_id = 1; // 全局唯一的任务ID
string worker_id = 2; // 目标Worker的ID (Orchestrator在调度时填充)
string capability_name = 3; // 任务要调用的能力名称
repeated TaskParameter parameters = 4; // 任务参数列表
int64 timeout_seconds = 5; // 任务执行超时时间 (秒)
google.protobuf.Timestamp created_at = 6; // 任务创建时间
map<string, string> metadata = 7; // 额外元数据,如请求来源等
}
// TaskStatus 枚举了任务的当前状态
enum TaskStatus {
TASK_STATUS_UNKNOWN = 0;
TASK_STATUS_PENDING = 1; // 任务已接收,等待执行
TASK_STATUS_RUNNING = 2; // 任务正在执行
TASK_STATUS_COMPLETED = 3; // 任务成功完成
TASK_STATUS_FAILED = 4; // 任务执行失败
TASK_STATUS_CANCELED = 5; // 任务被取消
}
// TaskResult 定义了任务执行的结果
message TaskResult {
string task_id = 1;
TaskStatus status = 2;
string output_json = 3; // 任务输出,以JSON字符串形式传递
string error_message = 4; // 如果任务失败,包含错误信息
google.protobuf.Timestamp completed_at = 5; // 任务完成时间
map<string, string> metadata = 6; // 额外元数据
}
// ------ RPC 请求与响应消息定义 ------
// RegisterWorkerRequest 用于工作者向注册中心注册
message RegisterWorkerRequest {
WorkerInfo worker_info = 1;
}
message RegisterWorkerResponse {
bool success = 1;
string message = 2;
}
// HeartbeatRequest 用于工作者发送心跳以表明其存活
message HeartbeatRequest {
string worker_id = 1;
int32 current_load = 2; // Worker当前负载情况 (e.g., 正在处理的任务数量)
// 可以添加更详细的资源使用情况
}
message HeartbeatResponse {
bool success = 1;
string message = 2;
// 可以包含Orchestrator对Worker的指令,如:
// repeated string commands = 3; // e.g., "STOP", "UPDATE_CONFIG"
}
// ExecuteTaskRequest 用于Orchestrator向Worker分配任务
message ExecuteTaskRequest {
Task task = 1;
}
// ExecuteTaskResponse 用于Worker确认接收任务
message ExecuteTaskResponse {
string task_id = 1;
bool accepted = 2; // Worker是否接受了任务
string message = 3; // 拒绝任务的原因
}
// GetTaskStatusRequest 用于查询任务状态
message GetTaskStatusRequest {
string task_id = 1;
}
// StreamTaskStatusResponse 用于通过流式RPC实时更新任务状态
message StreamTaskStatusResponse {
TaskResult result = 1;
}
// WorkerService 定义了工作者需要实现的服务接口
service WorkerService {
// ExecuteTask 是一个一元RPC,用于Orchestrator将任务分配给Worker
rpc ExecuteTask (ExecuteTaskRequest) returns (ExecuteTaskResponse);
// StreamTaskStatus 是一个服务器端流式RPC,用于Orchestrator订阅特定任务的状态更新
rpc StreamTaskStatus (GetTaskStatusRequest) returns (stream StreamTaskStatusResponse);
// GetWorkerInfo 用于Orchestrator获取Worker的详细信息
rpc GetWorkerInfo (google.protobuf.Empty) returns (WorkerInfo);
}
// RegistryService 定义了注册中心的服务接口 (供Worker和Orchestrator调用)
service RegistryService {
// RegisterWorker 用于Worker注册自己
rpc RegisterWorker (RegisterWorkerRequest) returns (RegisterWorkerResponse);
// WorkerHeartbeat 用于Worker发送心跳
rpc WorkerHeartbeat (HeartbeatRequest) returns (HeartbeatResponse);
// GetAvailableWorkers 用于Orchestrator查询可用Worker
rpc GetAvailableWorkers (google.protobuf.Empty) returns (stream WorkerInfo);
// GetWorkerById 用于Orchestrator根据ID查询特定Worker
rpc GetWorkerById (GetWorkerByIdRequest) returns (WorkerInfo);
}
message GetWorkerByIdRequest {
string worker_id = 1;
}
对上述API设计的解释:
ToolCapability: 这是描述工作者“能做什么”的核心。一个工作者可以提供一个或多个ToolCapability。每个ToolCapability都有一个唯一的name,一个description,以及一个参数列表parameters。参数定义了其名称、类型、描述和是否必需。这种结构允许Orchestrator理解工作者的功能,并根据任务需求匹配合适的工作者。WorkerInfo: 描述了工作者的基本身份和元数据,包括全局唯一的worker_id、名称、版本、网络地址、标签和最重要的capabilities。last_heartbeat用于跟踪工作者的活跃状态。Task: 封装了待执行任务的所有信息,包括任务ID、目标能力名称、具体参数、超时时间等。参数以key-value对的形式存储,value_json字段允许传递任意复杂的JSON结构,增加了灵活性。TaskStatus/TaskResult: 定义了任务的生命周期状态和最终结果。output_json同样支持复杂的结果数据。WorkerService: 这是每个工作者自身需要实现的接口。ExecuteTask: Orchestrator调用此方法向工作者分配一个任务。工作者返回ExecuteTaskResponse表示是否接受了任务。StreamTaskStatus: 这是一个服务器端流式RPC。Orchestrator可以调用此方法并保持连接,以便工作者在任务执行过程中实时推送状态更新(PENDING -> RUNNING -> COMPLETED/FAILED)。这比频繁轮询更高效。GetWorkerInfo: 允许Orchestrator随时查询工作者的最新信息。
RegistryService: 这是注册中心需要实现的接口。RegisterWorker: 工作者启动时调用此方法向注册中心注册自己。WorkerHeartbeat: 工作者定期发送心跳以保持其在注册中心的活跃状态,并可以报告当前负载。GetAvailableWorkers: Orchestrator调用此方法获取所有可用工作者的列表(可以设计为流式,以便实时发现新工作者或下线工作者)。GetWorkerById: 根据ID查询特定工作者。
3.2 关键接口交互流程
以下表格展示了主要的交互流程:
| 参与者 | 动作 | 描述 | 涉及的API方法 |
|---|---|---|---|
| Worker | 启动与注册 | 启动后向注册中心汇报自身信息和能力。 | RegistryService.RegisterWorker |
| Worker | 定期心跳 | 定期向注册中心发送心跳,表明存活和报告负载。 | RegistryService.WorkerHeartbeat |
| Orchestrator | 发现Worker | 从注册中心获取所有可用Worker及其能力信息。 | RegistryService.GetAvailableWorkers |
| Orchestrator | 分配任务 | 根据任务需求和Worker能力,选择合适的Worker,并发送任务。 | WorkerService.ExecuteTask |
| Worker | 接收任务 | 接收任务请求,验证后返回是否接受。 | WorkerService.ExecuteTask (响应) |
| Worker | 任务执行与状态更新 | 异步执行任务,并在任务状态变化时,通过流式API向Orchestrator推送更新。 | WorkerService.StreamTaskStatus (流式响应) |
| Orchestrator | 接收任务结果 | 通过订阅流式API或查询API,获取任务的最终结果和状态。 | WorkerService.StreamTaskStatus / GetTaskStatusRequest |
4. 架构组件的实现细节与代码示例
现在我们来具体看看如何实现这些组件。我们将使用Python作为示例语言,结合grpcio库。
4.1 注册中心 (Worker Registry) 示例
注册中心需要维护一个工作者信息的内存(或持久化)存储。
# registry_server.py
import grpc
from concurrent import futures
import time
import uuid
import json
from datetime import datetime, timezone
from google.protobuf.timestamp_pb2 import Timestamp
from google.protobuf.empty_pb2 import Empty
import worker_pb2
import worker_pb2_grpc
class WorkerRegistryServicer(worker_pb2_grpc.RegistryServiceServicer):
def __init__(self):
self.workers = {} # worker_id -> WorkerInfo
self.last_heartbeat_time = {} # worker_id -> datetime
def _to_timestamp_pb(self, dt):
timestamp = Timestamp()
if dt:
timestamp.FromDatetime(dt.astimezone(timezone.utc))
return timestamp
def _from_timestamp_pb(self, timestamp):
if timestamp and timestamp.seconds > 0:
return timestamp.ToDatetime(timezone.utc)
return None
def RegisterWorker(self, request, context):
worker_info = request.worker_info
if not worker_info.worker_id:
# If worker_id is not provided by worker, generate one
worker_info.worker_id = str(uuid.uuid4())
# Update last_heartbeat
worker_info.last_heartbeat.CopyFrom(self._to_timestamp_pb(datetime.now(timezone.utc)))
self.workers[worker_info.worker_id] = worker_info
self.last_heartbeat_time[worker_info.worker_id] = datetime.now(timezone.utc)
print(f"Worker {worker_info.name} ({worker_info.worker_id}) registered at {worker_info.address}")
return worker_pb2.RegisterWorkerResponse(success=True, message="Worker registered successfully.")
def WorkerHeartbeat(self, request, context):
worker_id = request.worker_id
if worker_id not in self.workers:
print(f"Heartbeat from unknown worker_id: {worker_id}")
return worker_pb2.HeartbeatResponse(success=False, message="Unknown worker_id.")
# Update heartbeat time
self.last_heartbeat_time[worker_id] = datetime.now(timezone.utc)
self.workers[worker_id].last_heartbeat.CopyFrom(self._to_timestamp_pb(datetime.now(timezone.utc)))
print(f"Worker {worker_id} heartbeat received. Load: {request.current_load}")
return worker_pb2.HeartbeatResponse(success=True, message="Heartbeat acknowledged.")
def GetAvailableWorkers(self, request, context):
# Implement a cleanup for stale workers (e.g., no heartbeat for X seconds)
# For simplicity, we'll just return all registered workers here.
for worker_id, info in list(self.workers.items()): # Use list() to avoid RuntimeError for dict change during iteration
# Example cleanup: remove workers inactive for more than 30 seconds
if (datetime.now(timezone.utc) - self.last_heartbeat_time.get(worker_id, datetime.min.replace(tzinfo=timezone.utc))).total_seconds() > 30:
print(f"Worker {worker_id} deemed stale, removing.")
del self.workers[worker_id]
del self.last_heartbeat_time[worker_id]
continue
yield info
def GetWorkerById(self, request, context):
worker_id = request.worker_id
worker_info = self.workers.get(worker_id)
if worker_info:
return worker_info
else:
context.set_code(grpc.StatusCode.NOT_FOUND)
context.set_details(f"Worker with ID {worker_id} not found.")
return worker_pb2.WorkerInfo() # Return empty WorkerInfo
def serve_registry():
server = grpc.server(futures.ThreadPoolExecutor(max_workers=10))
worker_pb2_grpc.add_RegistryServiceServicer_to_server(WorkerRegistryServicer(), server)
port = '[::]:50051'
server.add_insecure_port(port)
server.start()
print(f"Worker Registry server started on {port}")
try:
while True:
time.sleep(86400) # One day
except KeyboardInterrupt:
server.stop(0)
if __name__ == '__main__':
serve_registry()
4.2 工作者 (Worker) 示例
工作者需要实现WorkerService接口,并定期向注册中心发送心跳。
# example_worker.py
import grpc
from concurrent import futures
import time
import uuid
import json
from datetime import datetime, timezone
import threading
from google.protobuf.timestamp_pb2 import Timestamp
from google.protobuf.empty_pb2 import Empty
import worker_pb2
import worker_pb2_grpc
# In-memory store for task results (for simplicity)
TASK_RESULTS = {}
class ExampleWorkerServicer(worker_pb2_grpc.WorkerServiceServicer):
def __init__(self, worker_id, worker_name, worker_address, capabilities):
self.worker_id = worker_id
self.worker_name = worker_name
self.worker_address = worker_address
self.capabilities = capabilities # List of ToolCapability
self.active_tasks = {} # task_id -> Task object
def _to_timestamp_pb(self, dt):
timestamp = Timestamp()
if dt:
timestamp.FromDatetime(dt.astimezone(timezone.utc))
return timestamp
def _from_timestamp_pb(self, timestamp):
if timestamp and timestamp.seconds > 0:
return timestamp.ToDatetime(timezone.utc)
return None
def GetWorkerInfo(self, request, context):
worker_info = worker_pb2.WorkerInfo(
worker_id=self.worker_id,
name=self.worker_name,
version="1.0.0",
address=self.worker_address,
tags=["python", "example", "CPU"],
capabilities=self.capabilities
)
worker_info.last_heartbeat.CopyFrom(self._to_timestamp_pb(datetime.now(timezone.utc)))
return worker_info
def _execute_dummy_task(self, task, stream_callback):
"""Simulates task execution and reports status."""
task_id = task.task_id
capability_name = task.capability_name
params = {p.key: json.loads(p.value_json) for p in task.parameters}
print(f"Worker {self.worker_id} received task {task_id} for capability {capability_name} with params: {params}")
# Initial status: RUNNING
TASK_RESULTS[task_id] = worker_pb2.TaskResult(
task_id=task_id, status=worker_pb2.TASK_STATUS_RUNNING,
completed_at=self._to_timestamp_pb(datetime.now(timezone.utc)) # update completion time for current status
)
stream_callback(worker_pb2.StreamTaskStatusResponse(result=TASK_RESULTS[task_id]))
time.sleep(1) # Simulate some work
try:
# Simulate different capabilities
if capability_name == "dummy_processor":
input_data = params.get("input", "default_input")
processed_data = f"Processed: {input_data.upper()}"
output = {"result": processed_data, "worker": self.worker_name}
print(f"Task {task_id} processed: {processed_data}")
status = worker_pb2.TASK_STATUS_COMPLETED
elif capability_name == "error_simulator":
raise ValueError("Simulated error during task execution.")
else:
raise NotImplementedError(f"Capability '{capability_name}' not supported.")
TASK_RESULTS[task_id].output_json = json.dumps(output)
TASK_RESULTS[task_id].status = status
except Exception as e:
print(f"Task {task_id} failed: {e}")
TASK_RESULTS[task_id].status = worker_pb2.TASK_STATUS_FAILED
TASK_RESULTS[task_id].error_message = str(e)
TASK_RESULTS[task_id].output_json = json.dumps({"error": str(e)}) # Provide error in output_json too
TASK_RESULTS[task_id].completed_at.CopyFrom(self._to_timestamp_pb(datetime.now(timezone.utc)))
stream_callback(worker_pb2.StreamTaskStatusResponse(result=TASK_RESULTS[task_id]))
del self.active_tasks[task_id] # Task completed
def ExecuteTask(self, request, context):
task = request.task
# Basic validation: check if capability is supported
if not any(cap.name == task.capability_name for cap in self.capabilities):
return worker_pb2.ExecuteTaskResponse(
task_id=task.task_id, accepted=False,
message=f"Capability '{task.capability_name}' not supported by this worker."
)
# Store task as active
self.active_tasks[task.task_id] = task
TASK_RESULTS[task.task_id] = worker_pb2.TaskResult(
task_id=task.task_id, status=worker_pb2.TASK_STATUS_PENDING,
completed_at=self._to_timestamp_pb(datetime.now(timezone.utc))
)
print(f"Worker {self.worker_id} accepted task {task.task_id}")
return worker_pb2.ExecuteTaskResponse(task_id=task.task_id, accepted=True)
def StreamTaskStatus(self, request, context):
task_id = request.task_id
last_reported_status = None
while context.is_active():
current_result = TASK_RESULTS.get(task_id)
if current_result and current_result != last_reported_status:
yield worker_pb2.StreamTaskStatusResponse(result=current_result)
last_reported_status = current_result
if current_result.status in [worker_pb2.TASK_STATUS_COMPLETED, worker_pb2.TASK_STATUS_FAILED, worker_pb2.TASK_STATUS_CANCELED]:
print(f"Stream for task {task_id} completed. Final status: {current_result.status}")
break
# If task not yet in results (e.g., just accepted), or no change, wait
if task_id not in TASK_RESULTS or current_result.status == worker_pb2.TASK_STATUS_PENDING:
# Check if the task is active and still being processed
if task_id in self.active_tasks:
# If task is pending/running, start its execution thread if not already
if task_id not in [t.task_id for t in threading.enumerate() if t.name == f"task_executor_{task_id}"]:
# This callback allows the task execution thread to push updates
# directly to this streaming RPC's context.
# However, grpc context can only be yielded from the servicer method itself.
# A more robust solution would be to use a separate queue/event system
# for task execution to push updates to the streaming servicer.
# For simplicity, we will simulate this by checking TASK_RESULTS periodically.
# In a real scenario, you would pass a channel or callback from this streaming method
# to the task executor thread, allowing it to send updates directly to the client
# via `context.write(response)`.
# For this example, we'll just poll TASK_RESULTS, which is less efficient but simpler.
# Let's create a callback function that captures the current context's `write` method
def stream_write_callback(response):
if context.is_active():
context.write(response)
else:
print(f"Context for task {task_id} is inactive, cannot write stream.")
task_thread = threading.Thread(
target=self._execute_dummy_task,
args=(self.active_tasks[task_id], stream_write_callback),
name=f"task_executor_{task_id}"
)
task_thread.daemon = True
task_thread.start()
else:
# Task not active and not in results, might be a late request or invalid task_id
print(f"Task {task_id} not found or no longer active.")
context.set_code(grpc.StatusCode.NOT_FOUND)
context.set_details(f"Task {task_id} not found or already completed/failed.")
break # Exit stream if task is not found
time.sleep(0.5) # Poll for updates
def run_worker(registry_address, worker_port, worker_name="ExampleWorker"):
worker_id = str(uuid.uuid4())
worker_address = f"localhost:{worker_port}"
# Define capabilities
dummy_processor_cap = worker_pb2.ToolCapability(
name="dummy_processor",
description="A simple processor that converts input text to uppercase.",
parameters=[
worker_pb2.ToolParameter(name="input", type="string", description="The text to process.", required=True),
worker_pb2.ToolParameter(name="language", type="string", description="Optional language hint.", required=False)
]
)
error_simulator_cap = worker_pb2.ToolCapability(
name="error_simulator",
description="A tool that always fails for testing error handling.",
parameters=[]
)
capabilities = [dummy_processor_cap, error_simulator_cap]
# Start WorkerService
worker_server = grpc.server(futures.ThreadPoolExecutor(max_workers=10))
worker_servicer = ExampleWorkerServicer(worker_id, worker_name, worker_address, capabilities)
worker_pb2_grpc.add_WorkerServiceServicer_to_server(worker_servicer, worker_server)
worker_server.add_insecure_port(worker_address)
worker_server.start()
print(f"Worker service '{worker_name}' started on {worker_address}")
# Register with RegistryService
registry_channel = grpc.insecure_channel(registry_address)
registry_stub = worker_pb2_grpc.RegistryServiceStub(registry_channel)
def register_and_heartbeat():
while True:
try:
# Register
worker_info_to_register = worker_pb2.WorkerInfo(
worker_id=worker_id,
name=worker_name,
version="1.0.0",
address=worker_address,
tags=["python", "example", "CPU"],
capabilities=capabilities
)
worker_info_to_register.last_heartbeat.CopyFrom(worker_servicer._to_timestamp_pb(datetime.now(timezone.utc)))
response = registry_stub.RegisterWorker(worker_pb2.RegisterWorkerRequest(worker_info=worker_info_to_register))
print(f"Registry registration response: {response.message} (Success: {response.success})")
# Start heartbeat loop
while True:
heartbeat_response = registry_stub.WorkerHeartbeat(
worker_pb2.HeartbeatRequest(worker_id=worker_id, current_load=len(worker_servicer.active_tasks))
)
# print(f"Heartbeat response: {heartbeat_response.message} (Success: {heartbeat_response.success})")
time.sleep(5) # Send heartbeat every 5 seconds
except grpc.RpcError as e:
print(f"Registry connection error: {e}. Retrying in 10 seconds...")
time.sleep(10)
except Exception as e:
print(f"Unexpected error in registration/heartbeat: {e}. Retrying in 10 seconds...")
time.sleep(10)
heartbeat_thread = threading.Thread(target=register_and_heartbeat)
heartbeat_thread.daemon = True # Allow main program to exit even if this thread is running
heartbeat_thread.start()
try:
while True:
time.sleep(86400) # One day
except KeyboardInterrupt:
worker_server.stop(0)
print(f"Worker '{worker_name}' stopped.")
if __name__ == '__main__':
# Usage: python example_worker.py 50052 ExampleWorker1
# python example_worker.py 50053 ExampleWorker2
import sys
registry_addr = "localhost:50051" # Default registry address
worker_port = 50052
worker_name = "ExampleWorker"
if len(sys.argv) > 1:
worker_port = int(sys.argv[1])
if len(sys.argv) > 2:
worker_name = sys.argv[2]
run_worker(registry_addr, worker_port, worker_name)
4.3 编排器 (Orchestrator) 示例
编排器将作为客户端,与注册中心和服务工作者交互。
# orchestrator_client.py
import grpc
import time
import uuid
import json
from datetime import datetime, timezone
from google.protobuf.timestamp_pb2 import Timestamp
from google.protobuf.empty_pb2 import Empty
import worker_pb2
import worker_pb2_grpc
class Orchestrator:
def __init__(self, registry_address):
self.registry_channel = grpc.insecure_channel(registry_address)
self.registry_stub = worker_pb2_grpc.RegistryServiceStub(self.registry_channel)
self.available_workers = {} # worker_id -> WorkerInfo
self.worker_stubs = {} # worker_id -> WorkerServiceStub
def _to_timestamp_pb(self, dt):
timestamp = Timestamp()
if dt:
timestamp.FromDatetime(dt.astimezone(timezone.utc))
return timestamp
def refresh_workers(self):
print("Refreshing available workers...")
try:
# Use streaming to get all workers and keep the connection potentially active
for worker_info in self.registry_stub.GetAvailableWorkers(Empty()):
if worker_info.worker_id not in self.available_workers:
print(f"Discovered new worker: {worker_info.name} ({worker_info.worker_id}) at {worker_info.address}")
self.available_workers[worker_info.worker_id] = worker_info
# Create a stub for this worker
worker_channel = grpc.insecure_channel(worker_info.address)
self.worker_stubs[worker_info.worker_id] = worker_pb2_grpc.WorkerServiceStub(worker_channel)
else:
# Update existing worker info
self.available_workers[worker_info.worker_id] = worker_info
# Remove stale workers from our local cache if they are no longer in registry
current_registry_ids = {info.worker_id for info in self.registry_stub.GetAvailableWorkers(Empty())}
for worker_id in list(self.available_workers.keys()):
if worker_id not in current_registry_ids:
print(f"Worker {self.available_workers[worker_id].name} ({worker_id}) went offline.")
del self.available_workers[worker_id]
if worker_id in self.worker_stubs:
# Close the channel if grpcio supports it explicitly, otherwise it's implicitly managed
# For simplicity, we just delete the stub.
del self.worker_stubs[worker_id]
except grpc.RpcError as e:
print(f"Error refreshing workers from registry: {e}")
if not self.available_workers:
print("No workers currently available.")
else:
print(f"Currently available workers: {len(self.available_workers)}")
for worker_id, info in self.available_workers.items():
print(f" - {info.name} ({info.worker_id}) supports: {[c.name for c in info.capabilities]}")
def find_worker_for_capability(self, capability_name):
# In a real system, this would involve more sophisticated load balancing,
# resource matching, and potentially caching.
for worker_id, worker_info in self.available_workers.items():
if any(cap.name == capability_name for cap in worker_info.capabilities):
return worker_info
return None
def execute_task(self, capability_name, task_params, timeout_seconds=60):
self.refresh_workers() # Ensure we have the latest worker list
worker_info = self.find_worker_for_capability(capability_name)
if not worker_info:
print(f"No worker found for capability: {capability_name}")
return None, "No worker available."
worker_stub = self.worker_stubs.get(worker_info.worker_id)
if not worker_stub:
print(f"No gRPC stub for worker {worker_info.worker_id}. This should not happen after refresh.")
return None, "Worker stub not found."
task_id = str(uuid.uuid4())
task_parameters_pb = [
worker_pb2.TaskParameter(key=k, value_json=json.dumps(v))
for k, v in task_params.items()
]
task_pb = worker_pb2.Task(
task_id=task_id,
worker_id=worker_info.worker_id,
capability_name=capability_name,
parameters=task_parameters_pb,
timeout_seconds=timeout_seconds,
created_at=self._to_timestamp_pb(datetime.now(timezone.utc))
)
try:
print(f"Assigning task {task_id} to worker {worker_info.name} ({worker_info.worker_id})...")
response = worker_stub.ExecuteTask(worker_pb2.ExecuteTaskRequest(task=task_pb))
if response.accepted:
print(f"Task {task_id} accepted by worker. Waiting for status updates...")
return task_id, None
else:
print(f"Task {task_id} rejected by worker: {response.message}")
return None, response.message
except grpc.RpcError as e:
print(f"Error assigning task to worker {worker_info.worker_id}: {e}")
return None, str(e)
def stream_task_status(self, task_id):
# We need to know which worker is handling this task to stream its status.
# This requires the Orchestrator to maintain a mapping of task_id -> worker_id.
# For simplicity, let's assume we can get the worker_id from the initial task assignment,
# or have a more robust task management system.
# For this example, we'll try to find any worker that accepted this task (not ideal for real world).
# A better approach would be: Orchestrator keeps a mapping of task_id -> WorkerInfo (or worker_id)
# when it assigns the task. Then it uses that worker_info to get the stub.
# For now, let's just pick one of the known workers, this is a limitation of current example.
# In a real system, the Orchestrator would store which worker was assigned which task.
# Let's assume the worker_id is part of the Task object returned by ExecuteTask or stored.
# Here, we'll iterate through all stubs to find the one managing the task.
# This is inefficient, but illustrates the streaming concept.
target_worker_stub = None
# In a real system, Orchestrator would have a `task_id -> worker_id` mapping.
# For this example, let's assume we can just pick one worker's stub for demonstration.
# A more robust Orchestrator would track which task is assigned to which worker.
if not self.worker_stubs:
print("No worker stubs available to stream status.")
return
# Simplified: iterate all active workers to find the task. This is inefficient.
# Proper Orchestrator design would store `task_id -> worker_id` mapping.
for worker_id, stub in self.worker_stubs.items():
try:
# This is a hack for demonstration. In reality, you wouldn't query every worker.
# You would know which worker was assigned the task.
worker_info = stub.GetWorkerInfo(Empty())
# If worker_info indicates it's managing the task, then use its stub.
# This logic is too complex for a simple example.
# Let's assume we know the worker_id from the `execute_task` call.
# For this simple example, we'll just pick the first available worker stub,
# assuming the task was assigned to one of them.
target_worker_stub = stub
break # Found a stub to try
except grpc.RpcError:
continue
if not target_worker_stub:
print(f"Could not find an active worker stub to stream status for task {task_id}.")
return
try:
for response in target_worker_stub.StreamTaskStatus(worker_pb2.GetTaskStatusRequest(task_id=task_id)):
result = response.result
print(f"Task {result.task_id} status: {worker_pb2.TaskStatus.Name(result.status)}")
if result.output_json:
print(f" Output: {json.loads(result.output_json)}")
if result.error_message:
print(f" Error: {result.error_message}")
if result.status in [worker_pb2.TASK_STATUS_COMPLETED, worker_pb2.TASK_STATUS_FAILED, worker_pb2.TASK_STATUS_CANCELED]:
print(f"Task {task_id} streaming finished.")
break
except grpc.RpcError as e:
print(f"Error streaming task status for {task_id}: {e}")
except Exception as e:
print(f"Unexpected error during status streaming: {e}")
def main():
registry_addr = "localhost:50051"
orchestrator = Orchestrator(registry_addr)
print("--- Orchestrator starting ---")
orchestrator.refresh_workers()
time.sleep(2) # Give workers time to register
# Example 1: Execute a successful task
print("n--- Executing a successful task (dummy_processor) ---")
task_params_1 = {"input": "hello world"}
task_id_1, error_1 = orchestrator.execute_task("dummy_processor", task_params_1)
if task_id_1:
orchestrator.stream_task_status(task_id_1)
else:
print(f"Task 1 failed to start: {error_1}")
time.sleep(2) # Give some buffer
# Example 2: Execute a failing task (error_simulator)
print("n--- Executing a failing task (error_simulator) ---")
task_params_2 = {}
task_id_2, error_2 = orchestrator.execute_task("error_simulator", task_params_2)
if task_id_2:
orchestrator.stream_task_status(task_id_2)
else:
print(f"Task 2 failed to start: {error_2}")
time.sleep(2)
# Example 3: Execute a task with unsupported capability
print("n--- Executing a task with unsupported capability ---")
task_params_3 = {"data": [1,2,3]}
task_id_3, error_3 = orchestrator.execute_task("unsupported_tool", task_params_3)
if task_id_3:
orchestrator.stream_task_status(task_id_3)
else:
print(f"Task 3 failed to start as expected: {error_3}")
print("n--- Orchestrator finished ---")
if __name__ == '__main__':
# You need to run registry_server.py first, then one or more example_worker.py instances
# e.g.,
# 1. python registry_server.py
# 2. python example_worker.py 50052 WorkerA
# 3. python example_worker.py 50053 WorkerB
# 4. python orchestrator_client.py
main()
4.4 代码运行步骤
-
生成gRPC代码:
首先,确保你安装了grpcio-tools:pip install grpcio grpcio-tools
然后,在worker.proto文件所在的目录下运行:
python -m grpc_tools.protoc -I. --python_out=. --grpc_python_out=. worker.proto
这会生成worker_pb2.py和worker_pb2_grpc.py文件。 -
启动注册中心:
python registry_server.py
它会监听localhost:50051。 -
启动工作者实例:
可以启动一个或多个工作者。例如:
python example_worker.py 50052 WorkerA
python example_worker.py 50053 WorkerB
这两个工作者会分别在localhost:50052和localhost:50053启动,并向注册中心注册。 -
启动编排器客户端:
python orchestrator_client.py
编排器会发现工作者,并尝试分配任务。
通过以上步骤,你将看到注册中心接收工作者注册和心跳,工作者执行任务并更新状态,以及编排器调度任务并实时接收状态更新的整个流程。
5. 高级议题与考量
构建一个健壮、高性能、可扩展的工作者抽象系统,还需要考虑许多高级议题。
5.1 能力与工具的精细化描述
ToolCapability的参数目前只支持name, type, description, required。在实际应用中,可能需要更丰富的元数据:
- 数据类型约束: 除了基本类型,可能需要支持枚举、正则表达式、数值范围、Schema定义(如JSON Schema)等。
- 资源需求: 工作者在执行特定能力时可能需要特定的资源,如GPU内存、CPU核数、特定软件版本。
ToolCapability可以扩展字段来描述这些需求,以便Orchestrator进行更智能的调度。 - 并发限制: 一个工作者可能同时只能处理X个某个特定类型的任务。
- 外部依赖: 某些能力可能依赖于外部服务或API,这些信息对于故障排查和部署非常有价值。
5.2 资源管理与调度优化
Orchestrator的调度逻辑远比find_worker_for_capability复杂。它需要考虑:
- 负载均衡: 如何将任务均匀地分配给多个具有相同能力的Worker,避免单个Worker过载。
- 资源亲和性: 任务可能需要特定类型的Worker(如GPU Worker),调度器需要根据Worker的标签和资源描述进行匹配。
- 优先级调度: 紧急任务应优先于普通任务执行。
- 任务队列管理: 结合任务队列,实现背压机制,防止系统过载。
5.3 安全性
在分布式系统中,安全性至关重要:
- 认证 (Authentication): 确保只有合法的Worker和Orchestrator能够相互通信和注册。可以使用mTLS (mutual TLS) 或基于Token的认证。
- 授权 (Authorization): 确保Worker只能执行被授权的任务,Orchestrator只能访问被授权的Worker信息。
- 数据加密: 所有通信都应使用TLS/SSL进行加密。
- 隔离: Worker之间应相互隔离,防止一个Worker的故障或恶意行为影响其他Worker或整个系统。
5.4 容错与韧性
系统在面对故障时应保持健壮:
- 重试机制: Orchestrator在任务失败时可以尝试将任务重新分配给其他Worker。
- 死信队列 (Dead Letter Queue): 对于多次重试仍失败的任务,应将其放入死信队列进行人工审查或后续处理。
- 熔断器 (Circuit Breaker): 当某个Worker持续失败时,Orchestrator应暂时停止向其发送任务,防止雪崩效应。
- 优雅降级: 在部分Worker不可用时,系统应能继续提供部分服务。
- 幂等性 (Idempotency): 任务设计应尽可能支持幂等性,即重复执行不会产生副作用,这对于重试机制非常重要。
5.5 可观测性 (Observability)
- 日志记录: 统一的日志格式和集中式日志系统,方便排查问题。
- 指标监控: 收集Worker的健康状态、任务处理量、延迟、错误率等指标,通过Prometheus、Grafana等工具进行监控和告警。
- 分布式追踪: 使用OpenTracing/OpenTelemetry等工具,追踪任务在不同Worker之间流转的全过程,理解系统瓶颈。
5.6 版本管理
随着Worker能力的演进,其API接口也可能发生变化。
- API版本化: 支持不同版本的API,允许新旧Worker共存。可以通过URL路径(REST)、Header(REST/gRPC)、Protocol Buffers包名或服务名(gRPC)等方式实现。
- 兼容性策略: 优先保持向后兼容性,避免破坏性变更。
5.7 有状态与无状态工作者
- 无状态工作者 (Stateless Workers): 每次任务执行都是独立的,不依赖于之前的执行状态。这简化了扩展和故障恢复。
- 有状态工作者 (Stateful Workers): 任务执行依赖于之前的状态。这使得水平扩展更复杂,需要考虑状态的持久化、复制和同步。通常建议尽可能设计为无状态,将状态管理推到外部服务(如数据库、缓存)。
5.8 人机协作 (Human-in-the-Loop)
有些复杂任务,Agent可能无法完全自主完成,需要人类的介入。工作者抽象可以扩展,允许Orchestrator将任务分配给“人类工作者”,或者在Agent完成一部分工作后,将任务暂停并等待人类审批或补充信息。
6. 实际应用场景
工作者抽象框架在许多领域都有广泛的应用:
- AI Agent 系统: 构建大型AI助手、智能客服、内容生成平台。不同的Agent负责自然语言理解、知识检索、图像生成、代码编写等,Orchestrator负责协调它们完成复杂的对话或创作任务。
- 数据处理管道 (ETL/MLOps): 各种数据清洗、转换、特征工程、模型训练、模型推理等步骤都可以封装成Worker。例如,一个Worker负责从Kafka拉取数据,另一个Worker执行数据预处理,第三个Worker进行模型预测。
- DevOps 自动化: 自动化部署、测试、监控和告警。例如,一个Worker负责执行Terraform脚本,另一个Worker负责运行Jenkins流水线,第三个Worker负责将监控数据推送到Slack。
- RPA (Robotic Process Automation): 模拟人类操作进行自动化。每个Worker可以代表一个自动化流程,如自动填写表单、处理邮件附件、生成报告等。
- 分布式任务队列: 作为底层任务执行引擎,支持各种异步任务的处理。
7. 挑战与展望
尽管工作者抽象带来了巨大的便利和效率提升,但仍然面临一些挑战,并有广阔的未来发展空间:
- 语义互操作性: 仅仅是API接口的语法统一还不够,更深层次的挑战在于语义的互操作性。如何确保不同的Worker对“输入数据”、“任务结果”的理解是一致的?这需要更强大的本体论(Ontology)和语义描述能力,甚至可能需要统一的领域特定语言(DSL)。
- 动态能力发现与适配: 理想的工作者应能动态地调整其能力描述,Orchestrator也能在运行时根据任务的上下文智能地发现和组合Worker。
- 信任与验证: 在一个开放的Agent生态系统中,如何验证Worker的真实性、安全性和性能?尤其是在跨组织协作时,信任机制至关重要。
- 自组织Agent团队: 终极目标是实现Agent的自组织和自愈能力。它们能够根据环境变化和任务需求,自主地发现、协作、甚至创建新的Worker来完成目标。
结语
工作者抽象为我们提供了一种强大的范式,用于构建高度模块化、可扩展和智能的分布式系统。通过标准化接口,我们能够将各种异构的Agent转化为即插即用的工作者,极大地降低了系统集成的复杂性,提升了开发效率和系统的韧性。这不仅是技术架构上的进步,更是开启未来智能协作、构建真正弹性自治系统大门的钥匙。