AQS(AbstractQueuedSynchronizer)源码:Node节点的CLH队列排队机制

好的,我们开始吧。

AQS(AbstractQueuedSynchronizer)源码:Node节点的CLH队列排队机制

大家好!今天我们深入探讨Java并发编程中至关重要的基石——AbstractQueuedSynchronizer (AQS) 的核心机制:Node节点的CLH队列排队机制。AQS是构建锁和其他同步组件的关键抽象,理解其内部原理对于编写高效、可靠的并发程序至关重要。本次讲座将从CLH队列的理论基础入手,结合AQS源码,详细剖析Node节点在AQS中的作用,以及排队、唤醒等关键操作的实现。

1. CLH队列:理论基础

CLH队列(Craig, Landin, and Hagersten queue)是一种基于链表的自旋锁队列,用于解决多线程并发访问共享资源时的排队问题。它具有以下关键特性:

  • FIFO(First-In, First-Out): 线程按照请求锁的顺序排队,先请求的线程先获得锁,保证公平性。
  • 链表结构: 线程封装成节点(Node),通过前驱节点(predecessor)和后继节点(successor)连接成一个链表。
  • 自旋等待: 线程在等待锁时,不会阻塞,而是不断循环检查前驱节点的状态,直到前驱节点释放锁。
  • 隐式队列: CLH队列的实际存在是通过节点之间的链接关系来体现的,无需显式的队列数据结构。

CLH队列的优点在于:

  • 公平性: 保证线程按照请求顺序获得锁。
  • 性能: 自旋等待减少了线程切换的开销,提高了并发性能。
  • 可扩展性: 易于扩展到大规模并发环境。

2. AQS中的Node节点

在AQS中,Node节点是CLH队列的基本组成单元,每个试图获取同步状态(例如锁)的线程都会被封装成一个Node节点,并加入到CLH队列中。Node节点包含以下关键属性:

属性 类型 描述
waitStatus volatile int 节点的状态,用于指示节点的等待状态。
prev volatile Node 指向前驱节点。
next volatile Node 指向后继节点。
thread Thread 与该节点关联的线程。
nextWaiter Node 用于条件队列,这里我们暂不涉及。

waitStatus 属性是Node节点的核心,它表示节点当前的等待状态,可以取以下值:

  • 0:初始状态,表示节点正在等待获取锁。
  • CANCELLED:值为 1,表示节点已经被取消,不再参与锁竞争。
  • SIGNAL:值为 -1,表示当前节点的后继节点需要被唤醒。当一个节点释放锁时,它会尝试唤醒它的后继节点。
  • CONDITION:值为 -2,表示节点正在等待条件变量,与条件队列相关,这里我们暂不讨论。
  • PROPAGATE:值为 -3,表示释放锁的操作需要传播给其他节点,通常用于共享模式。

3. AQS源码分析:入队操作

当一个线程尝试获取同步状态失败时,AQS会将该线程封装成一个Node节点,并加入到CLH队列的尾部。入队操作主要涉及以下几个步骤:

  1. 创建Node节点: 使用当前线程创建一个新的Node节点,node = new Node(Thread.currentThread(), mode),其中mode可以是EXCLUSIVE(独占模式)或SHARED(共享模式)。
  2. CAS设置尾节点: 使用CAS(Compare and Swap)操作尝试将新节点设置为队列的尾节点。
  3. 处理并发情况: 如果CAS操作失败,说明有其他线程也在尝试入队,需要进入循环重试。

下面是AQS中enq(Node node)方法的源码,该方法负责将节点加入到CLH队列的尾部:

private Node enq(final Node node) {
    for (;;) {
        Node t = tail;
        if (t == null) { // Must initialize
            if (compareAndSetHead(null, new Node()))
                tail = head;
        } else {
            node.prev = t;
            if (compareAndSetTail(t, node)) {
                t.next = node;
                return t;
            }
        }
    }
}

代码解释:

  • for (;;):一个无限循环,直到成功将节点加入队列为止。
  • Node t = tail;:获取当前队列的尾节点。
  • if (t == null):如果尾节点为空,说明队列为空,需要初始化。
  • compareAndSetHead(null, new Node()):使用CAS操作将一个空的Node节点设置为头节点。
  • tail = head;:将尾节点指向头节点。
  • node.prev = t;:将新节点的前驱节点设置为当前尾节点。
  • compareAndSetTail(t, node):使用CAS操作将新节点设置为尾节点。
  • t.next = node;:将原尾节点的后继节点设置为新节点。
  • return t;:返回原尾节点。

4. AQS源码分析:acquireQueued方法

