`Python`的`并行`计算:`concurrent.futures`的`ThreadPoolExecutor`和`ProcessPoolExecutor`。

Python 并行计算:concurrent.futuresThreadPoolExecutorProcessPoolExecutor

各位朋友,大家好!今天我们来深入探讨 Python 中的并行计算,重点聚焦于 concurrent.futures 模块中的 ThreadPoolExecutorProcessPoolExecutor 这两个强大的工具。它们为我们提供了相对简洁的方式,利用多线程和多进程来加速程序的执行,特别是在处理 CPU 密集型和 I/O 密集型任务时。

1. 并行计算的基础概念

在深入具体实现之前,我们先回顾几个并行计算的基本概念:

  • 并发(Concurrency): 指的是程序在一段时间内能够处理多个任务。多个任务可以看起来像是同时运行,但实际上可能是在不同的时间片内交替执行。
  • 并行(Parallelism): 指的是程序在同一时刻能够真正地执行多个任务。这需要多个处理单元(例如,多个 CPU 核心)的支持。
  • 线程(Thread): 是操作系统能够进行运算调度的最小单位。一个进程可以包含多个线程,它们共享进程的资源(例如,内存空间)。
  • 进程(Process): 是操作系统进行资源分配的基本单位。每个进程都有自己独立的内存空间。
  • CPU 密集型任务(CPU-bound): 指的是任务的执行主要依赖于 CPU 的计算能力。例如,大量的数值计算、图像处理等。
  • I/O 密集型任务(I/O-bound): 指的是任务的执行主要依赖于 I/O 操作(例如,读写文件、网络请求)。例如,下载文件、读取数据库等。

理解这些概念对于选择合适的并行计算方法至关重要。

2. concurrent.futures 模块概述

concurrent.futures 模块是 Python 3.2 引入的标准库,它提供了一个高层次的接口,用于异步执行可调用对象。它主要包含两个类:

  • ThreadPoolExecutor: 使用线程池来异步执行可调用对象。适用于 I/O 密集型任务。
  • ProcessPoolExecutor: 使用进程池来异步执行可调用对象。适用于 CPU 密集型任务。

这两个类都实现了相同的接口,使得我们可以在不同的场景下方便地切换使用,而无需修改大量的代码。

3. ThreadPoolExecutor:利用线程池加速 I/O 密集型任务

ThreadPoolExecutor 通过创建和管理一个线程池来并发执行任务。线程池可以有效地复用线程,避免频繁创建和销毁线程的开销。由于 Python 的 GIL (Global Interpreter Lock) 的存在,同一时刻只能有一个线程执行 Python 字节码。因此,ThreadPoolExecutor 对于 CPU 密集型任务的加速效果并不明显,甚至可能因为线程切换的开销而降低性能。但对于 I/O 密集型任务,线程可以在等待 I/O 操作完成时释放 GIL,允许其他线程执行,从而提高并发性。

3.1 基本用法

下面是一个简单的 ThreadPoolExecutor 的使用示例:

import concurrent.futures
import time

def task(n):
    print(f"Task {n} started")
    time.sleep(1)  # 模拟 I/O 操作
    print(f"Task {n} finished")
    return n * n

if __name__ == "__main__":
    with concurrent.futures.ThreadPoolExecutor(max_workers=5) as executor:
        futures = [executor.submit(task, i) for i in range(10)]

        for future in concurrent.futures.as_completed(futures):
            print(f"Result: {future.result()}")

在这个例子中,我们创建了一个 ThreadPoolExecutor,设置了最大线程数为 5。然后,我们提交了 10 个 task 函数,每个函数模拟一个耗时 1 秒的 I/O 操作。executor.submit() 方法返回一个 Future 对象,代表异步执行的结果。concurrent.futures.as_completed() 函数返回一个迭代器,按照任务完成的顺序产生 Future 对象。我们可以通过 future.result() 方法获取任务的结果。

3.2 submit() 方法和 Future 对象

