Java 中的等待/通知机制优化:条件队列(Condition)与虚假唤醒问题
大家好,今天我们来深入探讨 Java 并发编程中一个非常重要的概念:等待/通知机制,以及如何利用 Condition
接口来优化它,并解决可能遇到的虚假唤醒问题。
1. 等待/通知机制:Object
类的 wait()
, notify()
, notifyAll()
在多线程编程中,经常会遇到这样的场景:一个线程需要等待某个条件满足才能继续执行,而另一个线程负责改变这个条件。传统的做法是使用 Object
类的 wait()
, notify()
, 和 notifyAll()
方法来实现线程间的通信与同步。
wait()
方法: 使当前线程进入等待状态,并释放对象的锁。线程会一直等待,直到被其他线程调用该对象的notify()
或notifyAll()
方法唤醒。wait()
方法必须在同步代码块或同步方法中调用,否则会抛出IllegalMonitorStateException
。notify()
方法: 唤醒在此对象监视器上等待的单个线程。如果有多个线程在等待,JVM 会选择一个线程唤醒,具体选择哪个线程由 JVM 决定。notifyAll()
方法: 唤醒在此对象监视器上等待的所有线程。被唤醒的线程会重新竞争对象的锁,只有一个线程能获得锁并继续执行。
示例:生产者-消费者模型 (使用 Object
的 wait()
/notifyAll()
方法)
import java.util.LinkedList;
import java.util.Queue;
public class ProducerConsumer {
private static final int CAPACITY = 10;
private final Queue<Integer> buffer = new LinkedList<>();
public synchronized void produce(int value) throws InterruptedException {
while (buffer.size() == CAPACITY) {
System.out.println("Producer waiting: Buffer is full.");
wait(); // Buffer is full, producer waits
}
buffer.offer(value);
System.out.println("Produced: " + value);
notifyAll(); // Notify consumers that a new item is available
}
public synchronized int consume() throws InterruptedException {
while (buffer.isEmpty()) {
System.out.println("Consumer waiting: Buffer is empty.");
wait(); // Buffer is empty, consumer waits
}
int value = buffer.poll();
System.out.println("Consumed: " + value);
notifyAll(); // Notify producers that space is available
return value;
}
public static void main(String[] args) {
ProducerConsumer pc = new ProducerConsumer();
Thread producerThread = new Thread(() -> {
try {
for (int i = 0; i < 20; i++) {
pc.produce(i);
Thread.sleep(100); // Simulate some work
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
});
Thread consumerThread = new Thread(() -> {
try {
for (int i = 0; i < 20; i++) {
pc.consume();
Thread.sleep(200); // Simulate some work
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
});
producerThread.start();
consumerThread.start();
try {
producerThread.join();
consumerThread.join();
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
System.out.println("Producer and Consumer threads finished.");
}
}
在这个例子中,produce()
方法在缓冲区满时调用 wait()
,consume()
方法在缓冲区空时调用 wait()
。当生产者生产了一个新元素或消费者消费了一个元素后,都会调用 notifyAll()
来唤醒所有等待的线程。
2. Condition
接口:更细粒度的控制
虽然 Object
的 wait()
/notify()
/notifyAll()
方法可以实现基本的等待/通知机制,但它存在一些局限性:
- 只能关联一个条件: 所有等待的线程都在同一个等待队列中,无法区分等待的不同条件。
- 必须使用
notifyAll()
: 通常需要唤醒所有等待的线程,即使只有部分线程满足条件,这会导致不必要的线程竞争和上下文切换,效率较低。
Condition
接口是 java.util.concurrent.locks
包中提供的一个更高级的同步工具,它允许我们将等待的线程放入不同的等待队列中,从而实现更细粒度的控制。
Condition
接口的主要方法:
await()
: 使当前线程进入等待状态,并释放与此Condition
相关的锁。类似于Object.wait()
,但与特定的Condition
关联。signal()
: 唤醒在此Condition
上等待的单个线程。类似于Object.notify()
,但只唤醒等待特定Condition
的线程。signalAll()
: 唤醒在此Condition
上等待的所有线程。类似于Object.notifyAll()
,但只唤醒等待特定Condition
的线程。
使用 Condition
的步骤:
- 获取一个
Lock
对象。 通常使用ReentrantLock
。 - 通过
Lock
对象的newCondition()
方法创建一个或多个Condition
对象。 - 在
Lock
对象的保护下,使用Condition
对象的await()
,signal()
, 和signalAll()
方法进行线程间的通信和同步。
示例:生产者-消费者模型 (使用 Condition
)
import java.util.LinkedList;
import java.util.Queue;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
public class ProducerConsumerWithCondition {
private static final int CAPACITY = 10;
private final Queue<Integer> buffer = new LinkedList<>();
private final Lock lock = new ReentrantLock();
private final Condition notFull = lock.newCondition();
private final Condition notEmpty = lock.newCondition();
public void produce(int value) throws InterruptedException {
lock.lock();
try {
while (buffer.size() == CAPACITY) {
System.out.println("Producer waiting: Buffer is full.");
notFull.await(); // Buffer is full, producer waits on notFull condition
}
buffer.offer(value);
System.out.println("Produced: " + value);
notEmpty.signal(); // Signal a consumer that the buffer is not empty
} finally {
lock.unlock();
}
}
public int consume() throws InterruptedException {
lock.lock();
try {
while (buffer.isEmpty()) {
System.out.println("Consumer waiting: Buffer is empty.");
notEmpty.await(); // Buffer is empty, consumer waits on notEmpty condition
}
int value = buffer.poll();
System.out.println("Consumed: " + value);
notFull.signal(); // Signal a producer that the buffer is not full
return value;
} finally {
lock.unlock();
}
}
public static void main(String[] args) {
ProducerConsumerWithCondition pc = new ProducerConsumerWithCondition();
Thread producerThread = new Thread(() -> {
try {
for (int i = 0; i < 20; i++) {
pc.produce(i);
Thread.sleep(100); // Simulate some work
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
});
Thread consumerThread = new Thread(() -> {
try {
for (int i = 0; i < 20; i++) {
pc.consume();
Thread.sleep(200); // Simulate some work
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
});
producerThread.start();
consumerThread.start();
try {
producerThread.join();
consumerThread.join();
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
System.out.println("Producer and Consumer threads finished.");
}
}
在这个例子中,我们使用了 ReentrantLock
作为锁,并创建了两个 Condition
对象:notFull
和 notEmpty
。生产者在缓冲区满时等待 notFull
条件,消费者在缓冲区空时等待 notEmpty
条件。生产者生产了一个新元素后,只唤醒等待 notEmpty
条件的消费者,消费者消费了一个元素后,只唤醒等待 notFull
条件的生产者。这样可以避免不必要的线程唤醒和竞争,提高效率。
Object
的 wait()
/notify()
/notifyAll()
和 Condition
的区别:
特性 | Object.wait() /notify() /notifyAll() |
Condition.await() /signal() /signalAll() |
---|---|---|
关联的锁 | 内置锁 (synchronized) | 显式锁 (如 ReentrantLock ) |
等待队列 | 单个等待队列 | 多个等待队列 (每个 Condition 一个队列) |
唤醒策略 | 通常使用 notifyAll() |
可以使用 signal() 或 signalAll() |
灵活性 | 较低 | 较高 |
适用场景 | 简单的同步场景 | 需要更细粒度控制的复杂同步场景 |
3. 虚假唤醒 (Spurious Wakeups)
虚假唤醒是指线程在没有被 notify()
或 notifyAll()
显式唤醒的情况下,从 wait()
方法中返回。这种情况在 Java 规范中是被允许的,因为某些底层平台可能会发生这种情况。
为什么会发生虚假唤醒?
虚假唤醒通常与操作系统的调度有关。例如,操作系统可能会因为某些内部原因提前唤醒一个等待中的线程。
如何处理虚假唤醒?
为了确保程序的正确性,必须在 wait()
方法返回后,重新检查等待的条件是否真的满足。正确的做法是将 wait()
方法放在一个循环中,并在循环中检查条件。
示例:处理虚假唤醒 (使用 Object
的 wait()
/notifyAll()
方法)
public class WaitExample {
private boolean conditionMet = false;
public synchronized void waitForCondition() throws InterruptedException {
while (!conditionMet) { // 循环检查条件
wait();
}
System.out.println("Condition met!");
}
public synchronized void setCondition() {
conditionMet = true;
notifyAll();
}
public static void main(String[] args) throws InterruptedException {
WaitExample example = new WaitExample();
Thread waitingThread = new Thread(() -> {
try {
example.waitForCondition();
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
});
Thread settingThread = new Thread(() -> {
try {
Thread.sleep(2000); // Simulate some work
example.setCondition();
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
});
waitingThread.start();
settingThread.start();
waitingThread.join();
settingThread.join();
}
}
在这个例子中,waitForCondition()
方法使用 while
循环来检查 conditionMet
是否为 true
。即使线程被虚假唤醒,它也会重新检查条件,直到条件真正满足才继续执行。
示例:处理虚假唤醒 (使用 Condition
)
Condition
接口也需要同样的处理方式来避免虚假唤醒。
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
public class ConditionWaitExample {
private final Lock lock = new ReentrantLock();
private final Condition condition = lock.newCondition();
private boolean conditionMet = false;
public void waitForCondition() throws InterruptedException {
lock.lock();
try {
while (!conditionMet) { // 循环检查条件
condition.await();
}
System.out.println("Condition met!");
} finally {
lock.unlock();
}
}
public void setCondition() {
lock.lock();
try {
conditionMet = true;
condition.signalAll();
} finally {
lock.unlock();
}
}
public static void main(String[] args) throws InterruptedException {
ConditionWaitExample example = new ConditionWaitExample();
Thread waitingThread = new Thread(() -> {
try {
example.waitForCondition();
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
});
Thread settingThread = new Thread(() -> {
try {
Thread.sleep(2000); // Simulate some work
example.setCondition();
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
});
waitingThread.start();
settingThread.start();
waitingThread.join();
settingThread.join();
}
}
总结:Condition
是更优的选择,但别忘了处理虚假唤醒
今天我们学习了 Java 中等待/通知机制的两种实现方式:使用 Object
类的 wait()
/notify()
/notifyAll()
方法和使用 Condition
接口。Condition
接口提供了更细粒度的控制,可以避免不必要的线程唤醒和竞争,提高效率。同时,我们也了解了虚假唤醒的概念以及如何通过循环检查条件来处理它。在实际开发中,建议优先使用 Condition
接口,并始终注意处理虚假唤醒问题,以确保程序的正确性和性能。