各位观众老爷,晚上好!今天咱们来聊聊Python世界里两个重量级选手:multiprocessing
和asyncio
。这俩家伙单拎出来都能独当一面,但如果能巧妙地把它们捏合在一起,嘿嘿,那威力可就不是1+1=2那么简单了,而是原子弹级别的。
咱们今天要讲的就是:Python的multiprocessing
与asyncio
的协同:混合使用以处理复杂任务。我会用通俗易懂的语言,加上一些实战代码,让大家明白这俩货是怎么配合搞事情的。
第一部分:multiprocessing
– 人多力量大,分工干活效率高
首先,咱们来回顾一下multiprocessing
是个什么东西。简单来说,它就是Python里实现并行计算的模块。你电脑不是有多核CPU吗?multiprocessing
可以让你的程序充分利用这些CPU核心,让多个任务同时执行,从而提高程序的运行速度。
想象一下,你要搬100块砖头。如果你一个人搬,可能要累个半死,搬个一天。但如果你找10个兄弟一起搬,每个人搬10块,那效率是不是蹭蹭往上涨?multiprocessing
就相当于你找了多个兄弟,每个兄弟负责一部分任务。
import multiprocessing
import time
def task(name):
print(f"Process {name} started")
time.sleep(2) # 模拟耗时操作
print(f"Process {name} finished")
if __name__ == "__main__":
processes = []
for i in range(3):
p = multiprocessing.Process(target=task, args=(i,))
processes.append(p)
p.start()
for p in processes:
p.join() # 等待所有进程结束
print("All processes finished")
这段代码创建了3个进程,每个进程都执行task
函数。task
函数模拟了一个耗时操作,比如下载文件、处理数据等等。通过multiprocessing.Process
创建进程,p.start()
启动进程,p.join()
等待进程结束。
运行这段代码,你会发现3个进程几乎是同时开始,同时结束的。这就是并行计算的魅力!
优点:
- 充分利用多核CPU: 真正的并行执行,速度快。
- 隔离性好: 每个进程都有独立的内存空间,避免互相干扰。
- 适合CPU密集型任务: 比如图像处理、科学计算等。
缺点:
- 进程间通信开销大: 进程间通信需要使用队列、管道等机制,开销比较大。
- 资源占用高: 每个进程都需要分配独立的内存空间,资源占用比较高。
- 上下文切换开销: 进程切换也需要时间。
第二部分:asyncio
– 我不是线程,我只是时间管理大师
接下来,咱们再来看看asyncio
。asyncio
是Python里实现并发编程的模块。它基于事件循环,可以在单个线程里实现多个任务的并发执行。
asyncio
并不是真正的并行,而是一种协程(coroutine)。协程是一种用户级的线程,它比线程更轻量级,切换开销也更小。
你可以把asyncio
想象成一个时间管理大师。它会在不同的任务之间快速切换,充分利用CPU的空闲时间,从而提高程序的运行效率。
import asyncio
import time
async def task(name):
print(f"Coroutine {name} started")
await asyncio.sleep(2) # 模拟耗时操作
print(f"Coroutine {name} finished")
async def main():
tasks = []
for i in range(3):
tasks.append(asyncio.create_task(task(i)))
await asyncio.gather(*tasks) # 等待所有协程结束
if __name__ == "__main__":
asyncio.run(main())
这段代码创建了3个协程,每个协程都执行task
函数。task
函数同样模拟了一个耗时操作。asyncio.create_task
创建协程,asyncio.gather
等待所有协程结束。
注意,asyncio
里必须使用async
和await
关键字。async
用于定义协程函数,await
用于等待一个协程完成。
运行这段代码,你会发现3个协程也是几乎同时开始,同时结束的。虽然它们是在同一个线程里执行的,但由于asyncio
的时间管理能力,它们仍然可以并发执行。
优点:
- 轻量级: 协程比线程更轻量级,切换开销小。
- 资源占用低: 协程不需要分配独立的内存空间,资源占用低。
- 适合IO密集型任务: 比如网络请求、数据库操作等。
缺点:
- 不能真正并行: 只能在单个线程里并发执行,不能充分利用多核CPU。
- 需要使用
async
和await
关键字: 代码编写比较繁琐。 - 容易出现阻塞: 如果一个协程阻塞了,整个事件循环都会被阻塞。
第三部分:multiprocessing
+ asyncio
– 王者组合,天下无敌
现在,重点来了!如果把multiprocessing
和asyncio
结合起来,会发生什么?
答案是:既能充分利用多核CPU,又能高效地处理IO密集型任务!
我们可以使用multiprocessing
创建多个进程,每个进程里都运行一个asyncio
事件循环。这样,每个进程都可以并发地执行多个IO密集型任务,同时多个进程又可以并行地执行,从而达到最佳的性能。
import multiprocessing
import asyncio
import time
async def task(name):
print(f"Coroutine {name} started in process {multiprocessing.current_process().name}")
await asyncio.sleep(2)
print(f"Coroutine {name} finished in process {multiprocessing.current_process().name}")
async def main():
tasks = []
for i in range(3):
tasks.append(asyncio.create_task(task(i)))
await asyncio.gather(*tasks)
def run_event_loop():
asyncio.run(main())
if __name__ == "__main__":
processes = []
for i in range(2): # 创建2个进程
p = multiprocessing.Process(target=run_event_loop, name=f"Process-{i}")
processes.append(p)
p.start()
for p in processes:
p.join()
print("All processes finished")
这段代码创建了2个进程,每个进程里都运行一个asyncio
事件循环。每个事件循环里都创建了3个协程。
运行这段代码,你会发现:
- 2个进程并行执行。
- 每个进程里的3个协程并发执行。
这就是multiprocessing
和asyncio
的完美结合!
具体实现步骤:
- 定义协程函数: 使用
async
和await
关键字定义协程函数。 - 创建事件循环: 使用
asyncio.get_event_loop()
创建一个事件循环。 - 运行事件循环: 使用
loop.run_until_complete()
运行事件循环。 - 创建进程: 使用
multiprocessing.Process
创建进程,并将运行事件循环的函数作为target
参数传递给它。 - 启动进程: 使用
p.start()
启动进程。 - 等待进程结束: 使用
p.join()
等待进程结束。
一些更高级的技巧和注意事项:
-
进程间通信: 如果需要在不同的进程之间传递数据,可以使用
multiprocessing.Queue
或者multiprocessing.Pipe
。import multiprocessing import asyncio async def task(name, queue): print(f"Coroutine {name} started in process {multiprocessing.current_process().name}") await asyncio.sleep(1) queue.put(f"Result from {name}") # 将结果放入队列 print(f"Coroutine {name} finished in process {multiprocessing.current_process().name}") async def main(queue): tasks = [] for i in range(2): tasks.append(asyncio.create_task(task(i, queue))) await asyncio.gather(*tasks) def run_event_loop(queue): asyncio.run(main(queue)) if __name__ == "__main__": queue = multiprocessing.Queue() # 创建一个队列 processes = [] for i in range(2): p = multiprocessing.Process(target=run_event_loop, args=(queue,), name=f"Process-{i}") processes.append(p) p.start() for p in processes: p.join() while not queue.empty(): result = queue.get() print(f"Received: {result}") print("All processes finished")
-
线程池: 如果IO操作不是特别耗时,可以使用线程池来代替
asyncio
。线程池可以减少线程切换的开销。import multiprocessing import concurrent.futures import time def task(name): print(f"Task {name} started in process {multiprocessing.current_process().name}") time.sleep(1) # 模拟IO操作 print(f"Task {name} finished in process {multiprocessing.current_process().name}") return f"Result from {name}" def run_tasks(tasks): with concurrent.futures.ThreadPoolExecutor(max_workers=4) as executor: # 使用线程池 futures = [executor.submit(task, t) for t in tasks] results = [future.result() for future in concurrent.futures.as_completed(futures)] return results def run_process(tasks): results = run_tasks(tasks) print(f"Process {multiprocessing.current_process().name} results: {results}") if __name__ == "__main__": tasks = [f"Task-{i}" for i in range(6)] # 6个任务 processes = [] tasks_per_process = 3 for i in range(2): process_tasks = tasks[i*tasks_per_process:(i+1)*tasks_per_process] p = multiprocessing.Process(target=run_process, args=(process_tasks,), name=f"Process-{i}") processes.append(p) p.start() for p in processes: p.join() print("All processes finished")
-
避免共享状态: 尽量避免在不同的进程之间共享状态。如果必须共享状态,可以使用
multiprocessing.Value
或者multiprocessing.Array
。 -
异常处理: 需要注意在进程中处理异常,避免进程崩溃。
适用场景:
- 需要处理大量IO密集型任务,同时又需要利用多核CPU的场景。 例如:
- 网络爬虫:同时爬取多个网站,每个网站又需要并发地发送多个请求。
- 数据处理:同时处理多个文件,每个文件又需要并发地进行数据清洗和转换。
- 服务器:同时处理多个客户端的请求,每个请求又需要并发地进行数据库查询和业务逻辑处理。
总结:
multiprocessing
和asyncio
都是Python里强大的并发编程工具。multiprocessing
可以利用多核CPU实现真正的并行计算,而asyncio
可以在单个线程里实现多个任务的并发执行。
把它们结合起来,可以充分发挥各自的优势,从而解决更复杂的问题。
希望今天的讲座能对大家有所帮助。 记住,编程就像烹饪,掌握了食材(各种技术),就可以做出美味佳肴(高效程序)。 祝大家编程愉快!