好的,各位观众老爷,欢迎来到“Python asyncio
信号处理:异步程序中的优雅退出”专场!今天咱们不搞虚的,直接上干货,聊聊如何在风骚的异步程序里,优雅地挥手告别。
开场白:信号是啥?为啥要优雅退出?
想象一下,你的异步程序正在服务器上欢快地跑着,处理着成千上万的请求。突然,服务器管理员一个手抖,执行了 kill -9
。你的程序瞬间暴毙,数据丢失,用户体验直线下降……这画面太美,我不敢看。
这就是为什么我们需要优雅退出。优雅退出,简单来说,就是在程序收到终止信号(比如 SIGINT
,SIGTERM
)时,不是立刻断电,而是:
- 停止接受新的任务。
- 完成当前正在执行的任务。
- 清理资源(关闭文件、数据库连接等)。
- 然后,再体面地退出。
这样,就能最大程度地减少数据丢失和错误,给用户一个交代。
信号是个啥?
信号是操作系统用来通知进程发生了某些事件的机制。常见的信号有:
SIGINT
(2): 通常由 Ctrl+C 产生,表示用户想中断程序。SIGTERM
(15): 终止信号,通常由kill
命令发送,表示程序应该终止。SIGKILL
(9): 强制终止信号,程序无法捕获和处理,直接被操作系统杀死。这就是我们最不想看到的“暴毙”场景。
为啥 asyncio
里的信号处理比较特殊?
asyncio
程序是基于事件循环的。所有的任务都在事件循环里排队执行。如果我们直接用传统的信号处理方式,可能会遇到以下问题:
- 阻塞事件循环: 信号处理函数可能会执行耗时操作,阻塞事件循环,导致其他任务无法执行。
- 数据竞争: 信号处理函数可能会修改共享资源,而其他任务也在同时访问这些资源,导致数据竞争和错误。
所以,我们需要用 asyncio
自己的方式来处理信号,确保事件循环不会被阻塞,并且避免数据竞争。
正文:asyncio
信号处理的正确姿势
asyncio
提供了 loop.add_signal_handler()
方法来注册信号处理函数。这个方法允许我们将一个协程函数注册为信号处理函数。这样,当收到信号时,事件循环会调度这个协程函数来执行。
示例 1:最简单的信号处理
import asyncio
import signal
import sys
async def handle_sigint(loop):
print("收到 SIGINT 信号,准备退出...")
# 在这里可以做一些清理工作,比如关闭文件、数据库连接等
await asyncio.sleep(1) # 模拟清理工作
loop.stop() # 停止事件循环
async def main():
loop = asyncio.get_running_loop()
# 注册 SIGINT 信号处理函数
loop.add_signal_handler(signal.SIGINT, lambda: asyncio.create_task(handle_sigint(loop)))
print("程序正在运行...")
try:
while True:
await asyncio.sleep(0.1)
except asyncio.CancelledError:
print("任务被取消")
if __name__ == "__main__":
asyncio.run(main())
这段代码做了什么:
- 定义了一个
handle_sigint
协程函数,用于处理SIGINT
信号。 - 在
main
函数中,获取当前的事件循环。 - 使用
loop.add_signal_handler()
注册SIGINT
信号处理函数。注意,这里我们用lambda
创建了一个匿名函数,它会创建一个新的task来执行handle_sigint
。这是非常重要的一步,保证了信号处理函数不会阻塞事件循环。 - 程序进入一个无限循环,模拟程序的运行。
- 当收到
SIGINT
信号时,handle_sigint
协程函数会被调度执行,打印一条消息,模拟清理工作,然后停止事件循环。
运行这个程序,然后在终端按下 Ctrl+C,你会看到程序优雅地退出了。
示例 2:处理多个信号
import asyncio
import signal
async def handle_signal(signal_num, loop):
print(f"收到信号 {signal_num},准备退出...")
# 在这里可以做一些清理工作
await asyncio.sleep(1)
loop.stop()
async def main():
loop = asyncio.get_running_loop()
# 注册多个信号处理函数
for sig in (signal.SIGINT, signal.SIGTERM):
loop.add_signal_handler(sig, lambda sig=sig: asyncio.create_task(handle_signal(sig, loop)))
print("程序正在运行...")
try:
while True:
await asyncio.sleep(0.1)
except asyncio.CancelledError:
print("任务被取消")
if __name__ == "__main__":
asyncio.run(main())
这个例子演示了如何处理多个信号。我们循环注册了 SIGINT
和 SIGTERM
信号的处理函数。当收到任何一个信号时,程序都会优雅地退出。
示例 3:取消正在执行的任务
如果我们的程序中有一些长时间运行的任务,我们希望在收到信号时,能够取消这些任务,而不是等到它们完成。可以这样做:
import asyncio
import signal
async def long_running_task():
print("长时间运行的任务开始...")
try:
for i in range(10):
print(f"任务运行中... {i}")
await asyncio.sleep(1)
except asyncio.CancelledError:
print("长时间运行的任务被取消")
finally:
print("长时间运行的任务清理工作...")
async def handle_sigint(loop, task):
print("收到 SIGINT 信号,准备退出...")
print("取消长时间运行的任务...")
task.cancel() # 取消任务
await asyncio.sleep(1) # 等待任务取消
loop.stop()
async def main():
loop = asyncio.get_running_loop()
task = asyncio.create_task(long_running_task())
# 注册 SIGINT 信号处理函数
loop.add_signal_handler(signal.SIGINT, lambda: asyncio.create_task(handle_sigint(loop, task)))
print("程序正在运行...")
try:
await task
except asyncio.CancelledError:
print("主任务被取消")
if __name__ == "__main__":
asyncio.run(main())
这个例子中,我们创建了一个长时间运行的任务 long_running_task
。当收到 SIGINT
信号时,我们首先取消这个任务,然后等待它完成清理工作,最后停止事件循环。
进阶:优雅退出中的注意事项
-
避免阻塞事件循环: 信号处理函数必须是非阻塞的。永远不要在信号处理函数中执行耗时操作。如果需要执行耗时操作,请将它们放到一个单独的协程函数中,并使用
asyncio.create_task()
来调度它。 -
处理
CancelledError
异常: 当一个任务被取消时,会抛出CancelledError
异常。我们需要在任务中捕获这个异常,并执行清理工作。 -
使用
asyncio.gather()
等待多个任务完成: 如果你的程序中有多个任务需要等待完成,可以使用asyncio.gather()
来并发地等待它们。import asyncio import signal async def task1(): print("任务 1 开始...") await asyncio.sleep(2) print("任务 1 完成") async def task2(): print("任务 2 开始...") await asyncio.sleep(3) print("任务 2 完成") async def handle_sigint(loop, tasks): print("收到 SIGINT 信号,准备退出...") print("取消所有任务...") for task in tasks: task.cancel() await asyncio.gather(*tasks, return_exceptions=True) # 等待所有任务完成或被取消 loop.stop() async def main(): loop = asyncio.get_running_loop() tasks = [asyncio.create_task(task1()), asyncio.create_task(task2())] # 注册 SIGINT 信号处理函数 loop.add_signal_handler(signal.SIGINT, lambda: asyncio.create_task(handle_sigint(loop, tasks))) print("程序正在运行...") try: await asyncio.gather(*tasks) except asyncio.CancelledError: print("主任务被取消") if __name__ == "__main__": asyncio.run(main())
-
使用
asyncio.shield()
保护任务: 有时候,我们希望某些任务即使在收到信号时也不被取消。可以使用asyncio.shield()
来保护这些任务。import asyncio import signal async def protected_task(): print("受保护的任务开始...") await asyncio.sleep(5) # 模拟一个需要长时间运行的任务 print("受保护的任务完成") async def handle_sigint(loop, task): print("收到 SIGINT 信号,准备退出...") print("尝试取消受保护的任务...") task.cancel() # 尝试取消任务 try: await task # 等待任务完成或被取消 except asyncio.CancelledError: print("受保护的任务被取消") except asyncio.exceptions.CancelledError: print("受保护的任务被取消 - asyncio.exceptions.CancelledError") loop.stop() async def main(): loop = asyncio.get_running_loop() # 使用 asyncio.shield() 保护任务 protected_task_instance = asyncio.create_task(protected_task()) shielded_task = asyncio.shield(protected_task_instance) # 注册 SIGINT 信号处理函数 loop.add_signal_handler(signal.SIGINT, lambda: asyncio.create_task(handle_sigint(loop, shielded_task))) print("程序正在运行...") try: await shielded_task except asyncio.CancelledError: print("主任务被取消") if __name__ == "__main__": asyncio.run(main())
在上面的例子中,即使我们尝试取消
shielded_task
,它也会继续运行,直到完成。asyncio.shield()
就像一个盾牌,保护任务不被取消。 需要注意的是,protected_task_instance
是可以被取消的,如果直接取消它,也会生效。 -
清理资源: 在退出程序之前,一定要清理所有资源,比如关闭文件、数据库连接等。这可以避免资源泄露和数据损坏。
-
日志记录: 在信号处理函数中,记录重要的事件,比如收到信号的时间、清理资源的状态等。这可以帮助我们诊断问题。
最佳实践:一个完整的优雅退出示例
import asyncio
import signal
import logging
# 配置日志
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
logger = logging.getLogger(__name__)
class MyApp:
def __init__(self):
self.db_connection = None
self.running = True
async def connect_db(self):
logger.info("Connecting to database...")
await asyncio.sleep(1) # 模拟连接数据库
self.db_connection = "Connected"
logger.info("Connected to database")
async def close_db(self):
if self.db_connection:
logger.info("Closing database connection...")
await asyncio.sleep(1) # 模拟关闭数据库连接
self.db_connection = None
logger.info("Database connection closed")
async def worker(self):
logger.info("Worker started")
while self.running:
logger.info("Doing some work...")
await asyncio.sleep(2)
logger.info("Worker stopped")
async def handle_shutdown(self, loop):
logger.info("Received shutdown signal, initiating graceful shutdown...")
self.running = False # Stop the worker loop
# Gather and cancel all tasks
tasks = asyncio.all_tasks(loop)
for task in tasks:
task.cancel()
results = await asyncio.gather(*tasks, return_exceptions=True)
# Log any exceptions from cancelled tasks
for result in results:
if isinstance(result, asyncio.CancelledError):
logger.info("Task cancelled successfully.")
elif isinstance(result, Exception):
logger.error(f"Task raised an exception: {result}")
await self.close_db() # Close the database connection
logger.info("Graceful shutdown complete.")
loop.stop()
async def run(self):
loop = asyncio.get_running_loop()
# Connect to the database
await self.connect_db()
# Create a worker task
worker_task = asyncio.create_task(self.worker())
# Register signal handlers
for sig in (signal.SIGINT, signal.SIGTERM):
loop.add_signal_handler(sig, lambda sig=sig: asyncio.create_task(self.handle_shutdown(loop)))
try:
await worker_task # Await the worker task
except asyncio.CancelledError:
logger.info("Main task cancelled.")
finally:
if self.db_connection:
await self.close_db() # Ensure database connection is closed even if cancelled
logger.info("Application exiting.")
if __name__ == "__main__":
app = MyApp()
asyncio.run(app.run())
这个例子更加完善,包括了:
- 日志记录
- 数据库连接和关闭
- 一个工作协程
- 信号处理函数,用于停止工作协程、关闭数据库连接、并停止事件循环
总结:优雅退出的重要性
优雅退出是编写健壮的 asyncio
程序的关键。它可以避免数据丢失、资源泄露和用户体验下降。通过正确地处理信号,我们可以让我们的程序在各种情况下都能够体面地退出。
希望今天的讲解对大家有所帮助!下次再见!