Java AQS 的锁排队机制:CLH 队列中 Node 节点的等待状态与条件等待
大家好,今天我们来深入探讨 Java 并发编程中一个至关重要的概念:AbstractQueuedSynchronizer(AQS)框架中的锁排队机制,特别是 CLH 队列中 Node 节点的等待状态以及条件等待的实现。AQS 是构建 Java 并发工具的基础,理解其内部机制对于编写高效、可靠的并发程序至关重要。
AQS 核心概念回顾
AQS 的核心思想是维护一个同步状态 (state) 和一个 FIFO 的等待队列 (CLH 队列)。同步状态表示锁的可用性,而等待队列则管理等待获取锁的线程。线程试图获取锁时,如果同步状态不可用,则会将当前线程封装成一个 Node 节点加入到 CLH 队列中,并进入阻塞状态。当锁被释放时,AQS 会唤醒等待队列中的一个或多个线程,使其尝试获取锁。
CLH 队列:线程排队的基石
CLH 队列是一种隐式的双向链表,用于管理等待获取锁的线程。虽然被称为队列,但它并没有显式的队列结构,而是通过 Node 节点的 prev 和 next 指针来维护线程的排队顺序。
每个等待锁的线程都会被封装成一个 Node 对象,包含以下关键属性:
thread: 代表正在等待锁的线程。waitStatus: 表示节点的等待状态,这是我们今天要重点讨论的内容。prev: 指向队列中的前驱节点。next: 指向队列中的后继节点。
Node 节点的等待状态 (waitStatus)
waitStatus 是一个重要的属性,它反映了节点在 CLH 队列中的状态,直接影响着 AQS 如何处理等待线程。waitStatus 可以取以下几个值:
| 值 | 含义 |
|---|---|
0 |
初始状态,表示节点正在等待获取锁。 |
SIGNAL |
表示当前节点的后继节点需要被唤醒。当一个节点释放锁后,会尝试将后继节点的 waitStatus 设置为 SIGNAL,以便后继节点能够被唤醒。 |
CANCELLED |
表示节点对应的线程被取消。通常发生在线程在等待过程中被中断或者超时。 |
CONDITION |
表示节点正在等待条件变量。 |
PROPAGATE |
表示释放操作需要传播给其他节点。 这通常用于共享模式(例如 Semaphore,CountDownLatch),当一个节点释放锁时,不仅需要唤醒它的后继节点,还需要确保后续节点也能够获得锁。 |
深入解析 waitStatus 的作用
-
0(初始状态): 当一个线程首次尝试获取锁失败,并且被加入到 CLH 队列时,其对应的 Node 节点的waitStatus初始化为0。这表示该节点处于一个“未激活”的等待状态,还没有收到任何唤醒信号。 -
SIGNAL:SIGNAL状态是最重要的状态之一。当一个节点成为队列中的头节点的后继节点,并且头节点释放锁时,头节点会尝试将后继节点的waitStatus设置为SIGNAL。这表示后继节点(即当前节点)应该被唤醒,尝试获取锁。- 设置
SIGNAL的时机: 主要发生在unparkSuccessor()方法中。该方法在释放锁时被调用,用于唤醒后继节点。 - 作用: 确保当锁可用时,等待队列中的线程能够被及时唤醒。
// AQS 中 unparkSuccessor() 方法的部分代码 private void unparkSuccessor(Node node) { /* * If status is negative (i.e., possibly needing signal) try * to clear in anticipation of signalling. It is OK if this * fails or if status is changed by waiting thread. */ int ws = node.waitStatus; if (ws < 0) compareAndSetWaitStatus(node, ws, 0); /* * Thread to unpark is held in successor, which is normally * just the next node. But if cancelled or apparently null, * traverse backwards from tail to find the actual * non-cancelled successor. */ Node s = node.next; if (s == null || s.waitStatus > 0) { s = null; for (Node t = tail; t != null && t != node; t = t.prev) if (t.waitStatus <= 0) s = t; } if (s != null) LockSupport.unpark(s.thread); } - 设置
-
CANCELLED:CANCELLED状态表示与该节点关联的线程已经被取消。这通常发生在以下情况:- 线程在等待锁的过程中被中断 (
Thread.interrupt())。 - 线程在等待锁的过程中超时。
当一个节点的状态变为
CANCELLED时,AQS 会将其从队列中移除,避免不必要的唤醒操作。- 处理
CANCELLED节点: AQS 会在尝试获取锁之前检查当前节点是否被取消。如果被取消,则会尝试将其从队列中移除,并重新尝试获取锁。
// AQS 中 acquireQueued() 方法的部分代码,用于处理 CANCELLED 节点 final boolean acquireQueued(final Node node, int arg) { boolean failed = true; try { boolean interrupted = false; for (;;) { final Node p = node.predecessor(); if (p == head && tryAcquire(arg)) { setHead(node); p.next = null; // help GC failed = false; return interrupted; } if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt()) interrupted = true; } } finally { if (failed) cancelAcquire(node); // 如果获取失败,则取消获取 } } private void cancelAcquire(Node node) { // Ignore if node doesn't exist if (node == null) return; node.thread = null; // Skip cancelled predecessors Node pred = node.prev; while (pred.waitStatus > 0) node.prev = pred = pred.prev; // predNext is the apparent node to unsplice. CASes below will // fail if not, so lost races are handled gracefully Node predNext = pred.next; // Can use unconditional write instead of CAS if predCorrectlyRefersTo(pred, node) // i.e., pred is still valid node.waitStatus = Node.CANCELLED; // If we are the tail, remove ourselves. if (node == tail && compareAndSetTail(node, pred)) { compareAndSetNext(pred, predNext, null); } else { // If successor needs signal, wake up to dispose of us int ws; if (pred != head && ((ws = pred.waitStatus) == Node.SIGNAL || (ws <= 0 && compareAndSetWaitStatus(pred, ws, Node.SIGNAL))) && (pred.thread != null)) { Node next = node.next; if (next != null && next.waitStatus <= 0) compareAndSetNext(pred, predNext, next); } else { unparkSuccessor(node); } } } - 线程在等待锁的过程中被中断 (
-
CONDITION:CONDITION状态用于支持条件变量。当一个线程调用Condition.await()方法时,该线程会被封装成一个 Node 节点,并将其waitStatus设置为CONDITION,然后加入到与该 Condition 对象关联的等待队列中。- 与条件变量配合使用:
CONDITION状态的节点不会直接参与到锁的竞争中,它们只有在收到Condition.signal()或Condition.signalAll()信号时才会被移动到 CLH 队列,参与锁的竞争。
- 与条件变量配合使用:
-
PROPAGATE:PROPAGATE状态用于共享模式,例如Semaphore和CountDownLatch。当一个节点释放锁时,如果其waitStatus为PROPAGATE,则表示释放操作需要传播给其他节点,以确保后续节点也能够获得锁。- 共享锁的传播: 在共享模式下,多个线程可以同时持有锁。当一个线程释放锁时,需要确保所有等待的线程都能获得锁,因此需要将释放操作传播给所有等待的线程。
条件等待 (Condition Waiting)
AQS 的条件等待机制允许线程在获取锁之后,因为某些条件不满足而释放锁并进入等待状态,直到其他线程发出信号通知该条件已经满足。
-
ConditionObject: AQS 提供了一个内部类ConditionObject来实现条件等待。每个ConditionObject维护一个独立的等待队列,用于存储等待特定条件的线程。 -
await()方法: 当一个线程调用Condition.await()方法时,会执行以下操作:- 释放当前持有的锁。
- 将当前线程封装成一个 Node 节点,并将其
waitStatus设置为CONDITION,然后加入到与该 Condition 对象关联的等待队列中。 - 线程进入阻塞状态,等待被唤醒。
-
signal()和signalAll()方法: 当一个线程调用Condition.signal()或Condition.signalAll()方法时,会执行以下操作:- 从与该 Condition 对象关联的等待队列中移除一个或所有节点。
- 将这些节点加入到 CLH 队列中,并将它们的
waitStatus设置为0,以便它们能够参与锁的竞争。
条件等待的代码示例
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
public class ConditionExample {
private final Lock lock = new ReentrantLock();
private final Condition condition = lock.newCondition();
private boolean isReady = false;
public void awaitSignal() throws InterruptedException {
lock.lock();
try {
while (!isReady) {
System.out.println(Thread.currentThread().getName() + " 等待信号...");
condition.await();
}
System.out.println(Thread.currentThread().getName() + " 收到信号,继续执行...");
} finally {
lock.unlock();
}
}
public void signalAll() {
lock.lock();
try {
isReady = true;
System.out.println(Thread.currentThread().getName() + " 发送信号...");
condition.signalAll();
} finally {
lock.unlock();
}
}
public static void main(String[] args) throws InterruptedException {
ConditionExample example = new ConditionExample();
Thread t1 = new Thread(() -> {
try {
example.awaitSignal();
} catch (InterruptedException e) {
e.printStackTrace();
}
}, "Thread-1");
Thread t2 = new Thread(() -> {
try {
example.awaitSignal();
} catch (InterruptedException e) {
e.printStackTrace();
}
}, "Thread-2");
Thread t3 = new Thread(() -> {
example.signalAll();
}, "Thread-3");
t1.start();
t2.start();
Thread.sleep(1000); // 确保 t1 和 t2 进入等待状态
t3.start();
}
}
在这个例子中,Thread-1 和 Thread-2 会调用 awaitSignal() 方法,因为 isReady 初始值为 false,所以它们会进入等待状态。Thread-3 会调用 signalAll() 方法,将 isReady 设置为 true,并发送信号唤醒所有等待的线程。
AQS 流程图
虽然不能在这里插入图片,但可以简单描述AQS的流程:
- 线程尝试获取锁 (
tryAcquire)。 - 如果获取成功,直接返回。
- 如果获取失败,将当前线程封装成 Node,加入 CLH 队列。
- Node 节点进入等待状态,
waitStatus为 0。 - 当锁被释放 (
tryRelease) 时,头节点尝试唤醒后继节点 (如果waitStatus为SIGNAL)。 - 被唤醒的节点重新尝试获取锁。
- 如果线程在等待过程中被中断,
waitStatus会被设置为CANCELLED,并从队列中移除。 - 条件等待通过
ConditionObject实现,线程在条件不满足时释放锁并进入条件队列,等待信号。
总结 Node 状态与 AQS 的关系
Node 节点的 waitStatus 是 AQS 实现锁排队和条件等待机制的关键。通过维护 waitStatus,AQS 能够有效地管理等待线程的状态,并根据不同的状态采取相应的行动,例如唤醒线程、取消线程或将其加入到条件队列中。理解 waitStatus 的含义和作用,对于深入理解 AQS 的工作原理至关重要。
清晰理解 AQS 的重要性
通过今天的学习,我们深入了解了 AQS 框架中 CLH 队列的 Node 节点的等待状态以及条件等待的实现。掌握这些概念对于理解 Java 并发工具的底层机制,编写高效、可靠的并发程序至关重要。AQS 是 Java 并发编程的基石,理解它能让我们更好地应对复杂的并发场景。