Python `asyncio` 信号处理:异步程序中的优雅退出

好的,各位观众老爷,欢迎来到“Python asyncio 信号处理:异步程序中的优雅退出”专场!今天咱们不搞虚的,直接上干货,聊聊如何在风骚的异步程序里,优雅地挥手告别。

开场白:信号是啥?为啥要优雅退出?

想象一下,你的异步程序正在服务器上欢快地跑着,处理着成千上万的请求。突然,服务器管理员一个手抖,执行了 kill -9。你的程序瞬间暴毙,数据丢失,用户体验直线下降……这画面太美,我不敢看。

这就是为什么我们需要优雅退出。优雅退出,简单来说,就是在程序收到终止信号(比如 SIGINTSIGTERM)时,不是立刻断电,而是:

  1. 停止接受新的任务。
  2. 完成当前正在执行的任务。
  3. 清理资源(关闭文件、数据库连接等)。
  4. 然后,再体面地退出。

这样,就能最大程度地减少数据丢失和错误,给用户一个交代。

信号是个啥?

信号是操作系统用来通知进程发生了某些事件的机制。常见的信号有:

  • SIGINT (2): 通常由 Ctrl+C 产生,表示用户想中断程序。
  • SIGTERM (15): 终止信号,通常由 kill 命令发送,表示程序应该终止。
  • SIGKILL (9): 强制终止信号,程序无法捕获和处理,直接被操作系统杀死。这就是我们最不想看到的“暴毙”场景。

为啥 asyncio 里的信号处理比较特殊?

asyncio 程序是基于事件循环的。所有的任务都在事件循环里排队执行。如果我们直接用传统的信号处理方式,可能会遇到以下问题:

  • 阻塞事件循环: 信号处理函数可能会执行耗时操作,阻塞事件循环,导致其他任务无法执行。
  • 数据竞争: 信号处理函数可能会修改共享资源,而其他任务也在同时访问这些资源,导致数据竞争和错误。

所以,我们需要用 asyncio 自己的方式来处理信号,确保事件循环不会被阻塞,并且避免数据竞争。

正文:asyncio 信号处理的正确姿势

asyncio 提供了 loop.add_signal_handler() 方法来注册信号处理函数。这个方法允许我们将一个协程函数注册为信号处理函数。这样,当收到信号时,事件循环会调度这个协程函数来执行。

示例 1:最简单的信号处理

import asyncio
import signal
import sys

async def handle_sigint(loop):
    print("收到 SIGINT 信号,准备退出...")
    # 在这里可以做一些清理工作,比如关闭文件、数据库连接等
    await asyncio.sleep(1)  # 模拟清理工作
    loop.stop()  # 停止事件循环

async def main():
    loop = asyncio.get_running_loop()

    # 注册 SIGINT 信号处理函数
    loop.add_signal_handler(signal.SIGINT, lambda: asyncio.create_task(handle_sigint(loop)))

    print("程序正在运行...")
    try:
        while True:
            await asyncio.sleep(0.1)
    except asyncio.CancelledError:
        print("任务被取消")

if __name__ == "__main__":
    asyncio.run(main())

这段代码做了什么:

  1. 定义了一个 handle_sigint 协程函数,用于处理 SIGINT 信号。
  2. main 函数中,获取当前的事件循环。
  3. 使用 loop.add_signal_handler() 注册 SIGINT 信号处理函数。注意,这里我们用 lambda 创建了一个匿名函数,它会创建一个新的task来执行handle_sigint。这是非常重要的一步,保证了信号处理函数不会阻塞事件循环。
  4. 程序进入一个无限循环,模拟程序的运行。
  5. 当收到 SIGINT 信号时,handle_sigint 协程函数会被调度执行,打印一条消息,模拟清理工作,然后停止事件循环。

运行这个程序,然后在终端按下 Ctrl+C,你会看到程序优雅地退出了。

示例 2:处理多个信号

import asyncio
import signal

async def handle_signal(signal_num, loop):
    print(f"收到信号 {signal_num},准备退出...")
    # 在这里可以做一些清理工作
    await asyncio.sleep(1)
    loop.stop()

