好的,各位朋友们,欢迎来到今天的“Python multiprocessing
共享内存与进程同步:高性能通信”讲座现场!我是你们的老朋友,bug 终结者,代码美容师,今天咱们就来聊聊 Python 多进程里那些让人又爱又恨的共享内存和进程同步。
开场白:多进程,好东西,但不好驾驭
话说,Python 因为 GIL (Global Interpreter Lock) 的存在,多线程并不能真正利用多核 CPU 的优势。于是乎,multiprocessing
模块应运而生,它允许我们创建多个独立的进程,每个进程都有自己的 Python 解释器和内存空间,真正实现并行计算。
但是,问题也来了。进程之间是独立的,数据交换成了大难题。就像两个国家,语言不通,文化各异,想合作搞事情,那可得费一番周折。这时,共享内存和进程同步就成了我们的“外交官”和“翻译机”,帮助我们搞定多进程之间的通信和协作。
第一幕:共享内存,数据直通车
共享内存,顾名思义,就是一块大家都能访问的内存区域。就像一个公共黑板,每个进程都可以往上面写字和擦字。这样,进程之间就可以直接读写数据,避免了传统 IPC (Inter-Process Communication) 的数据复制开销,速度嗖嗖的。
multiprocessing
模块提供了 Value
和 Array
这两个类,用来创建共享内存对象。
-
Value
:共享单个值这玩意儿就像一个简单的变量,可以存储整数、浮点数、字符等等。
from multiprocessing import Process, Value import time def increment(counter): for _ in range(100000): with counter.get_lock(): # 确保原子操作 counter.value += 1 if __name__ == '__main__': counter = Value('i', 0) # 'i' 表示整数类型,初始值为 0 processes = [] for _ in range(4): # 4个进程同时修改 p = Process(target=increment, args=(counter,)) processes.append(p) p.start() for p in processes: p.join() print(f"最终计数器值: {counter.value}") # 理论上应该是 400000
这里要注意,
Value
对象有一个get_lock()
方法,可以获取一个锁。这是为了防止多个进程同时修改同一个值,导致数据竞争。这个锁就像一个红绿灯,确保只有一个进程能进入临界区修改数据。 -
Array
:共享数组Array
就像一个列表,可以存储多个相同类型的值。from multiprocessing import Process, Array def update_array(arr): for i in range(len(arr)): with arr.get_lock(): arr[i] += 1 if __name__ == '__main__': shared_array = Array('i', [0, 0, 0, 0, 0]) # 'i' 表示整数类型,初始值为 [0, 0, 0, 0, 0] processes = [] for _ in range(3): p = Process(target=update_array, args=(shared_array,)) processes.append(p) p.start() for p in processes: p.join() print(f"最终数组值: {shared_array[:]}")
同样,
Array
对象也提供了get_lock()
方法,用于保护数组中的元素,防止并发修改。
第二幕:进程同步,排队上厕所
共享内存虽然快,但如果多个进程同时修改同一块内存,就会出现数据竞争,导致程序出错。就像一群人抢着上厕所,不排队肯定会乱套。所以,我们需要进程同步机制,让进程们有秩序地访问共享资源。
multiprocessing
模块提供了多种进程同步工具,包括:
-
Lock
:互斥锁Lock
是最基本的同步工具,它可以保证在任何时刻只有一个进程可以访问临界区。就像厕所的门锁,进去一个人,其他人就只能等着。from multiprocessing import Process, Lock, Value def worker(lock, counter): for _ in range(100000): with lock: # 获取锁,自动释放 counter.value += 1 if __name__ == '__main__': lock = Lock() counter = Value('i', 0) processes = [] for _ in range(2): p = Process(target=worker, args=(lock, counter)) processes.append(p) p.start() for p in processes: p.join() print(f"计数器最终值: {counter.value}")
with lock:
语句会自动获取锁并在代码块执行完毕后释放锁,避免了手动加锁解锁的麻烦。 -
RLock
:可重入锁RLock
允许同一个进程多次获取同一个锁,而不会造成死锁。就像一个VIP厕所,一个人可以多次进去,只要他还没出来,其他人还是得等着。from multiprocessing import Process, RLock def recursive_function(lock, depth): with lock: if depth > 0: print(f"Process {depth} acquiring lock") recursive_function(lock, depth - 1) print(f"Process {depth} releasing lock") if __name__ == '__main__': lock = RLock() p = Process(target=recursive_function, args=(lock, 3)) p.start() p.join()
使用
Lock
的话,在递归函数中再次获取锁就会导致死锁,而RLock
则可以避免这个问题。 -
Semaphore
:信号量Semaphore
维护一个计数器,允许指定数量的进程同时访问临界区。就像有多个隔间的厕所,可以同时容纳几个人。from multiprocessing import Process, Semaphore import time def worker(semaphore, worker_id): with semaphore: print(f"Worker {worker_id} entering critical section") time.sleep(2) # 模拟耗时操作 print(f"Worker {worker_id} leaving critical section") if __name__ == '__main__': semaphore = Semaphore(2) # 允许最多 2 个进程同时访问 processes = [] for i in range(5): p = Process(target=worker, args=(semaphore, i)) processes.append(p) p.start() for p in processes: p.join()
在这个例子中,最多只有两个
worker
进程可以同时进入临界区。 -
Event
:事件Event
用于进程间的信号通知。一个进程可以设置一个事件,其他进程可以等待这个事件发生。就像一个通知,告诉大家“开饭啦!”from multiprocessing import Process, Event import time def waiter(event): print("Waiter: Waiting for event...") event.wait() # 等待事件发生 print("Waiter: Event occurred!") def signaler(event): time.sleep(3) # 模拟准备工作 print("Signaler: Setting event...") event.set() # 设置事件 if __name__ == '__main__': event = Event() waiter_process = Process(target=waiter, args=(event,)) signaler_process = Process(target=signaler, args=(event,)) waiter_process.start() signaler_process.start() waiter_process.join() signaler_process.join()
waiter
进程会一直等待event
被signaler
进程设置。 -
Condition
:条件变量Condition
允许进程在满足特定条件时才继续执行。它通常与Lock
一起使用。就像一个食堂阿姨,只有饭菜准备好了才会通知大家开饭。from multiprocessing import Process, Condition import time import random def consumer(condition, items): with condition: while not items: print("Consumer: Waiting for items...") condition.wait() # 等待条件满足 item = items.pop() print(f"Consumer: Consumed item {item}") def producer(condition, items): for i in range(5): time.sleep(random.random()) # 模拟生产时间 with condition: items.append(i) print(f"Producer: Produced item {i}") condition.notify() # 通知消费者 #condition.notify_all() #通知所有等待的消费者 if __name__ == '__main__': condition = Condition() items = [] consumer_process = Process(target=consumer, args=(condition, items)) producer_process = Process(target=producer, args=(condition, items)) consumer_process.start() producer_process.start() consumer_process.join() producer_process.join()
consumer
进程会等待items
列表不为空时才继续执行,producer
进程会生产物品并通知consumer
进程。notify()
方法只会唤醒一个等待的进程,而notify_all()
方法会唤醒所有等待的进程。 -
Barrier
:屏障Barrier
允许一组进程互相等待,直到所有进程都到达屏障点,然后一起继续执行。就像赛跑,所有选手都准备好了才能一起起跑。from multiprocessing import Process, Barrier import time import random def worker(barrier, worker_id): print(f"Worker {worker_id}: Arrived at barrier") time.sleep(random.random()) barrier.wait() # 等待所有进程到达屏障 print(f"Worker {worker_id}: Passed the barrier") if __name__ == '__main__': barrier = Barrier(3) # 需要 3 个进程到达屏障 processes = [] for i in range(3): p = Process(target=worker, args=(barrier, i)) processes.append(p) p.start() for p in processes: p.join()
所有
worker
进程都会等待其他进程到达屏障后才继续执行。
第三幕:实战演练,数据分析加速
光说不练假把式,咱们来个实际例子,看看如何用共享内存和进程同步来加速数据分析。
假设我们有一个很大的数据集,需要计算每个数据的平方和。
import time
import random
def process_data(data):
"""模拟数据处理"""
time.sleep(random.random() * 0.1) # 模拟处理时间
return data * data
def sequential_processing(data):
start_time = time.time()
results = [process_data(item) for item in data]
end_time = time.time()
print(f"Sequential processing time: {end_time - start_time:.4f} seconds")
return results
def parallel_processing_shared_memory(data, num_processes):
from multiprocessing import Process, Array, Lock
start_time = time.time()
# 将数据分成几份,给不同的进程处理
chunk_size = len(data) // num_processes
chunks = [data[i:i + chunk_size] for i in range(0, len(data), chunk_size)]
# 创建共享内存数组存储结果
shared_results = Array('i', [0] * len(data))
# 创建锁
lock = Lock()
def worker(chunk, start_index):
for i, item in enumerate(chunk):
result = process_data(item)
with lock: #保证线程安全
shared_results[start_index + i] = result
processes = []
for i, chunk in enumerate(chunks):
start_index = i * chunk_size
p = Process(target=worker, args=(chunk, start_index))
processes.append(p)
p.start()
for p in processes:
p.join()
end_time = time.time()
print(f"Parallel processing time (shared memory): {end_time - start_time:.4f} seconds")
return list(shared_results)
if __name__ == "__main__":
data = list(range(1000))
num_processes = 4
# 串行处理
sequential_results = sequential_processing(data)
# 并行处理 (共享内存)
parallel_results = parallel_processing_shared_memory(data, num_processes)
在这个例子中,我们使用 Array
创建了一个共享内存数组来存储计算结果。每个进程负责处理一部分数据,并将结果写入共享内存。使用 Lock
确保对共享内存的写入是线程安全的。
总结:选择合适的工具,事半功倍
共享内存和进程同步是 multiprocessing
模块中非常重要的概念。掌握它们可以帮助我们编写出高效、稳定的多进程程序。
工具 | 适用场景 | 优点 | 缺点 |
---|---|---|---|
Value |
共享单个值 | 简单易用,适用于小规模数据共享 | 需要手动加锁,容易出错 |
Array |
共享数组 | 适用于大规模数据共享,速度快 | 需要手动加锁,容易出错 |
Lock |
保护临界区,防止数据竞争 | 简单易用,适用于保护小规模临界区 | 可能导致死锁,性能较低 |
RLock |
允许同一个进程多次获取锁 | 避免递归调用中的死锁 | 性能比 Lock 更低 |
Semaphore |
限制同时访问临界区的进程数量 | 可以控制并发度,避免资源耗尽 | 容易出错,需要小心使用 |
Event |
进程间信号通知 | 简单易用,适用于简单的同步场景 | 功能有限,不适用于复杂的同步场景 |
Condition |
进程在满足特定条件时才继续执行 | 可以实现复杂的同步逻辑 | 容易出错,需要小心使用 |
Barrier |
一组进程互相等待,直到所有进程都到达屏障点,然后一起继续执行 | 适用于需要同步所有进程的场景 | 如果某个进程崩溃,会导致其他进程永远等待 |
最后,记住,选择合适的工具,才能事半功倍!
好了,今天的讲座就到这里。希望大家能有所收获,早日成为多进程编程高手!谢谢大家!