Python高级技术之:`Python`的`multiprocessing`与`asyncio`的协同:混合使用以处理复杂任务。

各位观众老爷,晚上好!今天咱们来聊聊Python世界里两个重量级选手:multiprocessingasyncio。这俩家伙单拎出来都能独当一面,但如果能巧妙地把它们捏合在一起,嘿嘿,那威力可就不是1+1=2那么简单了,而是原子弹级别的。

咱们今天要讲的就是:Python的multiprocessingasyncio的协同:混合使用以处理复杂任务。我会用通俗易懂的语言,加上一些实战代码,让大家明白这俩货是怎么配合搞事情的。

第一部分:multiprocessing – 人多力量大,分工干活效率高

首先,咱们来回顾一下multiprocessing是个什么东西。简单来说,它就是Python里实现并行计算的模块。你电脑不是有多核CPU吗?multiprocessing可以让你的程序充分利用这些CPU核心,让多个任务同时执行,从而提高程序的运行速度。

想象一下,你要搬100块砖头。如果你一个人搬,可能要累个半死,搬个一天。但如果你找10个兄弟一起搬,每个人搬10块,那效率是不是蹭蹭往上涨?multiprocessing就相当于你找了多个兄弟,每个兄弟负责一部分任务。

import multiprocessing
import time

def task(name):
    print(f"Process {name} started")
    time.sleep(2)  # 模拟耗时操作
    print(f"Process {name} finished")

if __name__ == "__main__":
    processes = []
    for i in range(3):
        p = multiprocessing.Process(target=task, args=(i,))
        processes.append(p)
        p.start()

    for p in processes:
        p.join()  # 等待所有进程结束

    print("All processes finished")

这段代码创建了3个进程,每个进程都执行task函数。task函数模拟了一个耗时操作,比如下载文件、处理数据等等。通过multiprocessing.Process创建进程,p.start()启动进程,p.join()等待进程结束。

运行这段代码,你会发现3个进程几乎是同时开始,同时结束的。这就是并行计算的魅力!

优点:

  • 充分利用多核CPU: 真正的并行执行,速度快。
  • 隔离性好: 每个进程都有独立的内存空间,避免互相干扰。
  • 适合CPU密集型任务: 比如图像处理、科学计算等。

缺点:

  • 进程间通信开销大: 进程间通信需要使用队列、管道等机制,开销比较大。
  • 资源占用高: 每个进程都需要分配独立的内存空间,资源占用比较高。
  • 上下文切换开销: 进程切换也需要时间。

第二部分:asyncio – 我不是线程,我只是时间管理大师

接下来,咱们再来看看asyncioasyncio是Python里实现并发编程的模块。它基于事件循环,可以在单个线程里实现多个任务的并发执行。

asyncio并不是真正的并行,而是一种协程(coroutine)。协程是一种用户级的线程,它比线程更轻量级,切换开销也更小。

你可以把asyncio想象成一个时间管理大师。它会在不同的任务之间快速切换,充分利用CPU的空闲时间,从而提高程序的运行效率。

import asyncio
import time

async def task(name):
    print(f"Coroutine {name} started")
    await asyncio.sleep(2)  # 模拟耗时操作
    print(f"Coroutine {name} finished")

async def main():
    tasks = []
    for i in range(3):
        tasks.append(asyncio.create_task(task(i)))

    await asyncio.gather(*tasks)  # 等待所有协程结束

if __name__ == "__main__":
    asyncio.run(main())

这段代码创建了3个协程,每个协程都执行task函数。task函数同样模拟了一个耗时操作。asyncio.create_task创建协程,asyncio.gather等待所有协程结束。

注意,asyncio里必须使用asyncawait关键字。async用于定义协程函数,await用于等待一个协程完成。

运行这段代码,你会发现3个协程也是几乎同时开始,同时结束的。虽然它们是在同一个线程里执行的,但由于asyncio的时间管理能力,它们仍然可以并发执行。

优点:

  • 轻量级: 协程比线程更轻量级,切换开销小。
  • 资源占用低: 协程不需要分配独立的内存空间,资源占用低。
  • 适合IO密集型任务: 比如网络请求、数据库操作等。

缺点:

  • 不能真正并行: 只能在单个线程里并发执行,不能充分利用多核CPU。
  • 需要使用asyncawait关键字: 代码编写比较繁琐。
  • 容易出现阻塞: 如果一个协程阻塞了,整个事件循环都会被阻塞。

第三部分:multiprocessing + asyncio – 王者组合,天下无敌

现在,重点来了!如果把multiprocessingasyncio结合起来,会发生什么?

答案是:既能充分利用多核CPU,又能高效地处理IO密集型任务!

我们可以使用multiprocessing创建多个进程,每个进程里都运行一个asyncio事件循环。这样,每个进程都可以并发地执行多个IO密集型任务,同时多个进程又可以并行地执行,从而达到最佳的性能。

import multiprocessing
import asyncio
import time

async def task(name):
    print(f"Coroutine {name} started in process {multiprocessing.current_process().name}")
    await asyncio.sleep(2)
    print(f"Coroutine {name} finished in process {multiprocessing.current_process().name}")

