JAVA AQS底层原理深度解构:独占锁与共享锁的完整实现机制
大家好,今天我们来深入探讨Java并发编程中一个非常重要的组件——AbstractQueuedSynchronizer,也就是大家常说的AQS。AQS是构建锁和同步器的基础框架,理解AQS的底层原理对于编写高效、可靠的并发程序至关重要。我们将从AQS的基本概念出发,逐步剖析其核心机制,并重点分析独占锁和共享锁的实现原理,最后通过具体的代码示例来加深理解。
1. AQS:并发的基石
AQS,抽象队列同步器,是一个抽象类,它提供了一个基于FIFO队列的阻塞锁和相关的同步器框架。AQS的核心思想是使用一个int类型的state变量来表示同步状态,并通过内置的FIFO队列来管理竞争资源的线程。AQS本身并不直接实现任何同步机制,而是定义了一套模板方法,子类通过继承AQS并实现这些模板方法来实现特定的同步语义。
1.1 AQS的核心属性
AQS主要包含以下几个核心属性:
state(int): 同步状态,用于表示锁的持有状态或其他同步条件。它是一个volatile变量,保证了可见性。head(Node): FIFO队列的头节点,指向等待队列的第一个节点。tail(Node): FIFO队列的尾节点,指向等待队列的最后一个节点。Node(内部类): 表示等待队列中的一个节点,包含线程引用、等待状态等信息。
1.2 AQS的核心方法
AQS提供了一系列核心方法,子类需要根据具体的同步需求来实现这些方法。这些方法主要分为以下几类:
-
状态访问和修改方法:
getState(): 获取当前同步状态。setState(int newState): 设置同步状态。compareAndSetState(int expect, int update): 使用CAS操作原子地设置同步状态,保证线程安全。
-
独占模式相关方法:
tryAcquire(int arg): 尝试以独占模式获取同步状态。如果成功,则返回true,否则返回false。tryRelease(int arg): 尝试释放以独占模式占有的同步状态。如果成功,则返回true,否则返回false。isHeldExclusively(): 如果当前线程以独占模式持有同步状态,则返回true,否则返回false。
-
共享模式相关方法:
tryAcquireShared(int arg): 尝试以共享模式获取同步状态。如果成功,则返回一个非负值,表示剩余可用资源;如果失败,则返回一个负值。tryReleaseShared(int arg): 尝试释放以共享模式占有的同步状态。如果成功,则返回true,否则返回false。
2. AQS的底层原理:同步队列和等待机制
AQS的核心在于维护一个FIFO同步队列,用于管理竞争资源的线程。当线程尝试获取同步状态失败时,会被封装成一个Node节点,并加入到等待队列中。当持有同步状态的线程释放资源时,会唤醒等待队列中的一个或多个线程,让它们重新尝试获取同步状态。
2.1 同步队列的结构
同步队列是一个双向链表,每个节点包含以下信息:
| 属性 | 描述 |
|---|---|
thread |
等待资源的线程引用 |
mode |
节点模式,分为Node.EXCLUSIVE (独占模式) 和 Node.SHARED (共享模式) |
waitStatus |
节点等待状态,包括SIGNAL (需要被唤醒)、CANCELLED (已取消) 等 |
prev |
指向前驱节点 |
next |
指向后继节点 |
2.2 线程入队流程
当线程尝试获取同步状态失败时,AQS会将该线程封装成一个Node节点,并加入到同步队列的尾部。入队过程大致如下:
-
创建Node节点: 根据线程的请求模式(独占或共享)创建一个Node节点,并将线程引用保存在节点中。
Node node = new Node(Thread.currentThread(), mode); -
CAS尝试添加尾节点: 使用CAS操作将新节点添加到队列的尾部。如果队列为空,则需要初始化队列。
Node pred = tail; if (pred == null) { // 队列为空,初始化队列 if (compareAndSetHead(new Node())) tail = head; pred = tail; } if (compareAndSetTail(pred, node)) { pred.next = node; return node; } -
自旋重试: 如果CAS操作失败,说明有其他线程也在尝试添加节点,此时需要自旋重试,直到添加成功。
2.3 线程出队和唤醒流程
当持有同步状态的线程释放资源时,AQS会唤醒等待队列中的一个或多个线程,让它们重新尝试获取同步状态。出队和唤醒过程大致如下:
-
释放资源: 持有同步状态的线程调用
release或releaseShared方法释放资源。 -
唤醒后继节点: AQS会检查头节点的后继节点的等待状态。如果后继节点处于
SIGNAL状态(表示需要被唤醒),则会唤醒该节点对应的线程。if (nodeToUnpark != null) LockSupport.unpark(nodeToUnpark.thread); -
传播唤醒: 对于共享模式,如果唤醒的线程成功获取了同步状态,并且还有剩余可用资源,则会继续唤醒后续节点,直到没有剩余资源或到达队列尾部。
2.4 等待状态的变化
Node节点的waitStatus属性表示节点的等待状态,它的变化直接影响着线程的唤醒和调度。常见的waitStatus值包括:
waitStatus |
描述 |
|---|---|
0 |
初始状态,表示节点正在等待获取同步状态。 |
SIGNAL |
表示当前节点的后继节点需要被唤醒。当一个节点的前驱节点释放资源时,会将后继节点的waitStatus设置为SIGNAL,表示需要唤醒后继节点。 |
CANCELLED |
表示节点已经被取消。当线程中断或超时时,会将对应的节点的waitStatus设置为CANCELLED,表示该节点不再需要参与同步竞争。 |
CONDITION |
表示节点正在等待某个条件成立。通常与Condition对象配合使用,当条件不满足时,线程会被放入Condition对象的等待队列中,并将节点的waitStatus设置为CONDITION。当条件满足时,Condition对象会将等待队列中的节点移动到AQS的同步队列中,等待获取同步状态。 |
PROPAGATE |
仅用于共享模式,表示释放操作需要传播给其他节点。当一个共享模式的节点成功释放资源后,会将后继节点的waitStatus设置为PROPAGATE,表示需要继续传播释放操作,直到没有剩余资源或到达队列尾部。 |
3. 独占锁的实现:以ReentrantLock为例
ReentrantLock是Java中常用的可重入独占锁,它基于AQS实现。ReentrantLock内部维护了一个Sync抽象类,Sync有两个子类:NonfairSync (非公平锁) 和 FairSync (公平锁)。它们分别实现了不同的同步策略。
3.1 NonfairSync (非公平锁) 的实现
非公平锁的特点是,线程尝试获取锁时,会先尝试直接获取锁,如果获取失败,才会加入到等待队列中。这种策略可能会导致某些线程一直无法获取到锁,产生饥饿现象,但通常情况下性能更高。
-
tryAcquire(int arg):final boolean nonfairTryAcquire(int acquires) { final Thread current = Thread.currentThread(); int c = getState(); if (c == 0) { // 如果当前锁未被占用 if (compareAndSetState(0, acquires)) { // 尝试CAS获取锁 setExclusiveOwnerThread(current); // 设置独占线程 return true; } } else if (current == getExclusiveOwnerThread()) { // 如果当前线程已经持有锁,则重入 int nextc = c + acquires; if (nextc < 0) // overflow throw new Error("Maximum lock count exceeded"); setState(nextc); // 增加重入次数 return true; } return false; } -
tryRelease(int arg):protected final boolean tryRelease(int releases) { int c = getState() - releases; if (Thread.currentThread() != getExclusiveOwnerThread()) throw new IllegalMonitorStateException(); boolean free = false; if (c == 0) { // 如果重入次数为0,则释放锁 free = true; setExclusiveOwnerThread(null); // 清空独占线程 } setState(c); // 更新同步状态 return free; }
3.2 FairSync (公平锁) 的实现
公平锁的特点是,线程尝试获取锁时,会先检查等待队列中是否有其他线程正在等待。如果有,则会直接加入到等待队列中,按照FIFO的顺序获取锁。这种策略可以避免饥饿现象,但性能相对较低。
-
tryAcquire(int arg):protected final boolean tryAcquire(int acquires) { final Thread current = Thread.currentThread(); int c = getState(); if (c == 0) { // 如果当前锁未被占用 if (!hasQueuedPredecessors() && // 检查队列中是否有前驱节点 compareAndSetState(0, acquires)) { // 尝试CAS获取锁 setExclusiveOwnerThread(current); // 设置独占线程 return true; } } else if (current == getExclusiveOwnerThread()) { // 如果当前线程已经持有锁,则重入 int nextc = c + acquires; if (nextc < 0) throw new Error("Maximum lock count exceeded"); setState(nextc); return true; } return false; } -
hasQueuedPredecessors():public final boolean hasQueuedPredecessors() { // The correctness of this depends on head being initialized // before tail and on head.next being accurate if the current // head is transient. Node h = head; Node s; return h != tail && ((s = h.next) == null || s.thread != Thread.currentThread()); }
4. 共享锁的实现:以CountDownLatch为例
CountDownLatch是Java中常用的共享锁,它允许一个或多个线程等待一组操作完成。CountDownLatch内部维护了一个Sync类,该类继承自AQS,并实现了共享模式的同步语义。
-
tryAcquireShared(int arg):protected int tryAcquireShared(int acquires) { return (getState() == 0) ? 1 : -1; // 如果state为0,则返回1,表示获取成功;否则返回-1,表示获取失败 } -
tryReleaseShared(int arg):protected boolean tryReleaseShared(int releases) { for (;;) { int c = getState(); if (c == 0) return false; int nextc = c - 1; if (compareAndSetState(c, nextc)) // CAS更新state return nextc == 0; // 如果state变为0,则返回true,表示所有线程都已完成操作 } }
5. 代码示例:使用ReentrantLock实现线程安全的计数器
import java.util.concurrent.locks.ReentrantLock;
public class Counter {
private int count = 0;
private final ReentrantLock lock = new ReentrantLock();
public void increment() {
lock.lock(); // 获取锁
try {
count++;
} finally {
lock.unlock(); // 释放锁
}
}
public int getCount() {
lock.lock();
try {
return count;
} finally {
lock.unlock();
}
}
public static void main(String[] args) throws InterruptedException {
Counter counter = new Counter();
int numThreads = 10;
Thread[] threads = new Thread[numThreads];
for (int i = 0; i < numThreads; i++) {
threads[i] = new Thread(() -> {
for (int j = 0; j < 1000; j++) {
counter.increment();
}
});
threads[i].start();
}
for (int i = 0; i < numThreads; i++) {
threads[i].join();
}
System.out.println("Count: " + counter.getCount()); // 输出 Count: 10000
}
}
6. 代码示例:使用CountDownLatch实现多线程并发执行
import java.util.concurrent.CountDownLatch;
public class CountDownLatchExample {
public static void main(String[] args) throws InterruptedException {
int numThreads = 5;
CountDownLatch latch = new CountDownLatch(numThreads);
for (int i = 0; i < numThreads; i++) {
final int threadId = i;
new Thread(() -> {
try {
System.out.println("Thread " + threadId + " is running...");
Thread.sleep((long) (Math.random() * 2000)); // 模拟线程执行时间
System.out.println("Thread " + threadId + " finished.");
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
latch.countDown(); // 计数器减一
}
}).start();
}
latch.await(); // 等待所有线程完成
System.out.println("All threads finished.");
}
}
7. AQS的优势与局限
AQS作为Java并发编程的基础框架,具有以下优势:
- 灵活性: AQS提供了一套通用的同步机制,可以用于构建各种类型的锁和同步器。
- 高性能: AQS基于CAS操作和FIFO队列,能够有效地管理并发线程,并提供较高的性能。
- 可扩展性: AQS是一个抽象类,可以通过继承和扩展来实现自定义的同步语义。
AQS也存在一些局限:
- 复杂性: AQS的实现较为复杂,需要深入理解其底层原理才能正确使用。
- 适用性: AQS主要适用于构建阻塞式的锁和同步器,对于非阻塞式的同步机制,可能需要采用其他方案。
8. AQS的深入思考
AQS的设计体现了面向对象的设计原则,它将同步逻辑抽象成模板方法,子类通过实现这些方法来实现特定的同步语义。这种设计模式使得AQS具有良好的可扩展性和可维护性。
AQS的核心在于维护一个FIFO同步队列,用于管理竞争资源的线程。这种队列结构保证了线程按照先来后到的顺序获取资源,避免了饥饿现象。
AQS依赖于CAS操作来实现原子性操作,CAS操作是一种无锁的并发控制机制,它能够有效地提高并发程序的性能。
9. 锁的实现原理
我们讨论了AQS的底层实现,包括同步队列的结构,线程入队出队流程,以及等待状态的变化。
10. 独占锁和共享锁的示例
我们通过ReentrantLock和CountDownLatch两个具体的例子,讲解了独占锁和共享锁的实现原理。