Python高级技术之:`Python`的`queue`模块:在多线程和多进程间进行安全通信。

咳咳,各位观众老爷,老衲掐指一算,今日宜讲“Python的queue模块:在多线程和多进程间进行安全通信”。 准备好瓜子花生小板凳,咱们这就开唠!

开场白:线程和进程那点事儿

话说江湖上,并行计算这玩意儿,那是相当的吃香。你想啊,同样的时间,别人吭哧吭哧跑一个程序,你这边八个核一起上,那效率,简直就是火箭发射!

要实现并行计算,咱们得先搞清楚两个概念:线程和进程。

  • 进程 (Process): 进程就像一个独立的王国,有自己的领土(内存空间),自己的军队(系统资源)。进程之间要想交流,那得通过复杂的边境贸易(进程间通信,IPC)。
  • 线程 (Thread): 线程就像王国里的不同部门,共享同一片领土(进程的内存空间),协同工作。线程之间的交流方便多了,直接内部开会就行。

但是!问题来了。线程虽然交流方便,但是也容易出事儿。你想啊,如果两个部门同时修改一个文件,那肯定乱套。这就是著名的“线程安全问题”。

进程呢?虽然安全,但是交流成本高啊!这就好比两个国家谈生意,那得签字画押,层层审批,效率不高。

所以,我们需要一种既安全又高效的通信方式。 铛铛铛铛! queue模块闪亮登场!

queue模块:消息队列小能手

queue模块,顾名思义,就是队列。它提供了一种线程和进程安全的队列数据结构,可以用来在多个线程或进程之间传递消息。

你可以把queue想象成一个邮局。线程或进程可以把消息(数据)放到邮局(队列)里,然后其他的线程或进程就可以从邮局里取出消息。

queue模块主要提供了以下几种队列:

  • queue.Queue: 先进先出 (FIFO) 队列。就像排队买票,先来的先买。
  • queue.LifoQueue: 后进先出 (LIFO) 队列。 就像堆盘子,后放的先拿。
  • queue.PriorityQueue: 优先级队列。 就像医院急诊,病情重的先看。

queue.Queue:最常用的队列

咱们先从最常用的queue.Queue开始说起。

基本用法

import queue
import threading
import time

# 创建一个队列
q = queue.Queue()

# 往队列里放东西
q.put(1)
q.put('hello')
q.put([1, 2, 3])

# 从队列里取东西
item1 = q.get()
item2 = q.get()
item3 = q.get()

print(item1, item2, item3)  # 输出:1 hello [1, 2, 3]

这段代码很简单,就是往队列里放了几个不同类型的数据,然后又取出来。

阻塞式操作

queue模块的put()get()方法默认是阻塞式的。 啥意思呢?

  • put(): 如果队列满了,put()方法会一直阻塞,直到队列有空位。
  • get(): 如果队列空了,get()方法会一直阻塞,直到队列有新的数据。

这种阻塞式的操作,非常适合多线程和多进程的同步。

import queue
import threading
import time

q = queue.Queue(maxsize=3)  # 创建一个最大容量为3的队列

def producer():
    for i in range(5):
        print(f"Producer: Putting {i} into queue...")
        q.put(i)
        print(f"Producer: Put {i} into queue.")
        time.sleep(1)  # 模拟生产过程

def consumer():
    while True:
        print("Consumer: Waiting for item from queue...")
        item = q.get()
        print(f"Consumer: Got {item} from queue.")
        q.task_done() #告知queue,已经完成了该任务
        time.sleep(2)  # 模拟消费过程

# 创建生产者和消费者线程
producer_thread = threading.Thread(target=producer)
consumer_thread = threading.Thread(target=consumer)

# 启动线程
producer_thread.start()
consumer_thread.start()

# 等待生产者完成
producer_thread.join()

# 等待所有任务完成,然后再退出程序,否则可能程序会提前退出
q.join()
print("All tasks are done!")

