AQS(AbstractQueuedSynchronizer)框架深度剖析:CLH队列与同步状态管理

AQS(AbstractQueuedSynchronizer)框架深度剖析:CLH队列与同步状态管理

大家好,今天我们来深入探讨并发编程中一个非常重要的框架——AQS (AbstractQueuedSynchronizer)。AQS 是构建许多同步器(例如 ReentrantLock、Semaphore、CountDownLatch 等)的基础。理解 AQS 的原理对于编写高效且可靠的并发程序至关重要。我们将重点关注 AQS 的核心组件:CLH 队列和同步状态管理。

1. AQS 的核心思想

AQS 本质上是一个同步器框架,它提供了一种通用的机制来管理同步状态、阻塞和唤醒线程。它采用了一种基于模板方法的设计模式,允许开发者通过继承 AQS 并重写特定的方法来实现自定义的同步器。

AQS 的核心思想可以概括为以下几点:

  • 同步状态 (state): AQS 使用一个 volatile int 类型的 state 变量来表示同步状态。这个状态可以表示锁的持有者数量、信号量剩余的许可数量等等。
  • CLH 队列: 当线程尝试获取同步状态失败时,AQS 会将这些线程放入一个虚拟的双向队列,称为 CLH 队列。CLH 队列是一种 FIFO (先进先出) 队列,用于管理等待获取同步状态的线程。
  • 模板方法: AQS 提供了一组模板方法,例如 tryAcquire()tryRelease()isHeldExclusively() 等,开发者需要根据自己的同步器语义来实现这些方法。
  • 阻塞和唤醒机制: AQS 使用 LockSupport 类来阻塞和唤醒线程。当线程需要等待时,它会被阻塞并放入 CLH 队列。当同步状态可用时,AQS 会唤醒队列中的一个或多个线程。

2. CLH 队列:线程排队的基石

CLH 队列是 AQS 中一个至关重要的组件。它用于管理那些因无法立即获取同步状态而需要等待的线程。CLH 队列是一种基于链表的 FIFO 队列,但与传统的队列不同,它是一种 虚拟队列,这意味着它并不实际存储线程对象,而是存储包含线程信息的节点。

2.1 CLH 队列的结构

CLH 队列由一系列节点组成,每个节点代表一个等待获取同步状态的线程。每个节点包含以下信息:

  • thread: 等待获取同步状态的线程。
  • waitStatus: 节点的等待状态,可以是以下值之一:
    • SIGNAL: 表示当前节点的后继节点(如果存在)需要被唤醒。
    • CANCELLED: 表示当前节点对应的线程已经被取消或中断。
    • CONDITION: 表示当前节点正在等待一个条件。
    • PROPAGATE: 表示释放操作应该传播给其他节点。
    • 0: 初始状态,表示节点正在等待获取同步状态。
  • prev: 指向前驱节点的引用。
  • next: 指向后继节点的引用。

2.2 CLH 队列的工作原理

当一个线程尝试获取同步状态失败时,AQS 会执行以下步骤:

  1. 创建节点: 创建一个新的节点,并将当前线程的信息存储到该节点中。
  2. 入队: 将新节点添加到 CLH 队列的尾部。这通常通过 CAS (Compare and Swap) 操作来完成,以保证线程安全。
  3. 自旋等待: 线程进入自旋等待状态,不断检查其前驱节点的状态。如果前驱节点释放了同步状态,并且当前节点是队列中的第一个有效等待节点,那么当前线程就可以尝试获取同步状态。
  4. 获取同步状态: 如果线程成功获取了同步状态,它将从 CLH 队列中移除,并开始执行临界区代码。
  5. 唤醒后继节点: 在释放同步状态后,线程会尝试唤醒其后继节点,以便后继节点可以尝试获取同步状态。

2.3 CLH 队列的代码实现 (简化)

虽然 AQS 的 CLH 队列实现非常复杂,涉及到大量的并发优化,但我们可以通过一个简化的例子来理解其基本原理。

import java.util.concurrent.atomic.AtomicReference;
import java.util.concurrent.locks.LockSupport;

class Node {
    volatile Thread thread;
    volatile int waitStatus;
    volatile Node prev;
    volatile Node next;

