Python高级技术之:`Python`的`lock`、`Rlock`、`Condition`和`Semaphore`:多线程同步的原语。

各位观众老爷,大家好!我是你们的老朋友,今天咱们来聊聊Python多线程里的“四大金刚”—— LockRLockConditionSemaphore,保证让你们听得懂,用得上,还能笑出声!

第一部分:线程同步,为啥要这么麻烦?

话说,多线程编程就像一群熊孩子在厨房里做饭。每个人都想用锅,都想切菜,一不小心就可能把厨房搞得一团糟,甚至引发“火灾”(数据损坏)。

线程同步,就是给这群熊孩子立规矩,保证他们能有序地使用资源,避免混乱。如果没有这些规矩,你可能会遇到:

  • 数据竞争 (Race Condition): 多个线程同时修改同一个数据,结果谁也说不准,就像抢红包,你明明手速快,结果永远抢不到,气不气?
  • 死锁 (Deadlock): 几个线程互相等待对方释放资源,谁也不肯让步,结果大家都卡死了,就像俩人同时进一个门,谁也不让谁,最后谁也进不去。

所以,线程同步非常重要!

第二部分:主角登场!LockRLockConditionSemaphore

接下来,咱们逐一介绍这“四大金刚”。

1. Lock (互斥锁):

Lock 是最简单粗暴的家伙,就像一个“厕所门上的锁”。 谁先抢到锁,谁就拥有了使用资源的权利,用完之后必须释放锁,其他人才能使用。

  • 功能: 保证同一时刻只有一个线程可以访问被保护的资源。
  • 使用场景: 保护共享数据,避免数据竞争。
import threading
import time

# 全局变量,多个线程共享
counter = 0
# 创建一个锁
lock = threading.Lock()

def increment():
    global counter
    for _ in range(100000):
        # 获取锁
        lock.acquire()
        try:
            counter += 1
        finally:
            # 释放锁,必须放在finally里,确保一定会被执行
            lock.release()

def decrement():
    global counter
    for _ in range(100000):
        lock.acquire()
        try:
            counter -= 1
        finally:
            lock.release()

# 创建两个线程
t1 = threading.Thread(target=increment)
t2 = threading.Thread(target=decrement)

# 启动线程
t1.start()
t2.start()

# 等待线程结束
t1.join()
t2.join()

print(f"最终结果: {counter}") # 结果应该接近0,但由于线程切换,可能不是精确的0

代码解释:

  • lock = threading.Lock() 创建一个锁对象。
  • lock.acquire() 尝试获取锁,如果锁已经被其他线程占用,则阻塞,直到锁被释放。
  • lock.release() 释放锁,让其他线程可以获取。
  • finally 语句块保证无论 try 语句块中发生什么,锁都会被释放,避免死锁。

注意事项:

  • 必须成对使用 acquire()release(),否则可能会导致死锁。
  • Lock 不可重入,同一个线程不能连续 acquire() 多次,否则也会死锁。

2. RLock (可重入锁):

RLockLock 的升级版,它允许同一个线程多次 acquire() 同一个锁,而不会死锁。 就像一个“自家厕所的锁”,你进了第一次,还可以在里面继续锁门,不用担心把自己锁死。

  • 功能: 允许同一个线程多次获取锁,只有当该线程释放了所有 acquire() 的锁,其他线程才能获取。
  • 使用场景: 递归函数中使用锁,避免死锁。
import threading

# 创建一个可重入锁
rlock = threading.RLock()

def recursive_function(n):
    rlock.acquire()
    try:
        if n > 0:
            print(f"线程 {threading.current_thread().name} 正在执行: {n}")
            recursive_function(n - 1)
        else:
            print(f"线程 {threading.current_thread().name} 到达底部")
    finally:
        rlock.release()

# 创建一个线程
t = threading.Thread(target=recursive_function, args=(3,))

# 启动线程
t.start()

# 等待线程结束
t.join()

代码解释:

  • rlock = threading.RLock() 创建一个可重入锁对象。
  • recursive_function() 是一个递归函数,每次递归都会 acquire() 锁。
  • 由于 RLock 的可重入性,同一个线程可以多次 acquire() 锁而不会死锁。

注意事项:

  • RLock 内部维护了一个计数器,记录 acquire() 的次数。
  • 必须 release() 相同次数才能真正释放锁。

3. Condition (条件变量):

Condition 是一个更高级的同步原语,它允许线程在满足特定条件时才继续执行。 就像一个“闹钟”,线程可以等待某个条件成立,然后被唤醒。

  • 功能: 允许线程等待特定条件,并在条件满足时被唤醒。
  • 使用场景: 生产者-消费者模型,线程池等。
import threading
import time

# 创建一个条件变量
condition = threading.Condition()
# 共享资源
resource = []

def producer():
    global resource
    for i in range(5):
        condition.acquire() # 需要先获取锁
        resource.append(i)
        print(f"生产者生产了: {i}")
        condition.notify() # 唤醒一个等待的线程
        condition.release() # 释放锁
        time.sleep(1)

def consumer():
    global resource
    for _ in range(5):
        condition.acquire()
        while not resource: # 循环判断条件,防止虚假唤醒
            print("消费者等待...")
            condition.wait() # 释放锁并等待
        item = resource.pop(0)
        print(f"消费者消费了: {item}")
        condition.release()
        time.sleep(2)

# 创建生产者和消费者线程
t1 = threading.Thread(target=producer)
t2 = threading.Thread(target=consumer)

# 启动线程
t1.start()
t2.start()

