Java中的等待/通知机制优化:条件队列(Condition)与虚假唤醒问题

Java 中的等待/通知机制优化:条件队列(Condition)与虚假唤醒问题

大家好,今天我们来深入探讨 Java 并发编程中一个非常重要的概念:等待/通知机制,以及如何利用 Condition 接口来优化它,并解决可能遇到的虚假唤醒问题。

1. 等待/通知机制:Object 类的 wait(), notify(), notifyAll()

在多线程编程中,经常会遇到这样的场景:一个线程需要等待某个条件满足才能继续执行,而另一个线程负责改变这个条件。传统的做法是使用 Object 类的 wait(), notify(), 和 notifyAll() 方法来实现线程间的通信与同步。

  • wait() 方法: 使当前线程进入等待状态,并释放对象的锁。线程会一直等待,直到被其他线程调用该对象的 notify()notifyAll() 方法唤醒。wait() 方法必须在同步代码块或同步方法中调用,否则会抛出 IllegalMonitorStateException
  • notify() 方法: 唤醒在此对象监视器上等待的单个线程。如果有多个线程在等待,JVM 会选择一个线程唤醒,具体选择哪个线程由 JVM 决定。
  • notifyAll() 方法: 唤醒在此对象监视器上等待的所有线程。被唤醒的线程会重新竞争对象的锁,只有一个线程能获得锁并继续执行。

示例:生产者-消费者模型 (使用 Objectwait()/notifyAll() 方法)

import java.util.LinkedList;
import java.util.Queue;

public class ProducerConsumer {

    private static final int CAPACITY = 10;
    private final Queue<Integer> buffer = new LinkedList<>();

    public synchronized void produce(int value) throws InterruptedException {
        while (buffer.size() == CAPACITY) {
            System.out.println("Producer waiting: Buffer is full.");
            wait(); // Buffer is full, producer waits
        }
        buffer.offer(value);
        System.out.println("Produced: " + value);
        notifyAll(); // Notify consumers that a new item is available
    }

    public synchronized int consume() throws InterruptedException {
        while (buffer.isEmpty()) {
            System.out.println("Consumer waiting: Buffer is empty.");
            wait(); // Buffer is empty, consumer waits
        }
        int value = buffer.poll();
        System.out.println("Consumed: " + value);
        notifyAll(); // Notify producers that space is available
        return value;
    }

    public static void main(String[] args) {
        ProducerConsumer pc = new ProducerConsumer();

        Thread producerThread = new Thread(() -> {
            try {
                for (int i = 0; i < 20; i++) {
                    pc.produce(i);
                    Thread.sleep(100); // Simulate some work
                }
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
            }
        });

        Thread consumerThread = new Thread(() -> {
            try {
                for (int i = 0; i < 20; i++) {
                    pc.consume();
                    Thread.sleep(200); // Simulate some work
                }
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
            }
        });

        producerThread.start();
        consumerThread.start();

        try {
            producerThread.join();
            consumerThread.join();
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
        }

        System.out.println("Producer and Consumer threads finished.");
    }
}

在这个例子中,produce() 方法在缓冲区满时调用 wait()consume() 方法在缓冲区空时调用 wait()。当生产者生产了一个新元素或消费者消费了一个元素后,都会调用 notifyAll() 来唤醒所有等待的线程。

2. Condition 接口:更细粒度的控制

虽然 Objectwait()/notify()/notifyAll() 方法可以实现基本的等待/通知机制,但它存在一些局限性:

  • 只能关联一个条件: 所有等待的线程都在同一个等待队列中,无法区分等待的不同条件。
  • 必须使用 notifyAll() 通常需要唤醒所有等待的线程,即使只有部分线程满足条件,这会导致不必要的线程竞争和上下文切换,效率较低。

Condition 接口是 java.util.concurrent.locks 包中提供的一个更高级的同步工具,它允许我们将等待的线程放入不同的等待队列中,从而实现更细粒度的控制。

Condition 接口的主要方法:

  • await() 使当前线程进入等待状态,并释放与此 Condition 相关的锁。类似于 Object.wait(),但与特定的 Condition 关联。
  • signal() 唤醒在此 Condition 上等待的单个线程。类似于 Object.notify(),但只唤醒等待特定 Condition 的线程。
  • signalAll() 唤醒在此 Condition 上等待的所有线程。类似于 Object.notifyAll(),但只唤醒等待特定 Condition 的线程。

使用 Condition 的步骤:

  1. 获取一个 Lock 对象。 通常使用 ReentrantLock
  2. 通过 Lock 对象的 newCondition() 方法创建一个或多个 Condition 对象。
  3. Lock 对象的保护下,使用 Condition 对象的 await(), signal(), 和 signalAll() 方法进行线程间的通信和同步。

示例:生产者-消费者模型 (使用 Condition)

import java.util.LinkedList;
import java.util.Queue;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;

public class ProducerConsumerWithCondition {

    private static final int CAPACITY = 10;
    private final Queue<Integer> buffer = new LinkedList<>();
    private final Lock lock = new ReentrantLock();
    private final Condition notFull = lock.newCondition();
    private final Condition notEmpty = lock.newCondition();

    public void produce(int value) throws InterruptedException {
        lock.lock();
        try {
            while (buffer.size() == CAPACITY) {
                System.out.println("Producer waiting: Buffer is full.");
                notFull.await(); // Buffer is full, producer waits on notFull condition
            }
            buffer.offer(value);
            System.out.println("Produced: " + value);
            notEmpty.signal(); // Signal a consumer that the buffer is not empty
        } finally {
            lock.unlock();
        }
    }

