AQS(AbstractQueuedSynchronizer)框架深度剖析:CLH队列与同步状态管理
大家好,今天我们来深入探讨并发编程中一个非常重要的框架——AQS (AbstractQueuedSynchronizer)。AQS 是构建许多同步器(例如 ReentrantLock、Semaphore、CountDownLatch 等)的基础。理解 AQS 的原理对于编写高效且可靠的并发程序至关重要。我们将重点关注 AQS 的核心组件:CLH 队列和同步状态管理。
1. AQS 的核心思想
AQS 本质上是一个同步器框架,它提供了一种通用的机制来管理同步状态、阻塞和唤醒线程。它采用了一种基于模板方法的设计模式,允许开发者通过继承 AQS 并重写特定的方法来实现自定义的同步器。
AQS 的核心思想可以概括为以下几点:
- 同步状态 (state): AQS 使用一个
volatile int类型的state变量来表示同步状态。这个状态可以表示锁的持有者数量、信号量剩余的许可数量等等。 - CLH 队列: 当线程尝试获取同步状态失败时,AQS 会将这些线程放入一个虚拟的双向队列,称为 CLH 队列。CLH 队列是一种 FIFO (先进先出) 队列,用于管理等待获取同步状态的线程。
- 模板方法: AQS 提供了一组模板方法,例如
tryAcquire()、tryRelease()、isHeldExclusively()等,开发者需要根据自己的同步器语义来实现这些方法。 - 阻塞和唤醒机制: AQS 使用
LockSupport类来阻塞和唤醒线程。当线程需要等待时,它会被阻塞并放入 CLH 队列。当同步状态可用时,AQS 会唤醒队列中的一个或多个线程。
2. CLH 队列:线程排队的基石
CLH 队列是 AQS 中一个至关重要的组件。它用于管理那些因无法立即获取同步状态而需要等待的线程。CLH 队列是一种基于链表的 FIFO 队列,但与传统的队列不同,它是一种 虚拟队列,这意味着它并不实际存储线程对象,而是存储包含线程信息的节点。
2.1 CLH 队列的结构
CLH 队列由一系列节点组成,每个节点代表一个等待获取同步状态的线程。每个节点包含以下信息:
thread: 等待获取同步状态的线程。waitStatus: 节点的等待状态,可以是以下值之一:SIGNAL: 表示当前节点的后继节点(如果存在)需要被唤醒。CANCELLED: 表示当前节点对应的线程已经被取消或中断。CONDITION: 表示当前节点正在等待一个条件。PROPAGATE: 表示释放操作应该传播给其他节点。0: 初始状态,表示节点正在等待获取同步状态。
prev: 指向前驱节点的引用。next: 指向后继节点的引用。
2.2 CLH 队列的工作原理
当一个线程尝试获取同步状态失败时,AQS 会执行以下步骤:
- 创建节点: 创建一个新的节点,并将当前线程的信息存储到该节点中。
- 入队: 将新节点添加到 CLH 队列的尾部。这通常通过 CAS (Compare and Swap) 操作来完成,以保证线程安全。
- 自旋等待: 线程进入自旋等待状态,不断检查其前驱节点的状态。如果前驱节点释放了同步状态,并且当前节点是队列中的第一个有效等待节点,那么当前线程就可以尝试获取同步状态。
- 获取同步状态: 如果线程成功获取了同步状态,它将从 CLH 队列中移除,并开始执行临界区代码。
- 唤醒后继节点: 在释放同步状态后,线程会尝试唤醒其后继节点,以便后继节点可以尝试获取同步状态。
2.3 CLH 队列的代码实现 (简化)
虽然 AQS 的 CLH 队列实现非常复杂,涉及到大量的并发优化,但我们可以通过一个简化的例子来理解其基本原理。
import java.util.concurrent.atomic.AtomicReference;
import java.util.concurrent.locks.LockSupport;
class Node {
volatile Thread thread;
volatile int waitStatus;
volatile Node prev;
volatile Node next;
Node() {
thread = Thread.currentThread();
}
Node(Thread thread) {
this.thread = thread;
}
}
class SimpleCLHQueue {
private final AtomicReference<Node> head = new AtomicReference<>(new Node(null)); // Dummy node
private final AtomicReference<Node> tail = new AtomicReference<>(head.get());
public void enqueue(Node node) {
Node prev = tail.get();
node.prev = prev;
while (!tail.compareAndSet(prev, node)) {
prev = tail.get();
node.prev = prev;
}
prev.next = node;
}
public Node dequeue() {
Node h = head.get();
Node first = h.next;
if (first == null) {
return null; // Queue is empty
}
if (head.compareAndSet(h, first)) {
first.prev = null;
return first;
} else {
return null; // Dequeue failed, try again later
}
}
public static void main(String[] args) throws InterruptedException {
SimpleCLHQueue queue = new SimpleCLHQueue();
Thread thread1 = new Thread(() -> {
Node node = new Node();
queue.enqueue(node);
System.out.println("Thread 1 enqueued.");
LockSupport.park(node); // Block thread 1
System.out.println("Thread 1 resumed.");
});
Thread thread2 = new Thread(() -> {
Node node = new Node();
queue.enqueue(node);
System.out.println("Thread 2 enqueued.");
LockSupport.park(node); // Block thread 2
System.out.println("Thread 2 resumed.");
});
thread1.start();
Thread.sleep(100); // Ensure thread1 is enqueued first
thread2.start();
Thread.sleep(100); // Ensure thread2 is enqueued
// Simulate releasing the lock and unparking the first thread
Node firstNode = queue.dequeue();
if (firstNode != null) {
LockSupport.unpark(firstNode.thread);
System.out.println("Unparked thread 1.");
}
Thread.sleep(1000);
// Simulate releasing the lock and unparking the second thread
firstNode = queue.dequeue();
if (firstNode != null) {
LockSupport.unpark(firstNode.thread);
System.out.println("Unparked thread 2.");
}
thread1.join();
thread2.join();
}
}
这个简化的例子演示了 CLH 队列的基本入队和出队操作。请注意,这只是一个非常简化的版本,实际的 AQS 实现要复杂得多,因为它需要处理更多的并发情况和优化。例如,它使用更高级的 CAS 操作,并且使用 waitStatus 字段来管理节点的状态。
2.4 CLH 队列的优势
CLH 队列相比于其他队列实现,具有以下优势:
- 公平性: CLH 队列保证了线程按照 FIFO 的顺序获取同步状态,从而避免了饥饿现象。
- 可扩展性: CLH 队列可以很好地处理大量的并发线程。
- 性能: CLH 队列的入队和出队操作都是 O(1) 的时间复杂度。
- 减少竞争: 自旋等待主要发生在本地变量上,减少了对共享变量的竞争。线程只关注其前驱节点,避免了全局锁的竞争。
3. 同步状态管理:控制并发访问的核心
AQS 使用一个 volatile int 类型的 state 变量来表示同步状态。这个状态可以表示锁的持有者数量、信号量剩余的许可数量等等。AQS 提供了一组方法来操作这个 state 变量,包括:
getState(): 获取当前同步状态的值。setState(int newState): 设置当前同步状态的值。compareAndSetState(int expect, int update): 以原子方式比较并设置同步状态的值。如果当前状态的值等于expect,则将其更新为update。
3.1 模板方法:自定义同步逻辑
AQS 提供了一组模板方法,开发者需要根据自己的同步器语义来实现这些方法。这些模板方法定义了获取和释放同步状态的逻辑。
| 方法名 | 描述 |
|---|---|
tryAcquire(int arg) |
尝试获取同步状态。如果成功获取,则返回 true,否则返回 false。arg 参数通常用于传递获取同步状态所需的参数,例如锁的重入次数。 |
tryRelease(int arg) |
尝试释放同步状态。如果成功释放,则返回 true,否则返回 false。arg 参数通常用于传递释放同步状态所需的参数,例如锁的重入次数。 |
isHeldExclusively() |
如果当前线程独占式地持有同步状态,则返回 true,否则返回 false。这个方法通常用于判断当前线程是否已经持有锁。 |
tryAcquireShared(int arg) |
尝试以共享模式获取同步状态。如果成功获取,则返回一个非负值,否则返回一个负值。 |
tryReleaseShared(int arg) |
尝试以共享模式释放同步状态。如果成功释放,则返回 true,否则返回 false。 |
isHeldExclusively() |
用于判断当前同步器是否被当前线程独占持有。通常在使用 Condition 对象时需要实现此方法。 |
开发者需要根据自己的同步器语义来实现这些方法。例如,对于一个互斥锁,tryAcquire() 方法可能会检查当前同步状态是否为 0,如果是,则将其设置为 1,并返回 true。tryRelease() 方法可能会将同步状态设置为 0,并返回 true。
3.2 同步状态的获取和释放
AQS 提供了两种获取同步状态的模式:
- 独占模式 (Exclusive Mode): 只有一个线程可以获取同步状态。例如,ReentrantLock 就是以独占模式获取同步状态的。
- 共享模式 (Shared Mode): 多个线程可以同时获取同步状态。例如,Semaphore 和 CountDownLatch 就是以共享模式获取同步状态的。
AQS 提供了一组方法来获取和释放同步状态,包括:
acquire(int arg): 以独占模式获取同步状态。如果获取失败,则线程会被阻塞,直到同步状态可用。acquireInterruptibly(int arg): 以独占模式获取同步状态。如果获取失败,则线程会被阻塞,但可以被中断。tryAcquireNanos(int arg, long nanosTimeout): 以独占模式获取同步状态。如果获取失败,则线程会被阻塞,但有一个超时时间。release(int arg): 以独占模式释放同步状态。acquireShared(int arg): 以共享模式获取同步状态。如果获取失败,则线程会被阻塞,直到同步状态可用。acquireSharedInterruptibly(int arg): 以共享模式获取同步状态。如果获取失败,则线程会被阻塞,但可以被中断。tryAcquireSharedNanos(int arg, long nanosTimeout): 以共享模式获取同步状态。如果获取失败,则线程会被阻塞,但有一个超时时间。releaseShared(int arg): 以共享模式释放同步状态。
这些方法内部会调用开发者实现的模板方法来获取和释放同步状态。例如,acquire(int arg) 方法会首先调用 tryAcquire(int arg) 方法来尝试获取同步状态。如果 tryAcquire(int arg) 方法返回 true,则表示获取成功,acquire(int arg) 方法直接返回。如果 tryAcquire(int arg) 方法返回 false,则表示获取失败,acquire(int arg) 方法会将当前线程放入 CLH 队列,并阻塞线程,直到同步状态可用。
3.3 一个简单的互斥锁示例
下面是一个使用 AQS 实现的简单互斥锁的例子:
import java.util.concurrent.locks.AbstractQueuedSynchronizer;
class SimpleMutex {
private static class Sync extends AbstractQueuedSynchronizer {
@Override
protected boolean tryAcquire(int arg) {
if (compareAndSetState(0, 1)) {
setExclusiveOwnerThread(Thread.currentThread());
return true;
}
return false;
}
@Override
protected boolean tryRelease(int arg) {
if (!isHeldExclusively()) {
throw new IllegalMonitorStateException();
}
setExclusiveOwnerThread(null);
setState(0);
return true;
}
@Override
protected boolean isHeldExclusively() {
return getExclusiveOwnerThread() == Thread.currentThread();
}
}
private final Sync sync = new Sync();
public void lock() {
sync.acquire(1);
}
public void unlock() {
sync.release(1);
}
public static void main(String[] args) throws InterruptedException {
SimpleMutex mutex = new SimpleMutex();
Thread thread1 = new Thread(() -> {
mutex.lock();
try {
System.out.println("Thread 1 acquired the lock.");
Thread.sleep(1000); // Simulate some work
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
mutex.unlock();
System.out.println("Thread 1 released the lock.");
}
});
Thread thread2 = new Thread(() -> {
mutex.lock();
try {
System.out.println("Thread 2 acquired the lock.");
Thread.sleep(1000); // Simulate some work
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
mutex.unlock();
System.out.println("Thread 2 released the lock.");
}
});
thread1.start();
Thread.sleep(100); // Ensure thread1 acquires the lock first
thread2.start();
thread1.join();
thread2.join();
}
}
在这个例子中,SimpleMutex 类使用 AQS 的 AbstractQueuedSynchronizer 类来实现互斥锁。Sync 类继承了 AbstractQueuedSynchronizer 类,并实现了 tryAcquire()、tryRelease() 和 isHeldExclusively() 方法。lock() 方法调用 sync.acquire(1) 方法来获取锁,unlock() 方法调用 sync.release(1) 方法来释放锁。
4. Condition:等待/通知机制的实现
AQS 还提供了一个 Condition 类,用于实现等待/通知机制。Condition 对象与锁相关联,允许线程在等待特定条件时释放锁,并在条件满足时重新获取锁。
4.1 Condition 的工作原理
Condition 对象内部维护一个等待队列,用于存储等待特定条件的线程。当线程调用 Condition.await() 方法时,它会执行以下步骤:
- 释放锁: 线程释放与
Condition对象关联的锁。 - 进入等待队列: 线程被放入
Condition对象的等待队列中。 - 阻塞: 线程被阻塞,直到被其他线程唤醒。
当其他线程调用 Condition.signal() 或 Condition.signalAll() 方法时,它会执行以下步骤:
- 从等待队列中移除线程:
signal()方法会从等待队列中移除一个线程,signalAll()方法会从等待队列中移除所有线程。 - 唤醒线程: 被移除的线程会被唤醒。
- 重新获取锁: 线程会尝试重新获取与
Condition对象关联的锁。
4.2 Condition 的使用示例
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
class BoundedBuffer {
final Lock lock = new ReentrantLock();
final Condition notFull = lock.newCondition();
final Condition notEmpty = lock.newCondition();
final Object[] items = new Object[10];
int putptr, takeptr, count;
public void put(Object x) throws InterruptedException {
lock.lock();
try {
while (count == items.length)
notFull.await();
items[putptr] = x;
if (++putptr == items.length) putptr = 0;
++count;
notEmpty.signal();
} finally {
lock.unlock();
}
}
public Object take() throws InterruptedException {
lock.lock();
try {
while (count == 0)
notEmpty.await();
Object x = items[takeptr];
if (++takeptr == items.length) takeptr = 0;
--count;
notFull.signal();
return x;
} finally {
lock.unlock();
}
}
}
在这个例子中,BoundedBuffer 类使用 Condition 对象来实现一个有界缓冲区。notFull 条件表示缓冲区未满,notEmpty 条件表示缓冲区非空。put() 方法会在缓冲区满时等待 notFull 条件,take() 方法会在缓冲区空时等待 notEmpty 条件。
5. AQS 的重要性
AQS 是 Java 并发包中许多同步器的基石。理解 AQS 的原理对于编写高效且可靠的并发程序至关重要。AQS 提供了一种通用的机制来管理同步状态、阻塞和唤醒线程,并允许开发者通过继承 AQS 并重写特定的方法来实现自定义的同步器。
6. 总结:AQS 是并发编程的强大工具
AQS 通过 CLH 队列管理等待线程,通过同步状态控制并发访问,并使用 Condition 实现等待/通知机制,为构建各种同步器提供了坚实的基础。掌握 AQS 的原理对于理解 Java 并发包的底层机制至关重要。