Python `asyncio` 任务管理:取消、超时与异常处理

好的,各位观众,欢迎来到今天的“Python asyncio 任务管理:取消、超时与异常处理”讲座!今天咱们不搞虚的,直接上干货,用最通俗的语言,最实在的代码,把 asyncio 任务管理的几个重要方面给它扒个精光。

开场白:asyncio 任务,你的“打工人”

asyncio 的世界里,任务就像你的“打工人”,你给它们安排工作(协程),它们兢兢业业地执行。但“打工人”也可能摸鱼(卡死),也可能犯错(抛出异常)。作为“老板”,你得学会管理它们,及时取消摸鱼的,处理犯错的,才能保证整个项目的稳定运行。

第一部分:取消任务——“炒鱿鱼”的艺术

取消任务,说白了就是“炒鱿鱼”,让那些不再需要的任务提前结束。asyncio 提供了 Task.cancel() 方法来实现这个功能。

  • Task.cancel() 的基本用法

    import asyncio
    
    async def my_task():
        try:
            print("Task started")
            await asyncio.sleep(5)  # 模拟耗时操作
            print("Task finished")
        except asyncio.CancelledError:
            print("Task cancelled")
    
    async def main():
        task = asyncio.create_task(my_task())
        await asyncio.sleep(1)  # 等待 1 秒
        print("Cancelling task...")
        task.cancel()  # 取消任务
        try:
            await task  # 等待任务结束
        except asyncio.CancelledError:
            print("CancelledError caught in main")
    
    if __name__ == "__main__":
        asyncio.run(main())

    这段代码模拟了一个耗时 5 秒的任务。在任务开始 1 秒后,我们调用 task.cancel() 取消了它。注意,Task.cancel() 只是给任务发送一个取消的信号,任务本身需要配合,捕获 asyncio.CancelledError 异常才能真正停止执行。

    重点:

    • Task.cancel() 不会立即停止任务,而是抛出一个 asyncio.CancelledError 异常。
    • 协程需要捕获 asyncio.CancelledError 才能响应取消操作。
    • 如果协程没有捕获 asyncio.CancelledError,取消操作会一直传播到调用栈的顶层。
  • 取消任务的进阶技巧

    • 检查任务是否已取消:Task.cancelled()

      在执行一些重要操作之前,可以先检查任务是否已经被取消,避免浪费资源。

      async def my_task():
          try:
              print("Task started")
              for i in range(10):
                  if asyncio.current_task().cancelled():  # 检查任务是否已取消
                      print("Task is cancelled, exiting loop")
                      break
                  await asyncio.sleep(0.5)
                  print(f"Iteration {i}")
              print("Task finished")
          except asyncio.CancelledError:
              print("Task cancelled")
    • 确保任务最终完成:asyncio.shield()

      有时候,你希望某个任务即使被取消,也要执行到最后,比如清理资源。asyncio.shield() 可以防止任务被外部取消,保证任务的完整性。

      async def cleanup():
          print("Cleaning up resources...")
          await asyncio.sleep(1)  # 模拟清理操作
          print("Cleanup finished")
      
      async def my_task():
          try:
              print("Task started")
              await asyncio.shield(cleanup())  # 防止 cleanup() 被取消
              print("Task finished")
          except asyncio.CancelledError:
              print("Task cancelled")
      
      async def main():
          task = asyncio.create_task(my_task())
          await asyncio.sleep(1)
          print("Cancelling task...")
          task.cancel()
          try:
              await task
          except asyncio.CancelledError:
              print("CancelledError caught in main")
      
      if __name__ == "__main__":
          asyncio.run(main())

      在这个例子中,即使 my_task() 被取消,cleanup() 也会完整执行。

第二部分:超时控制——“Deadline” 的重要性

