各位同仁,各位技术爱好者,欢迎来到本次关于“深入异步节点执行:利用 Python asyncio 实现图结构中高并发外部工具调用”的讲座。
在当今高度互联的软件系统中,我们经常面临这样的挑战:需要协调执行大量独立或依赖的计算任务。这些任务可能涉及调用外部API、执行耗时的批处理脚本、访问数据库或与其他微服务通信。当这些操作是同步且阻塞的,它们会严重限制系统的吞吐量和响应速度。想象一个复杂的业务流程,它由多个步骤构成,这些步骤之间存在明确的先后关系,形成一个有向无环图(DAG)。如果每个步骤都耗时且需要等待,那么整个流程将变得异常缓慢。
今天,我们将深入探讨如何利用 Python 强大的 asyncio 库,结合图结构的思想,构建一个高效、高并发的执行引擎,专门用于管理和调度这些外部工具调用。我们的目标是,即使面对成千上万个需要执行的节点,也能以近乎同时的方式,充分利用系统资源,同时优雅地处理依赖、错误和并发限制。
1. 异步编程的基石:asyncio 概览
在深入图结构之前,我们必须先巩固对 asyncio 核心概念的理解。asyncio 是 Python 用于编写并发代码的库,它使用协程(coroutines)作为其主要构建块。
1.1 async 和 await:协程的声明与暂停
async 关键字用于定义一个协程函数。当这样的函数被调用时,它不会立即执行其代码,而是返回一个协程对象。
import asyncio
import time
import random
async def greet(name, delay):
print(f"[{time.time():.2f}] {name}: 准备问候...")
await asyncio.sleep(delay) # 模拟I/O操作,非阻塞
print(f"[{time.time():.2f}] {name}: 问候完成,耗时 {delay:.2f} 秒。")
return f"Hello, {name}!"
async def main_simple():
print(f"[{time.time():.2f}] 主程序开始。")
# 直接调用协程函数不会执行它,而是返回一个协程对象
coro1 = greet("Alice", 2)
coro2 = greet("Bob", 1)
# await 关键字用于等待一个协程的完成
# 这里会顺序执行,因为 await 会阻塞当前协程直到被等待的协程完成
result_alice = await coro1
result_bob = await coro2
print(f"[{time.time():.2f}] {result_alice}")
print(f"[{time.time():.2f}] {result_bob}")
print(f"[{time.time():.2f}] 主程序结束。")
# 运行主协程
# asyncio.run(main_simple())
运行 main_simple() 你会发现,即使 Bob 的延迟更短,它也必须等待 Alice 完成。这并不是我们想要的并发。
1.2 asyncio.Task 和 asyncio.gather:实现并发
为了实现真正的并发,我们需要将协程包装成 Task。Task 是 asyncio 调度器(事件循环)的单位。一旦一个协程被包装成 Task,它就会被提交到事件循环中运行,而不会阻塞当前协程。
asyncio.gather 是一个非常实用的工具,它允许我们同时运行多个协程或 Task,并等待它们全部完成,收集它们的结果。
async def main_concurrent():
print(f"[{time.time():.2f}] 主程序开始。")
# 创建Task,这些Task会立即被调度到事件循环中运行
task1 = asyncio.create_task(greet("Alice", 2))
task2 = asyncio.create_task(greet("Bob", 1))
task3 = asyncio.create_task(greet("Charlie", 3))
# asyncio.gather 等待所有Task完成
# 结果的顺序与传入Task的顺序一致
results = await asyncio.gather(task1, task2, task3)
print(f"[{time.time():.2f}] 所有问候完成。")
for res in results:
print(f"[{time.time():.2f}] {res}")
print(f"[{time.time():.2f}] 主程序结束。")
# asyncio.run(main_concurrent())
通过 asyncio.gather,Alice、Bob 和 Charlie 的问候会并发进行,总耗时将由最长的任务(Charlie 的3秒)决定。
1.3 事件循环(Event Loop)
asyncio 的核心是事件循环。它负责:
- 注册协程和任务。
- 监听I/O事件(例如网络套接字的数据就绪)。
- 在协程等待I/O时,切换到其他“就绪”的协程执行。
- 当I/O完成时,唤醒等待的协程。
asyncio.run() 是运行事件循环的便捷方式,它负责创建、运行和关闭事件循环。
1.4 run_in_executor:异步桥接同步阻塞代码
这是实现“高并发外部工具调用”的关键。Python 的 asyncio 完美地处理I/O密集型任务,因为它在等待I/O时可以切换到其他协程。然而,如果你的“外部工具调用”是同步阻塞的(例如,调用一个长时间运行的同步HTTP库、执行一个CPU密集型计算、或者调用一个同步的数据库驱动),直接在 async 函数中 await 这样的同步代码会阻塞整个事件循环,从而失去并发优势。
为了解决这个问题,asyncio 提供了 loop.run_in_executor() 方法。它允许你在一个单独的线程池(ThreadPoolExecutor)或进程池(ProcessPoolExecutor)中运行同步阻塞函数,而不会阻塞主事件循环。
ThreadPoolExecutor: 适合I/O密集型但仍是阻塞的同步函数,例如调用requests库或time.sleep。由于 Python 的 GIL (Global Interpreter Lock),它不适合CPU密集型任务。ProcessPoolExecutor: 适合CPU密集型任务,因为它在不同的进程中运行,绕过了 GIL 的限制。
import concurrent.futures
# 模拟一个同步的、阻塞的外部工具调用
def sync_blocking_external_tool(tool_id, duration):
print(f"[{time.time():.2f}] 外部工具 {tool_id}: 开始执行(同步阻塞)。")
time.sleep(duration) # 使用 time.sleep 模拟真正的阻塞
print(f"[{time.time():.2f}] 外部工具 {tool_id}: 执行完成(同步阻塞),耗时 {duration:.2f} 秒。")
return f"Tool {tool_id} result after {duration:.2f}s"
async def main_with_executor():
print(f"[{time.time():.2f}] 主程序开始。")
loop = asyncio.get_running_loop() # 获取当前事件循环
# 创建一个线程池执行器。默认的ThreadPoolExecutor线程数量通常足够
# 也可以显式指定 max_workers
# executor = concurrent.futures.ThreadPoolExecutor(max_workers=5)
tasks = []
for i in range(5):
# 将同步阻塞函数提交给线程池执行,并等待其完成
task = loop.run_in_executor(
None, # 使用默认的ThreadPoolExecutor
sync_blocking_external_tool,
f"Tool-{i+1}", random.uniform(1, 3) # 传递参数给同步函数
)
tasks.append(task)
results = await asyncio.gather(*tasks)
print(f"[{time.time():.2f}] 所有外部工具调用完成。")
for res in results:
print(f"[{time.time():.2f}] {res}")
print(f"[{time.time():.2f}] 主程序结束。")
# asyncio.run(main_with_executor())
运行 main_with_executor() 你会发现,尽管 sync_blocking_external_tool 内部使用了 time.sleep(阻塞),但由于 run_in_executor 将其放在了单独的线程中执行,主事件循环并没有被阻塞,多个工具调用能够并发进行。这是实现高并发外部工具调用的关键技术。
1.5 asyncio.Semaphore:并发控制
在处理“极高并发”时,我们常常需要限制同时运行的任务数量,以避免过度消耗资源(例如,避免同时打开太多文件句柄、避免对外部API发起过多的请求导致其过载、或者避免耗尽数据库连接池)。asyncio.Semaphore (信号量) 是一个非常有用的同步原语,它可以限制同时访问某个资源的协程数量。
信号量有一个内部计数器,当协程需要访问受保护的资源时,它会尝试 acquire() 信号量。如果计数器大于零,它会递减计数器并允许协程继续。如果计数器为零,协程将暂停,直到其他协程 release() 信号量。
async def limited_task(task_id, duration, semaphore):
async with semaphore: # 尝试获取信号量,相当于 await semaphore.acquire()
print(f"[{time.time():.2f}] 任务 {task_id}: 获取到信号量,开始执行。")
await asyncio.sleep(duration) # 模拟I/O操作
print(f"[{time.time():.2f}] 任务 {task_id}: 执行完成,释放信号量。")
return f"Task {task_id} finished."
async def main_with_semaphore():
print(f"[{time.time():.2f}] 主程序开始,限制并发数为 3。")
# 创建一个信号量,允许最多3个协程同时执行受保护的代码块
sem = asyncio.Semaphore(3)
tasks = []
for i in range(10): # 尝试运行10个任务
tasks.append(limited_task(f"T-{i+1}", random.uniform(1, 4), sem))
results = await asyncio.gather(*tasks)
print(f"[{time.time():.2f}] 所有任务完成。")
for res in results:
print(f"[{time.time():.2f}] {res}")
print(f"[{time.time():.2f}] 主程序结束。")
# asyncio.run(main_with_semaphore())
运行 main_with_semaphore() 你会发现,虽然有10个任务,但每次最多只有3个任务在同时运行。这对于控制对外部服务的请求速率非常有效。
2. 定义问题:图结构中的节点执行
现在,我们将上述 asyncio 的能力应用到更复杂的场景:一个由相互依赖的节点组成的图。
2.1 什么是图结构节点执行?
想象一个数据处理管道或工作流:
- 节点 (Node):图中的每个顶点代表一个独立的计算单元或外部工具调用。例如,“下载数据”、“解析JSON”、“调用机器学习模型API”、“写入数据库”等。
- 边 (Edge):图中的边代表节点之间的依赖关系。如果节点 B 依赖于节点 A,那么在 A 完成并成功之前,B 不能开始执行。
- 有向无环图 (DAG):对于工作流而言,通常是 DAG,这意味着图中不能有循环依赖(否则任务永远无法完成)。
目标:给定一个 DAG 和每个节点对应的执行函数,我们希望以最高效率执行所有节点,同时尊重它们的依赖关系,并限制并发度以保护系统资源。
2.2 节点状态模型
为了管理节点的执行,我们需要跟踪每个节点的状态。一个简单的状态模型可以包括:
| 状态 | 描述 |
|---|---|
PENDING |
节点尚未开始执行,可能正在等待依赖项。 |
READY |
节点所有依赖项已完成,可以开始执行。 |
RUNNING |
节点正在执行中。 |
COMPLETED |
节点已成功执行完毕。 |
FAILED |
节点执行失败。 |
SKIPPED |
节点被跳过(例如,因上游依赖失败)。 |
3. 架构设计:AsyncGraphExecutor
我们将构建一个 AsyncGraphExecutor 类来协调整个执行过程。
3.1 核心组件
Node类: 封装每个节点的逻辑和状态。- 图表示: 使用邻接列表(字典)来存储节点及其依赖关系。
AsyncGraphExecutor: 主调度器,负责:- 初始化图和节点执行函数。
- 管理节点状态。
- 检查依赖关系。
- 创建和调度
asyncio.Task。 - 处理并发限制 (
asyncio.Semaphore)。 - 捕获和报告错误。
3.2 Node 类的设计
每个 Node 实例将包含以下属性:
id: 节点的唯一标识符。func: 节点要执行的同步或异步函数。args/kwargs: 传递给func的参数。dependencies: 一个列表,包含此节点依赖的其他节点的id。status: 当前执行状态。result: 节点执行成功后的结果。error: 节点执行失败时的错误信息。task: 关联的asyncio.Task实例。start_time/end_time: 记录执行时间。
from enum import Enum, auto
import time
import uuid
class NodeStatus(Enum):
PENDING = auto() # 待处理,可能正在等待依赖项
READY = auto() # 依赖项已满足,可以执行
RUNNING = auto() # 正在执行
COMPLETED = auto() # 执行成功
FAILED = auto() # 执行失败
SKIPPED = auto() # 因依赖失败而被跳过
class GraphNode:
def __init__(self, node_id, func, dependencies=None, args=None, kwargs=None):
self.id = node_id
self.func = func
self.dependencies = dependencies if dependencies is not None else []
self.args = args if args is not None else []
self.kwargs = kwargs if kwargs is not None else {}
self.status = NodeStatus.PENDING
self.result = None
self.error = None
self.task = None # Will hold the asyncio.Task for this node
self.start_time = None
self.end_time = None
def __repr__(self):
return f"Node(id='{self.id}', status={self.status.name}, deps={self.dependencies})"
def mark_ready(self):
if self.status == NodeStatus.PENDING:
self.status = NodeStatus.READY
def mark_running(self):
self.status = NodeStatus.RUNNING
self.start_time = time.time()
def mark_completed(self, result):
self.status = NodeStatus.COMPLETED
self.result = result
self.end_time = time.time()
def mark_failed(self, error):
self.status = NodeStatus.FAILED
self.error = error
self.end_time = time.time()
def mark_skipped(self, reason="Upstream dependency failed"):
self.status = NodeStatus.SKIPPED
self.error = reason
self.end_time = time.time()
3.3 AsyncGraphExecutor 的设计
AsyncGraphExecutor 将协调所有 GraphNode 实例的执行。
import asyncio
import concurrent.futures
import traceback
class AsyncGraphExecutor:
def __init__(self, nodes_data, max_concurrent_nodes=None, max_concurrent_external_calls=None):
"""
初始化图执行器。
:param nodes_data: 节点配置列表,每个元素是一个字典,包含 id, func, dependencies, args, kwargs。
:param max_concurrent_nodes: 限制同时运行的节点总数(不包括等待依赖的节点)。
:param max_concurrent_external_calls: 限制同时运行的外部工具调用(通过 run_in_executor)。
"""
self.nodes = {} # {node_id: GraphNode instance}
self.loop = asyncio.get_event_loop() # 获取当前事件循环
# 用于 run_in_executor 的线程池
self.executor = concurrent.futures.ThreadPoolExecutor(
max_workers=max_concurrent_external_calls if max_concurrent_external_calls else None
)
# 用于限制总节点并发数的信号量
self.node_semaphore = asyncio.Semaphore(
max_concurrent_nodes if max_concurrent_nodes else float('inf')
)
self._initialize_nodes(nodes_data)
def _initialize_nodes(self, nodes_data):
"""根据输入数据创建 GraphNode 实例。"""
for data in nodes_data:
node = GraphNode(
node_id=data['id'],
func=data['func'],
dependencies=data.get('dependencies', []),
args=data.get('args', []),
kwargs=data.get('kwargs', {})
)
self.nodes[node.id] = node
# 初始检查:所有依赖项必须是图中存在的节点
self._validate_dependencies()
def _validate_dependencies(self):
"""验证所有节点依赖项是否存在于图中。"""
for node_id, node in self.nodes.items():
for dep_id in node.dependencies:
if dep_id not in self.nodes:
raise ValueError(f"节点 '{node_id}' 依赖于不存在的节点 '{dep_id}'。")
async def _execute_single_node(self, node: GraphNode):
"""
执行单个节点的协程函数。
它会等待所有依赖项完成,然后执行节点自身的 func。
"""
async with self.node_semaphore: # 限制总节点并发数
# 1. 等待所有依赖项完成
if node.dependencies:
await asyncio.gather(
*[self.nodes[dep_id].task for dep_id in node.dependencies]
)
# 2. 检查依赖项状态
for dep_id in node.dependencies:
dep_node = self.nodes[dep_id]
if dep_node.status in {NodeStatus.FAILED, NodeStatus.SKIPPED}:
node.mark_skipped(f"上游依赖 '{dep_id}' 失败或被跳过。")
print(f"[{time.time():.2f}] 节点 '{node.id}' 被跳过,原因: {node.error}")
return # 节点跳过,不再执行 func
node.mark_running()
print(f"[{time.time():.2f}] 节点 '{node.id}': 开始执行。")
result = None
try:
# 3. 执行节点函数
# 判断 func 是异步还是同步
if asyncio.iscoroutinefunction(node.func):
# 如果是异步函数,直接 await
result = await node.func(*node.args, **node.kwargs)
else:
# 如果是同步函数,通过 executor 运行,避免阻塞事件循环
result = await self.loop.run_in_executor(
self.executor,
node.func,
*node.args,
**node.kwargs
)
node.mark_completed(result)
print(f"[{time.time():.2f}] 节点 '{node.id}': 执行成功。")
except Exception as e:
error_message = f"节点 '{node.id}' 执行失败: {e}n{traceback.format_exc()}"
node.mark_failed(error_message)
print(f"[{time.time():.2f}] 节点 '{node.id}': 执行失败。错误: {e}")
return result # 返回结果,虽然通常不会直接使用这个返回值,而是通过node.result访问
async def execute_graph(self):
"""
执行整个图。
创建所有节点的任务,并用 asyncio.gather 等待它们完成。
"""
print(f"[{time.time():.2f}] 图执行器: 开始执行所有节点。")
# 为每个节点创建并启动一个异步任务
# 每个任务内部会处理自己的依赖等待逻辑
tasks = []
for node_id, node in self.nodes.items():
node.task = asyncio.create_task(self._execute_single_node(node))
tasks.append(node.task)
# 等待所有任务完成
await asyncio.gather(*tasks, return_exceptions=True) # return_exceptions=True 确保即使有任务失败,gather 也能等待所有任务完成
print(f"[{time.time():.2f}] 图执行器: 所有节点执行完毕。")
self.executor.shutdown(wait=True) # 关闭线程池
return self.nodes
4. 外部工具调用仿真与集成
为了演示,我们创建一些模拟的外部工具调用函数。这些函数将模拟真实世界中可能出现的各种情况:同步阻塞、异步非阻塞、成功、失败等。
4.1 模拟外部工具函数
async def async_api_call(name, delay):
"""模拟一个异步的、非阻塞的API调用。"""
print(f"[{time.time():.2f}] 异步API '{name}': 开始调用,预计耗时 {delay:.2f}s。")
await asyncio.sleep(delay)
if random.random() < 0.1: # 10% 失败率
raise ValueError(f"异步API '{name}' 调用失败!")
print(f"[{time.time():.2f}] 异步API '{name}': 调用成功。")
return f"Async result for {name} after {delay:.2f}s"
def sync_db_query(query_id, duration):
"""模拟一个同步的、阻塞的数据库查询。"""
print(f"[{time.time():.2f}] 同步DB查询 '{query_id}': 开始执行,预计耗时 {duration:.2f}s。")
time.sleep(duration) # 模拟阻塞
if random.random() < 0.05: # 5% 失败率
raise ConnectionError(f"同步DB查询 '{query_id}' 数据库连接失败!")
print(f"[{time.time():.2f}] 同步DB查询 '{query_id}': 执行成功。")
return f"DB result for {query_id} after {duration:.2f}s"
def cpu_intensive_task(task_id, iterations):
"""模拟一个同步的、CPU密集型任务。"""
print(f"[{time.time():.2f}] CPU任务 '{task_id}': 开始执行,迭代 {iterations} 次。")
result = 0
for _ in range(iterations):
result += sum(i for i in range(1000)) # 模拟计算
print(f"[{time.time():.2f}] CPU任务 '{task_id}': 执行成功。")
return f"CPU result for {task_id} with value {result}"
def combine_results(result_a, result_b, result_c):
"""一个依赖于其他节点结果的普通函数。"""
print(f"[{time.time():.2f}] 组合器: 收到结果 A='{result_a}', B='{result_b}', C='{result_c}'")
combined = f"Combined: ({result_a}) + ({result_b}) + ({result_c})"
print(f"[{time.time():.2f}] 组合器: 完成。")
return combined
async def final_report(combined_data):
"""最终的异步报告生成。"""
print(f"[{time.time():.2f}] 最终报告: 正在处理 '{combined_data}'...")
await asyncio.sleep(0.5)
print(f"[{time.time():.2f}] 最终报告: 完成。")
return f"Final Report Generated: {combined_data[:50]}..."
4.2 构建一个示例图并执行
现在,我们定义一个复杂的图结构,并使用 AsyncGraphExecutor 来执行它。
async def run_example_graph():
print("n" + "="*80)
print("开始运行示例图")
print("="*80 + "n")
# 定义节点数据
# 注意:args 和 kwargs 中的值可以是静态的,也可以是动态的(例如,从上游节点结果中获取)
# 在这个实现中,我们通过手动传递上游节点的 result 来模拟动态参数
nodes_config = [
# 初始节点,无依赖
{'id': 'DownloadData', 'func': async_api_call, 'args': ['DataDownloader', 1.5]},
{'id': 'PrepareConfig', 'func': sync_db_query, 'args': ['ConfigDB', 1.0]},
{'id': 'InitCache', 'func': async_api_call, 'args': ['CacheInit', 0.8]},
# 依赖 DownloadData
{'id': 'ParseData', 'func': cpu_intensive_task, 'dependencies': ['DownloadData'], 'args': ['DataParser', 500000]},
{'id': 'ValidateSchema', 'func': async_api_call, 'dependencies': ['DownloadData'], 'args': ['SchemaValidator', 0.7]},
# 依赖 PrepareConfig
{'id': 'LoadModelParams', 'func': sync_db_query, 'dependencies': ['PrepareConfig'], 'args': ['ModelParamsDB', 2.0]},
# 依赖 InitCache
{'id': 'PrecomputeFeatures', 'func': cpu_intensive_task, 'dependencies': ['InitCache'], 'args': ['FeaturePrecompute', 700000]},
# 依赖 ParseData 和 ValidateSchema
{'id': 'TransformData', 'func': sync_db_query, 'dependencies': ['ParseData', 'ValidateSchema'], 'args': ['DataTransformerDB', 1.2]},
# 依赖 LoadModelParams 和 PrecomputeFeatures
{'id': 'RunPrediction', 'func': async_api_call, 'dependencies': ['LoadModelParams', 'PrecomputeFeatures'], 'args': ['PredictionAPI', 3.0]},
# 依赖 TransformData, RunPrediction
{'id': 'AggregateResults', 'func': combine_results, 'dependencies': ['TransformData', 'RunPrediction'], 'args': ['PlaceholderA', 'PlaceholderB', 'PlaceholderC']}, # 参数会在实际执行时替换
# 最终节点,依赖 AggregateResults
{'id': 'GenerateReport', 'func': final_report, 'dependencies': ['AggregateResults'], 'args': ['PlaceholderCombinedData']} # 参数会在实际执行时替换
]
# 为了演示传递动态参数,我们需要稍微修改 AsyncGraphExecutor 的执行逻辑
# 实际项目中,通常会有一个更完善的上下文或参数解析机制
# 这里我们模拟一下,让 combine_results 和 final_report 能够拿到上游结果
# 创建一个更灵活的节点执行函数,用于演示参数传递
async def dynamic_node_func(node: GraphNode, graph_nodes_map: dict):
"""
一个包装器,用于在执行前解析节点的动态参数。
实际中,这部分逻辑会更复杂,可能通过 JINJA 模板或专门的表达式语言实现。
"""
resolved_args = []
resolved_kwargs = {}
# 简单示例:如果参数是占位符,尝试从依赖节点的结果中获取
# 实际应用中,你需要一个明确的规则来映射上游结果到下游参数
if node.id == 'AggregateResults':
result_a = graph_nodes_map['TransformData'].result
result_b = graph_nodes_map['RunPrediction'].result
result_c = graph_nodes_map['InitCache'].result # 示例:AggregateResults 也可能需要 InitCache 的结果
resolved_args = [result_a, result_b, result_c]
elif node.id == 'GenerateReport':
combined_data = graph_nodes_map['AggregateResults'].result
resolved_args = [combined_data]
else:
resolved_args = node.args
resolved_kwargs = node.kwargs
# 判断 func 是异步还是同步
if asyncio.iscoroutinefunction(node.func):
return await node.func(*resolved_args, **resolved_kwargs)
else:
return await asyncio.get_running_loop().run_in_executor(
graph_executor.executor, # 使用执行器的线程池
node.func,
*resolved_args,
**resolved_kwargs
)
# 将 nodes_config 中的 func 替换为这个动态解析的函数
# 这样每个节点在执行时都会调用 dynamic_node_func,由它来决定如何调用原始 func
# 并传递正确的参数。这需要修改 _execute_single_node 来接受一个 context_func
# 或者直接在 _execute_single_node 内部实现参数解析。
# 为了简化,我们直接修改 _execute_single_node 内部的执行逻辑。
# 修改 _execute_single_node 来支持简单的依赖结果传递
# 在实际执行之前,需要先将原始的 _execute_single_node 替换掉
# 这里演示一个临时的修改方法,更正式的应该修改类定义
# -------------------------------------------------------------
# 临时修改 _execute_single_node 以支持依赖结果传递
# 这是一个简化处理,实际中应该设计更健壮的参数传递机制
original_execute_single_node = AsyncGraphExecutor._execute_single_node
async def _modified_execute_single_node(self, node: GraphNode):
async with self.node_semaphore:
if node.dependencies:
await asyncio.gather(*[self.nodes[dep_id].task for dep_id in node.dependencies])
for dep_id in node.dependencies:
dep_node = self.nodes[dep_id]
if dep_node.status in {NodeStatus.FAILED, NodeStatus.SKIPPED}:
node.mark_skipped(f"上游依赖 '{dep_id}' 失败或被跳过。")
print(f"[{time.time():.2f}] 节点 '{node.id}' 被跳过,原因: {node.error}")
return
node.mark_running()
print(f"[{time.time():.2f}] 节点 '{node.id}': 开始执行。")
result = None
try:
# 动态解析参数
current_args = list(node.args)
current_kwargs = dict(node.kwargs)
# 这是一个简单的占位符替换逻辑,可以根据需求扩展
# 比如,如果一个参数是字符串 'dep_id.result',则替换为对应依赖的结果
if node.id == 'AggregateResults':
current_args = [
self.nodes['TransformData'].result,
self.nodes['RunPrediction'].result,
self.nodes['InitCache'].result # 额外添加一个来自 InitCache 的结果作为示例
]
elif node.id == 'GenerateReport':
current_args = [self.nodes['AggregateResults'].result]
# 执行节点函数
if asyncio.iscoroutinefunction(node.func):
result = await node.func(*current_args, **current_kwargs)
else:
result = await self.loop.run_in_executor(
self.executor,
node.func,
*current_args,
**current_kwargs
)
node.mark_completed(result)
print(f"[{time.time():.2f}] 节点 '{node.id}': 执行成功。")
except Exception as e:
error_message = f"节点 '{node.id}' 执行失败: {e}n{traceback.format_exc()}"
node.mark_failed(error_message)
print(f"[{time.time():.2f}] 节点 '{node.id}': 执行失败。错误: {e}")
return result
AsyncGraphExecutor._execute_single_node = _modified_execute_single_node
# -------------------------------------------------------------
# 初始化执行器,限制并发度
# 假设我们最多同时运行5个节点,同时最多有3个外部阻塞调用
graph_executor = AsyncGraphExecutor(
nodes_config,
max_concurrent_nodes=5,
max_concurrent_external_calls=3
)
start_time = time.time()
final_nodes_state = await graph_executor.execute_graph()
end_time = time.time()
print(f"n" + "="*80)
print(f"图执行完毕,总耗时: {end_time - start_time:.2f} 秒")
print("节点执行状态报告:")
print("="*80)
for node_id, node in final_nodes_state.items():
duration = (node.end_time - node.start_time) if node.start_time and node.end_time else 'N/A'
print(f" Node ID: {node.id}")
print(f" Status: {node.status.name}")
print(f" Result: {str(node.result)[:100]}..." if node.result else "N/A")
if node.error:
print(f" Error: {str(node.error).splitlines()[0]}...")
print(f" Duration: {duration:.2f}s" if duration != 'N/A' else "N/A")
print("-" * 20)
# 恢复原始方法,避免影响其他测试(如果存在)
AsyncGraphExecutor._execute_single_node = original_execute_single_node
# 运行主函数
# asyncio.run(run_example_graph())
这段代码为我们构建了一个强大的异步图执行引擎:
- 依赖管理:每个节点在启动前都会等待其所有依赖节点完成。
- 高并发:
asyncio.gather结合create_task使得多个独立分支和非阻塞任务并行运行。 - 阻塞处理:
loop.run_in_executor将同步阻塞的外部工具调用移到单独的线程池中,避免阻塞事件循环。 - 并发控制:
asyncio.Semaphore限制了同时运行的节点总数以及同时进行的外部阻塞调用数量,保护了资源。 - 错误处理:捕获节点执行中的异常,并标记节点为失败。依赖失败的节点会被自动跳过。
- 状态跟踪:详细记录了每个节点的执行状态、结果和错误。
5. 性能与扩展性考量
5.1 ThreadPoolExecutor vs. ProcessPoolExecutor
在 run_in_executor 中,我们默认使用了 ThreadPoolExecutor。
ThreadPoolExecutor: 适用于 I/O 密集型任务,如网络请求、磁盘读写。由于 GIL 限制,它无法充分利用多核 CPU 进行纯计算。ProcessPoolExecutor: 适用于 CPU 密集型任务,如复杂的数据处理、科学计算。每个进程有独立的 GIL,因此可以并行利用多核。但进程创建和通信的开销比线程大。
如果你的外部工具调用主要是 CPU 密集型的,你应该考虑切换到 ProcessPoolExecutor。这可以通过在 AsyncGraphExecutor 初始化时,将 self.executor 设置为 concurrent.futures.ProcessPoolExecutor(...) 来实现。
5.2 资源限制的细粒度控制
当前 AsyncGraphExecutor 使用了两个信号量:
node_semaphore: 限制同时处于 RUNNING 状态的节点总数。这有助于控制任务调度器的负载。max_concurrent_external_calls(通过ThreadPoolExecutor的max_workers参数实现): 限制同时进行的同步阻塞外部调用数量。这对于保护外部服务或数据库连接池至关重要。
你可以根据具体需求调整这些参数。例如,如果某个外部 API 有严格的速率限制,可以为该 API 创建一个专用的 asyncio.Semaphore,并将其作为参数传递给调用该 API 的节点函数。
5.3 错误处理与重试机制
当前的错误处理机制只是简单地捕获异常并标记节点失败。在生产环境中,你可能需要更健壮的策略:
- 重试 (Retries):对于瞬时错误(例如网络抖动),可以实现指数退避(exponential backoff)的重试机制。这可以通过在
_execute_single_node内部添加一个循环和asyncio.sleep来实现,或者使用专门的重试库。 - 错误传播:当一个节点失败时,其所有下游依赖节点都将被跳过。你可以选择是否允许部分失败,或者是否需要将错误信息聚合到最终报告中。
- 死信队列 (Dead Letter Queue):将失败的任务信息发送到专门的队列,供后续分析或手动干预。
5.4 超时机制
长时间运行的任务可能会导致系统资源被长时间占用。asyncio.wait_for() 可以用来为协程设置超时。
# 示例:在 _execute_single_node 中添加超时
try:
result = await asyncio.wait_for(
# 根据 func 类型选择 await 或 run_in_executor
node.func(*current_args, **current_kwargs) if asyncio.iscoroutinefunction(node.func) else
self.loop.run_in_executor(self.executor, node.func, *current_args, **current_kwargs),
timeout=node.timeout # 假设 Node 类有一个 timeout 属性
)
node.mark_completed(result)
except asyncio.TimeoutError:
node.mark_failed(f"节点 '{node.id}' 执行超时。")
print(f"[{time.time():.2f}] 节点 '{node.id}': 执行超时。")
except Exception as e:
# ... 现有错误处理
5.5 监控与日志
在高度并发的系统中,良好的日志记录和监控至关重要。
- 详细日志:记录每个节点的开始、结束、状态变化、耗时、结果摘要和完整错误信息。
- 结构化日志:使用
json或其他结构化格式记录日志,便于日志分析工具处理。 - 度量指标:使用 Prometheus、Datadog 等工具收集每个节点的执行时间、成功/失败率、并发度等指标。
5.6 动态图与任务取消
当前的实现是针对静态 DAG 的。如果图结构在运行时动态变化(例如,根据上一个节点的结果决定下一步执行哪些节点),则需要更复杂的调度器,可能涉及 asyncio.Queue 来管理待处理的节点。
此外,asyncio.Task 提供了 cancel() 方法,可以用于取消正在运行的任务。在某些场景下,例如用户取消了整个工作流,这会非常有用。
6. 总结与展望
我们深入探讨了如何利用 Python asyncio 及其核心组件(async/await, Task, gather, run_in_executor, Semaphore)来构建一个高效的异步图执行引擎。这个引擎能够优雅地处理节点间的依赖关系,实现极高并发的外部工具调用,同时通过并发控制机制保护系统资源。
通过将同步阻塞操作卸载到独立的线程池,我们确保了事件循环的非阻塞性,从而最大化了系统的吞吐量。这种模式对于构建数据管道、工作流自动化、微服务编排等需要协调大量独立或依赖任务的场景具有极高的价值。
未来,可以进一步增强这个执行器,使其支持动态图结构、更复杂的重试策略、更细粒度的资源管理,甚至可以考虑将其扩展为分布式系统。但其核心思想——利用异步编程的优势,并巧妙地桥接同步阻塞世界——将始终是我们构建高性能、可扩展系统的基石。