Python 并行计算:concurrent.futures
的 ThreadPoolExecutor
和 ProcessPoolExecutor
各位朋友,大家好!今天我们来深入探讨 Python 中的并行计算,重点聚焦于 concurrent.futures
模块中的 ThreadPoolExecutor
和 ProcessPoolExecutor
这两个强大的工具。它们为我们提供了相对简洁的方式,利用多线程和多进程来加速程序的执行,特别是在处理 CPU 密集型和 I/O 密集型任务时。
1. 并行计算的基础概念
在深入具体实现之前,我们先回顾几个并行计算的基本概念:
- 并发(Concurrency): 指的是程序在一段时间内能够处理多个任务。多个任务可以看起来像是同时运行,但实际上可能是在不同的时间片内交替执行。
- 并行(Parallelism): 指的是程序在同一时刻能够真正地执行多个任务。这需要多个处理单元(例如,多个 CPU 核心)的支持。
- 线程(Thread): 是操作系统能够进行运算调度的最小单位。一个进程可以包含多个线程,它们共享进程的资源(例如,内存空间)。
- 进程(Process): 是操作系统进行资源分配的基本单位。每个进程都有自己独立的内存空间。
- CPU 密集型任务(CPU-bound): 指的是任务的执行主要依赖于 CPU 的计算能力。例如,大量的数值计算、图像处理等。
- I/O 密集型任务(I/O-bound): 指的是任务的执行主要依赖于 I/O 操作(例如,读写文件、网络请求)。例如,下载文件、读取数据库等。
理解这些概念对于选择合适的并行计算方法至关重要。
2. concurrent.futures
模块概述
concurrent.futures
模块是 Python 3.2 引入的标准库,它提供了一个高层次的接口,用于异步执行可调用对象。它主要包含两个类:
ThreadPoolExecutor
: 使用线程池来异步执行可调用对象。适用于 I/O 密集型任务。ProcessPoolExecutor
: 使用进程池来异步执行可调用对象。适用于 CPU 密集型任务。
这两个类都实现了相同的接口,使得我们可以在不同的场景下方便地切换使用,而无需修改大量的代码。
3. ThreadPoolExecutor
:利用线程池加速 I/O 密集型任务
ThreadPoolExecutor
通过创建和管理一个线程池来并发执行任务。线程池可以有效地复用线程,避免频繁创建和销毁线程的开销。由于 Python 的 GIL (Global Interpreter Lock) 的存在,同一时刻只能有一个线程执行 Python 字节码。因此,ThreadPoolExecutor
对于 CPU 密集型任务的加速效果并不明显,甚至可能因为线程切换的开销而降低性能。但对于 I/O 密集型任务,线程可以在等待 I/O 操作完成时释放 GIL,允许其他线程执行,从而提高并发性。
3.1 基本用法
下面是一个简单的 ThreadPoolExecutor
的使用示例:
import concurrent.futures
import time
def task(n):
print(f"Task {n} started")
time.sleep(1) # 模拟 I/O 操作
print(f"Task {n} finished")
return n * n
if __name__ == "__main__":
with concurrent.futures.ThreadPoolExecutor(max_workers=5) as executor:
futures = [executor.submit(task, i) for i in range(10)]
for future in concurrent.futures.as_completed(futures):
print(f"Result: {future.result()}")
在这个例子中,我们创建了一个 ThreadPoolExecutor
,设置了最大线程数为 5。然后,我们提交了 10 个 task
函数,每个函数模拟一个耗时 1 秒的 I/O 操作。executor.submit()
方法返回一个 Future
对象,代表异步执行的结果。concurrent.futures.as_completed()
函数返回一个迭代器,按照任务完成的顺序产生 Future
对象。我们可以通过 future.result()
方法获取任务的结果。
3.2 submit()
方法和 Future
对象
executor.submit(func, *args, **kwargs)
方法用于提交一个可调用对象 func
到线程池中执行。它返回一个 Future
对象,该对象代表异步执行的结果。Future
对象提供了一系列方法来检查任务的状态和获取结果:
future.done()
: 如果任务已经完成或者被取消,则返回True
。future.cancelled()
: 如果任务已经被取消,则返回True
。future.cancel()
: 尝试取消任务。如果任务正在运行或者已经完成,则取消失败。future.result(timeout=None)
: 返回任务的结果。如果任务尚未完成,则阻塞等待,直到任务完成或者超时。如果任务抛出异常,则该异常会被重新抛出。future.exception(timeout=None)
: 返回任务抛出的异常。如果任务尚未完成,则阻塞等待,直到任务完成或者超时。如果任务成功完成,则返回None
。future.add_done_callback(fn)
: 添加一个回调函数fn
,当任务完成时会被调用。
3.3 map()
方法
除了 submit()
方法,ThreadPoolExecutor
还提供了 map()
方法,用于将一个函数应用于一个可迭代对象中的每个元素,并返回一个迭代器,产生每个元素对应的结果。
import concurrent.futures
import time
def square(n):
time.sleep(0.5)
return n * n
if __name__ == "__main__":
numbers = [1, 2, 3, 4, 5, 6, 7, 8, 9, 10]
with concurrent.futures.ThreadPoolExecutor(max_workers=5) as executor:
results = executor.map(square, numbers)
for result in results:
print(result)
executor.map()
方法会将 square
函数应用于 numbers
列表中的每个元素,并返回一个迭代器,产生每个元素的平方值。map()
方法会阻塞,直到所有任务都完成。
3.4 异常处理
在使用 ThreadPoolExecutor
时,我们需要注意异常处理。如果在任务中抛出了异常,future.result()
方法会将该异常重新抛出。因此,我们需要在调用 future.result()
方法时捕获异常。
import concurrent.futures
import time
def task(n):
if n == 5:
raise ValueError("Invalid input")
time.sleep(0.5)
return n * n
if __name__ == "__main__":
with concurrent.futures.ThreadPoolExecutor(max_workers=5) as executor:
futures = [executor.submit(task, i) for i in range(10)]
for future in futures:
try:
result = future.result()
print(f"Result: {result}")
except ValueError as e:
print(f"Error: {e}")
在这个例子中,如果 task
函数的参数 n
等于 5,则会抛出一个 ValueError
异常。我们在调用 future.result()
方法时捕获了该异常,并打印了错误信息。
4. ProcessPoolExecutor
:利用进程池加速 CPU 密集型任务
ProcessPoolExecutor
通过创建和管理一个进程池来并发执行任务。每个进程都有自己独立的内存空间,因此可以绕过 GIL 的限制,真正地实现并行计算。ProcessPoolExecutor
适用于 CPU 密集型任务,可以充分利用多核 CPU 的性能。
4.1 基本用法
ProcessPoolExecutor
的使用方法与 ThreadPoolExecutor
非常相似。
import concurrent.futures
import time
def cpu_bound_task(n):
print(f"Task {n} started")
result = 0
for i in range(10000000):
result += i
print(f"Task {n} finished")
return result + n
if __name__ == "__main__":
with concurrent.futures.ProcessPoolExecutor(max_workers=4) as executor:
futures = [executor.submit(cpu_bound_task, i) for i in range(8)]
for future in concurrent.futures.as_completed(futures):
print(f"Result: {future.result()}")
在这个例子中,我们创建了一个 ProcessPoolExecutor
,设置了最大进程数为 4。然后,我们提交了 8 个 cpu_bound_task
函数,每个函数模拟一个耗时的 CPU 密集型计算。
4.2 注意事项
在使用 ProcessPoolExecutor
时,需要注意以下几点:
- 序列化: 传递给
ProcessPoolExecutor
的函数和参数必须是可序列化的。这是因为进程之间需要通过进程间通信(IPC)来传递数据,而 IPC 通常需要将数据序列化为字节流。 - 全局变量: 尽量避免在进程之间共享全局变量。由于每个进程都有自己独立的内存空间,对全局变量的修改不会影响其他进程。如果需要在进程之间共享数据,可以使用
multiprocessing
模块提供的共享内存机制。 - 主模块的保护: 在使用
ProcessPoolExecutor
时,需要将代码放在if __name__ == "__main__":
块中。这是因为在 Windows 平台上,子进程会重新导入主模块,如果没有这个保护,可能会导致无限递归。
5. 如何选择 ThreadPoolExecutor
和 ProcessPoolExecutor
选择 ThreadPoolExecutor
还是 ProcessPoolExecutor
,主要取决于任务的类型:
任务类型 | 推荐使用的 Executor | 原因 |
---|---|---|
I/O 密集型 | ThreadPoolExecutor |
线程可以在等待 I/O 操作完成时释放 GIL,允许其他线程执行,从而提高并发性。线程创建和切换的开销相对较小。 |
CPU 密集型 | ProcessPoolExecutor |
每个进程都有自己独立的内存空间,可以绕过 GIL 的限制,真正地实现并行计算,充分利用多核 CPU 的性能。虽然进程创建和切换的开销较大,但对于 CPU 密集型任务来说,并行计算带来的收益远大于开销。 |
一般来说,如果任务主要涉及 I/O 操作(例如,网络请求、文件读写),则应该使用 ThreadPoolExecutor
。如果任务主要涉及 CPU 计算(例如,数值计算、图像处理),则应该使用 ProcessPoolExecutor
。
6. 性能测试
为了更直观地了解 ThreadPoolExecutor
和 ProcessPoolExecutor
的性能差异,我们可以进行一些简单的性能测试。
import concurrent.futures
import time
def io_bound_task(n):
time.sleep(0.5)
return n * n
def cpu_bound_task(n):
result = 0
for i in range(10000000):
result += i
return result + n
def test_executor(executor_class, task, task_name, num_tasks, max_workers):
start_time = time.time()
with executor_class(max_workers=max_workers) as executor:
futures = [executor.submit(task, i) for i in range(num_tasks)]
for future in concurrent.futures.as_completed(futures):
future.result()
end_time = time.time()
print(f"{task_name} with {executor_class.__name__}: {end_time - start_time:.2f} seconds")
if __name__ == "__main__":
num_tasks = 20
max_workers = 4
test_executor(concurrent.futures.ThreadPoolExecutor, io_bound_task, "I/O-bound Task", num_tasks, max_workers)
test_executor(concurrent.futures.ProcessPoolExecutor, io_bound_task, "I/O-bound Task", num_tasks, max_workers)
test_executor(concurrent.futures.ThreadPoolExecutor, cpu_bound_task, "CPU-bound Task", num_tasks, max_workers)
test_executor(concurrent.futures.ProcessPoolExecutor, cpu_bound_task, "CPU-bound Task", num_tasks, max_workers)
运行这段代码,我们可以得到类似下面的结果(结果可能因机器配置而异):
I/O-bound Task with ThreadPoolExecutor: 2.64 seconds
I/O-bound Task with ProcessPoolExecutor: 5.58 seconds
CPU-bound Task with ThreadPoolExecutor: 18.21 seconds
CPU-bound Task with ProcessPoolExecutor: 7.52 seconds
从结果可以看出,ThreadPoolExecutor
在处理 I/O 密集型任务时性能更好,而 ProcessPoolExecutor
在处理 CPU 密集型任务时性能更好。
7. 更高级的用法
concurrent.futures
模块还提供了一些更高级的用法,例如:
- 自定义 Executor: 我们可以通过继承
Executor
类来实现自定义的 Executor。 - 控制并发数量: 我们可以使用
Semaphore
等同步原语来控制并发数量,防止资源过度消耗。 - 异步编程:
concurrent.futures
模块可以与asyncio
模块结合使用,实现更复杂的异步编程模型。
由于时间关系,这里就不展开讲解了,感兴趣的朋友可以自行查阅相关资料。
8. 结语:选择合适的工具,优化你的 Python 代码
今天我们深入探讨了 Python 中 concurrent.futures
模块的 ThreadPoolExecutor
和 ProcessPoolExecutor
。 它们是强大的并行计算工具,能够有效地加速程序的执行。 选择合适的 Executor,能够充分利用系统资源,优化 Python 代码的性能。 记住,I/O 密集型任务通常选择 ThreadPoolExecutor
,CPU 密集型任务通常选择 ProcessPoolExecutor
。