深入理解Java并发编程:AQS框架原理、锁机制优化与高并发实践
各位同学,大家好!今天我们来深入探讨Java并发编程中的核心基石——AQS框架,以及如何利用它进行锁机制优化和高并发实践。AQS(AbstractQueuedSynchronizer)是Java并发包java.util.concurrent
中最核心的组件之一,它为构建锁、同步器等并发工具提供了一个通用的框架。掌握AQS,能够帮助我们更好地理解并发原理,并能根据实际场景定制高性能的并发组件。
一、AQS框架原理:理解同步状态与等待队列
AQS本质上是一个同步器模板,它定义了一套标准的同步操作流程,开发者可以通过继承AQS并重写特定的方法来实现自定义的同步器。AQS的核心概念包含两个部分:
-
同步状态(State): AQS内部维护一个
volatile int state
变量,用来表示同步状态。这个状态的含义由具体的同步器决定。例如,对于ReentrantLock,state表示锁被重入的次数;对于Semaphore,state表示剩余的许可数量。volatile
关键字保证了多线程环境下对state变量的可见性。 -
FIFO等待队列(CLH队列): 当线程尝试获取同步状态失败时,AQS会将该线程封装成一个Node节点,并加入到CLH队列中。CLH队列是一个虚拟的双向队列,实际上并不存在一个真实的队列对象,而是通过Node节点的prev和next指针维护节点之间的关系。队列的头节点是当前持有锁的线程,其他节点则处于等待状态。
AQS的核心方法:
AQS定义了一系列protected方法,子类需要根据自身的同步语义来实现这些方法。最常用的方法包括:
方法 | 描述 |
---|---|
tryAcquire(int arg) |
尝试获取同步状态。如果成功,返回true;否则,返回false。 |
tryRelease(int arg) |
尝试释放同步状态。如果成功,返回true;否则,返回false。 |
tryAcquireShared(int arg) |
共享模式下尝试获取同步状态。返回正数表示成功,0表示失败,负数表示应该阻塞。 |
tryReleaseShared(int arg) |
共享模式下尝试释放同步状态。如果成功,返回true;否则,返回false。 |
isHeldExclusively() |
当前同步器是否被当前线程独占。 |
AQS同步流程:
AQS的同步流程主要分为获取同步状态和释放同步状态两个阶段。
-
获取同步状态:
- 线程尝试通过
tryAcquire()
或tryAcquireShared()
方法获取同步状态。 - 如果获取成功,则直接返回。
- 如果获取失败,则将当前线程封装成Node节点,并加入到CLH队列的尾部。
- 加入队列后,线程会进入阻塞状态,直到被前驱节点唤醒或者被中断。
- 被唤醒后,线程会再次尝试获取同步状态,如果成功则退出阻塞,否则继续阻塞。
- 线程尝试通过
-
释放同步状态:
- 线程通过
tryRelease()
或tryReleaseShared()
方法释放同步状态。 - 如果释放成功,则唤醒CLH队列中的后继节点。
- 被唤醒的节点会尝试获取同步状态。
- 线程通过
AQS的内部类Node:
AQS的内部类Node
是CLH队列中的节点,它包含了以下重要的属性:
waitStatus
:表示节点的等待状态。常见的状态包括:SIGNAL
:表示后继节点处于等待状态,当前节点释放同步状态后需要唤醒后继节点。CANCELLED
:表示节点已经被取消。CONDITION
:表示节点正在等待条件变量。PROPAGATE
:表示释放操作需要传播给后继节点。0
:初始状态。
thread
:表示与该节点关联的线程。prev
:指向前驱节点的指针。next
:指向后继节点的指针。
二、基于AQS构建锁:独占锁与共享锁
AQS提供了两种同步模式:独占模式和共享模式。独占模式下,同一时刻只允许一个线程持有同步状态;共享模式下,允许多个线程同时持有同步状态。我们可以基于AQS构建独占锁(如ReentrantLock)和共享锁(如Semaphore)。
1. 独占锁(ReentrantLock)实现:
ReentrantLock是可重入的独占锁,它允许同一个线程多次获取锁,而不会造成死锁。ReentrantLock的实现依赖于AQS的独占模式。
import java.util.concurrent.locks.AbstractQueuedSynchronizer;
import java.util.concurrent.locks.ReentrantLock;
public class MyReentrantLock {
private final Sync sync = new Sync();
public void lock() {
sync.acquire(1);
}
public void unlock() {
sync.release(1);
}
private static class Sync extends AbstractQueuedSynchronizer {
@Override
protected boolean tryAcquire(int acquires) {
Thread current = Thread.currentThread();
int c = getState();
if (c == 0) { // 锁未被占用
if (compareAndSetState(0, acquires)) { // CAS尝试获取锁
setExclusiveOwnerThread(current); // 设置当前线程为独占锁持有者
return true;
}
} else if (current == getExclusiveOwnerThread()) { // 当前线程已经持有锁,重入
int nextc = c + acquires;
if (nextc < 0) // overflow
throw new Error("Maximum lock count exceeded");
setState(nextc); // 增加重入次数
return true;
}
return false; // 获取锁失败
}
@Override
protected boolean tryRelease(int releases) {
int c = getState() - releases;
if (Thread.currentThread() != getExclusiveOwnerThread())
throw new IllegalMonitorStateException();
boolean free = false;
if (c == 0) { // 锁完全释放
free = true;
setExclusiveOwnerThread(null); // 清空独占锁持有者
}
setState(c); // 减少重入次数
return free;
}
@Override
protected boolean isHeldExclusively() {
return getExclusiveOwnerThread() == Thread.currentThread();
}
}
public static void main(String[] args) throws InterruptedException {
MyReentrantLock lock = new MyReentrantLock();
Thread t1 = new Thread(() -> {
lock.lock();
try {
System.out.println("Thread 1 acquired lock");
Thread.sleep(1000);
lock.lock(); //重入锁
System.out.println("Thread 1 re-acquired lock");
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
lock.unlock();
lock.unlock(); //对应重入锁的释放
System.out.println("Thread 1 released lock");
}
});
Thread t2 = new Thread(() -> {
lock.lock();
try {
System.out.println("Thread 2 acquired lock");
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
lock.unlock();
System.out.println("Thread 2 released lock");
}
});
t1.start();
Thread.sleep(500); // 确保t1先获取锁
t2.start();
t1.join();
t2.join();
System.out.println("Done");
}
}
代码解释:
Sync
类继承了AbstractQueuedSynchronizer
,实现了tryAcquire
和tryRelease
方法。tryAcquire
方法首先判断锁是否被占用。如果未被占用,则通过CAS操作尝试获取锁,并将当前线程设置为独占锁持有者。如果锁已被占用,并且当前线程就是锁的持有者,则增加重入次数。tryRelease
方法减少重入次数,如果重入次数变为0,则释放锁,并清空独占锁持有者。
2. 共享锁(Semaphore)实现:
Semaphore用于控制对共享资源的并发访问数量。Semaphore的实现依赖于AQS的共享模式。
import java.util.concurrent.locks.AbstractQueuedSynchronizer;
import java.util.concurrent.Semaphore;
public class MySemaphore {
private final Sync sync;
public MySemaphore(int permits) {
sync = new Sync(permits);
}
public void acquire() throws InterruptedException {
sync.acquireSharedInterruptibly(1);
}
public void release() {
sync.releaseShared(1);
}
private static class Sync extends AbstractQueuedSynchronizer {
Sync(int permits) {
setState(permits);
}
@Override
protected int tryAcquireShared(int acquires) {
for (;;) {
int available = getState();
int remaining = available - acquires;
if (remaining < 0 || compareAndSetState(available, remaining))
return remaining;
}
}
@Override
protected boolean tryReleaseShared(int releases) {
for (;;) {
int current = getState();
int next = current + releases;
if (next < current) // overflow
throw new Error("Maximum permits exceeded");
if (compareAndSetState(current, next))
return true;
}
}
}
public static void main(String[] args) throws InterruptedException {
MySemaphore semaphore = new MySemaphore(3); // 允许3个线程同时访问
for (int i = 0; i < 5; i++) {
final int threadNum = i;
new Thread(() -> {
try {
semaphore.acquire();
System.out.println("Thread " + threadNum + " acquired semaphore");
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
semaphore.release();
System.out.println("Thread " + threadNum + " released semaphore");
}
}).start();
}
}
}
代码解释:
Sync
类继承了AbstractQueuedSynchronizer
,实现了tryAcquireShared
和tryReleaseShared
方法。tryAcquireShared
方法通过CAS操作尝试减少许可数量。如果剩余许可数量大于等于0,则获取成功,返回剩余许可数量;否则,返回负数,表示获取失败。tryReleaseShared
方法通过CAS操作尝试增加许可数量。
三、锁机制优化:提升并发性能
锁机制是并发编程中不可或缺的一部分,但过度使用锁会降低并发性能。以下是一些常用的锁机制优化技巧:
- 减少锁的持有时间: 只在必要的时候才持有锁,尽快释放锁。
- 缩小锁的范围: 使用更细粒度的锁,减少锁的竞争。例如,将一个大的锁分解成多个小的锁,每个锁保护不同的资源。
- 使用读写锁: 读写锁允许多个线程同时读取共享资源,但只允许一个线程写入共享资源。读写锁适用于读多写少的场景,可以提高并发性能。
- 使用CAS操作: CAS(Compare and Swap)是一种无锁算法,它可以原子地更新共享变量。CAS操作避免了锁的开销,可以提高并发性能。
- 使用ThreadLocal: ThreadLocal为每个线程提供一个独立的变量副本,避免了多线程之间的竞争。ThreadLocal适用于线程之间不需要共享数据的场景。
读写锁的实现 (ReadWriteLock):
import java.util.concurrent.locks.AbstractQueuedSynchronizer;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.ReadWriteLock;
public class MyReadWriteLock implements ReadWriteLock {
private final ReadLock readLock;
private final WriteLock writeLock;
private final Sync sync;
public MyReadWriteLock() {
sync = new Sync();
readLock = new ReadLock(sync);
writeLock = new WriteLock(sync);
}
@Override
public ReadLock readLock() {
return readLock;
}
@Override
public WriteLock writeLock() {
return writeLock;
}
private static class Sync extends AbstractQueuedSynchronizer {
static final int SHARED_BITS = 16;
static final int MAX_COUNT = (1 << SHARED_BITS) - 1;
static int exclusiveCount(int c) { return c & MAX_COUNT; }
@Override
protected boolean tryRelease(int releases) {
if (!isHeldExclusively())
throw new IllegalMonitorStateException();
int nextc = getState() - releases;
if (exclusiveCount(nextc) == 0) {
setExclusiveOwnerThread(null);
setState(nextc);
return true;
}
setState(nextc);
return false;
}
@Override
protected int tryAcquireShared(int acquires) {
for (;;) {
int c = getState();
int w = exclusiveCount(c);
if (w > 0 && getExclusiveOwnerThread() != Thread.currentThread()) //有写锁占用,并且不是当前线程
return -1;
int r = sharedCount(c);
if (r >= MAX_COUNT) throw new Error("Maximum number of readers exceeded"); // 超过读锁数量上限
int nextr = r + acquires;
if(nextr > MAX_COUNT) throw new Error("Maximum number of readers exceeded");
if (compareAndSetState(c, c + acquires)) {
return 1;
}
}
}
@Override
protected boolean tryAcquire(int acquires) {
Thread current = Thread.currentThread();
int c = getState();
int w = exclusiveCount(c);
if (c != 0) { //存在读锁或者写锁
if (w == 0 || current != getExclusiveOwnerThread()) //存在读锁 或者 写锁不是当前线程持有
return false;
if (w + exclusiveCount(acquires) > MAX_COUNT)
throw new Error("Maximum lock count exceeded");
setState(c + acquires);
return true;
}
if (compareAndSetState(0, acquires)) {
setExclusiveOwnerThread(current);
setState(acquires);
return true;
}
return false;
}
static int sharedCount(int c) { return c >>> SHARED_BITS; }
}
public static class ReadLock implements java.util.concurrent.locks.Lock {
private final Sync sync;
protected ReadLock(Sync sync) {
this.sync = sync;
}
@Override
public void lock() {
sync.acquireShared(1);
}
@Override
public void unlock() {
sync.releaseShared(1);
}
@Override
public Condition newCondition() {
throw new UnsupportedOperationException();
}
// 其他方法省略 (lockInterruptibly, tryLock 等)
@Override
public void lockInterruptibly() throws InterruptedException {}
@Override
public boolean tryLock() { return false; }
@Override
public boolean tryLock(long time, java.util.concurrent.TimeUnit unit) throws InterruptedException { return false;}
}
public static class WriteLock implements java.util.concurrent.locks.Lock {
private final Sync sync;
protected WriteLock(Sync sync) {
this.sync = sync;
}
@Override
public void lock() {
sync.acquire(1);
}
@Override
public void unlock() {
sync.release(1);
}
@Override
public Condition newCondition() {
return sync.new ConditionObject();
}
// 其他方法省略 (lockInterruptibly, tryLock 等)
@Override
public void lockInterruptibly() throws InterruptedException {}
@Override
public boolean tryLock() { return false; }
@Override
public boolean tryLock(long time, java.util.concurrent.TimeUnit unit) throws InterruptedException { return false;}
}
public static void main(String[] args) throws InterruptedException {
MyReadWriteLock rwLock = new MyReadWriteLock();
ReadLock readLock = rwLock.readLock();
WriteLock writeLock = rwLock.writeLock();
// 模拟读多写少的场景
new Thread(() -> {
writeLock.lock();
try {
System.out.println("Write Thread: Writing data...");
Thread.sleep(2000);
System.out.println("Write Thread: Data written.");
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
writeLock.unlock();
}
}).start();
Thread.sleep(100); // 确保写线程先启动
for (int i = 0; i < 5; i++) {
new Thread(() -> {
readLock.lock();
try {
System.out.println("Read Thread: Reading data...");
Thread.sleep(500);
System.out.println("Read Thread: Data read.");
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
readLock.unlock();
}
}).start();
}
}
}
代码解释:
Sync
类继承了AbstractQueuedSynchronizer
,用于管理读写锁的状态。 状态分成高16位和低16位,高16位代表共享锁(读锁)的数量,低16位代表独占锁(写锁)的数量ReadLock
和WriteLock
分别实现了java.util.concurrent.locks.Lock
接口,提供了lock
和unlock
方法。- 读锁通过
acquireShared
方法获取共享锁,写锁通过acquire
方法获取独占锁。
四、高并发实践:利用AQS构建高性能并发组件
AQS不仅可以用于构建锁,还可以用于构建各种高性能的并发组件,例如:
- CountDownLatch: 允许一个或多个线程等待其他线程完成操作。
- CyclicBarrier: 允许一组线程互相等待,直到所有线程都到达某个屏障点。
- FutureTask: 用于异步计算,可以获取计算结果或取消计算。
- RateLimiter: 用于限制请求的速率,防止系统被过载。
CountDownLatch的实现:
import java.util.concurrent.locks.AbstractQueuedSynchronizer;
import java.util.concurrent.CountDownLatch;
public class MyCountDownLatch {
private final Sync sync;
public MyCountDownLatch(int count) {
sync = new Sync(count);
}
public void await() throws InterruptedException {
sync.acquireSharedInterruptibly(1);
}
public void countDown() {
sync.releaseShared(1);
}
public long getCount() {
return sync.getCount();
}
private static class Sync extends AbstractQueuedSynchronizer {
Sync(int count) {
setState(count);
}
long getCount() {
return getState();
}
@Override
protected int tryAcquireShared(int acquires) {
return (getState() == 0) ? 1 : -1;
}
@Override
protected boolean tryReleaseShared(int releases) {
for (;;) {
int c = getState();
if (c == 0)
return false;
int nextc = c - 1;
if (compareAndSetState(c, nextc))
return nextc == 0;
}
}
}
public static void main(String[] args) throws InterruptedException {
MyCountDownLatch latch = new MyCountDownLatch(3);
for (int i = 0; i < 3; i++) {
final int threadNum = i;
new Thread(() -> {
try {
System.out.println("Thread " + threadNum + " is running");
Thread.sleep(1000);
System.out.println("Thread " + threadNum + " finished");
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
latch.countDown();
}
}).start();
}
latch.await(); // 等待所有线程完成
System.out.println("All threads finished");
}
}
代码解释:
Sync
类继承了AbstractQueuedSynchronizer
,使用state来表示需要等待的线程数量。tryAcquireShared
方法判断state是否为0,如果为0,则表示所有线程都已完成,返回1,允许等待线程继续执行;否则,返回-1,表示需要等待。tryReleaseShared
方法减少state的值,直到state变为0。
五、总结与应用
AQS是一个强大的并发框架,它为构建各种锁和同步器提供了基础。通过深入理解AQS的原理,我们可以更好地掌握并发编程的核心概念,并能够根据实际场景定制高性能的并发组件。理解AQS是深入理解Java并发编程的关键一步,它能够让你在面对复杂并发场景时更加得心应手。掌握AQS及其应用,能够帮助我们写出更高效、更可靠的并发程序。