JAVA AQS底层原理深度解构:独占锁与共享锁的完整实现机制

JAVA AQS底层原理深度解构:独占锁与共享锁的完整实现机制

大家好,今天我们来深入探讨Java并发编程中一个非常重要的组件——AbstractQueuedSynchronizer,也就是大家常说的AQS。AQS是构建锁和同步器的基础框架,理解AQS的底层原理对于编写高效、可靠的并发程序至关重要。我们将从AQS的基本概念出发,逐步剖析其核心机制,并重点分析独占锁和共享锁的实现原理,最后通过具体的代码示例来加深理解。

1. AQS:并发的基石

AQS,抽象队列同步器,是一个抽象类,它提供了一个基于FIFO队列的阻塞锁和相关的同步器框架。AQS的核心思想是使用一个int类型的state变量来表示同步状态,并通过内置的FIFO队列来管理竞争资源的线程。AQS本身并不直接实现任何同步机制,而是定义了一套模板方法,子类通过继承AQS并实现这些模板方法来实现特定的同步语义。

1.1 AQS的核心属性

AQS主要包含以下几个核心属性:

  • state (int): 同步状态,用于表示锁的持有状态或其他同步条件。它是一个volatile变量,保证了可见性。
  • head (Node): FIFO队列的头节点,指向等待队列的第一个节点。
  • tail (Node): FIFO队列的尾节点,指向等待队列的最后一个节点。
  • Node (内部类): 表示等待队列中的一个节点,包含线程引用、等待状态等信息。

1.2 AQS的核心方法

AQS提供了一系列核心方法,子类需要根据具体的同步需求来实现这些方法。这些方法主要分为以下几类:

  • 状态访问和修改方法:

    • getState(): 获取当前同步状态。
    • setState(int newState): 设置同步状态。
    • compareAndSetState(int expect, int update): 使用CAS操作原子地设置同步状态,保证线程安全。
  • 独占模式相关方法:

    • tryAcquire(int arg): 尝试以独占模式获取同步状态。如果成功,则返回true,否则返回false
    • tryRelease(int arg): 尝试释放以独占模式占有的同步状态。如果成功,则返回true,否则返回false
    • isHeldExclusively(): 如果当前线程以独占模式持有同步状态,则返回true,否则返回false
  • 共享模式相关方法:

    • tryAcquireShared(int arg): 尝试以共享模式获取同步状态。如果成功,则返回一个非负值,表示剩余可用资源;如果失败,则返回一个负值。
    • tryReleaseShared(int arg): 尝试释放以共享模式占有的同步状态。如果成功,则返回true,否则返回false

2. AQS的底层原理:同步队列和等待机制

AQS的核心在于维护一个FIFO同步队列,用于管理竞争资源的线程。当线程尝试获取同步状态失败时,会被封装成一个Node节点,并加入到等待队列中。当持有同步状态的线程释放资源时,会唤醒等待队列中的一个或多个线程,让它们重新尝试获取同步状态。

2.1 同步队列的结构

同步队列是一个双向链表,每个节点包含以下信息:

属性 描述
thread 等待资源的线程引用
mode 节点模式,分为Node.EXCLUSIVE (独占模式) 和 Node.SHARED (共享模式)
waitStatus 节点等待状态,包括SIGNAL (需要被唤醒)、CANCELLED (已取消) 等
prev 指向前驱节点
next 指向后继节点

2.2 线程入队流程

当线程尝试获取同步状态失败时,AQS会将该线程封装成一个Node节点,并加入到同步队列的尾部。入队过程大致如下:

  1. 创建Node节点: 根据线程的请求模式(独占或共享)创建一个Node节点,并将线程引用保存在节点中。

    Node node = new Node(Thread.currentThread(), mode);
  2. CAS尝试添加尾节点: 使用CAS操作将新节点添加到队列的尾部。如果队列为空,则需要初始化队列。

    Node pred = tail;
    if (pred == null) { // 队列为空,初始化队列
        if (compareAndSetHead(new Node()))
            tail = head;
        pred = tail;
    }
    if (compareAndSetTail(pred, node)) {
        pred.next = node;
        return node;
    }
  3. 自旋重试: 如果CAS操作失败,说明有其他线程也在尝试添加节点,此时需要自旋重试,直到添加成功。

2.3 线程出队和唤醒流程

