Java AQS的锁排队机制:CLH队列中Node节点的等待状态与条件等待

Java AQS 的锁排队机制:CLH 队列中 Node 节点的等待状态与条件等待

大家好,今天我们来深入探讨 Java 并发编程中一个至关重要的概念:AbstractQueuedSynchronizer(AQS)框架中的锁排队机制,特别是 CLH 队列中 Node 节点的等待状态以及条件等待的实现。AQS 是构建 Java 并发工具的基础,理解其内部机制对于编写高效、可靠的并发程序至关重要。

AQS 核心概念回顾

AQS 的核心思想是维护一个同步状态 (state) 和一个 FIFO 的等待队列 (CLH 队列)。同步状态表示锁的可用性,而等待队列则管理等待获取锁的线程。线程试图获取锁时,如果同步状态不可用,则会将当前线程封装成一个 Node 节点加入到 CLH 队列中,并进入阻塞状态。当锁被释放时,AQS 会唤醒等待队列中的一个或多个线程,使其尝试获取锁。

CLH 队列:线程排队的基石

CLH 队列是一种隐式的双向链表,用于管理等待获取锁的线程。虽然被称为队列,但它并没有显式的队列结构,而是通过 Node 节点的 prevnext 指针来维护线程的排队顺序。

每个等待锁的线程都会被封装成一个 Node 对象,包含以下关键属性:

  • thread: 代表正在等待锁的线程。
  • waitStatus: 表示节点的等待状态,这是我们今天要重点讨论的内容。
  • prev: 指向队列中的前驱节点。
  • next: 指向队列中的后继节点。

Node 节点的等待状态 (waitStatus)

waitStatus 是一个重要的属性,它反映了节点在 CLH 队列中的状态,直接影响着 AQS 如何处理等待线程。waitStatus 可以取以下几个值:

含义
0 初始状态,表示节点正在等待获取锁。
SIGNAL 表示当前节点的后继节点需要被唤醒。当一个节点释放锁后,会尝试将后继节点的 waitStatus 设置为 SIGNAL,以便后继节点能够被唤醒。
CANCELLED 表示节点对应的线程被取消。通常发生在线程在等待过程中被中断或者超时。
CONDITION 表示节点正在等待条件变量。
PROPAGATE 表示释放操作需要传播给其他节点。 这通常用于共享模式(例如 SemaphoreCountDownLatch),当一个节点释放锁时,不仅需要唤醒它的后继节点,还需要确保后续节点也能够获得锁。