    public int consume() throws InterruptedException {
        lock.lock();
        try {
            while (buffer.isEmpty()) {
                System.out.println("Consumer waiting: Buffer is empty.");
                notEmpty.await(); // Buffer is empty, consumer waits on notEmpty condition
            }
            int value = buffer.poll();
            System.out.println("Consumed: " + value);
            notFull.signal(); // Signal a producer that the buffer is not full
            return value;
        } finally {
            lock.unlock();
        }
    }

    public static void main(String[] args) {
        ProducerConsumerWithCondition pc = new ProducerConsumerWithCondition();

        Thread producerThread = new Thread(() -> {
            try {
                for (int i = 0; i < 20; i++) {
                    pc.produce(i);
                    Thread.sleep(100); // Simulate some work
                }
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
            }
        });

        Thread consumerThread = new Thread(() -> {
            try {
                for (int i = 0; i < 20; i++) {
                    pc.consume();
                    Thread.sleep(200); // Simulate some work
                }
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
            }
        });

        producerThread.start();
        consumerThread.start();

        try {
            producerThread.join();
            consumerThread.join();
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
        }

        System.out.println("Producer and Consumer threads finished.");
    }
}

在这个例子中,我们使用了 ReentrantLock 作为锁,并创建了两个 Condition 对象:notFullnotEmpty。生产者在缓冲区满时等待 notFull 条件,消费者在缓冲区空时等待 notEmpty 条件。生产者生产了一个新元素后,只唤醒等待 notEmpty 条件的消费者,消费者消费了一个元素后,只唤醒等待 notFull 条件的生产者。这样可以避免不必要的线程唤醒和竞争,提高效率。

Objectwait()/notify()/notifyAll()Condition 的区别:

特性 Object.wait()/notify()/notifyAll() Condition.await()/signal()/signalAll()
关联的锁 内置锁 (synchronized) 显式锁 (如 ReentrantLock)
等待队列 单个等待队列 多个等待队列 (每个 Condition 一个队列)
唤醒策略 通常使用 notifyAll() 可以使用 signal()signalAll()
灵活性 较低 较高
适用场景 简单的同步场景 需要更细粒度控制的复杂同步场景

3. 虚假唤醒 (Spurious Wakeups)

虚假唤醒是指线程在没有被 notify()notifyAll() 显式唤醒的情况下,从 wait() 方法中返回。这种情况在 Java 规范中是被允许的,因为某些底层平台可能会发生这种情况。

为什么会发生虚假唤醒?

虚假唤醒通常与操作系统的调度有关。例如,操作系统可能会因为某些内部原因提前唤醒一个等待中的线程。

如何处理虚假唤醒?

为了确保程序的正确性,必须在 wait() 方法返回后,重新检查等待的条件是否真的满足。正确的做法是将 wait() 方法放在一个循环中,并在循环中检查条件。

示例:处理虚假唤醒 (使用 Objectwait()/notifyAll() 方法)

public class WaitExample {

    private boolean conditionMet = false;

    public synchronized void waitForCondition() throws InterruptedException {
        while (!conditionMet) { // 循环检查条件
            wait();
        }
        System.out.println("Condition met!");
    }

    public synchronized void setCondition() {
        conditionMet = true;
        notifyAll();
    }

    public static void main(String[] args) throws InterruptedException {
        WaitExample example = new WaitExample();

        Thread waitingThread = new Thread(() -> {
            try {
                example.waitForCondition();
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
            }
        });

        Thread settingThread = new Thread(() -> {
            try {
                Thread.sleep(2000); // Simulate some work
                example.setCondition();
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
            }
        });

        waitingThread.start();
        settingThread.start();

        waitingThread.join();
        settingThread.join();
    }
}

在这个例子中,waitForCondition() 方法使用 while 循环来检查 conditionMet 是否为 true。即使线程被虚假唤醒,它也会重新检查条件,直到条件真正满足才继续执行。

示例:处理虚假唤醒 (使用 Condition)

Condition 接口也需要同样的处理方式来避免虚假唤醒。

import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;

public class ConditionWaitExample {

    private final Lock lock = new ReentrantLock();
    private final Condition condition = lock.newCondition();
    private boolean conditionMet = false;

    public void waitForCondition() throws InterruptedException {
        lock.lock();
        try {
            while (!conditionMet) { // 循环检查条件
                condition.await();
            }
            System.out.println("Condition met!");
        } finally {
            lock.unlock();
        }
    }

    public void setCondition() {
        lock.lock();
        try {
            conditionMet = true;
            condition.signalAll();
        } finally {
            lock.unlock();
        }
    }

    public static void main(String[] args) throws InterruptedException {
        ConditionWaitExample example = new ConditionWaitExample();

        Thread waitingThread = new Thread(() -> {
            try {
                example.waitForCondition();
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
            }
        });

        Thread settingThread = new Thread(() -> {
            try {
                Thread.sleep(2000); // Simulate some work
                example.setCondition();
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
            }
        });

        waitingThread.start();
        settingThread.start();

        waitingThread.join();
        settingThread.join();
    }
}

总结:Condition 是更优的选择,但别忘了处理虚假唤醒

今天我们学习了 Java 中等待/通知机制的两种实现方式:使用 Object 类的 wait()/notify()/notifyAll() 方法和使用 Condition 接口。Condition 接口提供了更细粒度的控制,可以避免不必要的线程唤醒和竞争,提高效率。同时,我们也了解了虚假唤醒的概念以及如何通过循环检查条件来处理它。在实际开发中,建议优先使用 Condition 接口,并始终注意处理虚假唤醒问题,以确保程序的正确性和性能。

发表回复

您的邮箱地址不会被公开。 必填项已用 * 标注