async def main():
    tasks = []
    for i in range(3):
        tasks.append(asyncio.create_task(task(i)))

    await asyncio.gather(*tasks)

def run_event_loop():
    asyncio.run(main())

if __name__ == "__main__":
    processes = []
    for i in range(2):  # 创建2个进程
        p = multiprocessing.Process(target=run_event_loop, name=f"Process-{i}")
        processes.append(p)
        p.start()

    for p in processes:
        p.join()

    print("All processes finished")

这段代码创建了2个进程,每个进程里都运行一个asyncio事件循环。每个事件循环里都创建了3个协程。

运行这段代码,你会发现:

  • 2个进程并行执行。
  • 每个进程里的3个协程并发执行。

这就是multiprocessingasyncio的完美结合!

具体实现步骤:

  1. 定义协程函数: 使用asyncawait关键字定义协程函数。
  2. 创建事件循环: 使用asyncio.get_event_loop()创建一个事件循环。
  3. 运行事件循环: 使用loop.run_until_complete()运行事件循环。
  4. 创建进程: 使用multiprocessing.Process创建进程,并将运行事件循环的函数作为target参数传递给它。
  5. 启动进程: 使用p.start()启动进程。
  6. 等待进程结束: 使用p.join()等待进程结束。

一些更高级的技巧和注意事项:

  • 进程间通信: 如果需要在不同的进程之间传递数据,可以使用multiprocessing.Queue或者multiprocessing.Pipe

    import multiprocessing
    import asyncio
    
    async def task(name, queue):
        print(f"Coroutine {name} started in process {multiprocessing.current_process().name}")
        await asyncio.sleep(1)
        queue.put(f"Result from {name}")  # 将结果放入队列
        print(f"Coroutine {name} finished in process {multiprocessing.current_process().name}")
    
    async def main(queue):
        tasks = []
        for i in range(2):
            tasks.append(asyncio.create_task(task(i, queue)))
        await asyncio.gather(*tasks)
    
    def run_event_loop(queue):
        asyncio.run(main(queue))
    
    if __name__ == "__main__":
        queue = multiprocessing.Queue()  # 创建一个队列
        processes = []
        for i in range(2):
            p = multiprocessing.Process(target=run_event_loop, args=(queue,), name=f"Process-{i}")
            processes.append(p)
            p.start()
    
        for p in processes:
            p.join()
    
        while not queue.empty():
            result = queue.get()
            print(f"Received: {result}")
    
        print("All processes finished")
  • 线程池: 如果IO操作不是特别耗时,可以使用线程池来代替asyncio。线程池可以减少线程切换的开销。

    import multiprocessing
    import concurrent.futures
    import time
    
    def task(name):
        print(f"Task {name} started in process {multiprocessing.current_process().name}")
        time.sleep(1)  # 模拟IO操作
        print(f"Task {name} finished in process {multiprocessing.current_process().name}")
        return f"Result from {name}"
    
    def run_tasks(tasks):
        with concurrent.futures.ThreadPoolExecutor(max_workers=4) as executor:  # 使用线程池
            futures = [executor.submit(task, t) for t in tasks]
            results = [future.result() for future in concurrent.futures.as_completed(futures)]
        return results
    
    def run_process(tasks):
        results = run_tasks(tasks)
        print(f"Process {multiprocessing.current_process().name} results: {results}")
    
    if __name__ == "__main__":
        tasks = [f"Task-{i}" for i in range(6)]  # 6个任务
        processes = []
        tasks_per_process = 3
        for i in range(2):
            process_tasks = tasks[i*tasks_per_process:(i+1)*tasks_per_process]
            p = multiprocessing.Process(target=run_process, args=(process_tasks,), name=f"Process-{i}")
            processes.append(p)
            p.start()
    
        for p in processes:
            p.join()
    
        print("All processes finished")
  • 避免共享状态: 尽量避免在不同的进程之间共享状态。如果必须共享状态,可以使用multiprocessing.Value或者multiprocessing.Array

  • 异常处理: 需要注意在进程中处理异常,避免进程崩溃。

适用场景:

  • 需要处理大量IO密集型任务,同时又需要利用多核CPU的场景。 例如:
    • 网络爬虫:同时爬取多个网站,每个网站又需要并发地发送多个请求。
    • 数据处理:同时处理多个文件,每个文件又需要并发地进行数据清洗和转换。
    • 服务器:同时处理多个客户端的请求,每个请求又需要并发地进行数据库查询和业务逻辑处理。

总结:

multiprocessingasyncio都是Python里强大的并发编程工具。multiprocessing可以利用多核CPU实现真正的并行计算,而asyncio可以在单个线程里实现多个任务的并发执行。

把它们结合起来,可以充分发挥各自的优势,从而解决更复杂的问题。

希望今天的讲座能对大家有所帮助。 记住,编程就像烹饪,掌握了食材(各种技术),就可以做出美味佳肴(高效程序)。 祝大家编程愉快!

发表回复

您的邮箱地址不会被公开。 必填项已用 * 标注