当持有同步状态的线程释放资源时,AQS会唤醒等待队列中的一个或多个线程,让它们重新尝试获取同步状态。出队和唤醒过程大致如下:

  1. 释放资源: 持有同步状态的线程调用releasereleaseShared方法释放资源。

  2. 唤醒后继节点: AQS会检查头节点的后继节点的等待状态。如果后继节点处于SIGNAL状态(表示需要被唤醒),则会唤醒该节点对应的线程。

    if (nodeToUnpark != null)
        LockSupport.unpark(nodeToUnpark.thread);
  3. 传播唤醒: 对于共享模式,如果唤醒的线程成功获取了同步状态,并且还有剩余可用资源,则会继续唤醒后续节点,直到没有剩余资源或到达队列尾部。

2.4 等待状态的变化

Node节点的waitStatus属性表示节点的等待状态,它的变化直接影响着线程的唤醒和调度。常见的waitStatus值包括:

waitStatus 描述
0 初始状态,表示节点正在等待获取同步状态。
SIGNAL 表示当前节点的后继节点需要被唤醒。当一个节点的前驱节点释放资源时,会将后继节点的waitStatus设置为SIGNAL,表示需要唤醒后继节点。
CANCELLED 表示节点已经被取消。当线程中断或超时时,会将对应的节点的waitStatus设置为CANCELLED,表示该节点不再需要参与同步竞争。
CONDITION 表示节点正在等待某个条件成立。通常与Condition对象配合使用,当条件不满足时,线程会被放入Condition对象的等待队列中,并将节点的waitStatus设置为CONDITION。当条件满足时,Condition对象会将等待队列中的节点移动到AQS的同步队列中,等待获取同步状态。
PROPAGATE 仅用于共享模式,表示释放操作需要传播给其他节点。当一个共享模式的节点成功释放资源后,会将后继节点的waitStatus设置为PROPAGATE,表示需要继续传播释放操作,直到没有剩余资源或到达队列尾部。

3. 独占锁的实现:以ReentrantLock为例

ReentrantLock是Java中常用的可重入独占锁,它基于AQS实现。ReentrantLock内部维护了一个Sync抽象类,Sync有两个子类:NonfairSync (非公平锁) 和 FairSync (公平锁)。它们分别实现了不同的同步策略。

3.1 NonfairSync (非公平锁) 的实现

非公平锁的特点是,线程尝试获取锁时,会先尝试直接获取锁,如果获取失败,才会加入到等待队列中。这种策略可能会导致某些线程一直无法获取到锁,产生饥饿现象,但通常情况下性能更高。

  • tryAcquire(int arg):

    final boolean nonfairTryAcquire(int acquires) {
        final Thread current = Thread.currentThread();
        int c = getState();
        if (c == 0) { // 如果当前锁未被占用
            if (compareAndSetState(0, acquires)) { // 尝试CAS获取锁
                setExclusiveOwnerThread(current); // 设置独占线程
                return true;
            }
        }
        else if (current == getExclusiveOwnerThread()) { // 如果当前线程已经持有锁,则重入
            int nextc = c + acquires;
            if (nextc < 0) // overflow
                throw new Error("Maximum lock count exceeded");
            setState(nextc); // 增加重入次数
            return true;
        }
        return false;
    }
  • tryRelease(int arg):

    protected final boolean tryRelease(int releases) {
        int c = getState() - releases;
        if (Thread.currentThread() != getExclusiveOwnerThread())
            throw new IllegalMonitorStateException();
        boolean free = false;
        if (c == 0) { // 如果重入次数为0,则释放锁
            free = true;
            setExclusiveOwnerThread(null); // 清空独占线程
        }
        setState(c); // 更新同步状态
        return free;
    }

3.2 FairSync (公平锁) 的实现

公平锁的特点是,线程尝试获取锁时,会先检查等待队列中是否有其他线程正在等待。如果有,则会直接加入到等待队列中,按照FIFO的顺序获取锁。这种策略可以避免饥饿现象,但性能相对较低。

  • tryAcquire(int arg):

    protected final boolean tryAcquire(int acquires) {
        final Thread current = Thread.currentThread();
        int c = getState();
        if (c == 0) { // 如果当前锁未被占用
            if (!hasQueuedPredecessors() && // 检查队列中是否有前驱节点
                compareAndSetState(0, acquires)) { // 尝试CAS获取锁
                setExclusiveOwnerThread(current); // 设置独占线程
                return true;
            }
        }
        else if (current == getExclusiveOwnerThread()) { // 如果当前线程已经持有锁,则重入
            int nextc = c + acquires;
            if (nextc < 0)
                throw new Error("Maximum lock count exceeded");
            setState(nextc);
            return true;
        }
        return false;
    }
  • hasQueuedPredecessors():

    public final boolean hasQueuedPredecessors() {
        // The correctness of this depends on head being initialized
        // before tail and on head.next being accurate if the current
        // head is transient.
        Node h = head;
        Node s;
        return h != tail &&
            ((s = h.next) == null || s.thread != Thread.currentThread());
    }