async def main():
    loop = asyncio.get_running_loop()

    # 注册多个信号处理函数
    for sig in (signal.SIGINT, signal.SIGTERM):
        loop.add_signal_handler(sig, lambda sig=sig: asyncio.create_task(handle_signal(sig, loop)))

    print("程序正在运行...")
    try:
        while True:
            await asyncio.sleep(0.1)
    except asyncio.CancelledError:
        print("任务被取消")

if __name__ == "__main__":
    asyncio.run(main())

这个例子演示了如何处理多个信号。我们循环注册了 SIGINTSIGTERM 信号的处理函数。当收到任何一个信号时,程序都会优雅地退出。

示例 3:取消正在执行的任务

如果我们的程序中有一些长时间运行的任务,我们希望在收到信号时,能够取消这些任务,而不是等到它们完成。可以这样做:

import asyncio
import signal

async def long_running_task():
    print("长时间运行的任务开始...")
    try:
        for i in range(10):
            print(f"任务运行中... {i}")
            await asyncio.sleep(1)
    except asyncio.CancelledError:
        print("长时间运行的任务被取消")
    finally:
        print("长时间运行的任务清理工作...")

async def handle_sigint(loop, task):
    print("收到 SIGINT 信号,准备退出...")
    print("取消长时间运行的任务...")
    task.cancel()  # 取消任务
    await asyncio.sleep(1)  # 等待任务取消
    loop.stop()

async def main():
    loop = asyncio.get_running_loop()

    task = asyncio.create_task(long_running_task())

    # 注册 SIGINT 信号处理函数
    loop.add_signal_handler(signal.SIGINT, lambda: asyncio.create_task(handle_sigint(loop, task)))

    print("程序正在运行...")
    try:
        await task
    except asyncio.CancelledError:
        print("主任务被取消")

if __name__ == "__main__":
    asyncio.run(main())

这个例子中,我们创建了一个长时间运行的任务 long_running_task。当收到 SIGINT 信号时,我们首先取消这个任务,然后等待它完成清理工作,最后停止事件循环。

进阶:优雅退出中的注意事项

  1. 避免阻塞事件循环: 信号处理函数必须是非阻塞的。永远不要在信号处理函数中执行耗时操作。如果需要执行耗时操作,请将它们放到一个单独的协程函数中,并使用 asyncio.create_task() 来调度它。

  2. 处理 CancelledError 异常: 当一个任务被取消时,会抛出 CancelledError 异常。我们需要在任务中捕获这个异常,并执行清理工作。

  3. 使用 asyncio.gather() 等待多个任务完成: 如果你的程序中有多个任务需要等待完成,可以使用 asyncio.gather() 来并发地等待它们。

    import asyncio
    import signal
    
    async def task1():
       print("任务 1 开始...")
       await asyncio.sleep(2)
       print("任务 1 完成")
    
    async def task2():
       print("任务 2 开始...")
       await asyncio.sleep(3)
       print("任务 2 完成")
    
    async def handle_sigint(loop, tasks):
       print("收到 SIGINT 信号,准备退出...")
       print("取消所有任务...")
       for task in tasks:
           task.cancel()
       await asyncio.gather(*tasks, return_exceptions=True)  # 等待所有任务完成或被取消
       loop.stop()
    
    async def main():
       loop = asyncio.get_running_loop()
    
       tasks = [asyncio.create_task(task1()), asyncio.create_task(task2())]
    
       # 注册 SIGINT 信号处理函数
       loop.add_signal_handler(signal.SIGINT, lambda: asyncio.create_task(handle_sigint(loop, tasks)))
    
       print("程序正在运行...")
       try:
           await asyncio.gather(*tasks)
       except asyncio.CancelledError:
           print("主任务被取消")
    
    if __name__ == "__main__":
       asyncio.run(main())
  4. 使用 asyncio.shield() 保护任务: 有时候,我们希望某些任务即使在收到信号时也不被取消。可以使用 asyncio.shield() 来保护这些任务。

    import asyncio
    import signal
    
    async def protected_task():
       print("受保护的任务开始...")
       await asyncio.sleep(5)  # 模拟一个需要长时间运行的任务
       print("受保护的任务完成")
    
    async def handle_sigint(loop, task):
       print("收到 SIGINT 信号,准备退出...")
       print("尝试取消受保护的任务...")
       task.cancel()  # 尝试取消任务
       try:
           await task  # 等待任务完成或被取消
       except asyncio.CancelledError:
           print("受保护的任务被取消")
       except asyncio.exceptions.CancelledError:
           print("受保护的任务被取消 - asyncio.exceptions.CancelledError")
    
       loop.stop()
    
    async def main():
       loop = asyncio.get_running_loop()
    
       # 使用 asyncio.shield() 保护任务
       protected_task_instance = asyncio.create_task(protected_task())
       shielded_task = asyncio.shield(protected_task_instance)
    
       # 注册 SIGINT 信号处理函数
       loop.add_signal_handler(signal.SIGINT, lambda: asyncio.create_task(handle_sigint(loop, shielded_task)))
    
       print("程序正在运行...")
       try:
           await shielded_task
       except asyncio.CancelledError:
           print("主任务被取消")
    
    if __name__ == "__main__":
       asyncio.run(main())

    在上面的例子中,即使我们尝试取消 shielded_task,它也会继续运行,直到完成。asyncio.shield() 就像一个盾牌,保护任务不被取消。 需要注意的是,protected_task_instance是可以被取消的,如果直接取消它,也会生效。

  5. 清理资源: 在退出程序之前,一定要清理所有资源,比如关闭文件、数据库连接等。这可以避免资源泄露和数据损坏。

  6. 日志记录: 在信号处理函数中,记录重要的事件,比如收到信号的时间、清理资源的状态等。这可以帮助我们诊断问题。

