Python高级技术之:`Python`的`multiprocessing`模块:进程间通信的实现方式:`Pipe`和`Queue`。

各位观众老爷们,大家好!今天咱们来聊聊Python多进程里头,进程间通信那点儿事儿。别害怕,听起来高大上,其实就是让不同进程之间能互相“唠嗑”,传递点儿信息。这年头,单打独斗不行,得团队合作,进程也一样!咱们主要讲Pipe(管道)和Queue(队列)这俩哥们儿。

开场白:为什么需要进程间通信?

想想,为啥我们需要让进程之间能说话?一个进程算不完的事情,拆给多个进程一起算,算完了总得汇总一下吧?就像古代打仗,侦察兵侦察完敌情,总得跟将军汇报吧?没汇报,将军瞎指挥,那不完犊子了么!

多进程可以充分利用多核CPU的优势,提高程序的运行效率。但是,每个进程都有自己独立的内存空间,数据不能直接共享。所以,进程间通信(IPC,Inter-Process Communication)就成了必不可少的环节。

第一幕:Pipe(管道)—— 简单的单向交流

Pipe,顾名思义,就是一根管子。这管子是单向的,一头进,另一头出。形象一点儿说,就像你家厨房的下水道,脏水从水槽流进去,哗啦啦地从另一头流走了。

1. Pipe的基本用法

multiprocessing.Pipe()会返回一对连接对象,分别代表管道的两端。通常,一个进程用一个端点发送数据,另一个进程用另一个端点接收数据。

from multiprocessing import Process, Pipe

def worker(conn):
    conn.send("Hello from worker!")
    conn.close()

if __name__ == '__main__':
    parent_conn, child_conn = Pipe()  # 创建管道
    p = Process(target=worker, args=(child_conn,))  # 创建子进程
    p.start()
    print("Parent received:", parent_conn.recv())  # 接收数据
    p.join()

这段代码里,Pipe()返回了parent_connchild_conn两个连接对象,分别是管道的父端和子端。父进程用parent_conn接收数据,子进程用child_conn发送数据。

2. Pipe的特性

  • 单向性: 数据只能从一个端点流向另一个端点。
  • 双端性: 每个Pipe()创建了两个端点,可以实现两个进程之间的通信。
  • 阻塞性: recv()方法在没有数据可接收时会阻塞,直到有数据到达。

3. Pipe的适用场景

Pipe适用于简单的、单向的数据传输场景。比如,一个进程产生数据,另一个进程消费数据,就像生产者-消费者模型。

4. Pipe的进阶用法:双向通信

虽然Pipe本质上是单向的,但我们可以创建两个Pipe来实现双向通信。

from multiprocessing import Process, Pipe

def worker(conn1, conn2):
    conn1.send("Hello from worker!")
    print("Worker received:", conn2.recv())
    conn1.close()
    conn2.close()

if __name__ == '__main__':
    parent_conn, child_conn = Pipe()
    child_conn2, parent_conn2 = Pipe()
    p = Process(target=worker, args=(child_conn, parent_conn))
    p.start()
    print("Parent received:", parent_conn2.recv())
    parent_conn2.send("Hello from parent!")
    p.join()

这里,我们创建了两个Pipeparent_connchild_conn,以及child_conn2parent_conn2。子进程用child_conn发送数据给父进程,用parent_conn接收数据;父进程用parent_conn2发送数据给子进程,用child_conn2接收数据。这样就实现了双向通信。

第二幕:Queue(队列)—— 更强大的数据传递

Queue,也就是队列,就像你去银行排队取钱。先来的先办理,后来的排在后面。multiprocessing.Queue提供了一个线程和进程安全的队列,可以用于多个进程之间的数据传递。

1. Queue的基本用法

from multiprocessing import Process, Queue

def worker(q):
    q.put("Hello from worker!")

if __name__ == '__main__':
    q = Queue()  # 创建队列
    p = Process(target=worker, args=(q,))
    p.start()
    print("Parent received:", q.get())  # 从队列中获取数据
    p.join()

这段代码里,我们创建了一个Queue对象q。子进程用q.put()方法将数据放入队列,父进程用q.get()方法从队列中获取数据。

2. Queue的特性

  • 线程和进程安全: 多个进程可以同时访问同一个Queue对象,而不会发生数据竞争。
  • 先进先出(FIFO): 队列中的数据按照放入的顺序取出。
  • 阻塞性: get()方法在队列为空时会阻塞,直到有数据放入;put()方法在队列满时会阻塞,直到有空间可用。

3. Queue的适用场景

