使用Condition对象实现复杂的线程等待/通知模式:比wait/notify更精细的控制

好的,我们开始今天的讲座,主题是利用 Condition 对象实现更精细的线程等待/通知模式,这相比传统的 wait/notify 提供了更强大的控制能力。

1. 传统 wait/notify 的局限性

在多线程编程中,wait/notify (或 wait/notifyAll) 是实现线程间同步和通信的经典机制。 它的基本原理是:一个线程可以调用对象的 wait() 方法进入等待状态,释放对象锁;另一个线程可以通过调用同一个对象的 notify()notifyAll() 方法来唤醒一个或所有等待的线程。

然而,wait/notify 存在一些固有的局限性,尤其是在复杂的并发场景下:

  • 盲目唤醒 (Spurious Wakeups): 线程可能在没有接收到任何通知的情况下被唤醒。 这是由于 JVM 实现细节或其他系统事件引起的。 虽然规范建议在 wait() 返回后检查等待条件,但这增加了代码的复杂性。

  • 单一条件: wait/notify 只能与一个锁关联,这意味着所有等待线程都在等待 同一个 条件。 如果线程需要等待不同的条件,则必须使用额外的逻辑来区分,这很容易出错。

  • 缺乏公平性:notify() 唤醒的线程是随机的。 无法保证等待时间最长的线程会被优先唤醒,这可能导致某些线程一直处于饥饿状态。

  • 容易出错: 正确使用 wait/notify 需要非常小心,特别是要保证在修改共享变量之前持有锁,并在等待之前和之后检查条件。 稍有不慎就可能导致死锁或活锁。

2. Condition 对象的优势

java.util.concurrent.locks.Condition 接口提供了一种更强大、更灵活的方式来实现线程等待/通知。 Condition 对象与 Lock 接口关联,允许我们创建多个等待队列,每个队列对应于不同的等待条件。

Condition 接口的主要方法包括:

方法 描述
await() 使当前线程进入等待状态,释放与 Condition 关联的 Lock。 必须在 Lock.lock()Lock.unlock() 之间调用。
awaitUninterruptibly() await() 类似,但不会响应中断。
awaitNanos(long time) 使当前线程进入等待状态,最多等待指定的时间(纳秒)。
await(long time, TimeUnit unit) 使当前线程进入等待状态,最多等待指定的时间。
awaitUntil(Date deadline) 使当前线程进入等待状态,直到指定的截止时间。
signal() 唤醒一个等待在 Condition 上的线程。
signalAll() 唤醒所有等待在 Condition 上的线程。

wait/notify 相比,Condition 对象具有以下优势:

  • 多个等待队列: 可以为不同的等待条件创建不同的 Condition 对象,从而避免了盲目唤醒和复杂的条件判断。

  • 更强的控制: Condition 提供了更多的方法来控制线程的等待和唤醒,例如可以指定等待时间、响应中断等。

  • 与 Lock 配合: Condition 必须与 Lock 接口一起使用,这强制我们显式地获取和释放锁,提高了代码的可读性和可维护性。

3. Condition 对象的使用示例:生产者-消费者模型

让我们通过一个经典的生产者-消费者模型来演示如何使用 Condition 对象。 假设我们有一个缓冲区,生产者线程向缓冲区添加数据,消费者线程从缓冲区取出数据。 当缓冲区满时,生产者线程需要等待;当缓冲区为空时,消费者线程需要等待。

import java.util.LinkedList;
import java.util.Queue;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;

public class ProducerConsumer {

    private final Lock lock = new ReentrantLock();
    private final Condition notFull  = lock.newCondition();
    private final Condition notEmpty = lock.newCondition();

    private final Queue<Integer> queue = new LinkedList<>();
    private final int capacity;

    public ProducerConsumer(int capacity) {
        this.capacity = capacity;
    }

    public void produce(int data) throws InterruptedException {
        lock.lock();
        try {
            while (queue.size() == capacity) {
                System.out.println("Producer is waiting... Queue is full.");
                notFull.await(); // 等待缓冲区不满
            }
            queue.offer(data);
            System.out.println("Produced: " + data);
            notEmpty.signal(); // 唤醒等待的消费者
        } finally {
            lock.unlock();
        }
    }

    public int consume() throws InterruptedException {
        lock.lock();
        try {
            while (queue.isEmpty()) {
                System.out.println("Consumer is waiting... Queue is empty.");
                notEmpty.await(); // 等待缓冲区不空
            }
            int data = queue.poll();
            System.out.println("Consumed: " + data);
            notFull.signal(); // 唤醒等待的生产者
            return data;
        } finally {
            lock.unlock();
        }
    }

    public static void main(String[] args) throws InterruptedException {
        ProducerConsumer pc = new ProducerConsumer(5);

        Thread producer1 = new Thread(() -> {
            try {
                for (int i = 0; i < 10; i++) {
                    pc.produce(i);
                    Thread.sleep(100); // 模拟生产时间
                }
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
            }
        });

        Thread producer2 = new Thread(() -> {
            try {
                for (int i = 100; i < 110; i++) {
                    pc.produce(i);
                    Thread.sleep(150); // 模拟生产时间
                }
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
            }
        });

        Thread consumer1 = new Thread(() -> {
            try {
                for (int i = 0; i < 10; i++) {
                    pc.consume();
                    Thread.sleep(200); // 模拟消费时间
                }
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
            }
        });

        Thread consumer2 = new Thread(() -> {
            try {
                for (int i = 0; i < 10; i++) {
                    pc.consume();
                    Thread.sleep(250); // 模拟消费时间
                }
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
            }
        });

        producer1.start();
        producer2.start();
        consumer1.start();
        consumer2.start();

        producer1.join();
        producer2.join();
        consumer1.join();
        consumer2.join();

        System.out.println("Finished.");

    }
}

