Java AQS(AbstractQueuedSynchronizer)

好的,各位观众老爷们,今天咱们要聊的可是Java并发编程里的一位重量级选手——AQS(AbstractQueuedSynchronizer),江湖人称“抽象队列同步器”。 别看名字这么硬核,其实它就像一个“万能插座”,各种锁、信号量、倒计时器都得靠它才能插上电,发光发热。🔥

准备好了吗?咱们这就开始一场AQS的深度历险记!

一、开场白:并发世界的“水电站”

在并发编程的世界里,多个线程就像一群嗷嗷待哺的小鸟,都想抢着吃东西(访问共享资源)。如果没有一个好的“分配机制”,那场面可就乱了套了,轻则数据错乱,重则程序崩溃。

AQS,就是并发世界的“水电站”,它负责协调各个线程对共享资源的访问,保证线程安全,让大家井然有序地排队、获取资源、释放资源,最终实现并发控制。

你可以把AQS想象成一个“线程调度中心”,它手里拿着两样法宝:

  • state(状态): 这是一个整数变量,用来表示共享资源的状态。比如,锁是否被占用,信号量还剩多少个许可等等。
  • FIFO队列(CLH队列的变体): 这是一个先进先出的队列,用来存放那些没抢到资源的线程。这些线程会乖乖地排队等待,直到轮到自己。

二、AQS的核心思想:模板方法模式 + 状态管理

AQS之所以强大,是因为它采用了“模板方法模式”。它定义了一套通用的并发控制流程,但是把一些关键的步骤(比如获取资源、释放资源)留给子类去实现。

这就好比一个“积木玩具”,AQS提供了积木的基本框架,你可以根据自己的需求,用不同的积木块(子类)来搭建出各种各样的并发工具。🧱

AQS的核心思想可以用一句话概括:基于状态(state)来实现同步,利用FIFO队列来管理等待线程。

三、AQS的主要组件:状态、队列、同步方法

要理解AQS,我们需要深入了解它的三个主要组件:

  1. 状态(State):

    • 这是一个volatile int类型的变量,用来表示共享资源的状态。
    • AQS提供了三个方法来操作状态:
      • getState(): 获取当前状态值。
      • setState(int newState): 设置新的状态值。
      • compareAndSetState(int expect, int update): 使用CAS(Compare and Swap)原子操作来更新状态值。

    状态就像一个“交通信号灯”,不同的状态值代表不同的含义。比如,0表示空闲,1表示被占用。🚦

  2. 队列(Queue):

    • 这是一个FIFO双向队列(CLH队列的变体),用来存放等待获取资源的线程。
    • 队列中的每个节点(Node)都代表一个等待线程。
    • AQS使用headtail指针来指向队列的头部和尾部。

    队列就像一个“停车场”,没抢到车位的车辆就得排队等待。🚗🚕🚙

  3. 同步方法:

    AQS定义了一组同步方法,这些方法会被子类重写,用来实现具体的并发控制逻辑。这些方法主要包括:

    方法名 作用
    tryAcquire(int arg) 尝试获取独占资源。成功返回true,失败返回false
    tryRelease(int arg) 尝试释放独占资源。成功返回true,失败返回false
    tryAcquireShared(int arg) 尝试获取共享资源。返回值大于等于0表示获取成功,小于0表示获取失败。
    tryReleaseShared(int arg) 尝试释放共享资源。如果释放后允许唤醒后续节点,则返回true,否则返回false
    isHeldExclusively() 当前同步器是否在独占模式下被当前线程占用。通常在Condition实现中使用。

    这些同步方法就像“交通规则”,规定了线程如何获取和释放资源。📜

四、AQS的工作流程:线程排队、获取资源、释放资源