    Node() {
        thread = Thread.currentThread();
    }

    Node(Thread thread) {
        this.thread = thread;
    }
}

class SimpleCLHQueue {
    private final AtomicReference<Node> head = new AtomicReference<>(new Node(null)); // Dummy node
    private final AtomicReference<Node> tail = new AtomicReference<>(head.get());

    public void enqueue(Node node) {
        Node prev = tail.get();
        node.prev = prev;
        while (!tail.compareAndSet(prev, node)) {
            prev = tail.get();
            node.prev = prev;
        }
        prev.next = node;
    }

    public Node dequeue() {
        Node h = head.get();
        Node first = h.next;
        if (first == null) {
            return null; // Queue is empty
        }
        if (head.compareAndSet(h, first)) {
            first.prev = null;
            return first;
        } else {
            return null; // Dequeue failed, try again later
        }
    }

    public static void main(String[] args) throws InterruptedException {
        SimpleCLHQueue queue = new SimpleCLHQueue();

        Thread thread1 = new Thread(() -> {
            Node node = new Node();
            queue.enqueue(node);
            System.out.println("Thread 1 enqueued.");
            LockSupport.park(node); // Block thread 1
            System.out.println("Thread 1 resumed.");
        });

        Thread thread2 = new Thread(() -> {
            Node node = new Node();
            queue.enqueue(node);
            System.out.println("Thread 2 enqueued.");
            LockSupport.park(node); // Block thread 2
            System.out.println("Thread 2 resumed.");
        });

        thread1.start();
        Thread.sleep(100); // Ensure thread1 is enqueued first
        thread2.start();
        Thread.sleep(100); // Ensure thread2 is enqueued

        // Simulate releasing the lock and unparking the first thread
        Node firstNode = queue.dequeue();
        if (firstNode != null) {
            LockSupport.unpark(firstNode.thread);
            System.out.println("Unparked thread 1.");
        }

        Thread.sleep(1000);

        // Simulate releasing the lock and unparking the second thread
        firstNode = queue.dequeue();
        if (firstNode != null) {
            LockSupport.unpark(firstNode.thread);
            System.out.println("Unparked thread 2.");
        }

        thread1.join();
        thread2.join();
    }
}

这个简化的例子演示了 CLH 队列的基本入队和出队操作。请注意,这只是一个非常简化的版本,实际的 AQS 实现要复杂得多,因为它需要处理更多的并发情况和优化。例如,它使用更高级的 CAS 操作,并且使用 waitStatus 字段来管理节点的状态。

2.4 CLH 队列的优势

CLH 队列相比于其他队列实现,具有以下优势:

  • 公平性: CLH 队列保证了线程按照 FIFO 的顺序获取同步状态,从而避免了饥饿现象。
  • 可扩展性: CLH 队列可以很好地处理大量的并发线程。
  • 性能: CLH 队列的入队和出队操作都是 O(1) 的时间复杂度。
  • 减少竞争: 自旋等待主要发生在本地变量上,减少了对共享变量的竞争。线程只关注其前驱节点,避免了全局锁的竞争。

3. 同步状态管理:控制并发访问的核心

AQS 使用一个 volatile int 类型的 state 变量来表示同步状态。这个状态可以表示锁的持有者数量、信号量剩余的许可数量等等。AQS 提供了一组方法来操作这个 state 变量,包括:

  • getState(): 获取当前同步状态的值。
  • setState(int newState): 设置当前同步状态的值。
  • compareAndSetState(int expect, int update): 以原子方式比较并设置同步状态的值。如果当前状态的值等于 expect,则将其更新为 update

3.1 模板方法:自定义同步逻辑

AQS 提供了一组模板方法,开发者需要根据自己的同步器语义来实现这些方法。这些模板方法定义了获取和释放同步状态的逻辑。

