理解 Java AQS(AbstractQueuedSynchronizer):剖析并发组件底层实现原理,如 ReentrantLock、Semaphore。

好的,各位观众老爷们,欢迎来到今天的“并发组件底层探秘”专场!今天我们要扒的,是Java并发包里一个重量级的存在——AQS,也就是AbstractQueuedSynchronizer,抽象队列同步器。

别被“抽象”两个字吓跑了,它其实是个非常实在的家伙,像我们常用的ReentrantLock(可重入锁)、Semaphore(信号量)、CountDownLatch(倒计时器)等等并发工具,都或多或少地依赖于它。

一、AQS:并发世界的“万能胶”?

我们可以把AQS想象成并发世界里的“万能胶”,它提供了一个框架,让我们可以更方便地构建各种同步器。但是,它本身并不是一个锁,也不是一个信号量,而是一个模板,一个基类

这就好比盖房子,AQS就是那个打好的地基,搭好的框架,你要盖什么样的房子(锁?信号量?),还得自己往里面添砖加瓦。

为什么需要AQS?

在没有AQS之前,每个并发工具都需要自己去处理线程的阻塞、唤醒、竞争、排队等等问题。这工作量可想而知!而且,不同的开发者实现出来的东西,质量参差不齐,维护起来也痛苦不堪。

AQS的出现,就是为了解决这个问题。它把这些通用的、底层的并发控制逻辑抽象出来,封装成一个框架,让开发者只需要关注具体的同步语义,而不用再去重复造轮子。

二、AQS的核心组件:状态、队列、CAS

AQS主要有三个核心组件:

  1. state(状态): 这是一个volatile修饰的int变量,用来表示同步器的状态。它可以表示锁是否被占用、信号量的剩余数量等等。

    状态的改变通常是通过CAS(Compare and Swap)操作来保证原子性。

  2. CLH队列(队列): 这是一个FIFO(先进先出)的队列,用来存放那些因为竞争资源而阻塞的线程。线程会封装成Node节点加入队列,等待被唤醒。

    CLH队列是一个虚拟的双向队列,它并不实际存在一个Queue对象,而是通过Node节点之间的prev和next指针来维护队列的结构。

  3. CAS(Compare and Swap): 这是一种无锁算法,用来原子性地更新state变量。AQS大量使用了CAS操作来避免锁带来的性能开销。

    CAS操作包含三个操作数:内存地址(V)、期望值(A)和新值(B)。如果内存地址V的值等于期望值A,那么就将内存地址V的值修改为B,否则什么都不做。

    简而言之,CAS就是:我觉得现在是A,如果是A,我就改成B。

三、AQS的工作原理:线程的排队与唤醒

AQS的工作流程大致如下:

  1. 线程尝试获取同步状态: 线程会尝试通过CAS操作来改变state变量,如果成功,就表示获取到了同步状态,可以继续执行。
  2. 获取失败,加入CLH队列: 如果获取同步状态失败,线程就会被封装成一个Node节点,加入到CLH队列的末尾,并被阻塞。
  3. CLH队列中的线程等待被唤醒: CLH队列中的线程会不断地检查自己的前驱节点是否已经释放了同步状态。如果前驱节点释放了同步状态,并且自己是队列中的第一个等待者,那么它就会尝试获取同步状态。
  4. 释放同步状态,唤醒后继节点: 当线程释放同步状态时,它会唤醒CLH队列中的后继节点,让后继节点尝试获取同步状态。

图解AQS的工作流程:

graph LR
    A[线程尝试获取同步状态] --> B{获取成功?}
    B -- 是 --> C[执行临界区代码]
    B -- 否 --> D[创建Node节点,加入CLH队列]
    D --> E[线程阻塞]
    C --> F[释放同步状态]
    F --> G{CLH队列为空?}
    G -- 是 --> H[结束]
    G -- 否 --> I[唤醒CLH队列中的后继节点]
    I --> A

四、AQS的两种模式:独占模式和共享模式