深入解析 waitStatus 的作用

  1. 0 (初始状态): 当一个线程首次尝试获取锁失败,并且被加入到 CLH 队列时,其对应的 Node 节点的 waitStatus 初始化为 0。这表示该节点处于一个“未激活”的等待状态,还没有收到任何唤醒信号。

  2. SIGNAL: SIGNAL 状态是最重要的状态之一。当一个节点成为队列中的头节点的后继节点,并且头节点释放锁时,头节点会尝试将后继节点的 waitStatus 设置为 SIGNAL。这表示后继节点(即当前节点)应该被唤醒,尝试获取锁。

    • 设置 SIGNAL 的时机: 主要发生在 unparkSuccessor() 方法中。该方法在释放锁时被调用,用于唤醒后继节点。
    • 作用: 确保当锁可用时,等待队列中的线程能够被及时唤醒。
    // AQS 中 unparkSuccessor() 方法的部分代码
    private void unparkSuccessor(Node node) {
        /*
         * If status is negative (i.e., possibly needing signal) try
         * to clear in anticipation of signalling.  It is OK if this
         * fails or if status is changed by waiting thread.
         */
        int ws = node.waitStatus;
        if (ws < 0)
            compareAndSetWaitStatus(node, ws, 0);
    
        /*
         * Thread to unpark is held in successor, which is normally
         * just the next node.  But if cancelled or apparently null,
         * traverse backwards from tail to find the actual
         * non-cancelled successor.
         */
        Node s = node.next;
        if (s == null || s.waitStatus > 0) {
            s = null;
            for (Node t = tail; t != null && t != node; t = t.prev)
                if (t.waitStatus <= 0)
                    s = t;
        }
        if (s != null)
            LockSupport.unpark(s.thread);
    }
  3. CANCELLED: CANCELLED 状态表示与该节点关联的线程已经被取消。这通常发生在以下情况:

    • 线程在等待锁的过程中被中断 (Thread.interrupt())。
    • 线程在等待锁的过程中超时。

    当一个节点的状态变为 CANCELLED 时,AQS 会将其从队列中移除,避免不必要的唤醒操作。

    • 处理 CANCELLED 节点: AQS 会在尝试获取锁之前检查当前节点是否被取消。如果被取消,则会尝试将其从队列中移除,并重新尝试获取锁。
    // AQS 中 acquireQueued() 方法的部分代码,用于处理 CANCELLED 节点
    final boolean acquireQueued(final Node node, int arg) {
        boolean failed = true;
        try {
            boolean interrupted = false;
            for (;;) {
                final Node p = node.predecessor();
                if (p == head && tryAcquire(arg)) {
                    setHead(node);
                    p.next = null; // help GC
                    failed = false;
                    return interrupted;
                }
                if (shouldParkAfterFailedAcquire(p, node) &&
                    parkAndCheckInterrupt())
                    interrupted = true;
            }
        } finally {
            if (failed)
                cancelAcquire(node); // 如果获取失败,则取消获取
        }
    }
    
    private void cancelAcquire(Node node) {
        // Ignore if node doesn't exist
        if (node == null)
            return;
    
        node.thread = null;
    
        // Skip cancelled predecessors
        Node pred = node.prev;
        while (pred.waitStatus > 0)
            node.prev = pred = pred.prev;
    
        // predNext is the apparent node to unsplice. CASes below will
        // fail if not, so lost races are handled gracefully
        Node predNext = pred.next;
    
        // Can use unconditional write instead of CAS if predCorrectlyRefersTo(pred, node)
        // i.e., pred is still valid
        node.waitStatus = Node.CANCELLED;
    
        // If we are the tail, remove ourselves.
        if (node == tail && compareAndSetTail(node, pred)) {
            compareAndSetNext(pred, predNext, null);
        } else {
            // If successor needs signal, wake up to dispose of us
            int ws;
            if (pred != head &&
                ((ws = pred.waitStatus) == Node.SIGNAL ||
                 (ws <= 0 && compareAndSetWaitStatus(pred, ws, Node.SIGNAL))) &&
                (pred.thread != null)) {
                Node next = node.next;
                if (next != null && next.waitStatus <= 0)
                    compareAndSetNext(pred, predNext, next);
            } else {
                unparkSuccessor(node);
            }
        }
    }
  4. CONDITION: CONDITION 状态用于支持条件变量。当一个线程调用 Condition.await() 方法时,该线程会被封装成一个 Node 节点,并将其 waitStatus 设置为 CONDITION,然后加入到与该 Condition 对象关联的等待队列中。

    • 与条件变量配合使用: CONDITION 状态的节点不会直接参与到锁的竞争中,它们只有在收到 Condition.signal()Condition.signalAll() 信号时才会被移动到 CLH 队列,参与锁的竞争。
  5. PROPAGATE: PROPAGATE 状态用于共享模式,例如 SemaphoreCountDownLatch。当一个节点释放锁时,如果其 waitStatusPROPAGATE,则表示释放操作需要传播给其他节点,以确保后续节点也能够获得锁。

    • 共享锁的传播: 在共享模式下,多个线程可以同时持有锁。当一个线程释放锁时,需要确保所有等待的线程都能获得锁,因此需要将释放操作传播给所有等待的线程。

条件等待 (Condition Waiting)