方法名 描述
tryAcquire(int arg) 尝试获取同步状态。如果成功获取,则返回 true,否则返回 falsearg 参数通常用于传递获取同步状态所需的参数,例如锁的重入次数。
tryRelease(int arg) 尝试释放同步状态。如果成功释放,则返回 true,否则返回 falsearg 参数通常用于传递释放同步状态所需的参数,例如锁的重入次数。
isHeldExclusively() 如果当前线程独占式地持有同步状态,则返回 true,否则返回 false。这个方法通常用于判断当前线程是否已经持有锁。
tryAcquireShared(int arg) 尝试以共享模式获取同步状态。如果成功获取,则返回一个非负值,否则返回一个负值。
tryReleaseShared(int arg) 尝试以共享模式释放同步状态。如果成功释放,则返回 true,否则返回 false
isHeldExclusively() 用于判断当前同步器是否被当前线程独占持有。通常在使用 Condition 对象时需要实现此方法。

开发者需要根据自己的同步器语义来实现这些方法。例如,对于一个互斥锁,tryAcquire() 方法可能会检查当前同步状态是否为 0,如果是,则将其设置为 1,并返回 truetryRelease() 方法可能会将同步状态设置为 0,并返回 true

3.2 同步状态的获取和释放

AQS 提供了两种获取同步状态的模式:

  • 独占模式 (Exclusive Mode): 只有一个线程可以获取同步状态。例如,ReentrantLock 就是以独占模式获取同步状态的。
  • 共享模式 (Shared Mode): 多个线程可以同时获取同步状态。例如,Semaphore 和 CountDownLatch 就是以共享模式获取同步状态的。

AQS 提供了一组方法来获取和释放同步状态,包括:

  • acquire(int arg): 以独占模式获取同步状态。如果获取失败,则线程会被阻塞,直到同步状态可用。
  • acquireInterruptibly(int arg): 以独占模式获取同步状态。如果获取失败,则线程会被阻塞,但可以被中断。
  • tryAcquireNanos(int arg, long nanosTimeout): 以独占模式获取同步状态。如果获取失败,则线程会被阻塞,但有一个超时时间。
  • release(int arg): 以独占模式释放同步状态。
  • acquireShared(int arg): 以共享模式获取同步状态。如果获取失败,则线程会被阻塞,直到同步状态可用。
  • acquireSharedInterruptibly(int arg): 以共享模式获取同步状态。如果获取失败,则线程会被阻塞,但可以被中断。
  • tryAcquireSharedNanos(int arg, long nanosTimeout): 以共享模式获取同步状态。如果获取失败,则线程会被阻塞,但有一个超时时间。
  • releaseShared(int arg): 以共享模式释放同步状态。

这些方法内部会调用开发者实现的模板方法来获取和释放同步状态。例如,acquire(int arg) 方法会首先调用 tryAcquire(int arg) 方法来尝试获取同步状态。如果 tryAcquire(int arg) 方法返回 true,则表示获取成功,acquire(int arg) 方法直接返回。如果 tryAcquire(int arg) 方法返回 false,则表示获取失败,acquire(int arg) 方法会将当前线程放入 CLH 队列,并阻塞线程,直到同步状态可用。

3.3 一个简单的互斥锁示例

下面是一个使用 AQS 实现的简单互斥锁的例子:

import java.util.concurrent.locks.AbstractQueuedSynchronizer;

class SimpleMutex {

    private static class Sync extends AbstractQueuedSynchronizer {
        @Override
        protected boolean tryAcquire(int arg) {
            if (compareAndSetState(0, 1)) {
                setExclusiveOwnerThread(Thread.currentThread());
                return true;
            }
            return false;
        }

        @Override
        protected boolean tryRelease(int arg) {
            if (!isHeldExclusively()) {
                throw new IllegalMonitorStateException();
            }
            setExclusiveOwnerThread(null);
            setState(0);
            return true;
        }

        @Override
        protected boolean isHeldExclusively() {
            return getExclusiveOwnerThread() == Thread.currentThread();
        }
    }

    private final Sync sync = new Sync();

    public void lock() {
        sync.acquire(1);
    }

    public void unlock() {
        sync.release(1);
    }

