Python高级技术之:`Python`的`multiprocessing`库:共享内存和`Manager`对象在进程间通信中的应用。

各位程序猿/媛们,晚上好!我是今晚的分享嘉宾,很高兴能跟大家一起聊聊Python多进程里那些有点意思的东西。今天的主题是:multiprocessing库的共享内存和Manager对象,以及它们在进程间通信中的应用。准备好了吗?咱们这就开始!

一、进程间通信(IPC)是个啥?为啥要用它?

想象一下,你开了好几个窗口,每个窗口运行着一个独立的程序。这些程序各自忙着自己的事情,互不干扰。但有时候,它们需要交流,比如一个窗口算出来的数据,另一个窗口要拿来展示。这种程序之间的交流,就叫做进程间通信(Inter-Process Communication,简称IPC)。

为什么要用IPC呢?很简单,因为每个进程都有自己的内存空间,它们之间默认是隔离的。如果你想让它们共享数据,就必须通过某种方式打破这种隔离。

二、multiprocessing库的共享内存:小试牛刀

multiprocessing库提供了一些工具,可以让我们在进程之间共享内存。最常用的就是ValueArray

  • Value:共享单个值

    Value允许我们在多个进程之间共享一个简单的数据类型,比如整数、浮点数等。

    from multiprocessing import Process, Value
    import time
    
    def increase(number, num_increases):
        for _ in range(num_increases):
            time.sleep(0.01) # 模拟一些耗时操作,避免进程太快结束
            number.value += 1
    
    if __name__ == '__main__':
        shared_number = Value('i', 0)  # 'i'表示整数类型,初始值为0
        num_processes = 3
        num_increases = 100
    
        processes = []
        for _ in range(num_processes):
            p = Process(target=increase, args=(shared_number, num_increases))
            processes.append(p)
            p.start()
    
        for p in processes:
            p.join()
    
        print(f"最终的共享数值:{shared_number.value}") # 期望结果是 300

    在这个例子中,我们创建了一个Value对象shared_number,类型是整数,初始值为0。然后,我们启动了3个进程,每个进程都将shared_number的值增加100次。最后,我们打印出shared_number的值。理论上,结果应该是300。

    注意事项:

    • Value的第一个参数是类型代码,常用的有:

      • 'b': signed char
      • 'B': unsigned char
      • 'h': signed short
      • 'H': unsigned short
      • 'i': signed int
      • 'I': unsigned int
      • 'l': signed long
      • 'L': unsigned long
      • 'f': float
      • 'd': double
    • 由于多个进程同时修改共享变量可能会导致竞争条件(Race Condition),因此通常需要配合锁(Lock)来保证数据的一致性。后面会讲到锁。

  • Array:共享数组

    Array允许我们在多个进程之间共享一个数组。

    from multiprocessing import Process, Array
    import time
    
    def modify_array(shared_array, index, value):
        time.sleep(0.01) # 模拟一些耗时操作
        shared_array[index] = value
    
    if __name__ == '__main__':
        shared_array = Array('i', [0, 0, 0, 0, 0])  # 创建一个包含5个整数的数组,初始值都是0
        num_processes = 5
    
        processes = []
        for i in range(num_processes):
            p = Process(target=modify_array, args=(shared_array, i, i * 10))
            processes.append(p)
            p.start()
    
        for p in processes:
            p.join()
    
        print(f"最终的共享数组:{list(shared_array)}") # 期望结果是 [0, 10, 20, 30, 40]

    在这个例子中,我们创建了一个Array对象shared_array,类型是整数数组,包含5个元素,初始值都是0。然后,我们启动了5个进程,每个进程都将数组的对应位置修改为自己的索引乘以10。最后,我们打印出shared_array的值。理论上,结果应该是[0, 10, 20, 30, 40]

    注意事项:

    • Array的第一个参数也是类型代码,和Value一样。
    • Array的第二个参数是数组的大小或初始值。如果只提供大小,则数组会被初始化为0。

三、锁(Lock):保护共享资源

正如前面提到的,多个进程同时修改共享变量可能会导致竞争条件。为了避免这种情况,我们需要使用锁。锁可以保证在同一时刻只有一个进程可以访问共享资源。

from multiprocessing import Process, Value, Lock
import time

def increase_with_lock(number, num_increases, lock):
    for _ in range(num_increases):
        time.sleep(0.01)
        with lock:  # 获取锁,执行完代码块后自动释放锁
            number.value += 1