在这个例子中:

  • lock 是一个 ReentrantLock 对象,用于保护共享资源(队列)。

  • notFullnotEmpty 是两个 Condition 对象,分别表示缓冲区不满和缓冲区不空这两个条件。

  • produce() 方法在添加数据之前,会检查缓冲区是否已满。 如果已满,则调用 notFull.await() 进入等待状态,释放锁。 当消费者线程从缓冲区取出数据后,会调用 notFull.signal() 唤醒一个等待的生产者线程。

  • consume() 方法在取出数据之前,会检查缓冲区是否为空。 如果为空,则调用 notEmpty.await() 进入等待状态,释放锁。 当生产者线程向缓冲区添加数据后,会调用 notEmpty.signal() 唤醒一个等待的消费者线程。

通过使用 Condition 对象,我们可以将生产者和消费者线程分别置于不同的等待队列中,从而避免了盲目唤醒,并提高了程序的效率。

4. Condition 对象的高级用法:公平锁和优先级

Condition 对象可以与公平锁一起使用,以确保线程按照它们请求锁的顺序获得锁。 这可以避免某些线程一直处于饥饿状态。

import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.ReentrantLock;

public class FairLockExample {

    private final ReentrantLock fairLock = new ReentrantLock(true); // 使用公平锁
    private final Condition condition = fairLock.newCondition();

    public void accessResource(String threadName) throws InterruptedException {
        fairLock.lock();
        try {
            System.out.println(threadName + " is waiting for the condition...");
            condition.await();
            System.out.println(threadName + " is accessing the resource.");
            // 模拟资源访问
            Thread.sleep(100);
        } finally {
            System.out.println(threadName + " is releasing the lock.");
            fairLock.unlock();
        }
    }

    public void signalWaitingThread() {
        fairLock.lock();
        try {
            System.out.println("Signaling a waiting thread...");
            condition.signal();
        } finally {
            fairLock.unlock();
        }
    }

    public static void main(String[] args) throws InterruptedException {
        FairLockExample example = new FairLockExample();

        Thread thread1 = new Thread(() -> {
            try {
                example.accessResource("Thread-1");
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
            }
        });

        Thread thread2 = new Thread(() -> {
            try {
                example.accessResource("Thread-2");
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
            }
        });

        thread1.start();
        Thread.sleep(50); // 确保 Thread-1 先进入等待状态
        thread2.start();

        Thread.sleep(1000); // 等待一段时间
        example.signalWaitingThread(); // 唤醒 Thread-1
        Thread.sleep(1000);
        example.signalWaitingThread(); //唤醒 Thread-2

        thread1.join();
        thread2.join();

        System.out.println("Finished.");
    }
}

在这个例子中,ReentrantLock 被创建为公平锁 (new ReentrantLock(true))。 这意味着线程将按照它们请求锁的顺序获得锁。 当调用 condition.signal() 时,等待时间最长的线程(即 Thread-1)将被唤醒。

虽然 Condition 本身没有内置的优先级队列,但我们可以通过自定义逻辑来实现类似的功能。 例如,我们可以使用一个 PriorityQueue 来存储等待线程,并根据线程的优先级来决定唤醒哪个线程。 这需要更复杂的代码实现,但可以提供更细粒度的控制。

5. Condition 对象的注意事项

在使用 Condition 对象时,需要注意以下几点:

  • 必须与 Lock 关联: Condition 对象必须与 Lock 接口一起使用。 await()signal()signalAll() 方法必须在 Lock.lock()Lock.unlock() 之间调用。

  • 等待条件必须检查:await() 返回后,必须再次检查等待条件。 这是为了防止盲目唤醒。

  • 避免死锁: 确保以正确的顺序获取和释放锁,以避免死锁。

  • 异常处理:try-finally 块中释放锁,以确保即使发生异常,锁也能被正确释放。

  • 选择合适的唤醒策略: signal() 唤醒一个等待线程,而 signalAll() 唤醒所有等待线程。 根据具体情况选择合适的唤醒策略,以提高程序的效率。 通常,如果只有一个线程可以满足条件,则使用 signal();如果有多个线程可以满足条件,则使用 signalAll()

6. Condition 的一些替代方案

虽然 Condition 对象提供了比 wait/notify 更精细的控制,但在某些情况下,可能存在更合适的替代方案,例如:

  • BlockingQueue: 对于生产者-消费者模型,BlockingQueue 提供了更简洁的 API 和更好的性能。 BlockingQueue 内部使用了锁和条件变量,但将这些细节隐藏起来,使代码更易于理解和维护。

  • CountDownLatch: 用于等待一组线程完成操作。

  • CyclicBarrier: 用于同步一组线程,使它们在达到某个屏障点之前互相等待。

  • Semaphore: 用于控制对有限资源的访问。

选择哪种并发工具取决于具体的应用场景和需求。 Condition 对象适用于需要高度定制化的等待/通知模式的场景。

7.总结来说
Condition对象相较于传统的wait/notify提供了更精细的线程控制能力,允许创建多个等待队列,与Lock接口配合使用,并能与公平锁结合使用,实现更公平的线程调度。在复杂的并发场景下,Condition对象是强大的同步工具。

今天的讲座到此结束。希望大家通过今天的学习,能够更好地理解和使用 Condition 对象,从而编写出更健壮、更高效的并发程序。

发表回复

您的邮箱地址不会被公开。 必填项已用 * 标注