    public static void main(String[] args) throws InterruptedException {
        SimpleMutex mutex = new SimpleMutex();

        Thread thread1 = new Thread(() -> {
            mutex.lock();
            try {
                System.out.println("Thread 1 acquired the lock.");
                Thread.sleep(1000); // Simulate some work
            } catch (InterruptedException e) {
                e.printStackTrace();
            } finally {
                mutex.unlock();
                System.out.println("Thread 1 released the lock.");
            }
        });

        Thread thread2 = new Thread(() -> {
            mutex.lock();
            try {
                System.out.println("Thread 2 acquired the lock.");
                Thread.sleep(1000); // Simulate some work
            } catch (InterruptedException e) {
                e.printStackTrace();
            } finally {
                mutex.unlock();
                System.out.println("Thread 2 released the lock.");
            }
        });

        thread1.start();
        Thread.sleep(100); // Ensure thread1 acquires the lock first
        thread2.start();

        thread1.join();
        thread2.join();
    }
}

在这个例子中,SimpleMutex 类使用 AQS 的 AbstractQueuedSynchronizer 类来实现互斥锁。Sync 类继承了 AbstractQueuedSynchronizer 类,并实现了 tryAcquire()tryRelease()isHeldExclusively() 方法。lock() 方法调用 sync.acquire(1) 方法来获取锁,unlock() 方法调用 sync.release(1) 方法来释放锁。

4. Condition:等待/通知机制的实现

AQS 还提供了一个 Condition 类,用于实现等待/通知机制。Condition 对象与锁相关联,允许线程在等待特定条件时释放锁,并在条件满足时重新获取锁。

4.1 Condition 的工作原理

Condition 对象内部维护一个等待队列,用于存储等待特定条件的线程。当线程调用 Condition.await() 方法时,它会执行以下步骤:

  1. 释放锁: 线程释放与 Condition 对象关联的锁。
  2. 进入等待队列: 线程被放入 Condition 对象的等待队列中。
  3. 阻塞: 线程被阻塞,直到被其他线程唤醒。

当其他线程调用 Condition.signal()Condition.signalAll() 方法时,它会执行以下步骤:

  1. 从等待队列中移除线程: signal() 方法会从等待队列中移除一个线程,signalAll() 方法会从等待队列中移除所有线程。
  2. 唤醒线程: 被移除的线程会被唤醒。
  3. 重新获取锁: 线程会尝试重新获取与 Condition 对象关联的锁。

4.2 Condition 的使用示例

import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;

class BoundedBuffer {
    final Lock lock = new ReentrantLock();
    final Condition notFull  = lock.newCondition();
    final Condition notEmpty = lock.newCondition();

    final Object[] items = new Object[10];
    int putptr, takeptr, count;

    public void put(Object x) throws InterruptedException {
        lock.lock();
        try {
            while (count == items.length)
                notFull.await();
            items[putptr] = x;
            if (++putptr == items.length) putptr = 0;
            ++count;
            notEmpty.signal();
        } finally {
            lock.unlock();
        }
    }

    public Object take() throws InterruptedException {
        lock.lock();
        try {
            while (count == 0)
                notEmpty.await();
            Object x = items[takeptr];
            if (++takeptr == items.length) takeptr = 0;
            --count;
            notFull.signal();
            return x;
        } finally {
            lock.unlock();
        }
    }
}

在这个例子中,BoundedBuffer 类使用 Condition 对象来实现一个有界缓冲区。notFull 条件表示缓冲区未满,notEmpty 条件表示缓冲区非空。put() 方法会在缓冲区满时等待 notFull 条件,take() 方法会在缓冲区空时等待 notEmpty 条件。

5. AQS 的重要性

AQS 是 Java 并发包中许多同步器的基石。理解 AQS 的原理对于编写高效且可靠的并发程序至关重要。AQS 提供了一种通用的机制来管理同步状态、阻塞和唤醒线程,并允许开发者通过继承 AQS 并重写特定的方法来实现自定义的同步器。

6. 总结:AQS 是并发编程的强大工具

AQS 通过 CLH 队列管理等待线程,通过同步状态控制并发访问,并使用 Condition 实现等待/通知机制,为构建各种同步器提供了坚实的基础。掌握 AQS 的原理对于理解 Java 并发包的底层机制至关重要。

发表回复

您的邮箱地址不会被公开。 必填项已用 * 标注