各位程序猿/媛们,晚上好!我是今晚的分享嘉宾,很高兴能跟大家一起聊聊Python多进程里那些有点意思的东西。今天的主题是:multiprocessing
库的共享内存和Manager
对象,以及它们在进程间通信中的应用。准备好了吗?咱们这就开始!
一、进程间通信(IPC)是个啥?为啥要用它?
想象一下,你开了好几个窗口,每个窗口运行着一个独立的程序。这些程序各自忙着自己的事情,互不干扰。但有时候,它们需要交流,比如一个窗口算出来的数据,另一个窗口要拿来展示。这种程序之间的交流,就叫做进程间通信(Inter-Process Communication,简称IPC)。
为什么要用IPC呢?很简单,因为每个进程都有自己的内存空间,它们之间默认是隔离的。如果你想让它们共享数据,就必须通过某种方式打破这种隔离。
二、multiprocessing
库的共享内存:小试牛刀
multiprocessing
库提供了一些工具,可以让我们在进程之间共享内存。最常用的就是Value
和Array
。
-
Value
:共享单个值Value
允许我们在多个进程之间共享一个简单的数据类型,比如整数、浮点数等。from multiprocessing import Process, Value import time def increase(number, num_increases): for _ in range(num_increases): time.sleep(0.01) # 模拟一些耗时操作,避免进程太快结束 number.value += 1 if __name__ == '__main__': shared_number = Value('i', 0) # 'i'表示整数类型,初始值为0 num_processes = 3 num_increases = 100 processes = [] for _ in range(num_processes): p = Process(target=increase, args=(shared_number, num_increases)) processes.append(p) p.start() for p in processes: p.join() print(f"最终的共享数值:{shared_number.value}") # 期望结果是 300
在这个例子中,我们创建了一个
Value
对象shared_number
,类型是整数,初始值为0。然后,我们启动了3个进程,每个进程都将shared_number
的值增加100次。最后,我们打印出shared_number
的值。理论上,结果应该是300。注意事项:
-
Value
的第一个参数是类型代码,常用的有:'b'
: signed char'B'
: unsigned char'h'
: signed short'H'
: unsigned short'i'
: signed int'I'
: unsigned int'l'
: signed long'L'
: unsigned long'f'
: float'd'
: double
-
由于多个进程同时修改共享变量可能会导致竞争条件(Race Condition),因此通常需要配合锁(Lock)来保证数据的一致性。后面会讲到锁。
-
-
Array
:共享数组Array
允许我们在多个进程之间共享一个数组。from multiprocessing import Process, Array import time def modify_array(shared_array, index, value): time.sleep(0.01) # 模拟一些耗时操作 shared_array[index] = value if __name__ == '__main__': shared_array = Array('i', [0, 0, 0, 0, 0]) # 创建一个包含5个整数的数组,初始值都是0 num_processes = 5 processes = [] for i in range(num_processes): p = Process(target=modify_array, args=(shared_array, i, i * 10)) processes.append(p) p.start() for p in processes: p.join() print(f"最终的共享数组:{list(shared_array)}") # 期望结果是 [0, 10, 20, 30, 40]
在这个例子中,我们创建了一个
Array
对象shared_array
,类型是整数数组,包含5个元素,初始值都是0。然后,我们启动了5个进程,每个进程都将数组的对应位置修改为自己的索引乘以10。最后,我们打印出shared_array
的值。理论上,结果应该是[0, 10, 20, 30, 40]
。注意事项:
Array
的第一个参数也是类型代码,和Value
一样。Array
的第二个参数是数组的大小或初始值。如果只提供大小,则数组会被初始化为0。
三、锁(Lock):保护共享资源
正如前面提到的,多个进程同时修改共享变量可能会导致竞争条件。为了避免这种情况,我们需要使用锁。锁可以保证在同一时刻只有一个进程可以访问共享资源。
from multiprocessing import Process, Value, Lock
import time
def increase_with_lock(number, num_increases, lock):
for _ in range(num_increases):
time.sleep(0.01)
with lock: # 获取锁,执行完代码块后自动释放锁
number.value += 1
if __name__ == '__main__':
shared_number = Value('i', 0)
lock = Lock() # 创建一个锁对象
num_processes = 3
num_increases = 100
processes = []
for _ in range(num_processes):
p = Process(target=increase_with_lock, args=(shared_number, num_increases, lock))
processes.append(p)
p.start()
for p in processes:
p.join()
print(f"使用锁之后的最终共享数值:{shared_number.value}") # 期望结果是 300
在这个例子中,我们创建了一个Lock
对象lock
。在increase_with_lock
函数中,我们使用with lock:
语句来获取锁。这意味着只有当一个进程获取到锁之后,才能执行number.value += 1
这行代码。当with
语句块执行完毕后,锁会自动释放,让其他进程有机会获取锁。
四、Manager
对象:更强大的共享工具
Value
和Array
虽然简单易用,但它们只能共享简单的数据类型。如果我们需要共享更复杂的数据结构,比如列表、字典、自定义对象等,就需要使用Manager
对象。
Manager
对象提供了一种创建可以在多个进程之间共享的Python对象的方式。它实际上启动了一个服务器进程,并将共享对象存储在这个服务器进程中。其他进程可以通过代理对象来访问和修改这些共享对象。
from multiprocessing import Process, Manager
import time
def modify_list(shared_list, process_id):
time.sleep(0.01)
shared_list.append(process_id) # 每个进程将自己的ID添加到列表中
if __name__ == '__main__':
with Manager() as manager: # 使用 Manager 创建共享对象
shared_list = manager.list() # 创建一个共享列表
num_processes = 5
processes = []
for i in range(num_processes):
p = Process(target=modify_list, args=(shared_list, i))
processes.append(p)
p.start()
for p in processes:
p.join()
print(f"使用 Manager 共享的列表:{shared_list}") # 期望结果是 [0, 1, 2, 3, 4] (顺序可能不同)
在这个例子中,我们使用Manager()
创建了一个管理器对象,然后使用manager.list()
创建了一个共享列表shared_list
。每个进程都将自己的ID添加到这个列表中。最后,我们打印出shared_list
的值。
Manager
对象可以创建的共享对象类型:
对象类型 | 说明 |
---|---|
list() |
创建一个共享的列表 |
dict() |
创建一个共享的字典 |
Namespace() |
创建一个简单的共享对象,可以设置任意属性 |
Lock() |
创建一个共享的锁对象 |
RLock() |
创建一个可重入的锁对象 |
Semaphore() |
创建一个共享的信号量对象 |
BoundedSemaphore() |
创建一个有界信号量对象 |
Condition() |
创建一个共享的条件变量对象 |
Event() |
创建一个共享的事件对象 |
Queue() |
创建一个共享的队列对象 |
JoinableQueue() |
创建一个可连接的队列对象 |
Pool() |
创建一个进程池,方便管理多个进程,注意使用 manager.Pool() 创建 |
五、Manager.Namespace()
:共享任意属性
Manager.Namespace()
提供了一种创建具有任意属性的共享对象的方式。这对于共享一些配置信息或者状态非常有用。
from multiprocessing import Process, Manager
import time
def update_namespace(namespace, process_id):
time.sleep(0.01)
namespace.process_id = process_id # 每个进程设置自己的ID
namespace.counter += 1 # 增加计数器
if __name__ == '__main__':
with Manager() as manager:
namespace = manager.Namespace()
namespace.process_id = -1 # 初始值
namespace.counter = 0 # 初始值
num_processes = 5
processes = []
for i in range(num_processes):
p = Process(target=update_namespace, args=(namespace, i))
processes.append(p)
p.start()
for p in processes:
p.join()
print(f"Namespace 的 process_id: {namespace.process_id}") # 最后一个进程的ID
print(f"Namespace 的 counter: {namespace.counter}") # 期望结果是 5
在这个例子中,我们创建了一个Namespace
对象namespace
,并设置了两个属性:process_id
和counter
。每个进程都将process_id
设置为自己的ID,并将counter
增加1。最后,我们打印出namespace
的这两个属性的值。
六、共享队列(Queue
):进程间消息传递
Manager.Queue()
提供了一种在进程之间传递消息的方式。一个进程可以将消息放入队列,另一个进程可以从队列中取出消息。
from multiprocessing import Process, Manager
import time
def producer(queue):
for i in range(5):
time.sleep(0.01)
queue.put(f"Message {i} from Producer") # 生产者放入消息
print(f"Producer sent: Message {i} from Producer")
def consumer(queue):
for _ in range(5):
message = queue.get() # 消费者取出消息
print(f"Consumer received: {message}")
if __name__ == '__main__':
with Manager() as manager:
queue = manager.Queue() # 创建共享队列
p1 = Process(target=producer, args=(queue,))
p2 = Process(target=consumer, args=(queue,))
p1.start()
p2.start()
p1.join()
p2.join()
print("Done!")
在这个例子中,我们创建了一个共享队列queue
。producer
进程将消息放入队列,consumer
进程从队列中取出消息。
七、JoinableQueue()
:让消费者知道生产者已经完成
JoinableQueue()
是Queue()
的一个变种,它提供了一种让消费者知道生产者已经完成的方式。生产者在完成所有任务后,需要调用task_done()
方法来告诉队列任务已经完成。消费者在调用join()
方法后,会阻塞,直到队列中的所有任务都完成。
from multiprocessing import Process, Manager
import time
def producer_joinable(queue):
for i in range(5):
time.sleep(0.01)
queue.put(f"Message {i} from Producer")
print(f"Producer sent: Message {i} from Producer")
for _ in range(5): # 必须调用 task_done() 与 put() 的次数相同
queue.task_done() # 告诉队列任务已经完成
def consumer_joinable(queue):
for _ in range(5):
message = queue.get()
print(f"Consumer received: {message}")
time.sleep(0.01)
queue.task_done() # 标记任务完成
queue.join() # 阻塞直到所有任务完成
if __name__ == '__main__':
with Manager() as manager:
queue = manager.JoinableQueue() # 创建 JoinableQueue
p1 = Process(target=producer_joinable, args=(queue,))
p2 = Process(target=consumer_joinable, args=(queue,))
p1.start()
p2.start()
p1.join()
#p2.join() #消费者自己join
print("Done!")
注意,在生产者中,每个put()
对应一个task_done()
。在消费者中,每次处理完一个任务后,也需要调用task_done()
。最后,消费者调用join()
方法,等待所有任务完成。
八、Pool()
with Manager()
:共享状态的进程池
虽然通常 multiprocessing.Pool()
不需要 Manager()
,因为其返回值可以直接获取,但如果需要在进程池中的各个进程间共享状态,那么结合 Manager()
使用 Pool()
就很有意义。
from multiprocessing import Process, Manager, Pool
import time
def worker_pool(index, shared_list):
time.sleep(0.01)
shared_list.append(f"Worker {index} added this")
return f"Result from worker {index}"
if __name__ == '__main__':
with Manager() as manager:
shared_list = manager.list()
with Pool(processes=4) as pool:
results = pool.starmap(worker_pool, [(i, shared_list) for i in range(5)])
print("Shared List:", shared_list)
print("Results from pool:", results)
在这个例子中,我们使用 manager.list()
创建了一个共享列表,然后使用 pool.starmap()
将任务分配给进程池中的进程。每个进程都将自己的消息添加到共享列表中。
九、总结和注意事项
multiprocessing
库的共享内存和Manager
对象为我们提供了强大的进程间通信工具。我们可以使用Value
和Array
共享简单的数据类型,使用Manager
对象共享复杂的数据结构,使用Lock
来保护共享资源,使用Queue
进行消息传递。
一些重要的注意事项:
- 性能: 使用
Manager
对象进行共享数据通常比使用Value
和Array
慢,因为Manager
对象需要通过网络进行通信。因此,如果只需要共享简单的数据类型,并且对性能要求较高,建议使用Value
和Array
。 - 死锁: 在使用锁时,要避免死锁的发生。死锁是指两个或多个进程互相等待对方释放资源,导致所有进程都无法继续执行的情况。
- 数据一致性: 在多个进程同时修改共享数据时,要保证数据的一致性。可以使用锁、信号量等同步机制来避免竞争条件。
- 异常处理: 在多进程程序中,异常处理非常重要。如果一个进程发生异常,可能会导致整个程序崩溃。因此,要在每个进程中都进行适当的异常处理。
总的来说,multiprocessing
库为我们提供了很多强大的工具,可以让我们编写高效的并行程序。但是,在使用这些工具时,也要注意一些潜在的问题,并采取相应的措施来避免这些问题。 希望今天的分享对大家有所帮助! 谢谢大家!