咳咳,各位观众老爷,老衲掐指一算,今日宜讲“Python的queue模块:在多线程和多进程间进行安全通信”。 准备好瓜子花生小板凳,咱们这就开唠!
开场白:线程和进程那点事儿
话说江湖上,并行计算这玩意儿,那是相当的吃香。你想啊,同样的时间,别人吭哧吭哧跑一个程序,你这边八个核一起上,那效率,简直就是火箭发射!
要实现并行计算,咱们得先搞清楚两个概念:线程和进程。
- 进程 (Process): 进程就像一个独立的王国,有自己的领土(内存空间),自己的军队(系统资源)。进程之间要想交流,那得通过复杂的边境贸易(进程间通信,IPC)。
- 线程 (Thread): 线程就像王国里的不同部门,共享同一片领土(进程的内存空间),协同工作。线程之间的交流方便多了,直接内部开会就行。
但是!问题来了。线程虽然交流方便,但是也容易出事儿。你想啊,如果两个部门同时修改一个文件,那肯定乱套。这就是著名的“线程安全问题”。
进程呢?虽然安全,但是交流成本高啊!这就好比两个国家谈生意,那得签字画押,层层审批,效率不高。
所以,我们需要一种既安全又高效的通信方式。 铛铛铛铛! queue
模块闪亮登场!
queue
模块:消息队列小能手
queue
模块,顾名思义,就是队列。它提供了一种线程和进程安全的队列数据结构,可以用来在多个线程或进程之间传递消息。
你可以把queue
想象成一个邮局。线程或进程可以把消息(数据)放到邮局(队列)里,然后其他的线程或进程就可以从邮局里取出消息。
queue
模块主要提供了以下几种队列:
queue.Queue
: 先进先出 (FIFO) 队列。就像排队买票,先来的先买。queue.LifoQueue
: 后进先出 (LIFO) 队列。 就像堆盘子,后放的先拿。queue.PriorityQueue
: 优先级队列。 就像医院急诊,病情重的先看。
queue.Queue
:最常用的队列
咱们先从最常用的queue.Queue
开始说起。
基本用法
import queue
import threading
import time
# 创建一个队列
q = queue.Queue()
# 往队列里放东西
q.put(1)
q.put('hello')
q.put([1, 2, 3])
# 从队列里取东西
item1 = q.get()
item2 = q.get()
item3 = q.get()
print(item1, item2, item3) # 输出:1 hello [1, 2, 3]
这段代码很简单,就是往队列里放了几个不同类型的数据,然后又取出来。
阻塞式操作
queue
模块的put()
和get()
方法默认是阻塞式的。 啥意思呢?
put()
: 如果队列满了,put()
方法会一直阻塞,直到队列有空位。get()
: 如果队列空了,get()
方法会一直阻塞,直到队列有新的数据。
这种阻塞式的操作,非常适合多线程和多进程的同步。
import queue
import threading
import time
q = queue.Queue(maxsize=3) # 创建一个最大容量为3的队列
def producer():
for i in range(5):
print(f"Producer: Putting {i} into queue...")
q.put(i)
print(f"Producer: Put {i} into queue.")
time.sleep(1) # 模拟生产过程
def consumer():
while True:
print("Consumer: Waiting for item from queue...")
item = q.get()
print(f"Consumer: Got {item} from queue.")
q.task_done() #告知queue,已经完成了该任务
time.sleep(2) # 模拟消费过程
# 创建生产者和消费者线程
producer_thread = threading.Thread(target=producer)
consumer_thread = threading.Thread(target=consumer)
# 启动线程
producer_thread.start()
consumer_thread.start()
# 等待生产者完成
producer_thread.join()
# 等待所有任务完成,然后再退出程序,否则可能程序会提前退出
q.join()
print("All tasks are done!")
在这个例子中,producer()
函数往队列里放数据,consumer()
函数从队列里取数据。 由于队列的容量是3,所以当队列满了之后,producer()
函数会被阻塞,直到consumer()
函数取走数据。 q.task_done()
和q.join()
是配合使用的,用于确保所有任务都完成后程序再退出。
非阻塞式操作
如果你不想让put()
和get()
方法阻塞,可以设置block
参数为False
。
import queue
q = queue.Queue(maxsize=3)
try:
q.put(4, block=False) # 如果队列满了,会抛出queue.Full异常
except queue.Full:
print("Queue is full!")
try:
item = q.get(block=False) # 如果队列空了,会抛出queue.Empty异常
except queue.Empty:
print("Queue is empty!")
这种非阻塞式操作,适用于需要快速响应的场景。
超时设置
你还可以给put()
和get()
方法设置超时时间。 如果在指定时间内,队列仍然满或空,就会抛出异常。
import queue
import time
q = queue.Queue(maxsize=1)
try:
q.put(1, timeout=2) # 尝试在2秒内放入数据
q.put(2, timeout=2) # 尝试在2秒内放入数据,如果队列满了,会抛出queue.Full异常
except queue.Full:
print("Queue is full after 2 seconds!")
try:
item = q.get(timeout=2) # 尝试在2秒内取出数据
item = q.get(timeout=2) # 尝试在2秒内取出数据,如果队列空了,会抛出queue.Empty异常
except queue.Empty:
print("Queue is empty after 2 seconds!")
实用案例:多线程爬虫
咱们来一个实际点的例子:多线程爬虫。
import queue
import threading
import requests
import time
# 待爬取的URL队列
url_queue = queue.Queue()
# 已爬取的URL集合
visited_urls = set()
# 爬虫线程数
NUM_THREADS = 5
# 初始URL
start_url = "https://www.example.com"
# 爬取函数
def crawl(url):
try:
response = requests.get(url, timeout=5)
response.raise_for_status() # 检查HTTP状态码
print(f"Crawled: {url}")
return response.text
except requests.exceptions.RequestException as e:
print(f"Error crawling {url}: {e}")
return None
# 解析函数 (简单示例,只提取链接)
def parse(html):
if html:
# 这里应该用更强大的HTML解析库,如BeautifulSoup
# 这里仅仅是简单示例
links = []
# 假设链接都在<a>标签里,href属性包含链接
start = 0
while True:
start = html.find('<a href="', start)
if start == -1:
break
start += len('<a href="')
end = html.find('"', start)
if end == -1:
break
link = html[start:end]
# 处理相对链接
if not link.startswith('http'):
from urllib.parse import urljoin
link = urljoin(start_url, link)
links.append(link)
start = end + 1
return links
return []
# 爬虫线程
def worker():
while True:
try:
url = url_queue.get(timeout=5) # 从队列中获取URL,设置超时时间
if url not in visited_urls:
visited_urls.add(url)
html = crawl(url)
links = parse(html)
for link in links:
url_queue.put(link) #将提取到的链接放入队列中
url_queue.task_done() # 标记该任务完成
except queue.Empty:
break # 队列为空,线程退出
except Exception as e:
print(f"An error occurred: {e}")
url_queue.task_done() #确保即使出现错误,任务也会被标记为完成
# 主函数
def main():
# 将初始URL放入队列
url_queue.put(start_url)
# 创建并启动爬虫线程
threads = []
for _ in range(NUM_THREADS):
t = threading.Thread(target=worker)
threads.append(t)
t.start()
# 等待所有任务完成
url_queue.join()
# 等待所有线程完成
for t in threads:
t.join()
print("Crawling finished!")
if __name__ == "__main__":
start_time = time.time()
main()
end_time = time.time()
print(f"Total time taken: {end_time - start_time:.2f} seconds")
这个例子里,我们创建了一个url_queue
队列,用于存放待爬取的URL。 然后创建了多个爬虫线程,每个线程都从队列里取出URL进行爬取,并把爬取到的新URL放入队列。
这个例子演示了queue.Queue
在多线程环境下的应用。由于queue.Queue
是线程安全的,所以多个线程可以同时往队列里放数据和取数据,而不用担心数据竞争的问题。
queue.LifoQueue
和queue.PriorityQueue
queue.LifoQueue
和queue.PriorityQueue
的用法和queue.Queue
类似,只是队列的顺序不同。
queue.LifoQueue
: 后进先出。queue.PriorityQueue
: 优先级队列。 放入队列的元素必须是可比较的。 通常,我们会放入一个元组,第一个元素是优先级(数字越小,优先级越高),第二个元素是数据。
import queue
# LifoQueue
lifo_queue = queue.LifoQueue()
lifo_queue.put(1)
lifo_queue.put(2)
lifo_queue.put(3)
print(lifo_queue.get()) # 输出:3
print(lifo_queue.get()) # 输出:2
print(lifo_queue.get()) # 输出:1
# PriorityQueue
priority_queue = queue.PriorityQueue()
priority_queue.put((3, 'low'))
priority_queue.put((1, 'high'))
priority_queue.put((2, 'medium'))
print(priority_queue.get()) # 输出:(1, 'high')
print(priority_queue.get()) # 输出:(2, 'medium')
print(priority_queue.get()) # 输出:(3, 'low')
多进程通信:multiprocessing.Queue
上面的queue
模块主要用于多线程。 如果要在多进程之间通信,需要使用multiprocessing.Queue
。
multiprocessing.Queue
的用法和queue.Queue
基本一样,但是它可以在多个进程之间共享数据。
import multiprocessing
import time
def worker(q):
while True:
item = q.get()
if item is None: # 哨兵值,用于结束进程
break
print(f"Process {multiprocessing.current_process().name}: Got {item}")
time.sleep(1)
if __name__ == '__main__':
q = multiprocessing.Queue()
# 创建多个进程
processes = []
for i in range(3):
p = multiprocessing.Process(target=worker, args=(q,), name=f"Worker-{i}")
processes.append(p)
p.start()
# 往队列里放数据
for i in range(10):
q.put(i)
time.sleep(0.5)
# 发送哨兵值,结束进程
for _ in range(3):
q.put(None)
# 等待所有进程结束
for p in processes:
p.join()
print("All processes finished!")
在这个例子中,我们创建了一个multiprocessing.Queue
队列,然后在多个进程中共享这个队列。 主进程往队列里放数据,子进程从队列里取数据。 注意,我们需要使用“哨兵值”(在这个例子里是None
)来告诉子进程结束。 这是因为multiprocessing.Queue
的get()
方法会一直阻塞,直到队列有新的数据。 如果没有哨兵值,子进程会一直阻塞下去。
注意事项
multiprocessing.Queue
在Windows系统上只能传递可序列化的对象。 这意味着,你不能传递函数、类实例等复杂对象。- 使用
multiprocessing.Queue
进行进程间通信时,需要注意死锁问题。 避免循环等待的情况。
总结
queue
模块是Python中一个非常强大的工具,可以用来在多线程和多进程之间进行安全通信。 掌握queue
模块的用法,可以让你编写出更加高效、稳定的并发程序。
特性 | queue.Queue |
multiprocessing.Queue |
---|---|---|
适用场景 | 多线程 | 多进程 |
安全性 | 线程安全 | 进程安全 |
数据传递 | 共享内存 | 序列化/反序列化 |
跨平台兼容性 | 良好 | Windows有限制 |
对象传递限制 | 无 | 仅可序列化对象 |
好了,今天的讲座就到这里。 希望各位观众老爷有所收获。 如果觉得有用,记得点赞收藏转发哦! 咱们下期再见!