if __name__ == '__main__':
    shared_number = Value('i', 0)
    lock = Lock()  # 创建一个锁对象
    num_processes = 3
    num_increases = 100

    processes = []
    for _ in range(num_processes):
        p = Process(target=increase_with_lock, args=(shared_number, num_increases, lock))
        processes.append(p)
        p.start()

    for p in processes:
        p.join()

    print(f"使用锁之后的最终共享数值:{shared_number.value}") # 期望结果是 300

在这个例子中,我们创建了一个Lock对象lock。在increase_with_lock函数中,我们使用with lock:语句来获取锁。这意味着只有当一个进程获取到锁之后,才能执行number.value += 1这行代码。当with语句块执行完毕后,锁会自动释放,让其他进程有机会获取锁。

四、Manager对象:更强大的共享工具

ValueArray虽然简单易用,但它们只能共享简单的数据类型。如果我们需要共享更复杂的数据结构,比如列表、字典、自定义对象等,就需要使用Manager对象。

Manager对象提供了一种创建可以在多个进程之间共享的Python对象的方式。它实际上启动了一个服务器进程,并将共享对象存储在这个服务器进程中。其他进程可以通过代理对象来访问和修改这些共享对象。

from multiprocessing import Process, Manager
import time

def modify_list(shared_list, process_id):
    time.sleep(0.01)
    shared_list.append(process_id)  # 每个进程将自己的ID添加到列表中

if __name__ == '__main__':
    with Manager() as manager: # 使用 Manager 创建共享对象
        shared_list = manager.list()  # 创建一个共享列表
        num_processes = 5

        processes = []
        for i in range(num_processes):
            p = Process(target=modify_list, args=(shared_list, i))
            processes.append(p)
            p.start()

        for p in processes:
            p.join()

        print(f"使用 Manager 共享的列表:{shared_list}") # 期望结果是 [0, 1, 2, 3, 4] (顺序可能不同)

在这个例子中,我们使用Manager()创建了一个管理器对象,然后使用manager.list()创建了一个共享列表shared_list。每个进程都将自己的ID添加到这个列表中。最后,我们打印出shared_list的值。

Manager对象可以创建的共享对象类型:

对象类型 说明
list() 创建一个共享的列表
dict() 创建一个共享的字典
Namespace() 创建一个简单的共享对象,可以设置任意属性
Lock() 创建一个共享的锁对象
RLock() 创建一个可重入的锁对象
Semaphore() 创建一个共享的信号量对象
BoundedSemaphore() 创建一个有界信号量对象
Condition() 创建一个共享的条件变量对象
Event() 创建一个共享的事件对象
Queue() 创建一个共享的队列对象
JoinableQueue() 创建一个可连接的队列对象
Pool() 创建一个进程池,方便管理多个进程,注意使用 manager.Pool()创建

五、Manager.Namespace():共享任意属性

Manager.Namespace()提供了一种创建具有任意属性的共享对象的方式。这对于共享一些配置信息或者状态非常有用。

from multiprocessing import Process, Manager
import time

def update_namespace(namespace, process_id):
    time.sleep(0.01)
    namespace.process_id = process_id  # 每个进程设置自己的ID
    namespace.counter += 1 # 增加计数器

if __name__ == '__main__':
    with Manager() as manager:
        namespace = manager.Namespace()
        namespace.process_id = -1  # 初始值
        namespace.counter = 0 # 初始值

        num_processes = 5

        processes = []
        for i in range(num_processes):
            p = Process(target=update_namespace, args=(namespace, i))
            processes.append(p)
            p.start()

        for p in processes:
            p.join()

        print(f"Namespace 的 process_id: {namespace.process_id}") # 最后一个进程的ID
        print(f"Namespace 的 counter: {namespace.counter}") # 期望结果是 5

在这个例子中,我们创建了一个Namespace对象namespace,并设置了两个属性:process_idcounter。每个进程都将process_id设置为自己的ID,并将counter增加1。最后,我们打印出namespace的这两个属性的值。

六、共享队列(Queue):进程间消息传递

Manager.Queue()提供了一种在进程之间传递消息的方式。一个进程可以将消息放入队列,另一个进程可以从队列中取出消息。

from multiprocessing import Process, Manager
import time

def producer(queue):
    for i in range(5):
        time.sleep(0.01)
        queue.put(f"Message {i} from Producer") # 生产者放入消息
        print(f"Producer sent: Message {i} from Producer")

