各位观众老爷,大家好!我是你们的老朋友,今天咱们来聊聊Python多线程里的“四大金刚”—— Lock
、RLock
、Condition
和 Semaphore
,保证让你们听得懂,用得上,还能笑出声!
第一部分:线程同步,为啥要这么麻烦?
话说,多线程编程就像一群熊孩子在厨房里做饭。每个人都想用锅,都想切菜,一不小心就可能把厨房搞得一团糟,甚至引发“火灾”(数据损坏)。
线程同步,就是给这群熊孩子立规矩,保证他们能有序地使用资源,避免混乱。如果没有这些规矩,你可能会遇到:
- 数据竞争 (Race Condition): 多个线程同时修改同一个数据,结果谁也说不准,就像抢红包,你明明手速快,结果永远抢不到,气不气?
- 死锁 (Deadlock): 几个线程互相等待对方释放资源,谁也不肯让步,结果大家都卡死了,就像俩人同时进一个门,谁也不让谁,最后谁也进不去。
所以,线程同步非常重要!
第二部分:主角登场!Lock
、RLock
、Condition
、Semaphore
接下来,咱们逐一介绍这“四大金刚”。
1. Lock
(互斥锁):
Lock
是最简单粗暴的家伙,就像一个“厕所门上的锁”。 谁先抢到锁,谁就拥有了使用资源的权利,用完之后必须释放锁,其他人才能使用。
- 功能: 保证同一时刻只有一个线程可以访问被保护的资源。
- 使用场景: 保护共享数据,避免数据竞争。
import threading
import time
# 全局变量,多个线程共享
counter = 0
# 创建一个锁
lock = threading.Lock()
def increment():
global counter
for _ in range(100000):
# 获取锁
lock.acquire()
try:
counter += 1
finally:
# 释放锁,必须放在finally里,确保一定会被执行
lock.release()
def decrement():
global counter
for _ in range(100000):
lock.acquire()
try:
counter -= 1
finally:
lock.release()
# 创建两个线程
t1 = threading.Thread(target=increment)
t2 = threading.Thread(target=decrement)
# 启动线程
t1.start()
t2.start()
# 等待线程结束
t1.join()
t2.join()
print(f"最终结果: {counter}") # 结果应该接近0,但由于线程切换,可能不是精确的0
代码解释:
lock = threading.Lock()
创建一个锁对象。lock.acquire()
尝试获取锁,如果锁已经被其他线程占用,则阻塞,直到锁被释放。lock.release()
释放锁,让其他线程可以获取。finally
语句块保证无论try
语句块中发生什么,锁都会被释放,避免死锁。
注意事项:
- 必须成对使用
acquire()
和release()
,否则可能会导致死锁。 Lock
不可重入,同一个线程不能连续acquire()
多次,否则也会死锁。
2. RLock
(可重入锁):
RLock
是 Lock
的升级版,它允许同一个线程多次 acquire()
同一个锁,而不会死锁。 就像一个“自家厕所的锁”,你进了第一次,还可以在里面继续锁门,不用担心把自己锁死。
- 功能: 允许同一个线程多次获取锁,只有当该线程释放了所有
acquire()
的锁,其他线程才能获取。 - 使用场景: 递归函数中使用锁,避免死锁。
import threading
# 创建一个可重入锁
rlock = threading.RLock()
def recursive_function(n):
rlock.acquire()
try:
if n > 0:
print(f"线程 {threading.current_thread().name} 正在执行: {n}")
recursive_function(n - 1)
else:
print(f"线程 {threading.current_thread().name} 到达底部")
finally:
rlock.release()
# 创建一个线程
t = threading.Thread(target=recursive_function, args=(3,))
# 启动线程
t.start()
# 等待线程结束
t.join()
代码解释:
rlock = threading.RLock()
创建一个可重入锁对象。recursive_function()
是一个递归函数,每次递归都会acquire()
锁。- 由于
RLock
的可重入性,同一个线程可以多次acquire()
锁而不会死锁。
注意事项:
RLock
内部维护了一个计数器,记录acquire()
的次数。- 必须
release()
相同次数才能真正释放锁。
3. Condition
(条件变量):
Condition
是一个更高级的同步原语,它允许线程在满足特定条件时才继续执行。 就像一个“闹钟”,线程可以等待某个条件成立,然后被唤醒。
- 功能: 允许线程等待特定条件,并在条件满足时被唤醒。
- 使用场景: 生产者-消费者模型,线程池等。
import threading
import time
# 创建一个条件变量
condition = threading.Condition()
# 共享资源
resource = []
def producer():
global resource
for i in range(5):
condition.acquire() # 需要先获取锁
resource.append(i)
print(f"生产者生产了: {i}")
condition.notify() # 唤醒一个等待的线程
condition.release() # 释放锁
time.sleep(1)
def consumer():
global resource
for _ in range(5):
condition.acquire()
while not resource: # 循环判断条件,防止虚假唤醒
print("消费者等待...")
condition.wait() # 释放锁并等待
item = resource.pop(0)
print(f"消费者消费了: {item}")
condition.release()
time.sleep(2)
# 创建生产者和消费者线程
t1 = threading.Thread(target=producer)
t2 = threading.Thread(target=consumer)
# 启动线程
t1.start()
t2.start()
# 等待线程结束
t1.join()
t2.join()
代码解释:
condition = threading.Condition()
创建一个条件变量对象。condition.acquire()
获取锁,Condition
必须先获取锁才能使用。condition.wait()
释放锁并等待,直到其他线程调用notify()
或notify_all()
唤醒它。condition.notify()
唤醒一个等待的线程。condition.notify_all()
唤醒所有等待的线程。- 重要:
wait()
必须在while
循环中调用,防止虚假唤醒 (spurious wakeup)。 虚假唤醒是指线程被意外唤醒,但条件仍然不满足。
注意事项:
Condition
必须与锁一起使用,通常是Lock
或RLock
。wait()
会自动释放锁,并在被唤醒后重新获取锁。notify()
和notify_all()
只是唤醒线程,并不保证线程立即执行,还需要竞争锁。
4. Semaphore
(信号量):
Semaphore
用于控制对共享资源的并发访问数量。 就像一个“停车场的停车位”,只有当有空位时,车辆才能进入。
- 功能: 限制对共享资源的并发访问数量。
- 使用场景: 连接池,限制同时下载文件的数量等。
import threading
import time
# 创建一个信号量,最多允许3个线程同时访问
semaphore = threading.Semaphore(3)
def worker(i):
semaphore.acquire()
try:
print(f"线程 {i} 正在工作...")
time.sleep(2)
print(f"线程 {i} 完成工作...")
finally:
semaphore.release()
# 创建多个线程
threads = []
for i in range(5):
t = threading.Thread(target=worker, args=(i,))
threads.append(t)
t.start()
# 等待线程结束
for t in threads:
t.join()
代码解释:
semaphore = threading.Semaphore(3)
创建一个信号量对象,初始值为 3。semaphore.acquire()
尝试获取信号量,如果信号量的值大于 0,则减 1 并继续执行;否则,阻塞,直到信号量的值大于 0。semaphore.release()
释放信号量,将信号量的值加 1。
注意事项:
- 信号量的值可以大于 1,表示允许多个线程同时访问共享资源。
acquire()
和release()
的次数必须匹配,否则可能会导致资源泄漏或死锁。
第三部分:四大金刚的对比与选择
为了方便大家理解,我把这四个同步原语的特点总结成一个表格:
特性 | Lock |
RLock |
Condition |
Semaphore |
---|---|---|---|---|
基本功能 | 互斥访问 | 可重入互斥 | 条件等待与通知 | 限制并发访问数量 |
是否可重入 | 否 | 是 | 否 | 否 |
使用场景 | 简单资源保护 | 递归函数 | 生产者-消费者,线程池 | 连接池,限流 |
必须与锁一起使用 | 否 | 否 | 是 | 否 |
如何选择?
- 简单互斥: 选择
Lock
。 - 递归函数中的互斥: 选择
RLock
。 - 需要等待特定条件: 选择
Condition
。 - 限制并发访问数量: 选择
Semaphore
。
第四部分:实战演练,更上一层楼!
理论讲完了,咱们来点实际的,用 Condition
实现一个简单的线程池。
import threading
import queue
import time
class ThreadPool:
def __init__(self, num_threads):
self.num_threads = num_threads
self.task_queue = queue.Queue()
self.condition = threading.Condition()
self.threads = []
self.is_shutdown = False
for _ in range(num_threads):
thread = threading.Thread(target=self._worker)
self.threads.append(thread)
thread.start()
def _worker(self):
while True:
self.condition.acquire()
while self.task_queue.empty() and not self.is_shutdown:
self.condition.wait()
if self.is_shutdown:
self.condition.release()
break
task = self.task_queue.get()
self.condition.release()
try:
task()
except Exception as e:
print(f"任务执行出错: {e}")
def submit(self, task):
self.condition.acquire()
self.task_queue.put(task)
self.condition.notify()
self.condition.release()
def shutdown(self):
self.is_shutdown = True
self.condition.acquire()
self.condition.notify_all()
self.condition.release()
for thread in self.threads:
thread.join()
# 示例用法
def my_task(i):
print(f"线程 {threading.current_thread().name} 正在执行任务 {i}")
time.sleep(1)
print(f"线程 {threading.current_thread().name} 完成任务 {i}")
if __name__ == '__main__':
pool = ThreadPool(3)
for i in range(10):
pool.submit(lambda i=i: my_task(i))
pool.shutdown()
print("线程池已关闭")
代码解释:
ThreadPool
类实现了一个简单的线程池。task_queue
用于存储待执行的任务。condition
用于线程间的同步。_worker()
方法是线程池中的工作线程,它不断从task_queue
中获取任务并执行。submit()
方法用于向线程池提交任务。shutdown()
方法用于关闭线程池。
第五部分:总结与注意事项
今天咱们学习了 Python 多线程中的四大金刚:Lock
、RLock
、Condition
和 Semaphore
。 掌握它们,可以有效地解决多线程编程中的各种同步问题。
最后,再强调几点注意事项:
- 避免死锁: 仔细设计锁的获取和释放顺序,避免循环等待。
- 避免资源泄漏: 确保锁一定会被释放,可以使用
try...finally
语句块。 - 理解虚假唤醒: 在使用
Condition
时,必须在while
循环中判断条件。 - 小心过度同步: 过度使用锁会降低程序的并发性。
好了,今天的讲座就到这里。 希望大家能够掌握这些多线程同步原语,写出高效、稳定的多线程程序! 谢谢大家!