executor.submit(func, *args, **kwargs) 方法用于提交一个可调用对象 func 到线程池中执行。它返回一个 Future 对象,该对象代表异步执行的结果。Future 对象提供了一系列方法来检查任务的状态和获取结果:

  • future.done(): 如果任务已经完成或者被取消,则返回 True
  • future.cancelled(): 如果任务已经被取消,则返回 True
  • future.cancel(): 尝试取消任务。如果任务正在运行或者已经完成,则取消失败。
  • future.result(timeout=None): 返回任务的结果。如果任务尚未完成,则阻塞等待,直到任务完成或者超时。如果任务抛出异常,则该异常会被重新抛出。
  • future.exception(timeout=None): 返回任务抛出的异常。如果任务尚未完成,则阻塞等待,直到任务完成或者超时。如果任务成功完成,则返回 None
  • future.add_done_callback(fn): 添加一个回调函数 fn,当任务完成时会被调用。

3.3 map() 方法

除了 submit() 方法,ThreadPoolExecutor 还提供了 map() 方法,用于将一个函数应用于一个可迭代对象中的每个元素,并返回一个迭代器,产生每个元素对应的结果。

import concurrent.futures
import time

def square(n):
    time.sleep(0.5)
    return n * n

if __name__ == "__main__":
    numbers = [1, 2, 3, 4, 5, 6, 7, 8, 9, 10]
    with concurrent.futures.ThreadPoolExecutor(max_workers=5) as executor:
        results = executor.map(square, numbers)
        for result in results:
            print(result)

executor.map() 方法会将 square 函数应用于 numbers 列表中的每个元素,并返回一个迭代器,产生每个元素的平方值。map() 方法会阻塞,直到所有任务都完成。

3.4 异常处理

在使用 ThreadPoolExecutor 时,我们需要注意异常处理。如果在任务中抛出了异常,future.result() 方法会将该异常重新抛出。因此,我们需要在调用 future.result() 方法时捕获异常。

import concurrent.futures
import time

def task(n):
    if n == 5:
        raise ValueError("Invalid input")
    time.sleep(0.5)
    return n * n

if __name__ == "__main__":
    with concurrent.futures.ThreadPoolExecutor(max_workers=5) as executor:
        futures = [executor.submit(task, i) for i in range(10)]

        for future in futures:
            try:
                result = future.result()
                print(f"Result: {result}")
            except ValueError as e:
                print(f"Error: {e}")

在这个例子中,如果 task 函数的参数 n 等于 5,则会抛出一个 ValueError 异常。我们在调用 future.result() 方法时捕获了该异常,并打印了错误信息。

4. ProcessPoolExecutor:利用进程池加速 CPU 密集型任务

ProcessPoolExecutor 通过创建和管理一个进程池来并发执行任务。每个进程都有自己独立的内存空间,因此可以绕过 GIL 的限制,真正地实现并行计算。ProcessPoolExecutor 适用于 CPU 密集型任务,可以充分利用多核 CPU 的性能。

4.1 基本用法

ProcessPoolExecutor 的使用方法与 ThreadPoolExecutor 非常相似。

import concurrent.futures
import time

def cpu_bound_task(n):
    print(f"Task {n} started")
    result = 0
    for i in range(10000000):
        result += i
    print(f"Task {n} finished")
    return result + n

if __name__ == "__main__":
    with concurrent.futures.ProcessPoolExecutor(max_workers=4) as executor:
        futures = [executor.submit(cpu_bound_task, i) for i in range(8)]

        for future in concurrent.futures.as_completed(futures):
            print(f"Result: {future.result()}")

在这个例子中,我们创建了一个 ProcessPoolExecutor,设置了最大进程数为 4。然后,我们提交了 8 个 cpu_bound_task 函数,每个函数模拟一个耗时的 CPU 密集型计算。

4.2 注意事项

在使用 ProcessPoolExecutor 时,需要注意以下几点:

  • 序列化: 传递给 ProcessPoolExecutor 的函数和参数必须是可序列化的。这是因为进程之间需要通过进程间通信(IPC)来传递数据,而 IPC 通常需要将数据序列化为字节流。
  • 全局变量: 尽量避免在进程之间共享全局变量。由于每个进程都有自己独立的内存空间,对全局变量的修改不会影响其他进程。如果需要在进程之间共享数据,可以使用 multiprocessing 模块提供的共享内存机制。
  • 主模块的保护: 在使用 ProcessPoolExecutor 时,需要将代码放在 if __name__ == "__main__": 块中。这是因为在 Windows 平台上,子进程会重新导入主模块,如果没有这个保护,可能会导致无限递归。