def consumer(queue):
    for _ in range(5):
        message = queue.get() # 消费者取出消息
        print(f"Consumer received: {message}")

if __name__ == '__main__':
    with Manager() as manager:
        queue = manager.Queue() # 创建共享队列

        p1 = Process(target=producer, args=(queue,))
        p2 = Process(target=consumer, args=(queue,))

        p1.start()
        p2.start()

        p1.join()
        p2.join()

    print("Done!")

在这个例子中,我们创建了一个共享队列queueproducer进程将消息放入队列,consumer进程从队列中取出消息。

七、JoinableQueue():让消费者知道生产者已经完成

JoinableQueue()Queue()的一个变种,它提供了一种让消费者知道生产者已经完成的方式。生产者在完成所有任务后,需要调用task_done()方法来告诉队列任务已经完成。消费者在调用join()方法后,会阻塞,直到队列中的所有任务都完成。

from multiprocessing import Process, Manager
import time

def producer_joinable(queue):
    for i in range(5):
        time.sleep(0.01)
        queue.put(f"Message {i} from Producer")
        print(f"Producer sent: Message {i} from Producer")
    for _ in range(5): # 必须调用 task_done() 与 put() 的次数相同
        queue.task_done() # 告诉队列任务已经完成

def consumer_joinable(queue):
    for _ in range(5):
        message = queue.get()
        print(f"Consumer received: {message}")
        time.sleep(0.01)
        queue.task_done() # 标记任务完成
    queue.join() # 阻塞直到所有任务完成

if __name__ == '__main__':
    with Manager() as manager:
        queue = manager.JoinableQueue() # 创建 JoinableQueue

        p1 = Process(target=producer_joinable, args=(queue,))
        p2 = Process(target=consumer_joinable, args=(queue,))

        p1.start()
        p2.start()

        p1.join()
        #p2.join() #消费者自己join

    print("Done!")

注意,在生产者中,每个put()对应一个task_done()。在消费者中,每次处理完一个任务后,也需要调用task_done()。最后,消费者调用join()方法,等待所有任务完成。

八、Pool() with Manager():共享状态的进程池

虽然通常 multiprocessing.Pool() 不需要 Manager(),因为其返回值可以直接获取,但如果需要在进程池中的各个进程间共享状态,那么结合 Manager() 使用 Pool() 就很有意义。

from multiprocessing import Process, Manager, Pool
import time

def worker_pool(index, shared_list):
    time.sleep(0.01)
    shared_list.append(f"Worker {index} added this")
    return f"Result from worker {index}"

if __name__ == '__main__':
    with Manager() as manager:
        shared_list = manager.list()

        with Pool(processes=4) as pool:
            results = pool.starmap(worker_pool, [(i, shared_list) for i in range(5)])

        print("Shared List:", shared_list)
        print("Results from pool:", results)

在这个例子中,我们使用 manager.list() 创建了一个共享列表,然后使用 pool.starmap() 将任务分配给进程池中的进程。每个进程都将自己的消息添加到共享列表中。

九、总结和注意事项

multiprocessing库的共享内存和Manager对象为我们提供了强大的进程间通信工具。我们可以使用ValueArray共享简单的数据类型,使用Manager对象共享复杂的数据结构,使用Lock来保护共享资源,使用Queue进行消息传递。

一些重要的注意事项:

  • 性能: 使用Manager对象进行共享数据通常比使用ValueArray慢,因为Manager对象需要通过网络进行通信。因此,如果只需要共享简单的数据类型,并且对性能要求较高,建议使用ValueArray
  • 死锁: 在使用锁时,要避免死锁的发生。死锁是指两个或多个进程互相等待对方释放资源,导致所有进程都无法继续执行的情况。
  • 数据一致性: 在多个进程同时修改共享数据时,要保证数据的一致性。可以使用锁、信号量等同步机制来避免竞争条件。
  • 异常处理: 在多进程程序中,异常处理非常重要。如果一个进程发生异常,可能会导致整个程序崩溃。因此,要在每个进程中都进行适当的异常处理。

总的来说,multiprocessing库为我们提供了很多强大的工具,可以让我们编写高效的并行程序。但是,在使用这些工具时,也要注意一些潜在的问题,并采取相应的措施来避免这些问题。 希望今天的分享对大家有所帮助! 谢谢大家!

发表回复

您的邮箱地址不会被公开。 必填项已用 * 标注