Java 公平锁实现:AQS 如何通过队列管理确保线程的等待公平性
大家好!今天我们来深入探讨 Java 中公平锁的实现机制,特别是 AQS(AbstractQueuedSynchronizer)如何通过队列管理来保证线程等待的公平性。公平锁是一种同步机制,它确保线程按照请求锁的顺序获得锁,避免了饥饿现象,让每个线程都有机会获得执行。
1. 公平锁的概念与必要性
在并发编程中,多个线程可能同时竞争共享资源。锁机制就是为了解决这种竞争,保证数据的一致性和完整性。公平锁和非公平锁是两种不同的锁获取策略。
- 非公平锁: 线程尝试获取锁时,如果锁当前未被占用,则立即尝试获取,即使有其他线程正在等待。这可能导致某些线程长时间等待,也就是所谓的“饥饿”现象。
ReentrantLock默认是非公平锁。 - 公平锁: 线程只有在没有其他等待线程时才能获取锁,或者它是等待队列中的第一个线程。这保证了线程按照它们请求锁的顺序获得锁。
公平锁的必要性在于,某些应用场景对响应时间和资源分配有严格的要求。例如,在高并发的订单处理系统中,如果使用非公平锁,某些用户的请求可能长时间得不到处理,导致用户体验下降。使用公平锁可以保证每个用户的请求都有机会得到响应,避免优先级反转等问题。
2. AQS 简介:同步器的基石
AQS 是 Java 并发包 java.util.concurrent 中一个核心的抽象类,它为实现各种同步器(如锁、信号量、栅栏等)提供了一个通用的框架。AQS 基于模板方法模式,子类通过继承 AQS 并实现特定的方法来定制同步行为。
AQS 的核心思想是使用一个 int 类型的状态变量 state 来表示同步状态,并维护一个 FIFO(先进先出)的同步队列来管理等待线程。
AQS 提供以下核心方法:
| 方法名 | 描述 |
|---|---|
getState() |
获取当前同步状态 |
setState(int newState) |
设置当前同步状态 |
compareAndSetState(int expect, int update) |
使用 CAS 操作原子性地更新同步状态, expect 是期望值, update 是更新值 |
acquire(int arg) |
以独占模式获取同步状态,如果获取失败,则进入等待队列 |
release(int arg) |
释放同步状态,唤醒等待队列中的后继节点 |
acquireShared(int arg) |
以共享模式获取同步状态,如果获取失败,则进入等待队列 |
releaseShared(int arg) |
释放同步状态,唤醒等待队列中的后继节点 |
AQS 的子类需要实现以下方法来定义同步行为:
| 方法名 | 描述 |
|---|---|
tryAcquire(int arg) |
尝试以独占模式获取同步状态,成功返回 true,失败返回 false。 |
tryRelease(int arg) |
尝试释放同步状态,成功返回 true,失败返回 false。 |
tryAcquireShared(int arg) |
尝试以共享模式获取同步状态,成功返回非负数,失败返回负数。 |
tryReleaseShared(int arg) |
尝试释放同步状态,成功返回 true,失败返回 false。 |
isHeldExclusively() |
当前同步器是否在独占模式下被线程占用。 |
3. AQS 如何实现公平锁
AQS 通过维护一个 FIFO 的同步队列来保证公平性。当一个线程尝试获取锁但获取失败时,它会被放入队列的尾部,并进入等待状态。当锁被释放时,队列中的第一个线程会被唤醒并尝试获取锁。
以下是 AQS 实现公平锁的基本步骤:
- 线程尝试获取锁: 线程调用
acquire(int arg)方法尝试获取锁。 - 判断是否可以获取锁: 在
acquire(int arg)方法中,会调用子类实现的tryAcquire(int arg)方法来尝试获取锁。 tryAcquire(int arg)的实现: 对于公平锁,tryAcquire(int arg)方法会检查同步队列中是否有等待线程。如果有,则直接返回false,表示获取锁失败。如果没有,则尝试使用 CAS 操作更新state变量来获取锁。- 进入等待队列: 如果
tryAcquire(int arg)方法返回false,表示获取锁失败,线程会被放入同步队列的尾部,并进入等待状态。 - 锁释放: 当持有锁的线程释放锁时,它会调用
release(int arg)方法。 release(int arg)的实现: 在release(int arg)方法中,会调用子类实现的tryRelease(int arg)方法来尝试释放锁。- 唤醒后继节点: 如果
tryRelease(int arg)方法返回true,表示成功释放锁,release(int arg)方法会唤醒同步队列中的第一个等待线程。 - 线程被唤醒: 被唤醒的线程会再次尝试获取锁。由于它是队列中的第一个线程,所以可以成功获取锁。
下面是一个使用 AQS 实现公平锁的示例代码:
import java.util.concurrent.locks.AbstractQueuedSynchronizer;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.Condition;
public class FairLock implements Lock {
private final Sync sync = new Sync(true); // true 表示公平锁
private static class Sync extends AbstractQueuedSynchronizer {
private static final long serialVersionUID = -1L;
Sync(boolean fair) {
super();
}
@Override
protected final boolean tryAcquire(int acquires) {
final Thread current = Thread.currentThread();
int c = getState();
if (c == 0) {
// 只有当队列为空或者当前线程是队列的第一个节点时,才能尝试获取锁
if (!hasQueuedPredecessors() &&
compareAndSetState(0, acquires)) {
setExclusiveOwnerThread(current);
return true;
}
} else if (current == getExclusiveOwnerThread()) {
int nextc = c + acquires;
if (nextc < 0) // overflow
throw new Error("Maximum lock count exceeded");
setState(nextc);
return true;
}
return false;
}
@Override
protected final boolean tryRelease(int releases) {
int c = getState() - releases;
if (Thread.currentThread() != getExclusiveOwnerThread())
throw new IllegalMonitorStateException();
boolean free = false;
if (c == 0) {
free = true;
setExclusiveOwnerThread(null);
}
setState(c);
return free;
}
final ConditionObject newCondition() {
return new ConditionObject();
}
@Override
protected final boolean isHeldExclusively() {
return getExclusiveOwnerThread() == Thread.currentThread();
}
}
@Override
public void lock() {
sync.acquire(1);
}
@Override
public void lockInterruptibly() throws InterruptedException {
sync.acquireInterruptibly(1);
}
@Override
public boolean tryLock() {
return sync.tryAcquire(1);
}
@Override
public boolean tryLock(long time, java.util.concurrent.TimeUnit unit)
throws InterruptedException {
return sync.tryAcquireNanos(1, unit.toNanos(time));
}
@Override
public void unlock() {
sync.release(1);
}
@Override
public Condition newCondition() {
return sync.newCondition();
}
public boolean isFair() {
return true;
}
public boolean isLocked() {
return sync.isHeldExclusively();
}
}
在这个例子中,FairLock 类实现了 Lock 接口,并使用 AQS 的子类 Sync 来实现公平锁的逻辑。
Sync构造函数接受一个boolean类型的参数fair,用于指定是否是公平锁。在这里,我们传入true,表示创建一个公平锁。tryAcquire(int acquires)方法首先检查同步队列中是否有等待线程(通过hasQueuedPredecessors()方法)。如果有,则直接返回false,表示获取锁失败。如果没有,则尝试使用 CAS 操作更新state变量来获取锁。tryRelease(int releases)方法用于释放锁。isHeldExclusively()方法用于判断当前线程是否持有锁。
hasQueuedPredecessors() 方法是 AQS 中一个非常重要的方法,它用于判断当前线程前面是否有等待线程。对于公平锁,只有当没有等待线程时,线程才能尝试获取锁。hasQueuedPredecessors() 方法的具体实现比较复杂,它需要遍历同步队列,判断是否有节点在当前线程的前面。
4. 公平锁的性能考量
虽然公平锁可以保证线程的公平性,但它也带来了一定的性能开销。因为公平锁需要维护一个 FIFO 的同步队列,并且在每次获取锁之前都要检查队列中是否有等待线程。这增加了锁的竞争开销,降低了吞吐量。
在实际应用中,需要根据具体的场景来选择使用公平锁还是非公平锁。如果对响应时间和资源分配有严格的要求,可以使用公平锁。如果对吞吐量有更高的要求,可以使用非公平锁。
以下是公平锁和非公平锁的性能对比:
| 特性 | 公平锁 | 非公平锁 |
|---|---|---|
| 公平性 | 保证线程按照请求锁的顺序获得锁 | 不保证线程按照请求锁的顺序获得锁 |
| 性能 | 性能较低,因为需要维护一个 FIFO 的同步队列,并且在每次获取锁之前都要检查队列中是否有等待线程 | 性能较高,因为线程可以立即尝试获取锁,不需要检查队列 |
| 适用场景 | 对响应时间和资源分配有严格要求的场景 | 对吞吐量有更高要求的场景 |
5. AQS队列管理的细节
AQS的队列管理是其实现公平锁(以及其他同步机制)的关键。让我们更深入地了解AQS如何管理这个队列,包括节点(Node)的结构、入队和出队操作,以及如何处理取消(cancel)的情况。
5.1 节点(Node)结构
AQS队列是一个CLH(Craig, Landin, and Hagersten)变体的双向链表。队列中的每个元素都是一个Node对象。Node对象包含以下关键字段:
waitStatus: 表示当前节点的状态。它可以是以下值之一:SIGNAL: 表示当前节点的后继节点处于等待状态,当前节点释放锁时需要唤醒后继节点。CANCELLED: 表示当前节点由于超时或中断而被取消。CONDITION: 表示当前节点正在等待某个条件。PROPAGATE: 表示当前节点释放锁后,需要将唤醒操作传播给后继节点(用于共享模式)。0: 初始状态,表示节点正在等待获取锁。
thread: 与当前节点关联的线程。prev: 指向前驱节点的指针。next: 指向后继节点的指针。nextWaiter: 用于CONDITION队列,这里不深入讨论。
5.2 入队操作
当线程尝试获取锁失败时,AQS会将该线程封装成一个Node对象,并将其添加到同步队列的尾部。入队操作通过enq(Node node)方法完成。
private Node enq(final Node node) {
for (;;) {
Node t = tail;
if (t == null) { // Must initialize
if (compareAndSetHead(null, new Node()))
tail = head;
} else {
node.prev = t;
if (compareAndSetTail(t, node)) {
t.next = node;
return t;
}
}
}
}
这个方法是一个自旋操作,它会不断尝试将节点添加到队列的尾部,直到成功为止。
- 队列初始化: 如果队列为空(
tail == null),则先创建一个空的头节点,并设置head和tail都指向它。 - 添加到尾部: 如果队列不为空,则将新节点的
prev指针指向当前的尾节点t。然后,使用CAS操作尝试将新节点设置为尾节点。如果CAS操作成功,则将原尾节点的next指针指向新节点,完成入队操作。如果CAS操作失败,则说明有其他线程正在修改队列,需要重新尝试。
5.3 出队操作
当持有锁的线程释放锁时,AQS会唤醒队列中的第一个等待线程。被唤醒的线程会尝试获取锁,如果获取成功,则将自己设置为新的头节点,并从队列中移除。出队操作涉及到更新head节点。
当一个节点成为新的head节点之后, 它的thread域会被设置为null, 这是为了帮助GC回收不再需要的Thread对象。
private void setHead(Node node) {
head = node;
node.thread = null;
node.prev = null; // Help GC
}
5.4 取消(Cancel)操作
如果一个线程在等待获取锁的过程中被中断或超时,AQS会将该线程对应的节点设置为取消状态(waitStatus = CANCELLED)。取消状态的节点不会再被唤醒,并且会被从队列中移除。
AQS通过cancelAcquire(Node node)方法来处理取消操作。
private void cancelAcquire(Node node) {
// Ignore if node doesn't exist
if (node == null)
return;
node.thread = null;
// Skip cancelled predecessors
Node pred = node.prev;
while (pred.waitStatus > 0)
node.prev = pred = pred.prev;
Node predNext = pred.next;
// Can use unconditional write of next field of pred.
// After this atomic step, other Nodes can skip past us.
// Before, we are free to do other stuff.
node.waitStatus = Node.CANCELLED;
// If we are the tail, remove ourselves.
if (node == tail && compareAndSetTail(node, pred)) {
compareAndSetNext(pred, predNext, null);
} else {
// If successor needs signal, wake up to fix pointers.
int ws;
if (pred != head &&
((ws = pred.waitStatus) == Node.SIGNAL ||
(ws <= 0 && compareAndSetWaitStatus(pred, ws, Node.SIGNAL))) &&
pred.thread != null) {
Node next = node.next;
if (next != null && next.waitStatus <= 0)
compareAndSetNext(pred, predNext, next);
} else {
unparkSuccessor(node);
}
node.next = node; // help GC
}
}
- 设置节点状态: 将节点的状态设置为
CANCELLED,并将其关联的线程设置为null,以便GC回收。 - 跳过取消的前驱节点: 从当前节点开始,向前遍历队列,跳过所有状态为
CANCELLED的前驱节点,找到最近一个状态不是CANCELLED的前驱节点。 - 移除节点:
- 如果当前节点是尾节点,则尝试将尾节点设置为前驱节点。
- 如果当前节点不是尾节点,则需要调整前驱节点和后继节点的指针,将当前节点从队列中移除。
5.5 公平锁的实现关键在于 hasQueuedPredecessors()
hasQueuedPredecessors()是AQS中一个关键方法,用于判断当前线程是否应该获取锁。对于公平锁,只有当队列为空或者当前线程是队列的第一个节点时,才能尝试获取锁。
public final boolean hasQueuedPredecessors() {
// head 指向的是一个 dummy 节点,所以需要判断是否 head 的下一个节点不为空
Node h = head;
Node t = tail;
Node s;
return h != t && // 队列不为空
((s = h.next) == null || s.thread != Thread.currentThread()); // 存在前驱节点
}
6. 使用 ReentrantLock 实现公平锁
Java 并发包中提供了 ReentrantLock 类,它也支持公平锁和非公平锁。可以通过在构造函数中传入 true 来创建一个公平锁。
import java.util.concurrent.locks.ReentrantLock;
public class FairLockExample {
private static final ReentrantLock lock = new ReentrantLock(true); // 创建公平锁
public static void main(String[] args) {
for (int i = 0; i < 5; i++) {
new Thread(() -> {
lock.lock();
try {
System.out.println(Thread.currentThread().getName() + " 获取了锁");
Thread.sleep(100); // 模拟持有锁的时间
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
lock.unlock();
System.out.println(Thread.currentThread().getName() + " 释放了锁");
}
}, "Thread-" + i).start();
}
}
}
在这个例子中,我们创建了一个公平锁 lock。当多个线程同时竞争锁时,它们会按照请求锁的顺序获得锁。
7. 公平锁的应用场景
公平锁在以下场景中比较适用:
- 避免饥饿: 当某些线程的优先级较低,或者总是无法获得锁时,可以使用公平锁来保证它们有机会获得执行。
- 保证服务质量(QoS): 在需要保证每个请求都有机会得到响应的系统中,可以使用公平锁来避免某些请求长时间等待。
- 资源分配公平性: 在需要公平分配资源的系统中,可以使用公平锁来保证每个线程都有机会获得资源。
8. 使用公平锁的注意事项
- 公平锁会降低吞吐量,因为它需要维护一个 FIFO 的同步队列,并且在每次获取锁之前都要检查队列中是否有等待线程。
- 在某些情况下,非公平锁可能更适合,例如当对响应时间的要求不高,但对吞吐量有更高的要求时。
- 需要根据具体的应用场景来选择使用公平锁还是非公平锁。
9. AQS 的公平队列管理,重要且易错
AQS通过维护一个先进先出(FIFO)的同步队列来确保线程的等待公平性。队列中的每个节点都对应一个等待获取锁的线程。当一个线程请求锁但未能立即获得时,它会被添加到队列的尾部,并进入阻塞状态。当锁被释放时,队列头部的线程会被唤醒并尝试获取锁。AQS的公平性主要体现在tryAcquire方法中,它会检查当前线程是否是队列中的第一个等待线程,只有在这种情况下,线程才能尝试获取锁。
AQS的队列管理是实现公平锁的关键,但也带来了一定的性能开销。因此,在实际应用中,需要根据具体的场景来选择使用公平锁还是非公平锁。理解AQS的队列管理机制对于深入理解Java并发编程至关重要。