5. 如何选择 ThreadPoolExecutorProcessPoolExecutor

选择 ThreadPoolExecutor 还是 ProcessPoolExecutor,主要取决于任务的类型:

任务类型 推荐使用的 Executor 原因
I/O 密集型 ThreadPoolExecutor 线程可以在等待 I/O 操作完成时释放 GIL,允许其他线程执行,从而提高并发性。线程创建和切换的开销相对较小。
CPU 密集型 ProcessPoolExecutor 每个进程都有自己独立的内存空间,可以绕过 GIL 的限制,真正地实现并行计算,充分利用多核 CPU 的性能。虽然进程创建和切换的开销较大,但对于 CPU 密集型任务来说,并行计算带来的收益远大于开销。

一般来说,如果任务主要涉及 I/O 操作(例如,网络请求、文件读写),则应该使用 ThreadPoolExecutor。如果任务主要涉及 CPU 计算(例如,数值计算、图像处理),则应该使用 ProcessPoolExecutor

6. 性能测试

为了更直观地了解 ThreadPoolExecutorProcessPoolExecutor 的性能差异,我们可以进行一些简单的性能测试。

import concurrent.futures
import time

def io_bound_task(n):
    time.sleep(0.5)
    return n * n

def cpu_bound_task(n):
    result = 0
    for i in range(10000000):
        result += i
    return result + n

def test_executor(executor_class, task, task_name, num_tasks, max_workers):
    start_time = time.time()
    with executor_class(max_workers=max_workers) as executor:
        futures = [executor.submit(task, i) for i in range(num_tasks)]
        for future in concurrent.futures.as_completed(futures):
            future.result()
    end_time = time.time()
    print(f"{task_name} with {executor_class.__name__}: {end_time - start_time:.2f} seconds")

if __name__ == "__main__":
    num_tasks = 20
    max_workers = 4

    test_executor(concurrent.futures.ThreadPoolExecutor, io_bound_task, "I/O-bound Task", num_tasks, max_workers)
    test_executor(concurrent.futures.ProcessPoolExecutor, io_bound_task, "I/O-bound Task", num_tasks, max_workers)
    test_executor(concurrent.futures.ThreadPoolExecutor, cpu_bound_task, "CPU-bound Task", num_tasks, max_workers)
    test_executor(concurrent.futures.ProcessPoolExecutor, cpu_bound_task, "CPU-bound Task", num_tasks, max_workers)

运行这段代码,我们可以得到类似下面的结果(结果可能因机器配置而异):

I/O-bound Task with ThreadPoolExecutor: 2.64 seconds
I/O-bound Task with ProcessPoolExecutor: 5.58 seconds
CPU-bound Task with ThreadPoolExecutor: 18.21 seconds
CPU-bound Task with ProcessPoolExecutor: 7.52 seconds

从结果可以看出,ThreadPoolExecutor 在处理 I/O 密集型任务时性能更好,而 ProcessPoolExecutor 在处理 CPU 密集型任务时性能更好。

7. 更高级的用法

concurrent.futures 模块还提供了一些更高级的用法,例如:

  • 自定义 Executor: 我们可以通过继承 Executor 类来实现自定义的 Executor。
  • 控制并发数量: 我们可以使用 Semaphore 等同步原语来控制并发数量,防止资源过度消耗。
  • 异步编程: concurrent.futures 模块可以与 asyncio 模块结合使用,实现更复杂的异步编程模型。

由于时间关系,这里就不展开讲解了,感兴趣的朋友可以自行查阅相关资料。

8. 结语:选择合适的工具,优化你的 Python 代码

今天我们深入探讨了 Python 中 concurrent.futures 模块的 ThreadPoolExecutorProcessPoolExecutor。 它们是强大的并行计算工具,能够有效地加速程序的执行。 选择合适的 Executor,能够充分利用系统资源,优化 Python 代码的性能。 记住,I/O 密集型任务通常选择 ThreadPoolExecutor,CPU 密集型任务通常选择 ProcessPoolExecutor

发表回复

您的邮箱地址不会被公开。 必填项已用 * 标注