好的,我们现在开始深入探讨Java AQS(AbstractQueuedSynchronizer)框架中的独占模式和共享模式,并以ReentrantLock和CountDownLatch为例,剖析它们在实现上的差异。
AQS:同步器的骨架
AQS是Java并发包java.util.concurrent的核心基石。它提供了一个框架,用于构建锁和同步器,极大地简化了并发编程的复杂性。AQS维护一个同步状态(state)和一个FIFO等待队列。同步状态是一个整数,可以使用原子操作进行修改。等待队列则用于管理那些试图获取同步状态但被阻塞的线程。
AQS定义了两种同步模式:
- 独占模式(Exclusive Mode): 每次只允许一个线程持有锁。ReentrantLock就是基于独占模式实现的。
- 共享模式(Shared Mode): 允许多个线程同时持有锁。Semaphore和CountDownLatch就是基于共享模式实现的。
ReentrantLock:独占模式的典型代表
ReentrantLock是一个可重入的互斥锁。这意味着如果一个线程已经持有了锁,它可以再次获取该锁而不会被阻塞。ReentrantLock的实现是基于AQS的独占模式。
ReentrantLock的AQS实现
ReentrantLock内部通常会包含一个Sync类,该类继承自AQS。Sync类又会有公平锁(FairSync)和非公平锁(NonfairSync)两种实现。我们先来看非公平锁。
import java.util.concurrent.locks.AbstractQueuedSynchronizer;
import java.util.concurrent.locks.ReentrantLock;
public class NonfairReentrantLockExample {
static class NonfairSync extends AbstractQueuedSynchronizer {
@Override
protected boolean tryAcquire(int acquires) {
final Thread current = Thread.currentThread();
int c = getState();
if (c == 0) {
if (compareAndSetState(0, acquires)) {
setExclusiveOwnerThread(current);
return true;
}
} else if (current == getExclusiveOwnerThread()) {
int nextc = c + acquires;
if (nextc < 0) // overflow
throw new Error("Maximum lock count exceeded");
setState(nextc);
return true;
}
return false;
}
@Override
protected boolean tryRelease(int releases) {
int c = getState() - releases;
if (Thread.currentThread() != getExclusiveOwnerThread())
throw new IllegalMonitorStateException();
boolean free = false;
if (c == 0) {
free = true;
setExclusiveOwnerThread(null);
}
setState(c);
return free;
}
@Override
protected boolean isHeldExclusively() {
return getExclusiveOwnerThread() == Thread.currentThread();
}
}
private final NonfairSync sync = new NonfairSync();
public void lock() {
sync.acquire(1);
}
public void unlock() {
sync.release(1);
}
public static void main(String[] args) throws InterruptedException {
NonfairReentrantLockExample lockExample = new NonfairReentrantLockExample();
Thread t1 = new Thread(() -> {
lockExample.lock();
try {
System.out.println("Thread 1 acquired lock");
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
lockExample.unlock();
System.out.println("Thread 1 released lock");
}
});
Thread t2 = new Thread(() -> {
lockExample.lock();
try {
System.out.println("Thread 2 acquired lock");
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
lockExample.unlock();
System.out.println("Thread 2 released lock");
}
});
t1.start();
Thread.sleep(100); // 让t1先获取锁
t2.start();
t1.join();
t2.join();
System.out.println("Done");
}
}
核心方法分析:
tryAcquire(int acquires): 尝试获取独占锁。- 如果state为0,表示锁未被占用,尝试使用CAS(Compare and Swap)操作将state设置为
acquires(通常为1),并将当前线程设置为独占线程。如果CAS成功,则获取锁成功。 - 如果state不为0,且当前线程是独占线程,表示重入锁,将state增加
acquires。 - 如果state不为0,且当前线程不是独占线程,则获取锁失败。
- 如果state为0,表示锁未被占用,尝试使用CAS(Compare and Swap)操作将state设置为
tryRelease(int releases): 尝试释放独占锁。- 将state减去
releases。 - 如果state变为0,表示锁完全释放,将独占线程设置为null,并返回true。
- 否则,更新state,并返回false。
- 将state减去
isHeldExclusively(): 判断当前线程是否持有独占锁。
acquire(int acquires)和release(int releases):
ReentrantLock的lock()方法调用sync.acquire(1),unlock()方法调用sync.release(1)。acquire()和release()是AQS提供的方法,它们封装了获取和释放锁的逻辑,包括阻塞和唤醒线程。
公平锁的实现 (FairSync)
公平锁的实现与非公平锁的主要区别在于tryAcquire()方法。公平锁会先检查等待队列中是否有等待时间更长的线程,如果有,则不会尝试获取锁,从而保证了公平性。
static class FairSync extends AbstractQueuedSynchronizer {
@Override
protected boolean tryAcquire(int acquires) {
final Thread current = Thread.currentThread();
int c = getState();
if (c == 0) {
if (!hasQueuedPredecessors() && // 关键:检查队列中是否有等待时间更长的线程
compareAndSetState(0, acquires)) {
setExclusiveOwnerThread(current);
return true;
}
} else if (current == getExclusiveOwnerThread()) {
int nextc = c + acquires;
if (nextc < 0)
throw new Error("Maximum lock count exceeded");
setState(nextc);
return true;
}
return false;
}
// 其他方法与 NonfairSync 相同
}
hasQueuedPredecessors(): AQS提供的方法,用于检查等待队列中是否有等待时间更长的线程。
CountDownLatch:共享模式的经典案例
CountDownLatch是一个同步工具类,允许一个或多个线程等待直到在其他线程中执行的一组操作完成。CountDownLatch维护一个计数器,该计数器被初始化为一个正整数。当计数器达到零时,所有等待线程将被释放。
CountDownLatch的AQS实现
CountDownLatch的实现是基于AQS的共享模式。
import java.util.concurrent.locks.AbstractQueuedSynchronizer;
import java.util.concurrent.CountDownLatch;
public class CountDownLatchExample {
static class Sync extends AbstractQueuedSynchronizer {
Sync(int count) {
setState(count);
}
@Override
protected int tryAcquireShared(int acquires) {
return (getState() == 0) ? 1 : -1;
}
@Override
protected boolean tryReleaseShared(int releases) {
for (;;) {
int c = getState();
if (c == 0)
return false;
int nextc = c - 1;
if (compareAndSetState(c, nextc))
return nextc == 0;
}
}
}
private final Sync sync;
public CountDownLatchExample(int count) {
sync = new Sync(count);
}
public void await() throws InterruptedException {
sync.acquireSharedInterruptibly(1);
}
public void countDown() {
sync.releaseShared(1);
}
public long getCount() {
return sync.getState();
}
public static void main(String[] args) throws InterruptedException {
int numberOfThreads = 3;
CountDownLatchExample latch = new CountDownLatchExample(numberOfThreads);
for (int i = 0; i < numberOfThreads; i++) {
final int threadId = i;
new Thread(() -> {
try {
System.out.println("Thread " + threadId + " is working...");
Thread.sleep((long) (Math.random() * 2000)); // 模拟工作
System.out.println("Thread " + threadId + " finished.");
latch.countDown(); // 倒计时
} catch (InterruptedException e) {
e.printStackTrace();
}
}).start();
}
latch.await(); // 主线程等待
System.out.println("All threads have finished. Main thread continues.");
}
}
核心方法分析:
tryAcquireShared(int acquires): 尝试获取共享锁。- 如果state为0,表示计数器已经为0,允许所有等待线程通过,返回一个大于等于0的值(通常为1)。
- 如果state不为0,表示计数器还未归零,不允许线程通过,返回一个小于0的值(通常为-1)。
tryReleaseShared(int releases): 尝试释放共享锁(减少计数器)。- 使用CAS操作将state减1。
- 如果state变为0,表示计数器已经为0,返回true,唤醒所有等待线程。
- 否则,返回false。
acquireSharedInterruptibly(int acquires)和releaseShared(int releases):
CountDownLatch的await()方法调用sync.acquireSharedInterruptibly(1),countDown()方法调用sync.releaseShared(1)。acquireSharedInterruptibly()和releaseShared()是AQS提供的方法,它们封装了获取和释放共享锁的逻辑,包括阻塞和唤醒线程。 acquireSharedInterruptibly 允许在等待过程中被中断。
独占模式 vs. 共享模式:实现差异对比
| 特性 | 独占模式 (ReentrantLock) | 共享模式 (CountDownLatch) |
|---|---|---|
| 获取锁 | tryAcquire() 返回 true/false,表示是否成功获取锁。 |
tryAcquireShared() 返回 >=0 或 <0,表示是否允许通过。 |
| 释放锁 | tryRelease() 返回 true/false,表示是否完全释放锁。 |
tryReleaseShared() 返回 true/false,表示是否需要唤醒其他线程。 |
| 适用场景 | 互斥访问共享资源。 | 等待一组线程完成操作。 |
| 状态 (State) | 表示锁的重入次数。 | 表示计数器的值。 |
| 获取线程数 | 每次只允许一个线程获取锁。 | 允许多个线程同时获取锁 (当状态满足条件时)。 |
AQS核心方法的差异:
tryAcquirevs.tryAcquireShared: 独占模式使用tryAcquire尝试获取锁,返回布尔值表示是否成功。共享模式使用tryAcquireShared尝试获取锁,返回整数值,其正负表示获取是否成功,更灵活地表达状态。tryReleasevs.tryReleaseShared: 独占模式使用tryRelease释放锁,返回布尔值表示是否完全释放锁。共享模式使用tryReleaseShared释放锁,返回布尔值表示是否需要唤醒其他线程。
选择合适的同步模式
选择独占模式还是共享模式取决于具体的并发场景。
- 如果需要互斥访问共享资源,应该选择独占模式,例如ReentrantLock。
- 如果需要等待一组线程完成操作,应该选择共享模式,例如CountDownLatch。
- 如果需要限制同时访问共享资源的线程数量,可以选择Semaphore(也是基于AQS的共享模式实现的)。
AQS的设计优势
AQS框架的设计具有以下优势:
- 代码复用: AQS提供了一个通用的同步框架,可以避免重复编写同步逻辑。
- 可扩展性: 可以通过继承AQS并重写其抽象方法来实现自定义的同步器。
- 性能优化: AQS内部使用了CAS操作和FIFO队列,可以有效地提高并发性能。
- 灵活性: AQS支持独占模式和共享模式,可以满足不同的并发需求。
总结:理解独占与共享模式的关键差异
ReentrantLock和CountDownLatch虽然都基于AQS框架,但它们分别代表了独占模式和共享模式。独占模式保证了对资源的互斥访问,而共享模式则允许多个线程并发执行。理解这两种模式的差异,有助于我们选择合适的同步工具,并编写高效、可靠的并发程序。
核心在于状态的控制
AQS的状态管理是区分独占和共享模式的关键。独占模式中,状态通常表示锁的持有者和重入次数;而在共享模式中,状态则代表某种共享资源的可利用程度或者计数器的剩余值。通过对状态的巧妙控制,AQS能够灵活地支持各种复杂的并发场景。
AQS的强大之处在于它的可定制性
AQS的强大之处在于它的可定制性。开发者可以通过继承AQS并重写其抽象方法,来实现各种自定义的同步器,满足不同的并发需求。这种灵活性使得AQS成为Java并发包中不可或缺的核心组件。