给任务设置一个“Deadline”,超过时间就自动取消,这是避免任务卡死的有效手段。asyncio 提供了 asyncio.wait_for()asyncio.timeout() 来实现超时控制。

  • asyncio.wait_for() 的基本用法

    import asyncio
    
    async def my_task():
        print("Task started")
        await asyncio.sleep(5)  # 模拟耗时操作
        print("Task finished")
        return "Task completed successfully"
    
    async def main():
        try:
            result = await asyncio.wait_for(my_task(), timeout=2)  # 设置超时时间为 2 秒
            print(f"Task result: {result}")
        except asyncio.TimeoutError:
            print("Task timed out")
    
    if __name__ == "__main__":
        asyncio.run(main())

    这段代码设置了 my_task() 的超时时间为 2 秒。如果 my_task() 在 2 秒内没有完成,就会抛出 asyncio.TimeoutError 异常。

    重点:

    • asyncio.wait_for() 会在超时后抛出 asyncio.TimeoutError 异常。
    • 超时后,任务会被自动取消。
  • asyncio.timeout() 的基本用法

    asyncio.timeout() 是 Python 3.11 引入的新特性,它提供了一种更优雅的超时控制方式,使用上下文管理器。

    import asyncio
    
    async def my_task():
        print("Task started")
        await asyncio.sleep(5)  # 模拟耗时操作
        print("Task finished")
        return "Task completed successfully"
    
    async def main():
        try:
            async with asyncio.timeout(2):  # 设置超时时间为 2 秒
                result = await my_task()
                print(f"Task result: {result}")
        except TimeoutError:
            print("Task timed out")
    
    if __name__ == "__main__":
        asyncio.run(main())

    asyncio.timeout() 的效果和 asyncio.wait_for() 类似,但代码更简洁,更易读。

  • 超时控制的进阶技巧

    • 动态调整超时时间

      根据任务的实际情况,动态调整超时时间可以提高程序的灵活性。

      async def my_task(expected_duration):
          print("Task started")
          await asyncio.sleep(expected_duration)
          print("Task finished")
          return "Task completed successfully"
      
      async def main():
          try:
              # 假设预期时间是 expected_duration
              expected_duration = 3
              result = await asyncio.wait_for(my_task(expected_duration), timeout=expected_duration * 1.5) # 超时时间是预期时间的1.5倍
              print(f"Task result: {result}")
          except asyncio.TimeoutError:
              print("Task timed out")
      
      if __name__ == "__main__":
          asyncio.run(main())
    • 超时后重试

      如果任务因为超时而失败,可以尝试重新执行,但要注意避免无限重试。

      async def my_task():
          print("Task started")
          await asyncio.sleep(5)  # 模拟耗时操作
          print("Task finished")
          return "Task completed successfully"
      
      async def main():
          max_retries = 3
          for i in range(max_retries):
              try:
                  result = await asyncio.wait_for(my_task(), timeout=2)
                  print(f"Task result: {result}")
                  break  # 成功完成,退出循环
              except asyncio.TimeoutError:
                  print(f"Task timed out, retrying ({i+1}/{max_retries})")
          else:
              print("Task failed after multiple retries")
      
      if __name__ == "__main__":
          asyncio.run(main())

第三部分:异常处理——“擦屁股”的艺术