在这个例子中,producer()函数往队列里放数据,consumer()函数从队列里取数据。 由于队列的容量是3,所以当队列满了之后,producer()函数会被阻塞,直到consumer()函数取走数据。 q.task_done()q.join()是配合使用的,用于确保所有任务都完成后程序再退出。

非阻塞式操作

如果你不想让put()get()方法阻塞,可以设置block参数为False

import queue

q = queue.Queue(maxsize=3)

try:
    q.put(4, block=False)  # 如果队列满了,会抛出queue.Full异常
except queue.Full:
    print("Queue is full!")

try:
    item = q.get(block=False)  # 如果队列空了,会抛出queue.Empty异常
except queue.Empty:
    print("Queue is empty!")

这种非阻塞式操作,适用于需要快速响应的场景。

超时设置

你还可以给put()get()方法设置超时时间。 如果在指定时间内,队列仍然满或空,就会抛出异常。

import queue
import time

q = queue.Queue(maxsize=1)

try:
    q.put(1, timeout=2)  # 尝试在2秒内放入数据
    q.put(2, timeout=2)  # 尝试在2秒内放入数据,如果队列满了,会抛出queue.Full异常
except queue.Full:
    print("Queue is full after 2 seconds!")

try:
    item = q.get(timeout=2)  # 尝试在2秒内取出数据
    item = q.get(timeout=2)  # 尝试在2秒内取出数据,如果队列空了,会抛出queue.Empty异常
except queue.Empty:
    print("Queue is empty after 2 seconds!")

实用案例:多线程爬虫

咱们来一个实际点的例子:多线程爬虫。

import queue
import threading
import requests
import time

# 待爬取的URL队列
url_queue = queue.Queue()

# 已爬取的URL集合
visited_urls = set()

# 爬虫线程数
NUM_THREADS = 5

# 初始URL
start_url = "https://www.example.com"

# 爬取函数
def crawl(url):
    try:
        response = requests.get(url, timeout=5)
        response.raise_for_status()  # 检查HTTP状态码
        print(f"Crawled: {url}")
        return response.text
    except requests.exceptions.RequestException as e:
        print(f"Error crawling {url}: {e}")
        return None

# 解析函数 (简单示例,只提取链接)
def parse(html):
    if html:
        # 这里应该用更强大的HTML解析库,如BeautifulSoup
        # 这里仅仅是简单示例
        links = []
        # 假设链接都在<a>标签里,href属性包含链接
        start = 0
        while True:
            start = html.find('<a href="', start)
            if start == -1:
                break
            start += len('<a href="')
            end = html.find('"', start)
            if end == -1:
                break
            link = html[start:end]

            # 处理相对链接
            if not link.startswith('http'):
                from urllib.parse import urljoin
                link = urljoin(start_url, link)
            links.append(link)
            start = end + 1
        return links
    return []

# 爬虫线程
def worker():
    while True:
        try:
            url = url_queue.get(timeout=5)  # 从队列中获取URL,设置超时时间
            if url not in visited_urls:
                visited_urls.add(url)
                html = crawl(url)
                links = parse(html)
                for link in links:
                    url_queue.put(link) #将提取到的链接放入队列中
            url_queue.task_done() # 标记该任务完成
        except queue.Empty:
            break  # 队列为空,线程退出
        except Exception as e:
            print(f"An error occurred: {e}")
            url_queue.task_done() #确保即使出现错误,任务也会被标记为完成

# 主函数
def main():
    # 将初始URL放入队列
    url_queue.put(start_url)

    # 创建并启动爬虫线程
    threads = []
    for _ in range(NUM_THREADS):
        t = threading.Thread(target=worker)
        threads.append(t)
        t.start()

    # 等待所有任务完成
    url_queue.join()

    # 等待所有线程完成
    for t in threads:
        t.join()

    print("Crawling finished!")

if __name__ == "__main__":
    start_time = time.time()
    main()
    end_time = time.time()
    print(f"Total time taken: {end_time - start_time:.2f} seconds")