AQS的工作流程可以用一个简单的例子来说明:假设我们要实现一个简单的独占锁(类似于ReentrantLock):

  1. 线程尝试获取锁: 线程调用acquire(int arg)方法来尝试获取锁。这个方法会调用子类重写的tryAcquire(int arg)方法。
  2. tryAcquire(int arg)尝试获取锁:
    • 如果当前锁是空闲的(state为0),则使用CAS操作将state设置为1,表示锁被当前线程占用。如果CAS操作成功,则表示获取锁成功,tryAcquire(int arg)返回true
    • 如果当前锁已经被其他线程占用(state为1),或者CAS操作失败,则表示获取锁失败,tryAcquire(int arg)返回false
  3. 如果获取锁失败,则进入队列等待: 如果tryAcquire(int arg)返回false,则线程会被封装成一个Node节点,并添加到AQS的队列中。
  4. 队列中的线程被阻塞: 队列中的线程会被阻塞,进入等待状态。
  5. 当锁被释放时: 当持有锁的线程调用release(int arg)方法释放锁时,release(int arg)方法会调用子类重写的tryRelease(int arg)方法。
  6. tryRelease(int arg)尝试释放锁:
    • tryRelease(int arg)会将state设置为0,表示锁被释放。
    • tryRelease(int arg)会唤醒队列中的第一个线程(head节点的下一个节点)。
  7. 被唤醒的线程再次尝试获取锁: 被唤醒的线程会再次尝试获取锁,如果获取成功,则从队列中移除,并开始执行自己的任务。

整个流程就像一个“过山车”,线程们排队等待,当轮到自己的时候,才能登上过山车(获取资源),体验一把刺激的旅程。🎢

五、AQS的两种模式:独占模式和共享模式

AQS支持两种模式:

  • 独占模式(Exclusive): 在独占模式下,同一时刻只有一个线程可以获取资源。比如ReentrantLock就是独占锁。
  • 共享模式(Shared): 在共享模式下,多个线程可以同时获取资源。比如SemaphoreCountDownLatch就是共享锁。

这两种模式就像“单行道”和“多车道”,单行道一次只能通过一辆车,多车道一次可以同时通过多辆车。🛣️

六、AQS在Java并发工具类中的应用:

AQS是Java并发工具类的基石,很多常用的并发工具类都是基于AQS实现的。比如:

  • ReentrantLock: 可重入锁,基于AQS的独占模式实现。
  • ReentrantReadWriteLock: 可重入读写锁,基于AQS的独占模式和共享模式实现。
  • Semaphore: 信号量,基于AQS的共享模式实现。
  • CountDownLatch: 倒计时器,基于AQS的共享模式实现。
  • CyclicBarrier: 循环栅栏,虽然不是直接基于AQS实现,但是其内部也使用了锁和条件变量来实现线程同步。

这些工具类就像“乐高积木”,你可以用它们来搭建出各种各样的并发应用。🧱

七、AQS源码分析(简化版):

为了更深入地理解AQS,我们来分析一下AQS的源码(简化版):

public abstract class AbstractQueuedSynchronizer extends AbstractOwnableSynchronizer implements java.io.Serializable {

    private volatile int state; // 共享资源的状态

    private transient volatile Node head; // 队列头节点

    private transient volatile Node tail; // 队列尾节点

    // 获取当前状态
    protected final int getState() {
        return state;
    }

    // 设置当前状态
    protected final void setState(int newState) {
        state = newState;
    }

    // CAS更新状态
    protected final boolean compareAndSetState(int expect, int update) {
        // Unsafe类提供的原子操作
        return unsafe.compareAndSwapInt(this, stateOffset, expect, update);
    }

    // 尝试获取独占资源(需要子类实现)
    protected boolean tryAcquire(int arg) {
        throw new UnsupportedOperationException();
    }

    // 尝试释放独占资源(需要子类实现)
    protected boolean tryRelease(int arg) {
        throw new UnsupportedOperationException();
    }

