Condition 对象源码解析:实现比 wait/notify 更精细的线程等待与唤醒
大家好,今天我们来深入探讨 Java 并发编程中一个非常重要的工具:Condition 对象。我们都知道 Object 类提供了 wait()、notify() 和 notifyAll() 方法来实现线程间的同步和通信。然而,Condition 对象在某些场景下能够提供更精细的控制,实现更灵活的线程等待和唤醒机制。本次讲座将从以下几个方面展开:
wait/notify的局限性Condition接口概览AbstractQueuedSynchronizer (AQS)基础ConditionObject源码剖析Condition的使用场景和最佳实践- 对比与总结
1. wait/notify 的局限性
wait() 和 notify() 方法是 Java 中最基础的线程同步机制。它们允许线程在特定条件不满足时进入等待状态,并在其他线程满足条件时被唤醒。然而,这种机制存在一些局限性:
- 无差别唤醒:
notify()方法会随机唤醒一个等待的线程,而notifyAll()会唤醒所有等待的线程。如果多个线程等待不同的条件,这种无差别唤醒会导致不必要的线程竞争和上下文切换,降低性能。 - 条件判断的缺失: 当线程被唤醒时,它需要重新检查条件是否满足。这是因为
notify()仅仅是通知线程条件可能已经改变,但并不能保证条件一定满足。 - 难以实现优先级调度:
wait/notify无法对等待线程进行优先级排序,所有线程都处于同等地位。
为了克服这些局限性,Java 提供了 Condition 接口,它与 Lock 接口配合使用,可以实现更细粒度的线程同步和通信。
2. Condition 接口概览
Condition 接口是 Java 并发包 java.util.concurrent.locks 中的一个接口,它提供了一种比 wait/notify 更灵活的线程等待和唤醒机制。Condition 接口的主要方法包括:
| 方法 | 描述 |
|---|---|
await() |
使当前线程进入等待状态,直到被 signal() 或 signalAll() 唤醒,或者被中断。与 Object.wait() 类似,但必须在关联的 Lock 的保护范围内调用。 |
await(long time, TimeUnit unit) |
使当前线程进入等待状态,直到被 signal() 或 signalAll() 唤醒,或者被中断,或者超时。与 Object.wait(long timeout) 类似。 |
awaitNanos(long nanosTimeout) |
使当前线程进入等待状态,直到被 signal() 或 signalAll() 唤醒,或者被中断,或者超时。超时时间以纳秒为单位。 |
awaitUninterruptibly() |
使当前线程进入等待状态,直到被 signal() 或 signalAll() 唤醒。与 await() 不同,此方法不会响应中断。 |
awaitUntil(Date deadline) |
使当前线程进入等待状态,直到被 signal() 或 signalAll() 唤醒,或者被中断,或者到达指定的时间。 |
signal() |
唤醒一个等待在 Condition 上的线程。与 Object.notify() 类似,但必须在关联的 Lock 的保护范围内调用。 |
signalAll() |
唤醒所有等待在 Condition 上的线程。与 Object.notifyAll() 类似,但必须在关联的 Lock 的保护范围内调用。 |
与 wait/notify 相比,Condition 的优势在于:
- 与
Lock接口配合使用:Condition对象必须从Lock对象中获得,这保证了线程在等待和唤醒时对共享资源的互斥访问。 - 可以创建多个
Condition对象: 一个Lock对象可以关联多个Condition对象,每个Condition对象可以对应不同的等待条件,从而实现更精细的线程控制。
3. AbstractQueuedSynchronizer (AQS) 基础
在深入了解 ConditionObject 的源码之前,我们需要先了解 AbstractQueuedSynchronizer (AQS)。AQS 是 Java 并发包中的一个核心类,它提供了一个通用的框架,用于构建锁和相关的同步器。ReentrantLock 和 ReentrantReadWriteLock 等常用的锁都是基于 AQS 实现的。
AQS 的核心思想是使用一个 volatile int state 变量来表示同步状态,并使用一个 FIFO 队列来管理等待线程。AQS 定义了一系列方法,用于获取和释放同步状态,以及管理等待队列。这些方法通常由子类来实现,以适应不同的同步需求。
ConditionObject 是 AQS 的一个内部类,它实现了 Condition 接口。ConditionObject 利用 AQS 的同步队列来实现线程的等待和唤醒。
4. ConditionObject 源码剖析
ConditionObject 是 AQS 的一个内部类,它实现了 Condition 接口。下面我们来分析 ConditionObject 的主要源码:
4.1 ConditionObject 的结构
public class ConditionObject implements Condition, java.io.Serializable {
/** First node of condition queue. */
private transient Node firstWaiter;
/** Last node of condition queue. */
private transient Node lastWaiter;
// ...
}
ConditionObject 维护了一个等待队列,用于存储等待在 Condition 上的线程。firstWaiter 和 lastWaiter 分别指向队列的头节点和尾节点。这个队列与AQS的同步队列不同。
4.2 await() 方法
await() 方法用于使当前线程进入等待状态。其主要步骤如下:
- 释放锁: 首先,
await()方法会原子地释放与Condition关联的Lock。这是为了让其他线程有机会获取锁,并改变等待条件。 - 将当前线程添加到等待队列: 创建一个新的
Node对象,并将当前线程包装在其中,然后将该节点添加到Condition的等待队列中。 - 阻塞线程: 使用
LockSupport.park()方法阻塞当前线程,直到被signal()或signalAll()唤醒,或者被中断。 - 重新获取锁: 当线程被唤醒时,它需要重新获取与
Condition关联的Lock。 - 处理中断: 如果线程在等待期间被中断,
await()方法会抛出InterruptedException异常。
public final void await() throws InterruptedException {
if (Thread.interrupted())
throw new InterruptedException();
Node node = addConditionWaiter(); // 将当前线程添加到等待队列
int savedState = fullyRelease(node); // 释放锁
int interruptMode = 0;
while (!isOnSyncQueue(node)) { // 检查是否在同步队列
LockSupport.park(this); // 阻塞线程
if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
break;
}
if (acquireQueued(node, savedState) && interruptMode != THROW)
interruptMode = REINTERRUPT;
if (node.nextWaiter != null) // clean up if cancelled
unlinkCancelledWaiters();
if (interruptMode != 0)
reportInterruptAfterWait(interruptMode);
}
4.3 signal() 方法
signal() 方法用于唤醒一个等待在 Condition 上的线程。其主要步骤如下:
- 从等待队列中移除头节点: 从
Condition的等待队列中移除头节点,该节点对应于等待时间最长的线程。 - 将节点添加到同步队列: 将移除的节点添加到
AQS的同步队列中,使其有机会重新获取锁。 - 唤醒线程: 使用
LockSupport.unpark()方法唤醒与该节点关联的线程。
public final void signal() {
if (!isHeldExclusively())
throw new IllegalMonitorStateException();
Node first = firstWaiter;
if (first != null)
doSignal(first);
}
private void doSignal(Node first) {
do {
if (firstWaiter == first)
firstWaiter = first.nextWaiter;
first.nextWaiter = null;
if (!transferForSignal(first)) // 将节点转移到同步队列
signalNext(first);
} while (firstWaiter != null && firstWaiter.status > 0);
}
final boolean transferForSignal(Node node) {
/*
* If cannot change waiter status, the waiter is cancelled.
* Propagate to unparkSuccessor so that getExclusiveQueuedFor(node)
* returns true.
*/
if (!compareAndSetWaitStatus(node, Node.CONDITION, 0))
return false;
/*
* Splice onto queue and try to set waitStatus of predecessor to
* indicate that thread is (probably) waiting. If even that fails
* return to signalNext only propagating the signal that it is safe
* to retry.
*/
Node p = enq(node); // 将节点添加到同步队列
int ws = p.waitStatus;
if (ws > 0 || !compareAndSetWaitStatus(p, ws, Node.SIGNAL))
LockSupport.unpark(node.thread); // 唤醒线程
return true;
}
4.4 signalAll() 方法
signalAll() 方法用于唤醒所有等待在 Condition 上的线程。其主要步骤如下:
- 遍历等待队列: 遍历
Condition的等待队列,将所有节点依次移除并添加到AQS的同步队列中。 - 唤醒所有线程: 使用
LockSupport.unpark()方法唤醒与所有节点关联的线程。
public final void signalAll() {
if (!isHeldExclusively())
throw new IllegalMonitorStateException();
Node first = firstWaiter;
if (first != null)
doSignalAll(first);
}
private void doSignalAll(Node first) {
lastWaiter = firstWaiter = null;
do {
Node next = first.nextWaiter;
first.nextWaiter = null;
if (!transferForSignal(first))
signalNext(first);
first = next;
} while (first != null);
}
4.5 关键代码逻辑解释
addConditionWaiter(): 这个方法负责创建一个新的Node(类型为Node.CONDITION),并将当前线程包装进去,然后添加到Condition的等待队列的末尾。如果等待队列为空,则新节点同时是头节点和尾节点。如果队列不为空,则将新节点链接到当前的尾节点之后。fullyRelease(Node node): 这个方法负责完全释放锁,并返回锁释放之前的state值。Condition的await操作需要先释放锁,才能让其他线程有机会获取锁,从而改变等待的条件。isOnSyncQueue(Node node): 这个方法检查给定的节点是否已经存在于AQS的同步队列中。当一个线程被signal后,它对应的节点会被转移到同步队列中,等待重新获取锁。acquireQueued(Node node, int savedState): 这个方法负责让节点在同步队列中排队,并尝试获取锁。如果获取成功,则返回。如果在等待过程中被中断,则会设置中断标志。transferForSignal(Node node): 这个方法是signal操作的核心。它尝试将等待队列中的节点转移到同步队列中。首先,它会尝试将节点的waitStatus从CONDITION修改为0。如果修改失败,说明节点已经被取消了。如果修改成功,则将节点加入到同步队列中。然后,检查前驱节点的状态,如果前驱节点的状态大于0或者修改前驱节点的状态为SIGNAL失败,则直接unpark该节点对应的线程。unlinkCancelledWaiters(): 这个方法用于清理等待队列中被取消的节点。
4.6 ConditionObject的等待队列和AQS的同步队列
ConditionObject维护一个等待队列(也称为条件队列),而AQS维护一个同步队列(也称为CLH队列)。这两个队列是不同的,用于不同的目的。
| 特性 | ConditionObject 的等待队列 | AQS 的同步队列 |
|---|---|---|
| 目的 | 存储等待特定条件的线程。线程在这个队列中等待,直到条件满足。 | 存储等待获取锁的线程。线程在这个队列中按照FIFO的顺序等待获取锁。 |
| 节点类型 | 节点类型为Node.CONDITION。 |
节点类型为Node.EXCLUSIVE(独占模式)或Node.SHARED(共享模式)。 |
| 状态 | 节点的状态通常为Node.CONDITION,表示线程正在等待。 |
节点的状态可以是SIGNAL(表示后继节点需要被唤醒)、CANCELLED(表示节点被取消)、0(初始状态)、PROPAGATE(表示需要传播唤醒信号)等。 |
| 队列属性 | 是一个简单的链表,只包含firstWaiter和lastWaiter指针。 |
是一个CLH队列,每个节点都有指向前驱节点和后继节点的指针。 |
| 线程转移 | 当条件满足时,线程会从等待队列转移到同步队列。这个过程通过transferForSignal方法实现。 |
线程在同步队列中等待获取锁,一旦锁被释放,队列中的线程会按照FIFO的顺序尝试获取锁。 |
| 唤醒机制 | 通过signal或signalAll方法唤醒等待队列中的线程,然后将它们转移到同步队列。 |
通过unparkSuccessor方法唤醒同步队列中的后继节点对应的线程。 |
| 应用场景 | 用于实现更精细的线程等待和唤醒机制,允许线程在不同的条件下等待。例如,生产者-消费者模型中,生产者等待队列用于存储等待生产的线程,消费者等待队列用于存储等待消费的线程。 | 用于实现锁的公平性和同步,保证线程按照一定的顺序获取锁,避免饥饿现象。 |
总结来说,ConditionObject的等待队列用于管理等待特定条件的线程,而AQS的同步队列用于管理等待获取锁的线程。Condition机制允许线程在等待队列和同步队列之间转移,从而实现更灵活的线程同步和通信。
5. Condition 的使用场景和最佳实践
Condition 对象在以下场景中特别有用:
- 生产者-消费者问题: 使用多个
Condition对象可以分别管理等待生产的线程和等待消费的线程,避免不必要的线程竞争。 - 有界缓冲区: 当缓冲区满时,生产者线程可以等待;当缓冲区空时,消费者线程可以等待。
- 多条件同步: 当线程需要等待多个条件满足时,可以使用多个
Condition对象分别管理这些条件。
最佳实践:
- 始终在
Lock的保护范围内使用Condition对象。 - 在
await()方法返回后,始终重新检查等待条件。 - 使用
signalAll()方法可以避免死锁,但可能会降低性能。 - 避免长时间持有锁,以免影响其他线程的执行。
下面是一个使用 Condition 对象实现生产者-消费者问题的示例:
import java.util.LinkedList;
import java.util.Queue;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
public class ProducerConsumer {
private final Lock lock = new ReentrantLock();
private final Condition notFull = lock.newCondition();
private final Condition notEmpty = lock.newCondition();
private final Queue<Integer> queue = new LinkedList<>();
private final int capacity;
public ProducerConsumer(int capacity) {
this.capacity = capacity;
}
public void produce(int item) throws InterruptedException {
lock.lock();
try {
while (queue.size() == capacity) {
notFull.await(); // 缓冲区已满,等待消费者消费
}
queue.offer(item);
System.out.println("Produced: " + item);
notEmpty.signal(); // 通知消费者可以消费了
} finally {
lock.unlock();
}
}
public int consume() throws InterruptedException {
lock.lock();
try {
while (queue.isEmpty()) {
notEmpty.await(); // 缓冲区为空,等待生产者生产
}
int item = queue.poll();
System.out.println("Consumed: " + item);
notFull.signal(); // 通知生产者可以生产了
return item;
} finally {
lock.unlock();
}
}
public static void main(String[] args) throws InterruptedException {
ProducerConsumer pc = new ProducerConsumer(5);
Thread producer = new Thread(() -> {
try {
for (int i = 0; i < 10; i++) {
pc.produce(i);
Thread.sleep((long) (Math.random() * 100));
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
});
Thread consumer = new Thread(() -> {
try {
for (int i = 0; i < 10; i++) {
pc.consume();
Thread.sleep((long) (Math.random() * 100));
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
});
producer.start();
consumer.start();
producer.join();
consumer.join();
}
}
6. 总结
Condition 对象是 Java 并发编程中一个强大的工具,它提供了一种比 wait/notify 更灵活的线程等待和唤醒机制。Condition 对象与 Lock 接口配合使用,可以实现更细粒度的线程同步和通信。理解 Condition 对象的源码和使用场景,可以帮助我们编写更高效、更可靠的并发程序。
通过对源码的分析,我们知道了:
ConditionObject通过维护一个等待队列来实现线程的等待和唤醒。Condition必须配合Lock使用,实现互斥。signal()和signalAll()方法用于唤醒等待的线程,await()方法用于使线程进入等待状态。