Java AQS ConditionObject: 精确唤醒的艺术
大家好,今天我们深入探讨Java并发编程中一个非常重要的组件:AQS(AbstractQueuedSynchronizer)的ConditionObject,以及它如何利用等待队列来实现线程的精确唤醒。AQS是构建许多同步器的基石,而ConditionObject则为我们在同步器内部实现更精细的线程控制提供了强大的工具。
1. AQS与ConditionObject的关联
AQS本质上是一个同步器框架,它维护一个同步状态(state)和一个FIFO的同步队列(CLH队列)。但AQS本身并不直接提供线程的等待/唤醒机制。这正是ConditionObject发挥作用的地方。
ConditionObject是AQS的一个内部类,它与AQS实例紧密关联,并且维护着一个独立的等待队列。每个ConditionObject实例都代表一个条件,当某个线程需要等待某个特定条件满足时,它就可以进入该ConditionObject的等待队列。
简而言之:AQS负责管理同步状态和同步队列,ConditionObject负责管理等待队列,并提供与条件相关的等待和唤醒操作。
2. ConditionObject的核心方法
ConditionObject提供了几个核心方法,用于线程的等待和唤醒:
await(): 使当前线程进入等待队列,直到被signal或signalAll唤醒,或者被中断。awaitUninterruptibly(): 与await()类似,但不会响应中断。awaitNanos(long nanosTimeout): 使当前线程进入等待队列,等待指定的时间,直到被signal或signalAll唤醒,或者超时。awaitUntil(Date deadline): 使当前线程进入等待队列,等待直到指定的时间点,直到被signal或signalAll唤醒,或者到达时间点。signal(): 唤醒等待队列中最先进入的一个线程。signalAll(): 唤醒等待队列中的所有线程。
3. ConditionObject的工作原理:等待
当一个线程调用condition.await()时,会发生以下步骤:
- 释放锁: 线程首先必须释放与ConditionObject关联的AQS同步器上的锁。 这一步至关重要,否则其他线程将无法进入临界区,条件也永远无法满足。
- 构建节点: 创建一个新的节点(Node),该节点包含当前线程的信息,并将其添加到ConditionObject的等待队列的末尾。 注意,这个等待队列与AQS的同步队列是完全独立的。
- 阻塞线程: 线程进入阻塞状态,等待被唤醒。 这个阻塞操作是由AQS的
LockSupport.park(this)方法实现的。 - 进入同步队列: 当线程被唤醒后(通过signal或signalAll),它会从ConditionObject的等待队列转移到AQS的同步队列,重新竞争锁。
4. ConditionObject的工作原理:唤醒
当一个线程调用condition.signal()时,会发生以下步骤:
- 检查等待队列: 检查ConditionObject的等待队列是否为空。如果为空,则什么也不做。
- 移除节点: 从等待队列的头部移除一个节点,该节点代表一个正在等待的线程。
- 转移到同步队列: 将该节点转移到AQS的同步队列中。这一步是通过
transferAfterCancelledWait(Node node)方法实现的。 本质上,就是将等待队列中的节点加入到AQS同步队列的末尾。 - 唤醒线程: 唤醒与该节点关联的线程,使其可以重新竞争锁。这一步是通过
LockSupport.unpark(thread)方法实现的。
signalAll()的操作类似,但它会遍历整个等待队列,将所有节点都转移到AQS的同步队列,并唤醒所有线程。
5. 代码示例:生产者-消费者问题
为了更直观地理解ConditionObject的使用,我们以经典的生产者-消费者问题为例。
import java.util.LinkedList;
import java.util.Queue;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
class Buffer {
private final Queue<Integer> queue;
private final int capacity;
private final Lock lock = new ReentrantLock();
private final Condition notFull = lock.newCondition();
private final Condition notEmpty = lock.newCondition();
public Buffer(int capacity) {
this.queue = new LinkedList<>();
this.capacity = capacity;
}
public void produce(int item) throws InterruptedException {
lock.lock();
try {
while (queue.size() == capacity) {
System.out.println("Buffer is full, producer is waiting...");
notFull.await(); // 阻塞生产者线程,直到缓冲区有空位
}
queue.offer(item);
System.out.println("Produced: " + item);
notEmpty.signal(); // 唤醒一个消费者线程
} finally {
lock.unlock();
}
}
public int consume() throws InterruptedException {
lock.lock();
try {
while (queue.isEmpty()) {
System.out.println("Buffer is empty, consumer is waiting...");
notEmpty.await(); // 阻塞消费者线程,直到缓冲区有数据
}
int item = queue.poll();
System.out.println("Consumed: " + item);
notFull.signal(); // 唤醒一个生产者线程
return item;
} finally {
lock.unlock();
}
}
}
public class ProducerConsumer {
public static void main(String[] args) {
Buffer buffer = new Buffer(5);
Thread producer = new Thread(() -> {
try {
for (int i = 0; i < 10; i++) {
buffer.produce(i);
Thread.sleep((long) (Math.random() * 100)); // 模拟生产时间
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
});
Thread consumer = new Thread(() -> {
try {
for (int i = 0; i < 10; i++) {
buffer.consume();
Thread.sleep((long) (Math.random() * 150)); // 模拟消费时间
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
});
producer.start();
consumer.start();
}
}
在这个例子中:
Buffer类维护一个固定大小的队列,作为生产者和消费者共享的缓冲区。ReentrantLock用于保护对队列的并发访问。notFullConditionObject用于生产者线程,当缓冲区满时,生产者线程会调用notFull.await()进入等待队列。notEmptyConditionObject用于消费者线程,当缓冲区为空时,消费者线程会调用notEmpty.await()进入等待队列。- 生产者生产数据后,调用
notEmpty.signal()唤醒一个消费者线程。 - 消费者消费数据后,调用
notFull.signal()唤醒一个生产者线程。
6. 精确唤醒的优势
使用ConditionObject进行精确唤醒,相比于使用Object.wait()/Object.notify()/Object.notifyAll(),有以下优势:
- 避免不必要的唤醒: 使用
Object.notifyAll()会唤醒所有等待的线程,即使某些线程的条件并不满足,这会导致不必要的竞争和上下文切换。ConditionObject的signal()方法只唤醒等待特定条件的线程,避免了这种浪费。 - 更好的可读性和可维护性: 使用多个ConditionObject可以清晰地表达不同的等待条件,使代码更易于理解和维护。
- 更高的性能: 由于避免了不必要的唤醒,通常可以获得更高的性能。
7. ConditionObject与公平性
ConditionObject的等待队列的公平性取决于AQS同步器的公平性。
- 公平锁: 如果AQS同步器是公平锁,那么ConditionObject的唤醒也是公平的,即等待队列中最先进入的线程会被优先唤醒。
- 非公平锁: 如果AQS同步器是非公平锁,那么ConditionObject的唤醒也是非公平的,即使等待队列中有线程在等待,新到达的线程也可能先获得锁。
8. ConditionObject的注意事项
- 必须持有锁: 调用
await()、signal()和signalAll()方法之前,必须持有与ConditionObject关联的AQS同步器上的锁。 否则会抛出IllegalMonitorStateException。 - 循环检查条件: 从
await()方法返回后,应该在一个循环中重新检查条件是否满足。 这是因为线程可能被虚假唤醒(spurious wakeup),或者在等待期间条件可能已经被其他线程改变。 - 处理中断:
await()方法会响应中断,因此需要适当地处理InterruptedException。 - 避免死锁: 在使用多个锁和多个ConditionObject时,要特别注意避免死锁的发生。
9. await的深入理解: transferAfterCancelledWait
在await方法返回后,如果线程是被中断唤醒的,或者在等待期间被取消,那么会调用transferAfterCancelledWait方法。这个方法的作用是将当前节点的状态设置为取消,并尝试将其从Condition队列移动到AQS的同步队列中。
final void transferAfterCancelledWait(Node node) {
/*
* If this node was cancelled after being enqueued, allow to
* cancel followers. Also clear pointers to minimise garbage
* retention.
*/
if (compareAndSetWaitStatus(node, Node.CONDITION, 0)) {
enq(node); // 加入AQS同步队列
return;
}
/*
* If we did not set waitStatus to 0, then it must be that
* signal/signalAll race with the cancellation of this node.
* In this case, signal/signalAll are responsible for
* continuing the rest of the transfer.
*/
while (!isOnSyncQueue(node))
Thread.yield();
}
private final boolean isOnSyncQueue(Node node) {
if (node.waitStatus == Node.CONDITION || node.prev == null)
return false;
if (node.next == null)
return false;
return true;
}
这段代码的核心在于:
- CAS更新状态: 首先尝试使用CAS操作将节点的状态从
CONDITION修改为0。如果成功,说明当前节点还没有被signal或signalAll处理,那么就将其加入到AQS的同步队列中。 - 处理竞争: 如果CAS操作失败,说明
signal或signalAll可能正在处理这个节点。 在这种情况下,signal或signalAll会负责完成后续的转移操作。isOnSyncQueue方法用于判断节点是否已经进入同步队列。
10. 使用表格总结ConditionObject的关键点
| 特性 | 描述 |
|---|---|
| 关联对象 | AQS同步器实例 |
| 主要功能 | 实现线程的等待和唤醒,基于特定条件 |
| 核心方法 | await(), awaitUninterruptibly(), awaitNanos(), awaitUntil(), signal(), signalAll() |
| 等待队列 | 维护一个独立的等待队列,与AQS同步队列分离 |
| 唤醒机制 | signal()唤醒等待队列中的第一个线程,signalAll()唤醒所有线程 |
| 适用场景 | 需要精细控制线程等待和唤醒的场景,例如生产者-消费者问题,读写锁等 |
| 公平性 | 取决于关联的AQS同步器的公平性 |
| 异常处理 | 需要处理InterruptedException,并循环检查条件 |
| 注意事项 | 必须持有锁才能调用相关方法,避免死锁 |
线程控制的精髓
ConditionObject是AQS框架中一个至关重要的组成部分,它允许我们构建更加灵活和高效的并发程序。理解ConditionObject的工作原理,掌握其核心方法的使用,能够帮助我们更好地控制线程的等待和唤醒,从而解决各种复杂的并发问题。掌握精确唤醒的艺术,能够写出更优雅,更高性能的并发代码。