# 等待线程结束
t1.join()
t2.join()

代码解释:

  • condition = threading.Condition() 创建一个条件变量对象。
  • condition.acquire() 获取锁,Condition 必须先获取锁才能使用。
  • condition.wait() 释放锁并等待,直到其他线程调用 notify()notify_all() 唤醒它。
  • condition.notify() 唤醒一个等待的线程。
  • condition.notify_all() 唤醒所有等待的线程。
  • 重要: wait() 必须在 while 循环中调用,防止虚假唤醒 (spurious wakeup)。 虚假唤醒是指线程被意外唤醒,但条件仍然不满足。

注意事项:

  • Condition 必须与锁一起使用,通常是 LockRLock
  • wait() 会自动释放锁,并在被唤醒后重新获取锁。
  • notify()notify_all() 只是唤醒线程,并不保证线程立即执行,还需要竞争锁。

4. Semaphore (信号量):

Semaphore 用于控制对共享资源的并发访问数量。 就像一个“停车场的停车位”,只有当有空位时,车辆才能进入。

  • 功能: 限制对共享资源的并发访问数量。
  • 使用场景: 连接池,限制同时下载文件的数量等。
import threading
import time

# 创建一个信号量,最多允许3个线程同时访问
semaphore = threading.Semaphore(3)

def worker(i):
    semaphore.acquire()
    try:
        print(f"线程 {i} 正在工作...")
        time.sleep(2)
        print(f"线程 {i} 完成工作...")
    finally:
        semaphore.release()

# 创建多个线程
threads = []
for i in range(5):
    t = threading.Thread(target=worker, args=(i,))
    threads.append(t)
    t.start()

# 等待线程结束
for t in threads:
    t.join()

代码解释:

  • semaphore = threading.Semaphore(3) 创建一个信号量对象,初始值为 3。
  • semaphore.acquire() 尝试获取信号量,如果信号量的值大于 0,则减 1 并继续执行;否则,阻塞,直到信号量的值大于 0。
  • semaphore.release() 释放信号量,将信号量的值加 1。

注意事项:

  • 信号量的值可以大于 1,表示允许多个线程同时访问共享资源。
  • acquire()release() 的次数必须匹配,否则可能会导致资源泄漏或死锁。

第三部分:四大金刚的对比与选择

为了方便大家理解,我把这四个同步原语的特点总结成一个表格:

特性 Lock RLock Condition Semaphore
基本功能 互斥访问 可重入互斥 条件等待与通知 限制并发访问数量
是否可重入
使用场景 简单资源保护 递归函数 生产者-消费者,线程池 连接池,限流
必须与锁一起使用

如何选择?

  • 简单互斥: 选择 Lock
  • 递归函数中的互斥: 选择 RLock
  • 需要等待特定条件: 选择 Condition
  • 限制并发访问数量: 选择 Semaphore

第四部分:实战演练,更上一层楼!

理论讲完了,咱们来点实际的,用 Condition 实现一个简单的线程池。

import threading
import queue
import time

class ThreadPool:
    def __init__(self, num_threads):
        self.num_threads = num_threads
        self.task_queue = queue.Queue()
        self.condition = threading.Condition()
        self.threads = []
        self.is_shutdown = False

        for _ in range(num_threads):
            thread = threading.Thread(target=self._worker)
            self.threads.append(thread)
            thread.start()

    def _worker(self):
        while True:
            self.condition.acquire()
            while self.task_queue.empty() and not self.is_shutdown:
                self.condition.wait()
            if self.is_shutdown:
                self.condition.release()
                break
            task = self.task_queue.get()
            self.condition.release()
            try:
                task()
            except Exception as e:
                print(f"任务执行出错: {e}")

    def submit(self, task):
        self.condition.acquire()
        self.task_queue.put(task)
        self.condition.notify()
        self.condition.release()

    def shutdown(self):
        self.is_shutdown = True
        self.condition.acquire()
        self.condition.notify_all()
        self.condition.release()
        for thread in self.threads:
            thread.join()

# 示例用法
def my_task(i):
    print(f"线程 {threading.current_thread().name} 正在执行任务 {i}")
    time.sleep(1)
    print(f"线程 {threading.current_thread().name} 完成任务 {i}")

if __name__ == '__main__':
    pool = ThreadPool(3)
    for i in range(10):
        pool.submit(lambda i=i: my_task(i))
    pool.shutdown()
    print("线程池已关闭")

代码解释:

  • ThreadPool 类实现了一个简单的线程池。
  • task_queue 用于存储待执行的任务。
  • condition 用于线程间的同步。
  • _worker() 方法是线程池中的工作线程,它不断从 task_queue 中获取任务并执行。
  • submit() 方法用于向线程池提交任务。
  • shutdown() 方法用于关闭线程池。

第五部分:总结与注意事项

今天咱们学习了 Python 多线程中的四大金刚:LockRLockConditionSemaphore。 掌握它们,可以有效地解决多线程编程中的各种同步问题。

最后,再强调几点注意事项:

  • 避免死锁: 仔细设计锁的获取和释放顺序,避免循环等待。
  • 避免资源泄漏: 确保锁一定会被释放,可以使用 try...finally 语句块。
  • 理解虚假唤醒: 在使用 Condition 时,必须在 while 循环中判断条件。
  • 小心过度同步: 过度使用锁会降低程序的并发性。

好了,今天的讲座就到这里。 希望大家能够掌握这些多线程同步原语,写出高效、稳定的多线程程序! 谢谢大家!

发表回复

您的邮箱地址不会被公开。 必填项已用 * 标注