Queue适用于需要多个进程之间进行数据共享的场景,比如:

  • 生产者-消费者模型: 多个生产者进程产生数据,多个消费者进程消费数据。
  • 任务分配: 一个主进程将任务放入队列,多个工作进程从队列中获取任务并执行。

4. Queue的进阶用法

  • 设置队列大小: 可以通过Queue(maxsize=N)来限制队列的大小,防止内存溢出。
  • 非阻塞模式: 可以使用get(block=False)put(block=False)来设置非阻塞模式。如果队列为空或满,会抛出EmptyFull异常。
  • 超时: 可以使用get(timeout=T)put(timeout=T)来设置超时时间。如果在指定的时间内无法获取或放入数据,会抛出EmptyFull异常。

5. Queue的坑:进程退出后的数据丢失

在使用Queue时,需要注意一个坑:如果一个进程在Queue中放入了数据,但在退出前没有调用q.close()q.join_thread(),那么这些数据可能会丢失。

from multiprocessing import Process, Queue
import time

def worker(q):
    for i in range(5):
        q.put(i)
        time.sleep(0.1)  # 模拟耗时操作
    print("Worker finished putting data.")
    # q.close()  # 注释掉这行和下一行,看看会发生什么
    # q.join_thread()

if __name__ == '__main__':
    q = Queue()
    p = Process(target=worker, args=(q,))
    p.start()
    time.sleep(1)  # 等待子进程放入一些数据
    print("Parent is getting data...")
    while not q.empty():
        print("Got:", q.get())
    p.join()
    print("Parent finished.")

运行这段代码,你会发现父进程可能只能获取到一部分数据,甚至一个数据都获取不到。这是因为子进程在退出时,Queue内部的缓冲区可能还没有将数据刷新到磁盘上。

解决这个问题的方法就是在子进程退出前调用q.close()q.join_thread()q.close()会关闭队列,防止继续放入数据;q.join_thread()会等待队列中的所有数据都被消费完毕。

第三幕:Pipe vs Queue: 选哪个?

PipeQueue都是进程间通信的常用工具,但它们各有优缺点。那么,在实际应用中,我们应该选择哪个呢?

特性 Pipe Queue
通信方式 单向或通过创建多个Pipe实现双向 双向
数据结构 无特定数据结构,直接传递字节流或对象 队列(FIFO)
线程安全 需要手动加锁保证线程安全 线程和进程安全
适用场景 简单的单向数据传输,或者需要手动控制数据流 复杂的数据共享,生产者-消费者模型,任务分配
易用性 相对简单,但双向通信较复杂 更加易用
性能 在简单场景下可能略优于Queue 在复杂场景下性能更好

总结一下:

  • 如果只需要简单的单向数据传输,并且对性能有较高要求,可以选择Pipe
  • 如果需要多个进程之间进行复杂的数据共享,或者需要实现生产者-消费者模型,可以选择Queue

实战演练:生产者-消费者模型

为了更好地理解Queue的应用,我们来实现一个简单的生产者-消费者模型。

from multiprocessing import Process, Queue
import time
import random

def producer(q):
    for i in range(5):
        data = random.randint(1, 100)
        print("Producer put:", data)
        q.put(data)
        time.sleep(random.random())  # 模拟生产数据的时间

def consumer(q):
    while True:
        data = q.get()
        if data is None:  # 生产者发送None表示结束
            break
        print("Consumer got:", data)
        time.sleep(random.random())  # 模拟消费数据的时间

if __name__ == '__main__':
    q = Queue()
    p = Process(target=producer, args=(q,))
    c = Process(target=consumer, args=(q,))
    p.start()
    c.start()
    p.join()
    q.put(None)  # 生产者发送None表示结束
    c.join()
    print("Finished.")

在这个例子中,生产者进程负责生产数据,并将数据放入队列;消费者进程负责从队列中获取数据并消费。生产者在生产完所有数据后,向队列中放入一个None,表示生产结束。消费者在接收到None后,退出循环。

总结与展望

今天咱们聊了Python多进程中进程间通信的两种重要方式:PipeQueuePipe简单直接,适合单向通信;Queue功能强大,适合复杂的数据共享。

除了PipeQueuemultiprocessing模块还提供了其他的进程间通信方式,比如ValueArrayManager等。这些工具各有特点,可以根据实际需求选择使用。

掌握了进程间通信,你就可以构建更加复杂和高效的多进程应用。希望今天的讲解对你有所帮助!下次有机会再跟大家分享更多Python高级技术。感谢各位的观看!

发表回复

您的邮箱地址不会被公开。 必填项已用 * 标注