最佳实践:一个完整的优雅退出示例

import asyncio
import signal
import logging

# 配置日志
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
logger = logging.getLogger(__name__)

class MyApp:
    def __init__(self):
        self.db_connection = None
        self.running = True

    async def connect_db(self):
        logger.info("Connecting to database...")
        await asyncio.sleep(1)  # 模拟连接数据库
        self.db_connection = "Connected"
        logger.info("Connected to database")

    async def close_db(self):
        if self.db_connection:
            logger.info("Closing database connection...")
            await asyncio.sleep(1)  # 模拟关闭数据库连接
            self.db_connection = None
            logger.info("Database connection closed")

    async def worker(self):
        logger.info("Worker started")
        while self.running:
            logger.info("Doing some work...")
            await asyncio.sleep(2)
        logger.info("Worker stopped")

    async def handle_shutdown(self, loop):
        logger.info("Received shutdown signal, initiating graceful shutdown...")
        self.running = False  # Stop the worker loop

        # Gather and cancel all tasks
        tasks = asyncio.all_tasks(loop)
        for task in tasks:
            task.cancel()
        results = await asyncio.gather(*tasks, return_exceptions=True)

        # Log any exceptions from cancelled tasks
        for result in results:
            if isinstance(result, asyncio.CancelledError):
                logger.info("Task cancelled successfully.")
            elif isinstance(result, Exception):
                logger.error(f"Task raised an exception: {result}")

        await self.close_db()  # Close the database connection
        logger.info("Graceful shutdown complete.")
        loop.stop()

    async def run(self):
        loop = asyncio.get_running_loop()

        # Connect to the database
        await self.connect_db()

        # Create a worker task
        worker_task = asyncio.create_task(self.worker())

        # Register signal handlers
        for sig in (signal.SIGINT, signal.SIGTERM):
            loop.add_signal_handler(sig, lambda sig=sig: asyncio.create_task(self.handle_shutdown(loop)))

        try:
            await worker_task  # Await the worker task
        except asyncio.CancelledError:
            logger.info("Main task cancelled.")
        finally:
            if self.db_connection:
                await self.close_db()  # Ensure database connection is closed even if cancelled
            logger.info("Application exiting.")

if __name__ == "__main__":
    app = MyApp()
    asyncio.run(app.run())

这个例子更加完善,包括了:

  • 日志记录
  • 数据库连接和关闭
  • 一个工作协程
  • 信号处理函数,用于停止工作协程、关闭数据库连接、并停止事件循环

总结:优雅退出的重要性

优雅退出是编写健壮的 asyncio 程序的关键。它可以避免数据丢失、资源泄露和用户体验下降。通过正确地处理信号,我们可以让我们的程序在各种情况下都能够体面地退出。

希望今天的讲解对大家有所帮助!下次再见!

发表回复

您的邮箱地址不会被公开。 必填项已用 * 标注