好的,各位观众,欢迎来到今天的“Python asyncio
任务管理:取消、超时与异常处理”讲座!今天咱们不搞虚的,直接上干货,用最通俗的语言,最实在的代码,把 asyncio
任务管理的几个重要方面给它扒个精光。
开场白:asyncio
任务,你的“打工人”
在 asyncio
的世界里,任务就像你的“打工人”,你给它们安排工作(协程),它们兢兢业业地执行。但“打工人”也可能摸鱼(卡死),也可能犯错(抛出异常)。作为“老板”,你得学会管理它们,及时取消摸鱼的,处理犯错的,才能保证整个项目的稳定运行。
第一部分:取消任务——“炒鱿鱼”的艺术
取消任务,说白了就是“炒鱿鱼”,让那些不再需要的任务提前结束。asyncio
提供了 Task.cancel()
方法来实现这个功能。
-
Task.cancel()
的基本用法import asyncio async def my_task(): try: print("Task started") await asyncio.sleep(5) # 模拟耗时操作 print("Task finished") except asyncio.CancelledError: print("Task cancelled") async def main(): task = asyncio.create_task(my_task()) await asyncio.sleep(1) # 等待 1 秒 print("Cancelling task...") task.cancel() # 取消任务 try: await task # 等待任务结束 except asyncio.CancelledError: print("CancelledError caught in main") if __name__ == "__main__": asyncio.run(main())
这段代码模拟了一个耗时 5 秒的任务。在任务开始 1 秒后,我们调用
task.cancel()
取消了它。注意,Task.cancel()
只是给任务发送一个取消的信号,任务本身需要配合,捕获asyncio.CancelledError
异常才能真正停止执行。重点:
Task.cancel()
不会立即停止任务,而是抛出一个asyncio.CancelledError
异常。- 协程需要捕获
asyncio.CancelledError
才能响应取消操作。 - 如果协程没有捕获
asyncio.CancelledError
,取消操作会一直传播到调用栈的顶层。
-
取消任务的进阶技巧
-
检查任务是否已取消:
Task.cancelled()
在执行一些重要操作之前,可以先检查任务是否已经被取消,避免浪费资源。
async def my_task(): try: print("Task started") for i in range(10): if asyncio.current_task().cancelled(): # 检查任务是否已取消 print("Task is cancelled, exiting loop") break await asyncio.sleep(0.5) print(f"Iteration {i}") print("Task finished") except asyncio.CancelledError: print("Task cancelled")
-
确保任务最终完成:
asyncio.shield()
有时候,你希望某个任务即使被取消,也要执行到最后,比如清理资源。
asyncio.shield()
可以防止任务被外部取消,保证任务的完整性。async def cleanup(): print("Cleaning up resources...") await asyncio.sleep(1) # 模拟清理操作 print("Cleanup finished") async def my_task(): try: print("Task started") await asyncio.shield(cleanup()) # 防止 cleanup() 被取消 print("Task finished") except asyncio.CancelledError: print("Task cancelled") async def main(): task = asyncio.create_task(my_task()) await asyncio.sleep(1) print("Cancelling task...") task.cancel() try: await task except asyncio.CancelledError: print("CancelledError caught in main") if __name__ == "__main__": asyncio.run(main())
在这个例子中,即使
my_task()
被取消,cleanup()
也会完整执行。
-
第二部分:超时控制——“Deadline” 的重要性
给任务设置一个“Deadline”,超过时间就自动取消,这是避免任务卡死的有效手段。asyncio
提供了 asyncio.wait_for()
和 asyncio.timeout()
来实现超时控制。
-
asyncio.wait_for()
的基本用法import asyncio async def my_task(): print("Task started") await asyncio.sleep(5) # 模拟耗时操作 print("Task finished") return "Task completed successfully" async def main(): try: result = await asyncio.wait_for(my_task(), timeout=2) # 设置超时时间为 2 秒 print(f"Task result: {result}") except asyncio.TimeoutError: print("Task timed out") if __name__ == "__main__": asyncio.run(main())
这段代码设置了
my_task()
的超时时间为 2 秒。如果my_task()
在 2 秒内没有完成,就会抛出asyncio.TimeoutError
异常。重点:
asyncio.wait_for()
会在超时后抛出asyncio.TimeoutError
异常。- 超时后,任务会被自动取消。
-
asyncio.timeout()
的基本用法asyncio.timeout()
是 Python 3.11 引入的新特性,它提供了一种更优雅的超时控制方式,使用上下文管理器。import asyncio async def my_task(): print("Task started") await asyncio.sleep(5) # 模拟耗时操作 print("Task finished") return "Task completed successfully" async def main(): try: async with asyncio.timeout(2): # 设置超时时间为 2 秒 result = await my_task() print(f"Task result: {result}") except TimeoutError: print("Task timed out") if __name__ == "__main__": asyncio.run(main())
asyncio.timeout()
的效果和asyncio.wait_for()
类似,但代码更简洁,更易读。 -
超时控制的进阶技巧
-
动态调整超时时间
根据任务的实际情况,动态调整超时时间可以提高程序的灵活性。
async def my_task(expected_duration): print("Task started") await asyncio.sleep(expected_duration) print("Task finished") return "Task completed successfully" async def main(): try: # 假设预期时间是 expected_duration expected_duration = 3 result = await asyncio.wait_for(my_task(expected_duration), timeout=expected_duration * 1.5) # 超时时间是预期时间的1.5倍 print(f"Task result: {result}") except asyncio.TimeoutError: print("Task timed out") if __name__ == "__main__": asyncio.run(main())
-
超时后重试
如果任务因为超时而失败,可以尝试重新执行,但要注意避免无限重试。
async def my_task(): print("Task started") await asyncio.sleep(5) # 模拟耗时操作 print("Task finished") return "Task completed successfully" async def main(): max_retries = 3 for i in range(max_retries): try: result = await asyncio.wait_for(my_task(), timeout=2) print(f"Task result: {result}") break # 成功完成,退出循环 except asyncio.TimeoutError: print(f"Task timed out, retrying ({i+1}/{max_retries})") else: print("Task failed after multiple retries") if __name__ == "__main__": asyncio.run(main())
-
第三部分:异常处理——“擦屁股”的艺术
任务在执行过程中可能会抛出各种异常,作为“老板”,你得学会处理这些异常,避免程序崩溃。
-
try...except
的基本用法import asyncio async def my_task(): try: print("Task started") # 模拟一个可能抛出异常的操作 result = 1 / 0 # 除以 0,会抛出 ZeroDivisionError print("Task finished") return result except ZeroDivisionError: print("Caught ZeroDivisionError") return None async def main(): task = asyncio.create_task(my_task()) result = await task print(f"Task result: {result}") if __name__ == "__main__": asyncio.run(main())
这段代码在
my_task()
中使用了try...except
块,捕获了ZeroDivisionError
异常。重点:
try...except
块可以捕获任务中抛出的异常。- 在
except
块中,可以进行异常处理,比如记录日志、重试操作等。 - 如果任务中没有捕获异常,异常会传播到调用栈的顶层。
-
处理未捕获的异常:
asyncio.get_event_loop().set_exception_handler()
有时候,你可能无法预知任务会抛出什么异常,或者你希望统一处理所有未捕获的异常。
asyncio
提供了set_exception_handler()
方法,可以设置一个全局的异常处理函数。import asyncio def exception_handler(loop, context): print(f"Exception caught: {context['message']}") exception = context.get("exception") if exception: print(f"Exception type: {type(exception)}") print(f"Exception value: {exception}") async def my_task(): print("Task started") # 模拟一个可能抛出异常的操作 result = 1 / 0 # 除以 0,会抛出 ZeroDivisionError print("Task finished") return result async def main(): loop = asyncio.get_event_loop() loop.set_exception_handler(exception_handler) # 设置全局异常处理函数 task = asyncio.create_task(my_task()) await task if __name__ == "__main__": asyncio.run(main())
这段代码设置了一个全局的异常处理函数
exception_handler()
,当my_task()
抛出ZeroDivisionError
异常时,exception_handler()
会被调用。重点:
set_exception_handler()
可以设置一个全局的异常处理函数。- 异常处理函数接收两个参数:
loop
和context
。 context
包含异常的信息,比如异常类型、异常值等。
-
异常处理的进阶技巧
-
自定义异常类型
为了更好地组织和管理异常,可以自定义异常类型。
class MyCustomError(Exception): pass async def my_task(): try: print("Task started") # 模拟一个抛出自定义异常的操作 raise MyCustomError("Something went wrong") print("Task finished") except MyCustomError as e: print(f"Caught MyCustomError: {e}") async def main(): task = asyncio.create_task(my_task()) await task if __name__ == "__main__": asyncio.run(main())
-
使用
asyncio.gather()
处理多个任务的异常asyncio.gather()
可以同时运行多个任务,并返回所有任务的结果。如果其中一个任务抛出异常,asyncio.gather()
会立即取消所有其他任务,并抛出第一个抛出的异常。import asyncio async def task1(): await asyncio.sleep(1) return "Task 1 completed" async def task2(): await asyncio.sleep(0.5) raise ValueError("Task 2 failed") async def task3(): await asyncio.sleep(2) return "Task 3 completed" async def main(): try: results = await asyncio.gather(task1(), task2(), task3()) print(f"Results: {results}") except ValueError as e: print(f"Caught ValueError: {e}") if __name__ == "__main__": asyncio.run(main())
在这个例子中,
task2()
抛出了ValueError
异常,asyncio.gather()
会立即取消task1()
和task3()
,并抛出ValueError
异常。
-
总结:任务管理的“三板斧”
今天咱们学习了 asyncio
任务管理的“三板斧”:
- 取消任务: 使用
Task.cancel()
取消不再需要的任务。 - 超时控制: 使用
asyncio.wait_for()
或asyncio.timeout()
给任务设置“Deadline”。 - 异常处理: 使用
try...except
块捕获任务中抛出的异常,使用set_exception_handler()
设置全局异常处理函数。
掌握了这“三板斧”,你就能更好地管理 asyncio
任务,保证程序的稳定性和可靠性。
最后,送给大家一份小礼物:asyncio
任务管理常见问题及解决方案
问题 | 解决方案 |
---|---|
任务无法取消 | 确保协程捕获了 asyncio.CancelledError 异常,并在捕获到异常后立即停止执行。 |
超时后任务继续执行 | 确保在捕获到 asyncio.TimeoutError 异常后,立即停止执行。 |
无法捕获任务中抛出的异常 | 检查 try...except 块是否覆盖了所有可能抛出异常的代码,或者使用 set_exception_handler() 设置全局异常处理函数。 |
asyncio.gather() 抛出异常后,其他任务没有被取消 |
asyncio.gather() 默认会取消所有其他任务。如果你希望即使其中一个任务抛出异常,其他任务仍然继续执行,可以使用 asyncio.gather(*tasks, return_exceptions=True) 。 |
如何在取消任务前执行清理操作 | 使用 asyncio.shield() 保护清理操作,或者在 finally 块中执行清理操作。 |
如何在任务中安全地执行阻塞操作 | 将阻塞操作放在单独的线程或进程中执行,然后使用 asyncio.to_thread() 或 asyncio.run_in_executor() 将结果返回给 asyncio 任务。 |
希望这份小礼物能帮助大家更好地理解和使用 asyncio
任务管理。
今天的讲座就到这里,谢谢大家!如果大家有什么问题,欢迎在评论区留言,我会尽力解答。咱们下次再见!