任务在执行过程中可能会抛出各种异常,作为“老板”,你得学会处理这些异常,避免程序崩溃。

  • try...except 的基本用法

    import asyncio
    
    async def my_task():
        try:
            print("Task started")
            # 模拟一个可能抛出异常的操作
            result = 1 / 0  # 除以 0,会抛出 ZeroDivisionError
            print("Task finished")
            return result
        except ZeroDivisionError:
            print("Caught ZeroDivisionError")
            return None
    
    async def main():
        task = asyncio.create_task(my_task())
        result = await task
        print(f"Task result: {result}")
    
    if __name__ == "__main__":
        asyncio.run(main())

    这段代码在 my_task() 中使用了 try...except 块,捕获了 ZeroDivisionError 异常。

    重点:

    • try...except 块可以捕获任务中抛出的异常。
    • except 块中,可以进行异常处理,比如记录日志、重试操作等。
    • 如果任务中没有捕获异常,异常会传播到调用栈的顶层。
  • 处理未捕获的异常:asyncio.get_event_loop().set_exception_handler()

    有时候,你可能无法预知任务会抛出什么异常,或者你希望统一处理所有未捕获的异常。asyncio 提供了 set_exception_handler() 方法,可以设置一个全局的异常处理函数。

    import asyncio
    
    def exception_handler(loop, context):
        print(f"Exception caught: {context['message']}")
        exception = context.get("exception")
        if exception:
            print(f"Exception type: {type(exception)}")
            print(f"Exception value: {exception}")
    
    async def my_task():
        print("Task started")
        # 模拟一个可能抛出异常的操作
        result = 1 / 0  # 除以 0,会抛出 ZeroDivisionError
        print("Task finished")
        return result
    
    async def main():
        loop = asyncio.get_event_loop()
        loop.set_exception_handler(exception_handler)  # 设置全局异常处理函数
        task = asyncio.create_task(my_task())
        await task
    
    if __name__ == "__main__":
        asyncio.run(main())

    这段代码设置了一个全局的异常处理函数 exception_handler(),当 my_task() 抛出 ZeroDivisionError 异常时,exception_handler() 会被调用。

    重点:

    • set_exception_handler() 可以设置一个全局的异常处理函数。
    • 异常处理函数接收两个参数:loopcontext
    • context 包含异常的信息,比如异常类型、异常值等。
  • 异常处理的进阶技巧

    • 自定义异常类型

      为了更好地组织和管理异常,可以自定义异常类型。

      class MyCustomError(Exception):
          pass
      
      async def my_task():
          try:
              print("Task started")
              # 模拟一个抛出自定义异常的操作
              raise MyCustomError("Something went wrong")
              print("Task finished")
          except MyCustomError as e:
              print(f"Caught MyCustomError: {e}")
      
      async def main():
          task = asyncio.create_task(my_task())
          await task
      
      if __name__ == "__main__":
          asyncio.run(main())
    • 使用 asyncio.gather() 处理多个任务的异常

      asyncio.gather() 可以同时运行多个任务,并返回所有任务的结果。如果其中一个任务抛出异常,asyncio.gather() 会立即取消所有其他任务,并抛出第一个抛出的异常。

      import asyncio
      
      async def task1():
          await asyncio.sleep(1)
          return "Task 1 completed"
      
      async def task2():
          await asyncio.sleep(0.5)
          raise ValueError("Task 2 failed")
      
      async def task3():
          await asyncio.sleep(2)
          return "Task 3 completed"
      
      async def main():
          try:
              results = await asyncio.gather(task1(), task2(), task3())
              print(f"Results: {results}")
          except ValueError as e:
              print(f"Caught ValueError: {e}")
      
      if __name__ == "__main__":
          asyncio.run(main())

      在这个例子中,task2() 抛出了 ValueError 异常,asyncio.gather() 会立即取消 task1()task3(),并抛出 ValueError 异常。

总结:任务管理的“三板斧”

今天咱们学习了 asyncio 任务管理的“三板斧”:

  1. 取消任务: 使用 Task.cancel() 取消不再需要的任务。
  2. 超时控制: 使用 asyncio.wait_for()asyncio.timeout() 给任务设置“Deadline”。
  3. 异常处理: 使用 try...except 块捕获任务中抛出的异常,使用 set_exception_handler() 设置全局异常处理函数。

掌握了这“三板斧”,你就能更好地管理 asyncio 任务,保证程序的稳定性和可靠性。

最后,送给大家一份小礼物:asyncio 任务管理常见问题及解决方案

问题 解决方案
任务无法取消 确保协程捕获了 asyncio.CancelledError 异常,并在捕获到异常后立即停止执行。
超时后任务继续执行 确保在捕获到 asyncio.TimeoutError 异常后,立即停止执行。
无法捕获任务中抛出的异常 检查 try...except 块是否覆盖了所有可能抛出异常的代码,或者使用 set_exception_handler() 设置全局异常处理函数。
asyncio.gather() 抛出异常后,其他任务没有被取消 asyncio.gather() 默认会取消所有其他任务。如果你希望即使其中一个任务抛出异常,其他任务仍然继续执行,可以使用 asyncio.gather(*tasks, return_exceptions=True)
如何在取消任务前执行清理操作 使用 asyncio.shield() 保护清理操作,或者在 finally 块中执行清理操作。
如何在任务中安全地执行阻塞操作 将阻塞操作放在单独的线程或进程中执行,然后使用 asyncio.to_thread()asyncio.run_in_executor() 将结果返回给 asyncio 任务。

希望这份小礼物能帮助大家更好地理解和使用 asyncio 任务管理。

今天的讲座就到这里,谢谢大家!如果大家有什么问题,欢迎在评论区留言,我会尽力解答。咱们下次再见!

发表回复

您的邮箱地址不会被公开。 必填项已用 * 标注