AQS支持两种模式:

  • 独占模式(Exclusive): 只有一个线程可以获取同步状态,例如ReentrantLock。
  • 共享模式(Shared): 允许多个线程同时获取同步状态,例如Semaphore、CountDownLatch。

这两种模式的区别主要在于tryAcquire()tryRelease()方法的实现。

  • 独占模式:

    • tryAcquire(int arg):尝试以独占模式获取同步状态,成功返回true,失败返回false。
    • tryRelease(int arg):尝试释放独占模式的同步状态,如果释放成功,则唤醒后继节点。
  • 共享模式:

    • tryAcquireShared(int arg):尝试以共享模式获取同步状态,成功返回一个非负数,失败返回一个负数。
    • tryReleaseShared(int arg):尝试释放共享模式的同步状态,如果释放成功,则唤醒所有后继节点。

五、AQS的核心方法:留给子类去实现的“钩子”

AQS提供了一些核心方法,但是这些方法都是protected类型的,意味着它们是留给子类去实现的。这些方法被称为“钩子方法”,子类通过实现这些方法,来定义自己的同步语义。

方法名 描述
tryAcquire(int arg) 尝试以独占模式获取同步状态。子类必须实现此方法,并根据自己的同步语义来判断是否可以获取同步状态。如果可以获取,则返回true,否则返回false。
tryRelease(int arg) 尝试以独占模式释放同步状态。子类必须实现此方法,并根据自己的同步语义来释放同步状态。如果释放成功,则返回true,否则返回false。
tryAcquireShared(int arg) 尝试以共享模式获取同步状态。子类必须实现此方法,并根据自己的同步语义来判断是否可以获取同步状态。如果可以获取,则返回一个非负数,表示剩余的可用资源数。如果不能获取,则返回一个负数。
tryReleaseShared(int arg) 尝试以共享模式释放同步状态。子类必须实现此方法,并根据自己的同步语义来释放同步状态。如果释放成功,则返回true,否则返回false。
isHeldExclusively() 判断当前线程是否独占同步状态。子类必须实现此方法,并根据自己的同步语义来判断当前线程是否独占同步状态。

六、AQS的典型应用:ReentrantLock、Semaphore、CountDownLatch

现在,让我们来看几个AQS的典型应用,看看它们是如何利用AQS来实现自己的同步语义的。

1. ReentrantLock(可重入锁)

ReentrantLock是AQS的典型应用之一。它实现了独占模式的同步语义,允许同一个线程多次获取锁,而不会被自己阻塞。

  • state: 用来表示锁的持有状态。0表示锁未被持有,大于0表示锁被持有,具体数值表示被同一个线程重入的次数。
  • tryAcquire(int arg):

    • 如果state为0,表示锁未被持有,尝试通过CAS操作将state设置为1,成功则获取锁。
    • 如果state大于0,且当前线程是持有锁的线程,则将state加1,表示重入锁。
    • 如果以上两种情况都不满足,则获取锁失败,返回false。
  • tryRelease(int arg):

    • 将state减1。
    • 如果state减为0,表示锁已经被完全释放,将持有锁的线程设置为null,并返回true。
    • 否则,返回false。

ReentrantLock的源码片段(简化版):

class ReentrantLock {
    private final Sync sync;

    abstract static class Sync extends AbstractQueuedSynchronizer {
        // ...
        @Override
        protected final boolean tryAcquire(int acquires) {
            final Thread current = Thread.currentThread();
            int c = getState();
            if (c == 0) {
                if (compareAndSetState(0, acquires)) {
                    setExclusiveOwnerThread(current);
                    return true;
                }
            }
            else if (current == getExclusiveOwnerThread()) {
                int nextc = c + acquires;
                if (nextc < 0) // overflow
                    throw new Error("Maximum lock count exceeded");
                setState(nextc);
                return true;
            }
            return false;
        }

