Java AQS 共享模式:构建并发访问控制的基石
大家好,今天我们深入探讨Java AQS(AbstractQueuedSynchronizer)的共享模式,以及如何利用tryAcquireShared()方法来实现资源的并发访问控制。AQS是Java并发包java.util.concurrent的核心基石,理解AQS对于构建高性能、高可靠的并发应用至关重要。
1. AQS 简介:并发同步的抽象框架
AQS是一个抽象类,它提供了一个框架,用于构建锁和相关的同步器。它通过维护一个同步状态(state,一个volatile int变量)和一个FIFO等待队列来实现同步机制。AQS定义了两种模式:
- 独占模式(Exclusive Mode): 只有一个线程可以持有资源。例如,
ReentrantLock。 - 共享模式(Shared Mode): 多个线程可以同时持有资源。例如,
Semaphore、CountDownLatch。
我们今天的重点是共享模式。
2. 共享模式的核心方法:tryAcquireShared()
在共享模式中,tryAcquireShared(int arg) 方法是核心。它的作用是尝试以共享模式获取同步状态。它返回一个int值,这个值决定了后续的操作:
- 正数(>= 1): 表示获取成功,并且剩余资源允许后续线程继续获取。这个正数可以理解为剩余的可用许可数量。
- 零(0): 表示获取成功,但后续线程无法继续获取。通常意味着资源已经耗尽或者达到了某种限制。
- 负数(< 0): 表示获取失败。线程需要进入等待队列。
3. 共享模式的流程
共享模式的获取流程大致如下:
- 尝试获取: 线程调用
tryAcquireShared(int arg)尝试获取资源。 - 判断结果:
- 成功(正数或零): 线程获取到资源,继续执行。如果返回值是正数,会唤醒等待队列中排在前面的线程,让他们也尝试获取资源。
- 失败(负数): 线程进入等待队列,等待被唤醒。
- 唤醒和重试: 当资源可用时,AQS会唤醒等待队列中的线程。被唤醒的线程会再次尝试
tryAcquireShared(int arg)。
4. 代码示例:基于 AQS 实现一个简单的信号量(Semaphore)
为了更好地理解tryAcquireShared(),我们通过一个简单的信号量实现来演示。信号量允许一定数量的线程同时访问共享资源。
import java.util.concurrent.locks.AbstractQueuedSynchronizer;
public class SimpleSemaphore {
private final Sync sync;
public SimpleSemaphore(int permits) {
sync = new Sync(permits);
}
public void acquire() throws InterruptedException {
sync.acquireSharedInterruptibly(1);
}
public void release() {
sync.releaseShared(1);
}
private static final class Sync extends AbstractQueuedSynchronizer {
private static final long serialVersionUID = -32249322224590012L;
Sync(int permits) {
setState(permits); // 初始化同步状态为 permits
}
@Override
protected int tryAcquireShared(int acquires) {
for (;;) {
int available = getState();
int remaining = available - acquires;
if (remaining < 0 || compareAndSetState(available, remaining)) {
return remaining; // 返回剩余许可数量
}
}
}
@Override
protected boolean tryReleaseShared(int releases) {
for (;;) {
int current = getState();
int next = current + releases;
if (next < current) // overflow
throw new Error("Maximum permit count exceeded");
if (compareAndSetState(current, next)) {
return true;
}
}
}
}
public static void main(String[] args) throws InterruptedException {
SimpleSemaphore semaphore = new SimpleSemaphore(3); // 允许3个线程同时访问
for (int i = 0; i < 5; i++) {
final int threadId = i;
new Thread(() -> {
try {
System.out.println("Thread " + threadId + " is waiting to acquire.");
semaphore.acquire();
System.out.println("Thread " + threadId + " acquired.");
Thread.sleep(2000); // 模拟访问共享资源
System.out.println("Thread " + threadId + " releasing.");
semaphore.release();
} catch (InterruptedException e) {
e.printStackTrace();
}
}).start();
Thread.sleep(100); // 错开线程启动时间
}
}
}
代码解释:
SimpleSemaphore类: 封装了信号量的逻辑。Sync类: 继承自AbstractQueuedSynchronizer,实现了同步逻辑。Sync(int permits)构造函数: 初始化同步状态state为permits,表示可用的许可数量。tryAcquireShared(int acquires)方法:- 在一个无限循环中尝试获取许可。
available = getState():获取当前可用的许可数量。remaining = available - acquires:计算剩余的许可数量。if (remaining < 0):如果剩余许可数量小于 0,表示获取失败,返回一个负数。compareAndSetState(available, remaining):使用CAS操作尝试更新state。如果更新成功,表示获取成功,返回剩余的许可数量remaining。- 如果CAS操作失败,表示有其他线程同时也在尝试获取许可,循环重试。
tryReleaseShared(int releases)方法:- 在一个无限循环中尝试释放许可。
current = getState():获取当前的许可数量。next = current + releases:计算释放后的许可数量。compareAndSetState(current, next):使用CAS操作尝试更新state。如果更新成功,表示释放成功,返回true。- 如果CAS操作失败,表示有其他线程同时也在尝试释放许可,循环重试。
acquire()和release()方法: 分别调用acquireSharedInterruptibly()和releaseShared(),这是AQS提供的方法,封装了线程入队和唤醒的逻辑。main()方法: 创建一个信号量,并启动 5 个线程来竞争资源。每个线程获取到信号量后,模拟访问共享资源,然后释放信号量。
运行结果分析:
运行这段代码,你会发现最多只有 3 个线程可以同时输出 "Thread X acquired.",因为我们创建的信号量的许可数量是 3。其他线程会阻塞在 semaphore.acquire() 方法中,直到有线程释放信号量。
5. tryAcquireShared() 的设计要点
- CAS 操作:
tryAcquireShared()通常使用CAS(Compare and Swap)操作来原子性地更新同步状态state。这是避免数据竞争的关键。 - 自旋: 如果 CAS 操作失败,
tryAcquireShared()通常会进行自旋重试,直到成功或者确定无法获取资源。 - 返回值:
tryAcquireShared()的返回值非常重要,它不仅指示了获取是否成功,还指示了后续线程是否可以继续获取资源。 - 公平性: 默认情况下,AQS 是非公平的。这意味着即使有线程在等待队列中,新来的线程仍然可以尝试获取资源。如果需要公平性,可以考虑使用
FairSync,或者在tryAcquireShared()中加入判断逻辑,例如检查当前线程是否是队列的头部节点。
6. 共享模式的应用场景
共享模式在并发编程中有很多应用场景:
- 资源池: 例如数据库连接池、线程池。可以使用信号量来控制同时连接数据库或执行任务的线程数量。
- 流量控制: 限制对某个服务的并发请求数量,防止服务过载。
- 读写锁:
ReentrantReadWriteLock中的读锁就是基于 AQS 共享模式实现的。多个线程可以同时持有读锁,但只有一个线程可以持有写锁。 - 栅栏(CyclicBarrier): 允许一组线程互相等待,直到所有线程都到达某个点,然后所有线程才可以继续执行。
7. AQS 共享模式的优势
- 灵活性: AQS 提供了一个高度灵活的框架,可以自定义同步逻辑。
- 高性能: AQS 使用高效的 CAS 操作和等待队列来管理线程的同步和阻塞。
- 可扩展性: AQS 可以很容易地扩展到支持更复杂的同步需求。
- 避免死锁: 通过合理的线程入队和唤醒机制,AQS 可以有效地避免死锁的发生。
8. tryAcquireShared() 的源码分析(以ReentrantReadWriteLock的ReadLock为例)
虽然我们自己实现了一个简单的Semaphore,但要真正理解tryAcquireShared(),研究Java并发包中现有类的实现是最好的方式。我们以ReentrantReadWriteLock的ReadLock的tryAcquireShared()方法为例进行分析。
protected final int tryAcquireShared(int unused) {
Thread current = Thread.currentThread();
int c = getState();
if (exclusiveCount(c) != 0 &&
getExclusiveOwnerThread() != current)
return -1; //如果存在写锁,并且不是当前线程持有,则获取失败
int r = sharedCount(c);
if (r == MAX)
throw new Error("Maximum number of shared locks exceeded");
if (readerShouldBlock()) //判断读线程是否需要阻塞
return -1;
if (compareAndSetState(c, c + SHARED_UNIT)) { //尝试增加读锁计数器
HoldCounter holdCounter = holdCounter;
if (holdCounter == null) {
firstReader = current;
holdCounter = new HoldCounter();
} else if (holdCounter.tid != getThreadId(current)) {
firstReader = current;
holdCounter = new HoldCounter();
} else {
holdCounter.count++;
}
this.holdCounter = holdCounter;
return 1;
}
else {
//自旋重试,直到成功或者readerShouldBlock返回true
for (;;) {
c = getState();
if (exclusiveCount(c) != 0 &&
getExclusiveOwnerThread() != current)
return -1;
if (readerShouldBlock())
return -1;
if (sharedCount(c) == MAX)
throw new Error("Maximum number of shared locks exceeded");
if (compareAndSetState(c, c + SHARED_UNIT))
break;
}
ThreadLocalHoldCounter holdCounter = cachedHoldCounter;
if (holdCounter == null)
cachedHoldCounter = holdCounter = new ThreadLocalHoldCounter();
HoldCounter h = holdCounter.get();
if (h == null) {
holdCounter.set(h = new HoldCounter());
} else if (h.tid != getThreadId(current)) {
holdCounter.set(h = new HoldCounter());
} else {
h.count++;
}
firstReader = current;
return 1;
}
}
代码解释:
exclusiveCount(c): 用于从同步状态c中提取写锁的计数。如果写锁计数不为 0,说明有写锁被持有。getExclusiveOwnerThread(): 返回持有写锁的线程。sharedCount(c): 用于从同步状态c中提取读锁的计数。readerShouldBlock(): 这是一个策略方法,用于判断当前线程是否应该被阻塞。这与读写锁的公平性策略有关。如果读写锁是非公平的,那么即使有等待的写线程,新来的读线程也可能尝试获取读锁。如果读写锁是公平的,那么新来的读线程必须等待前面的写线程释放写锁。SHARED_UNIT: 表示读锁计数器的增量,通常为 1。HoldCounter和ThreadLocalHoldCounter: 用于记录每个线程持有读锁的次数,这对于支持可重入的读锁是必要的。
分析要点:
- 优先判断写锁:
tryAcquireShared()首先判断是否存在写锁,并且当前线程是否持有写锁。如果存在写锁,并且不是当前线程持有,那么获取读锁失败。这是读写锁的基本原则:写锁优先于读锁。 - readerShouldBlock(): 这个方法体现了读写锁的公平性策略。如果
readerShouldBlock()返回true,那么即使没有写锁,当前线程也应该被阻塞,等待前面的写线程释放写锁。 - CAS 操作和自旋:
tryAcquireShared()使用 CAS 操作来增加读锁计数器。如果 CAS 操作失败,说明有其他线程也在尝试获取或释放读锁,需要自旋重试。 - HoldCounter: 为了支持可重入的读锁,需要记录每个线程持有读锁的次数。
HoldCounter和ThreadLocalHoldCounter就是用于实现这个功能的。
9. AQS 共享模式的注意事项
- 正确处理返回值:
tryAcquireShared()的返回值至关重要,必须根据返回值来判断获取是否成功,以及是否需要唤醒其他线程。 - 避免死锁: 在使用 AQS 时,需要仔细考虑死锁的可能性。例如,避免循环等待的情况。
- 性能优化: 合理选择同步策略(例如公平性),避免不必要的竞争。
- 理解
readerShouldBlock()的作用: 在使用ReentrantReadWriteLock时,要理解readerShouldBlock()方法的作用,根据实际需求选择合适的公平性策略。
10. 总结:AQS 共享模式是并发控制的强大工具
今天我们详细学习了 AQS 的共享模式,并通过一个简单的信号量和 ReentrantReadWriteLock 的源码分析,深入理解了 tryAcquireShared() 方法的作用和实现。掌握 AQS 共享模式,能够帮助我们构建更加灵活、高效、可靠的并发应用。
资源共享,并发控制,AQS共享模式的核心价值
AQS 共享模式提供了强大的并发访问控制能力,通过 tryAcquireShared() 的灵活设计,可以应对各种复杂的并发场景。深入理解和熟练运用 AQS,是成为并发编程专家的必经之路。