acquireQueued(final Node node, int arg)方法是AQS的核心方法之一,它负责让线程在CLH队列中排队等待,直到获取到同步状态。该方法的主要逻辑如下:

  1. 自旋等待: 线程进入自旋等待状态,不断检查是否可以获取同步状态。
  2. 检查前驱节点: 只有当前驱节点是头节点时,线程才有资格尝试获取同步状态。
  3. 尝试获取同步状态: 调用tryAcquire(arg)方法尝试获取同步状态,如果成功,则将当前节点设置为头节点,并返回。
  4. 设置waitStatus: 如果获取同步状态失败,则根据情况设置节点的waitStatus属性。
  5. 阻塞等待: 如果需要阻塞等待,则调用LockSupport.park(this)方法阻塞当前线程。
  6. 中断处理: 如果线程被中断,则取消节点,并退出等待。

下面是acquireQueued(final Node node, int arg)方法的源码:

final boolean acquireQueued(final Node node, int arg) {
    boolean failed = true;
    try {
        boolean interrupted = false;
        for (;;) {
            final Node p = node.predecessor();
            if (p == head && tryAcquire(arg)) {
                setHead(node);
                p.next = null; // help GC
                failed = false;
                return interrupted;
            }
            if (shouldParkAfterFailedAcquire(p, node) &&
                parkAndCheckInterrupt())
                interrupted = true;
        }
    } finally {
        if (failed)
            cancelAcquire(node);
    }
}

代码解释:

  • failed = true;:设置失败标志,如果在方法执行过程中出现异常,则需要取消节点。
  • boolean interrupted = false;:记录线程是否被中断。
  • for (;;):一个无限循环,直到成功获取同步状态或被取消为止。
  • final Node p = node.predecessor();:获取当前节点的前驱节点。
  • if (p == head && tryAcquire(arg)):如果前驱节点是头节点,并且成功获取同步状态。
  • setHead(node);:将当前节点设置为头节点。
  • p.next = null;:将原头节点的后继节点设置为null,帮助GC。
  • failed = false;:设置失败标志为false,表示成功获取同步状态。
  • return interrupted;:返回线程是否被中断。
  • if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt()):如果需要阻塞等待,并且线程被中断。
  • interrupted = true;:设置线程被中断标志。
  • if (failed):如果方法执行过程中出现异常。
  • cancelAcquire(node);:取消节点。

shouldParkAfterFailedAcquire(Node pred, Node node)方法用于判断是否需要阻塞等待:

private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
    int ws = pred.waitStatus;
    if (ws == Node.SIGNAL)
        /*
         * This node has already set status asking a release
         * to signal it, so it can safely park.
         */
        return true;
    if (ws > 0) {
        /*
         * Predecessor was cancelled. Skip over predecessors and
         * indicate retry.
         */
        do {
            node.prev = pred = pred.prev;
        } while (pred != null && pred.waitStatus > 0);
        pred.next = node;
    } else {
        /*
         * waitStatus must be 0 or PROPAGATE.  Indicate that we
         * need a signal, but don't park yet.  Caller will need to
         * retry to make sure it doesn't miss a signal.
         */
        compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
    }
    return false;
}

代码解释:

  • int ws = pred.waitStatus;:获取前驱节点的waitStatus属性。
  • if (ws == Node.SIGNAL):如果前驱节点的waitStatusSIGNAL,表示可以安全地阻塞等待。
  • if (ws > 0):如果前驱节点的waitStatus大于0,表示前驱节点已经被取消,需要跳过该节点。
  • compareAndSetWaitStatus(pred, ws, Node.SIGNAL);:使用CAS操作将前驱节点的waitStatus设置为SIGNAL,表示需要被唤醒。

parkAndCheckInterrupt()方法用于阻塞当前线程,并检查是否被中断:

private final boolean parkAndCheckInterrupt() {
    LockSupport.park(this);
    return Thread.interrupted();
}

代码解释:

  • LockSupport.park(this);:阻塞当前线程。
  • Thread.interrupted();:检查线程是否被中断,并清除中断标志。

5. AQS源码分析:释放同步状态

当线程释放同步状态时,AQS需要唤醒CLH队列中的后继节点,使其有机会获取同步状态。释放操作主要涉及以下几个步骤:

  1. 尝试释放同步状态: 调用tryRelease(arg)方法尝试释放同步状态,如果成功,则继续执行。
  2. 唤醒后继节点: 如果头节点的waitStatusSIGNAL,则唤醒后继节点。

下面是AQS中release(int arg)方法的源码:

public final boolean release(int arg) {
    if (tryRelease(arg)) {
        Node h = head;
        if (h != null && h.waitStatus != 0)
            unparkSuccessor(h);
        return true;
    }
    return false;
}

代码解释:

  • if (tryRelease(arg)):调用tryRelease(arg)方法尝试释放同步状态,如果成功,则继续执行。
  • Node h = head;:获取头节点。
  • if (h != null && h.waitStatus != 0):如果头节点不为空,并且waitStatus不为0,表示需要唤醒后继节点。
  • unparkSuccessor(h);:唤醒后继节点。
  • return true;:返回true,表示成功释放同步状态。

unparkSuccessor(Node node)方法用于唤醒后继节点:

private void unparkSuccessor(Node node) {
    /*
     * If status is negative (i.e., possibly needing signal) try
     * to clear in anticipation of signaling.  It is OK if this
     * fails or if status is changed by waiting thread.
     */
    int ws = node.waitStatus;
    if (ws < 0)
        compareAndSetWaitStatus(node, ws, 0);

    /*
     * Thread to unpark is held in successor, which is normally
     * just the next node.  But if cancelled or apparently null,
     * traverse backwards from tail to find the actual
     * non-cancelled successor.
     */
    Node s = node.next;
    if (s == null || s.waitStatus > 0) {
        s = null;
        for (Node t = tail; t != null && t != node; t = t.prev)
            if (t.waitStatus <= 0)
                s = t;
    }
    if (s != null)
        LockSupport.unpark(s.thread);
}

代码解释:

  • int ws = node.waitStatus;:获取节点的waitStatus属性。
  • if (ws < 0):如果waitStatus小于0,表示需要清空waitStatus
  • compareAndSetWaitStatus(node, ws, 0);:使用CAS操作将waitStatus设置为0。
  • Node s = node.next;:获取后继节点。
  • if (s == null || s.waitStatus > 0):如果后继节点为空,或者waitStatus大于0,表示后继节点已经被取消,需要从尾节点开始向前遍历,找到第一个未被取消的节点。
  • LockSupport.unpark(s.thread);:唤醒后继节点对应的线程。

6. 独占模式与共享模式

AQS支持两种同步模式:独占模式和共享模式。

  • 独占模式: 只有一个线程可以获取同步状态,例如ReentrantLock。
  • 共享模式: 多个线程可以同时获取同步状态,例如CountDownLatch。

在独占模式下,tryAcquire(arg)方法尝试独占式地获取同步状态,如果获取失败,则将当前线程加入到CLH队列中,并阻塞等待。在共享模式下,tryAcquireShared(arg)方法尝试共享式地获取同步状态,如果获取失败,则同样将当前线程加入到CLH队列中,但不需要阻塞等待,而是继续尝试获取同步状态,直到获取成功或被取消。

7. 取消等待

当线程在等待同步状态时,可能会被中断或超时,此时需要取消等待。取消等待的操作主要包括以下几个步骤:

  1. 设置waitStatus为CANCELLED: 将节点的waitStatus属性设置为CANCELLED
  2. 从队列中移除节点: 将节点从CLH队列中移除。
  3. 唤醒后继节点: 如果节点有后继节点,则唤醒后继节点。

AQS中cancelAcquire(Node node) 方法用于取消获取:

private void cancelAcquire(Node node) {
    // Ignore if node doesn't exist
    if (node == null)
        return;

    node.thread = null;

    // Skip cancelled predecessors
    Node pred = node.prev;
    while (pred.waitStatus > 0)
        node.prev = pred = pred.prev;

    // predNext is the apparent node to unsplice. CASes below will
    // fail if not, in which case, we lost race vs another cancel
    // or signal, so no further action is necessary.
    Node predNext = pred.next;

    // Can use unconditional write instead of CAS if predCorrectlyRefersTo
    // node.firstWaiter, but can't ensure this will always be true.
    if (pred != node.prev)
        return;

    node.waitStatus = Node.CANCELLED;

    // If we are the tail, remove ourselves.
    if (node == tail && compareAndSetTail(node, pred)) {
        compareAndSetNext(pred, predNext, null);
    } else {
        // If successor needs signal, try to clear node's signal
        // bit, else unpark successor.
        int ws;
        if (pred != head &&
            ((ws = pred.waitStatus) == Node.SIGNAL ||
             (ws <= 0 && compareAndSetWaitStatus(pred, ws, Node.SIGNAL))) &&
            (pred.thread != null)) {
            Node next = node.next;
            if (next != null && next.waitStatus <= 0)
                unparkSuccessor(next);
        } else {
            signalNext(node);
        }
    }
}

8. AQS的优势与局限

AQS作为Java并发编程的基础框架,具有以下优势:

  • 灵活性: 可以用于构建各种同步组件,例如锁、信号量、CountDownLatch等。
  • 可扩展性: 易于扩展到大规模并发环境。
  • 公平性: 可以保证线程按照请求顺序获得同步状态。

AQS也存在一些局限性:

  • 复杂性: 源码较为复杂,理解起来有一定难度。
  • 性能开销: 自旋等待和CAS操作会带来一定的性能开销。
  • 死锁风险: 如果使用不当,可能会导致死锁。

Node节点排队机制:AQS并发同步的基石

通过本次讲座,我们深入了解了AQS中Node节点的CLH队列排队机制。Node节点作为AQS并发同步的核心,在线程排队、唤醒和取消等方面发挥着至关重要的作用。掌握Node节点排队机制的原理,有助于我们更好地理解AQS,并编写高效、可靠的并发程序。

发表回复

您的邮箱地址不会被公开。 必填项已用 * 标注