4. 共享锁的实现:以CountDownLatch为例

CountDownLatch是Java中常用的共享锁,它允许一个或多个线程等待一组操作完成。CountDownLatch内部维护了一个Sync类,该类继承自AQS,并实现了共享模式的同步语义。

  • tryAcquireShared(int arg):

    protected int tryAcquireShared(int acquires) {
        return (getState() == 0) ? 1 : -1; // 如果state为0,则返回1,表示获取成功;否则返回-1,表示获取失败
    }
  • tryReleaseShared(int arg):

    protected boolean tryReleaseShared(int releases) {
        for (;;) {
            int c = getState();
            if (c == 0)
                return false;
            int nextc = c - 1;
            if (compareAndSetState(c, nextc)) // CAS更新state
                return nextc == 0; // 如果state变为0,则返回true,表示所有线程都已完成操作
        }
    }

5. 代码示例:使用ReentrantLock实现线程安全的计数器

import java.util.concurrent.locks.ReentrantLock;

public class Counter {
    private int count = 0;
    private final ReentrantLock lock = new ReentrantLock();

    public void increment() {
        lock.lock(); // 获取锁
        try {
            count++;
        } finally {
            lock.unlock(); // 释放锁
        }
    }

    public int getCount() {
        lock.lock();
        try {
            return count;
        } finally {
            lock.unlock();
        }
    }

    public static void main(String[] args) throws InterruptedException {
        Counter counter = new Counter();
        int numThreads = 10;
        Thread[] threads = new Thread[numThreads];

        for (int i = 0; i < numThreads; i++) {
            threads[i] = new Thread(() -> {
                for (int j = 0; j < 1000; j++) {
                    counter.increment();
                }
            });
            threads[i].start();
        }

        for (int i = 0; i < numThreads; i++) {
            threads[i].join();
        }

        System.out.println("Count: " + counter.getCount()); // 输出 Count: 10000
    }
}

6. 代码示例:使用CountDownLatch实现多线程并发执行

import java.util.concurrent.CountDownLatch;

public class CountDownLatchExample {
    public static void main(String[] args) throws InterruptedException {
        int numThreads = 5;
        CountDownLatch latch = new CountDownLatch(numThreads);

        for (int i = 0; i < numThreads; i++) {
            final int threadId = i;
            new Thread(() -> {
                try {
                    System.out.println("Thread " + threadId + " is running...");
                    Thread.sleep((long) (Math.random() * 2000)); // 模拟线程执行时间
                    System.out.println("Thread " + threadId + " finished.");
                } catch (InterruptedException e) {
                    e.printStackTrace();
                } finally {
                    latch.countDown(); // 计数器减一
                }
            }).start();
        }

        latch.await(); // 等待所有线程完成
        System.out.println("All threads finished.");
    }
}

7. AQS的优势与局限

AQS作为Java并发编程的基础框架,具有以下优势:

  • 灵活性: AQS提供了一套通用的同步机制,可以用于构建各种类型的锁和同步器。
  • 高性能: AQS基于CAS操作和FIFO队列,能够有效地管理并发线程,并提供较高的性能。
  • 可扩展性: AQS是一个抽象类,可以通过继承和扩展来实现自定义的同步语义。

AQS也存在一些局限:

  • 复杂性: AQS的实现较为复杂,需要深入理解其底层原理才能正确使用。
  • 适用性: AQS主要适用于构建阻塞式的锁和同步器,对于非阻塞式的同步机制,可能需要采用其他方案。

8. AQS的深入思考

AQS的设计体现了面向对象的设计原则,它将同步逻辑抽象成模板方法,子类通过实现这些方法来实现特定的同步语义。这种设计模式使得AQS具有良好的可扩展性和可维护性。

AQS的核心在于维护一个FIFO同步队列,用于管理竞争资源的线程。这种队列结构保证了线程按照先来后到的顺序获取资源,避免了饥饿现象。

AQS依赖于CAS操作来实现原子性操作,CAS操作是一种无锁的并发控制机制,它能够有效地提高并发程序的性能。

9. 锁的实现原理

我们讨论了AQS的底层实现,包括同步队列的结构,线程入队出队流程,以及等待状态的变化。

10. 独占锁和共享锁的示例

我们通过ReentrantLockCountDownLatch两个具体的例子,讲解了独占锁和共享锁的实现原理。

发表回复

您的邮箱地址不会被公开。 必填项已用 * 标注