        @Override
        protected final boolean tryRelease(int releases) {
            int c = getState() - releases;
            if (Thread.currentThread() != getExclusiveOwnerThread())
                throw new IllegalMonitorStateException();
            boolean free = false;
            if (c == 0) {
                free = true;
                setExclusiveOwnerThread(null);
            }
            setState(c);
            return free;
        }

        @Override
        protected final boolean isHeldExclusively() {
            return getExclusiveOwnerThread() == Thread.currentThread();
        }
    }

    // ...
}

2. Semaphore(信号量)

Semaphore也是AQS的应用,它实现了共享模式的同步语义,用来控制同时访问某个资源的线程数量。

  • state: 用来表示剩余的可用资源数。
  • tryAcquireShared(int arg):

    • 循环尝试减少state的值。
    • 如果state大于等于0,表示有可用资源,获取成功,返回剩余的可用资源数。
    • 如果state小于0,表示没有可用资源,获取失败,返回负数。
  • tryReleaseShared(int arg):

    • 循环尝试增加state的值。
    • 如果增加成功,则唤醒CLH队列中的后继节点。

Semaphore的源码片段(简化版):

class Semaphore {
    private final Sync sync;

    abstract static class Sync extends AbstractQueuedSynchronizer {
        // ...
        @Override
        protected int tryAcquireShared(int acquires) {
            for (;;) {
                int available = getState();
                int remaining = available - acquires;
                if (remaining < 0 ||
                    compareAndSetState(available, remaining))
                    return remaining;
            }
        }

        @Override
        protected boolean tryReleaseShared(int releases) {
            for (;;) {
                int current = getState();
                int next = current + releases;
                if (next < current) // overflow
                    throw new Error("Maximum permit count exceeded");
                if (compareAndSetState(current, next))
                    return true;
            }
        }
    }
    // ...
}

3. CountDownLatch(倒计时器)

CountDownLatch也是AQS的应用,它实现了共享模式的同步语义,用来等待一组线程完成操作。

  • state: 用来表示需要等待的线程数量。
  • tryAcquireShared(int arg):

    • 如果state为0,表示所有线程已经完成,获取成功,返回1。
    • 否则,获取失败,返回-1。
  • tryReleaseShared(int arg):

    • 将state减1。
    • 如果state减为0,表示所有线程已经完成,唤醒所有等待的线程。

CountDownLatch的源码片段(简化版):

class CountDownLatch {
    private final Sync sync;

    private static final class Sync extends AbstractQueuedSynchronizer {
        // ...
        @Override
        protected int tryAcquireShared(int acquires) {
            return (getState() == 0) ? 1 : -1;
        }

        @Override
        protected boolean tryReleaseShared(int releases) {
            for (;;) {
                int c = getState();
                if (c == 0)
                    return false;
                int nextc = c-1;
                if (compareAndSetState(c, nextc))
                    return nextc == 0;
            }
        }
    }
    // ...
}

七、AQS的优点与缺点

优点:

  • 简化并发工具的开发: AQS提供了一个通用的并发控制框架,让开发者只需要关注具体的同步语义,而不用再去重复造轮子。
  • 提高并发性能: AQS大量使用了CAS操作来避免锁带来的性能开销。
  • 提高代码的可维护性: AQS将通用的并发控制逻辑封装成一个框架,使得代码结构更加清晰,易于维护。

缺点:

  • 学习曲线陡峭: AQS的原理比较复杂,需要一定的并发编程基础才能理解。
  • 使用不当可能导致死锁: 如果子类实现的同步语义不正确,可能会导致死锁。

八、总结

AQS是Java并发包里一个非常重要的组件,它为各种并发工具的实现提供了基础。理解AQS的原理,可以帮助我们更好地理解Java并发包,更好地使用并发工具,也可以帮助我们更好地解决并发编程中的问题。

希望今天的讲解能让你对AQS有一个更深入的了解。记住,AQS不是万能的,但它绝对是并发编程中的一把利器!💪

最后,送大家一句话:并发编程,路漫漫其修远兮,吾将上下而求索!

发表回复

您的邮箱地址不会被公开。 必填项已用 * 标注