AQS 的条件等待机制允许线程在获取锁之后,因为某些条件不满足而释放锁并进入等待状态,直到其他线程发出信号通知该条件已经满足。

  • ConditionObject: AQS 提供了一个内部类 ConditionObject 来实现条件等待。每个 ConditionObject 维护一个独立的等待队列,用于存储等待特定条件的线程。

  • await() 方法: 当一个线程调用 Condition.await() 方法时,会执行以下操作:

    1. 释放当前持有的锁。
    2. 将当前线程封装成一个 Node 节点,并将其 waitStatus 设置为 CONDITION,然后加入到与该 Condition 对象关联的等待队列中。
    3. 线程进入阻塞状态,等待被唤醒。
  • signal()signalAll() 方法: 当一个线程调用 Condition.signal()Condition.signalAll() 方法时,会执行以下操作:

    1. 从与该 Condition 对象关联的等待队列中移除一个或所有节点。
    2. 将这些节点加入到 CLH 队列中,并将它们的 waitStatus 设置为 0,以便它们能够参与锁的竞争。

条件等待的代码示例

import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;

public class ConditionExample {

    private final Lock lock = new ReentrantLock();
    private final Condition condition = lock.newCondition();
    private boolean isReady = false;

    public void awaitSignal() throws InterruptedException {
        lock.lock();
        try {
            while (!isReady) {
                System.out.println(Thread.currentThread().getName() + " 等待信号...");
                condition.await();
            }
            System.out.println(Thread.currentThread().getName() + " 收到信号,继续执行...");
        } finally {
            lock.unlock();
        }
    }

    public void signalAll() {
        lock.lock();
        try {
            isReady = true;
            System.out.println(Thread.currentThread().getName() + " 发送信号...");
            condition.signalAll();
        } finally {
            lock.unlock();
        }
    }

    public static void main(String[] args) throws InterruptedException {
        ConditionExample example = new ConditionExample();

        Thread t1 = new Thread(() -> {
            try {
                example.awaitSignal();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }, "Thread-1");

        Thread t2 = new Thread(() -> {
            try {
                example.awaitSignal();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }, "Thread-2");

        Thread t3 = new Thread(() -> {
            example.signalAll();
        }, "Thread-3");

        t1.start();
        t2.start();
        Thread.sleep(1000); // 确保 t1 和 t2 进入等待状态
        t3.start();
    }
}

在这个例子中,Thread-1Thread-2 会调用 awaitSignal() 方法,因为 isReady 初始值为 false,所以它们会进入等待状态。Thread-3 会调用 signalAll() 方法,将 isReady 设置为 true,并发送信号唤醒所有等待的线程。

AQS 流程图

虽然不能在这里插入图片,但可以简单描述AQS的流程:

  1. 线程尝试获取锁 (tryAcquire)。
  2. 如果获取成功,直接返回。
  3. 如果获取失败,将当前线程封装成 Node,加入 CLH 队列。
  4. Node 节点进入等待状态,waitStatus 为 0。
  5. 当锁被释放 (tryRelease) 时,头节点尝试唤醒后继节点 (如果 waitStatusSIGNAL)。
  6. 被唤醒的节点重新尝试获取锁。
  7. 如果线程在等待过程中被中断,waitStatus 会被设置为 CANCELLED,并从队列中移除。
  8. 条件等待通过 ConditionObject 实现,线程在条件不满足时释放锁并进入条件队列,等待信号。

总结 Node 状态与 AQS 的关系

Node 节点的 waitStatus 是 AQS 实现锁排队和条件等待机制的关键。通过维护 waitStatus,AQS 能够有效地管理等待线程的状态,并根据不同的状态采取相应的行动,例如唤醒线程、取消线程或将其加入到条件队列中。理解 waitStatus 的含义和作用,对于深入理解 AQS 的工作原理至关重要。

清晰理解 AQS 的重要性

通过今天的学习,我们深入了解了 AQS 框架中 CLH 队列的 Node 节点的等待状态以及条件等待的实现。掌握这些概念对于理解 Java 并发工具的底层机制,编写高效、可靠的并发程序至关重要。AQS 是 Java 并发编程的基石,理解它能让我们更好地应对复杂的并发场景。

发表回复

您的邮箱地址不会被公开。 必填项已用 * 标注