    // 获取独占资源
    public final void acquire(int arg) {
        if (!tryAcquire(arg) &&
            acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
            selfInterrupt();
    }

    // 将线程添加到队列中
    private Node addWaiter(Node mode) {
        Node node = new Node(Thread.currentThread(), mode);
        // 快速尝试将节点添加到队列尾部
        Node pred = tail;
        if (pred != null) {
            node.prev = pred;
            if (compareAndSetTail(pred, node)) {
                pred.next = node;
                return node;
            }
        }
        // 如果快速尝试失败,则使用enq方法
        enq(node);
        return node;
    }

    // 将节点添加到队列尾部
    private Node enq(final Node node) {
        for (;;) {
            Node t = tail;
            if (t == null) { // 队列为空,初始化
                if (compareAndSetHead(new Node()))
                    tail = head;
            } else {
                node.prev = t;
                if (compareAndSetTail(t, node)) {
                    t.next = node;
                    return t;
                }
            }
        }
    }

    // 线程在队列中等待
    final boolean acquireQueued(final Node node, int arg) {
        boolean failed = true;
        try {
            boolean interrupted = false;
            for (;;) {
                final Node p = node.predecessor();
                if (p == head && tryAcquire(arg)) { // 前驱节点是head,且获取锁成功
                    setHead(node);
                    p.next = null; // help GC
                    failed = false;
                    return interrupted;
                }
                if (shouldParkAfterFailedAcquire(p, node) &&
                    parkAndCheckInterrupt())
                    interrupted = true;
            }
        } finally {
            if (failed)
                cancelAcquire(node);
        }
    }

    // 判断是否应该park线程
    private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
        int ws = pred.waitStatus;
        if (ws == Node.SIGNAL)
            return true;
        if (ws > 0) {
            do {
                node.prev = pred = pred.prev;
            } while (pred != null && pred.waitStatus > 0);
            if (pred != null)
                pred.next = node;
        } else {
            compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
        }
        return false;
    }

    // park线程并检查中断
    private final boolean parkAndCheckInterrupt() {
        LockSupport.park(this);
        return Thread.interrupted();
    }

    // 释放独占资源
    public final boolean release(int arg) {
        if (tryRelease(arg)) {
            Node h = head;
            if (h != null && h.waitStatus != 0)
                unparkSuccessor(h);
            return true;
        }
        return false;
    }

    // 唤醒后继节点
    private void unparkSuccessor(Node node) {
        int ws = node.waitStatus;
        if (ws < 0)
            compareAndSetWaitStatus(node, ws, 0);
        Node s = node.next;
        if (s == null || s.waitStatus > 0) {
            s = null;
            for (Node t = tail; t != null && t != node; t = t.prev)
                if (t.waitStatus <= 0)
                    s = t;
        }
        if (s != null)
            LockSupport.unpark(s.thread);
    }

    // 其他方法...
}

这段代码展示了AQS的核心流程:线程尝试获取锁,如果获取失败,则进入队列等待,当锁被释放时,唤醒队列中的线程。

八、AQS的优缺点:

优点:

  • 灵活性: AQS提供了高度的灵活性,可以用来实现各种各样的并发工具。
  • 可扩展性: 可以通过继承AQS来定制自己的同步器。
  • 高性能: AQS使用CAS操作和FIFO队列来保证并发性能。

缺点:

  • 复杂性: AQS的源码比较复杂,理解起来有一定的难度。
  • 需要一定的并发编程基础: 使用AQS需要对并发编程有一定的了解。

九、总结:AQS是并发编程的“瑞士军刀”

AQS是Java并发编程中一个非常重要的工具,它就像一把“瑞士军刀”,可以用来解决各种各样的并发问题。虽然AQS的源码比较复杂,但是只要掌握了它的核心思想,就可以灵活地使用它来构建自己的并发工具。

希望今天的讲解能够帮助大家更好地理解AQS,并在实际的并发编程中灵活运用。 如果觉得有所收获,别忘了点赞👍、收藏⭐、转发↗️哦! 😉

发表回复

您的邮箱地址不会被公开。 必填项已用 * 标注