好的,我们开始吧。
AQS(AbstractQueuedSynchronizer)源码:Node节点的CLH队列排队机制
大家好!今天我们深入探讨Java并发编程中至关重要的基石——AbstractQueuedSynchronizer (AQS) 的核心机制:Node节点的CLH队列排队机制。AQS是构建锁和其他同步组件的关键抽象,理解其内部原理对于编写高效、可靠的并发程序至关重要。本次讲座将从CLH队列的理论基础入手,结合AQS源码,详细剖析Node节点在AQS中的作用,以及排队、唤醒等关键操作的实现。
1. CLH队列:理论基础
CLH队列(Craig, Landin, and Hagersten queue)是一种基于链表的自旋锁队列,用于解决多线程并发访问共享资源时的排队问题。它具有以下关键特性:
- FIFO(First-In, First-Out): 线程按照请求锁的顺序排队,先请求的线程先获得锁,保证公平性。
- 链表结构: 线程封装成节点(Node),通过前驱节点(predecessor)和后继节点(successor)连接成一个链表。
- 自旋等待: 线程在等待锁时,不会阻塞,而是不断循环检查前驱节点的状态,直到前驱节点释放锁。
- 隐式队列: CLH队列的实际存在是通过节点之间的链接关系来体现的,无需显式的队列数据结构。
CLH队列的优点在于:
- 公平性: 保证线程按照请求顺序获得锁。
- 性能: 自旋等待减少了线程切换的开销,提高了并发性能。
- 可扩展性: 易于扩展到大规模并发环境。
2. AQS中的Node节点
在AQS中,Node节点是CLH队列的基本组成单元,每个试图获取同步状态(例如锁)的线程都会被封装成一个Node节点,并加入到CLH队列中。Node节点包含以下关键属性:
| 属性 | 类型 | 描述 |
|---|---|---|
waitStatus |
volatile int |
节点的状态,用于指示节点的等待状态。 |
prev |
volatile Node |
指向前驱节点。 |
next |
volatile Node |
指向后继节点。 |
thread |
Thread |
与该节点关联的线程。 |
nextWaiter |
Node |
用于条件队列,这里我们暂不涉及。 |
waitStatus 属性是Node节点的核心,它表示节点当前的等待状态,可以取以下值:
0:初始状态,表示节点正在等待获取锁。CANCELLED:值为 1,表示节点已经被取消,不再参与锁竞争。SIGNAL:值为 -1,表示当前节点的后继节点需要被唤醒。当一个节点释放锁时,它会尝试唤醒它的后继节点。CONDITION:值为 -2,表示节点正在等待条件变量,与条件队列相关,这里我们暂不讨论。PROPAGATE:值为 -3,表示释放锁的操作需要传播给其他节点,通常用于共享模式。
3. AQS源码分析:入队操作
当一个线程尝试获取同步状态失败时,AQS会将该线程封装成一个Node节点,并加入到CLH队列的尾部。入队操作主要涉及以下几个步骤:
- 创建Node节点: 使用当前线程创建一个新的Node节点,
node = new Node(Thread.currentThread(), mode),其中mode可以是EXCLUSIVE(独占模式)或SHARED(共享模式)。 - CAS设置尾节点: 使用CAS(Compare and Swap)操作尝试将新节点设置为队列的尾节点。
- 处理并发情况: 如果CAS操作失败,说明有其他线程也在尝试入队,需要进入循环重试。
下面是AQS中enq(Node node)方法的源码,该方法负责将节点加入到CLH队列的尾部:
private Node enq(final Node node) {
for (;;) {
Node t = tail;
if (t == null) { // Must initialize
if (compareAndSetHead(null, new Node()))
tail = head;
} else {
node.prev = t;
if (compareAndSetTail(t, node)) {
t.next = node;
return t;
}
}
}
}
代码解释:
for (;;):一个无限循环,直到成功将节点加入队列为止。Node t = tail;:获取当前队列的尾节点。if (t == null):如果尾节点为空,说明队列为空,需要初始化。compareAndSetHead(null, new Node()):使用CAS操作将一个空的Node节点设置为头节点。tail = head;:将尾节点指向头节点。node.prev = t;:将新节点的前驱节点设置为当前尾节点。compareAndSetTail(t, node):使用CAS操作将新节点设置为尾节点。t.next = node;:将原尾节点的后继节点设置为新节点。return t;:返回原尾节点。
4. AQS源码分析:acquireQueued方法
acquireQueued(final Node node, int arg)方法是AQS的核心方法之一,它负责让线程在CLH队列中排队等待,直到获取到同步状态。该方法的主要逻辑如下:
- 自旋等待: 线程进入自旋等待状态,不断检查是否可以获取同步状态。
- 检查前驱节点: 只有当前驱节点是头节点时,线程才有资格尝试获取同步状态。
- 尝试获取同步状态: 调用
tryAcquire(arg)方法尝试获取同步状态,如果成功,则将当前节点设置为头节点,并返回。 - 设置waitStatus: 如果获取同步状态失败,则根据情况设置节点的
waitStatus属性。 - 阻塞等待: 如果需要阻塞等待,则调用
LockSupport.park(this)方法阻塞当前线程。 - 中断处理: 如果线程被中断,则取消节点,并退出等待。
下面是acquireQueued(final Node node, int arg)方法的源码:
final boolean acquireQueued(final Node node, int arg) {
boolean failed = true;
try {
boolean interrupted = false;
for (;;) {
final Node p = node.predecessor();
if (p == head && tryAcquire(arg)) {
setHead(node);
p.next = null; // help GC
failed = false;
return interrupted;
}
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
interrupted = true;
}
} finally {
if (failed)
cancelAcquire(node);
}
}
代码解释:
failed = true;:设置失败标志,如果在方法执行过程中出现异常,则需要取消节点。boolean interrupted = false;:记录线程是否被中断。for (;;):一个无限循环,直到成功获取同步状态或被取消为止。final Node p = node.predecessor();:获取当前节点的前驱节点。if (p == head && tryAcquire(arg)):如果前驱节点是头节点,并且成功获取同步状态。setHead(node);:将当前节点设置为头节点。p.next = null;:将原头节点的后继节点设置为null,帮助GC。failed = false;:设置失败标志为false,表示成功获取同步状态。return interrupted;:返回线程是否被中断。if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt()):如果需要阻塞等待,并且线程被中断。interrupted = true;:设置线程被中断标志。if (failed):如果方法执行过程中出现异常。cancelAcquire(node);:取消节点。
shouldParkAfterFailedAcquire(Node pred, Node node)方法用于判断是否需要阻塞等待:
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
int ws = pred.waitStatus;
if (ws == Node.SIGNAL)
/*
* This node has already set status asking a release
* to signal it, so it can safely park.
*/
return true;
if (ws > 0) {
/*
* Predecessor was cancelled. Skip over predecessors and
* indicate retry.
*/
do {
node.prev = pred = pred.prev;
} while (pred != null && pred.waitStatus > 0);
pred.next = node;
} else {
/*
* waitStatus must be 0 or PROPAGATE. Indicate that we
* need a signal, but don't park yet. Caller will need to
* retry to make sure it doesn't miss a signal.
*/
compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
}
return false;
}
代码解释:
int ws = pred.waitStatus;:获取前驱节点的waitStatus属性。if (ws == Node.SIGNAL):如果前驱节点的waitStatus为SIGNAL,表示可以安全地阻塞等待。if (ws > 0):如果前驱节点的waitStatus大于0,表示前驱节点已经被取消,需要跳过该节点。compareAndSetWaitStatus(pred, ws, Node.SIGNAL);:使用CAS操作将前驱节点的waitStatus设置为SIGNAL,表示需要被唤醒。
parkAndCheckInterrupt()方法用于阻塞当前线程,并检查是否被中断:
private final boolean parkAndCheckInterrupt() {
LockSupport.park(this);
return Thread.interrupted();
}
代码解释:
LockSupport.park(this);:阻塞当前线程。Thread.interrupted();:检查线程是否被中断,并清除中断标志。
5. AQS源码分析:释放同步状态
当线程释放同步状态时,AQS需要唤醒CLH队列中的后继节点,使其有机会获取同步状态。释放操作主要涉及以下几个步骤:
- 尝试释放同步状态: 调用
tryRelease(arg)方法尝试释放同步状态,如果成功,则继续执行。 - 唤醒后继节点: 如果头节点的
waitStatus为SIGNAL,则唤醒后继节点。
下面是AQS中release(int arg)方法的源码:
public final boolean release(int arg) {
if (tryRelease(arg)) {
Node h = head;
if (h != null && h.waitStatus != 0)
unparkSuccessor(h);
return true;
}
return false;
}
代码解释:
if (tryRelease(arg)):调用tryRelease(arg)方法尝试释放同步状态,如果成功,则继续执行。Node h = head;:获取头节点。if (h != null && h.waitStatus != 0):如果头节点不为空,并且waitStatus不为0,表示需要唤醒后继节点。unparkSuccessor(h);:唤醒后继节点。return true;:返回true,表示成功释放同步状态。
unparkSuccessor(Node node)方法用于唤醒后继节点:
private void unparkSuccessor(Node node) {
/*
* If status is negative (i.e., possibly needing signal) try
* to clear in anticipation of signaling. It is OK if this
* fails or if status is changed by waiting thread.
*/
int ws = node.waitStatus;
if (ws < 0)
compareAndSetWaitStatus(node, ws, 0);
/*
* Thread to unpark is held in successor, which is normally
* just the next node. But if cancelled or apparently null,
* traverse backwards from tail to find the actual
* non-cancelled successor.
*/
Node s = node.next;
if (s == null || s.waitStatus > 0) {
s = null;
for (Node t = tail; t != null && t != node; t = t.prev)
if (t.waitStatus <= 0)
s = t;
}
if (s != null)
LockSupport.unpark(s.thread);
}
代码解释:
int ws = node.waitStatus;:获取节点的waitStatus属性。if (ws < 0):如果waitStatus小于0,表示需要清空waitStatus。compareAndSetWaitStatus(node, ws, 0);:使用CAS操作将waitStatus设置为0。Node s = node.next;:获取后继节点。if (s == null || s.waitStatus > 0):如果后继节点为空,或者waitStatus大于0,表示后继节点已经被取消,需要从尾节点开始向前遍历,找到第一个未被取消的节点。LockSupport.unpark(s.thread);:唤醒后继节点对应的线程。
6. 独占模式与共享模式
AQS支持两种同步模式:独占模式和共享模式。
- 独占模式: 只有一个线程可以获取同步状态,例如ReentrantLock。
- 共享模式: 多个线程可以同时获取同步状态,例如CountDownLatch。
在独占模式下,tryAcquire(arg)方法尝试独占式地获取同步状态,如果获取失败,则将当前线程加入到CLH队列中,并阻塞等待。在共享模式下,tryAcquireShared(arg)方法尝试共享式地获取同步状态,如果获取失败,则同样将当前线程加入到CLH队列中,但不需要阻塞等待,而是继续尝试获取同步状态,直到获取成功或被取消。
7. 取消等待
当线程在等待同步状态时,可能会被中断或超时,此时需要取消等待。取消等待的操作主要包括以下几个步骤:
- 设置waitStatus为CANCELLED: 将节点的
waitStatus属性设置为CANCELLED。 - 从队列中移除节点: 将节点从CLH队列中移除。
- 唤醒后继节点: 如果节点有后继节点,则唤醒后继节点。
AQS中cancelAcquire(Node node) 方法用于取消获取:
private void cancelAcquire(Node node) {
// Ignore if node doesn't exist
if (node == null)
return;
node.thread = null;
// Skip cancelled predecessors
Node pred = node.prev;
while (pred.waitStatus > 0)
node.prev = pred = pred.prev;
// predNext is the apparent node to unsplice. CASes below will
// fail if not, in which case, we lost race vs another cancel
// or signal, so no further action is necessary.
Node predNext = pred.next;
// Can use unconditional write instead of CAS if predCorrectlyRefersTo
// node.firstWaiter, but can't ensure this will always be true.
if (pred != node.prev)
return;
node.waitStatus = Node.CANCELLED;
// If we are the tail, remove ourselves.
if (node == tail && compareAndSetTail(node, pred)) {
compareAndSetNext(pred, predNext, null);
} else {
// If successor needs signal, try to clear node's signal
// bit, else unpark successor.
int ws;
if (pred != head &&
((ws = pred.waitStatus) == Node.SIGNAL ||
(ws <= 0 && compareAndSetWaitStatus(pred, ws, Node.SIGNAL))) &&
(pred.thread != null)) {
Node next = node.next;
if (next != null && next.waitStatus <= 0)
unparkSuccessor(next);
} else {
signalNext(node);
}
}
}
8. AQS的优势与局限
AQS作为Java并发编程的基础框架,具有以下优势:
- 灵活性: 可以用于构建各种同步组件,例如锁、信号量、CountDownLatch等。
- 可扩展性: 易于扩展到大规模并发环境。
- 公平性: 可以保证线程按照请求顺序获得同步状态。
AQS也存在一些局限性:
- 复杂性: 源码较为复杂,理解起来有一定难度。
- 性能开销: 自旋等待和CAS操作会带来一定的性能开销。
- 死锁风险: 如果使用不当,可能会导致死锁。
Node节点排队机制:AQS并发同步的基石
通过本次讲座,我们深入了解了AQS中Node节点的CLH队列排队机制。Node节点作为AQS并发同步的核心,在线程排队、唤醒和取消等方面发挥着至关重要的作用。掌握Node节点排队机制的原理,有助于我们更好地理解AQS,并编写高效、可靠的并发程序。