这个例子里,我们创建了一个url_queue队列,用于存放待爬取的URL。 然后创建了多个爬虫线程,每个线程都从队列里取出URL进行爬取,并把爬取到的新URL放入队列。

这个例子演示了queue.Queue在多线程环境下的应用。由于queue.Queue是线程安全的,所以多个线程可以同时往队列里放数据和取数据,而不用担心数据竞争的问题。

queue.LifoQueuequeue.PriorityQueue

queue.LifoQueuequeue.PriorityQueue的用法和queue.Queue类似,只是队列的顺序不同。

  • queue.LifoQueue: 后进先出。
  • queue.PriorityQueue: 优先级队列。 放入队列的元素必须是可比较的。 通常,我们会放入一个元组,第一个元素是优先级(数字越小,优先级越高),第二个元素是数据。
import queue

# LifoQueue
lifo_queue = queue.LifoQueue()
lifo_queue.put(1)
lifo_queue.put(2)
lifo_queue.put(3)

print(lifo_queue.get())  # 输出:3
print(lifo_queue.get())  # 输出:2
print(lifo_queue.get())  # 输出:1

# PriorityQueue
priority_queue = queue.PriorityQueue()
priority_queue.put((3, 'low'))
priority_queue.put((1, 'high'))
priority_queue.put((2, 'medium'))

print(priority_queue.get())  # 输出:(1, 'high')
print(priority_queue.get())  # 输出:(2, 'medium')
print(priority_queue.get())  # 输出:(3, 'low')

多进程通信:multiprocessing.Queue

上面的queue模块主要用于多线程。 如果要在多进程之间通信,需要使用multiprocessing.Queue

multiprocessing.Queue的用法和queue.Queue基本一样,但是它可以在多个进程之间共享数据。

import multiprocessing
import time

def worker(q):
    while True:
        item = q.get()
        if item is None:  # 哨兵值,用于结束进程
            break
        print(f"Process {multiprocessing.current_process().name}: Got {item}")
        time.sleep(1)

if __name__ == '__main__':
    q = multiprocessing.Queue()

    # 创建多个进程
    processes = []
    for i in range(3):
        p = multiprocessing.Process(target=worker, args=(q,), name=f"Worker-{i}")
        processes.append(p)
        p.start()

    # 往队列里放数据
    for i in range(10):
        q.put(i)
        time.sleep(0.5)

    # 发送哨兵值,结束进程
    for _ in range(3):
        q.put(None)

    # 等待所有进程结束
    for p in processes:
        p.join()

    print("All processes finished!")

在这个例子中,我们创建了一个multiprocessing.Queue队列,然后在多个进程中共享这个队列。 主进程往队列里放数据,子进程从队列里取数据。 注意,我们需要使用“哨兵值”(在这个例子里是None)来告诉子进程结束。 这是因为multiprocessing.Queueget()方法会一直阻塞,直到队列有新的数据。 如果没有哨兵值,子进程会一直阻塞下去。

注意事项

  • multiprocessing.Queue在Windows系统上只能传递可序列化的对象。 这意味着,你不能传递函数、类实例等复杂对象。
  • 使用multiprocessing.Queue进行进程间通信时,需要注意死锁问题。 避免循环等待的情况。

总结

queue模块是Python中一个非常强大的工具,可以用来在多线程和多进程之间进行安全通信。 掌握queue模块的用法,可以让你编写出更加高效、稳定的并发程序。

特性 queue.Queue multiprocessing.Queue
适用场景 多线程 多进程
安全性 线程安全 进程安全
数据传递 共享内存 序列化/反序列化
跨平台兼容性 良好 Windows有限制
对象传递限制 仅可序列化对象

好了,今天的讲座就到这里。 希望各位观众老爷有所收获。 如果觉得有用,记得点赞收藏转发哦! 咱们下期再见!

发表回复

您的邮箱地址不会被公开。 必填项已用 * 标注