Java中的公平锁实现:AQS如何通过队列管理确保线程的等待公平性

Java 公平锁实现:AQS 如何通过队列管理确保线程的等待公平性

大家好!今天我们来深入探讨 Java 中公平锁的实现机制,特别是 AQS(AbstractQueuedSynchronizer)如何通过队列管理来保证线程等待的公平性。公平锁是一种同步机制,它确保线程按照请求锁的顺序获得锁,避免了饥饿现象,让每个线程都有机会获得执行。

1. 公平锁的概念与必要性

在并发编程中,多个线程可能同时竞争共享资源。锁机制就是为了解决这种竞争,保证数据的一致性和完整性。公平锁和非公平锁是两种不同的锁获取策略。

  • 非公平锁: 线程尝试获取锁时,如果锁当前未被占用,则立即尝试获取,即使有其他线程正在等待。这可能导致某些线程长时间等待,也就是所谓的“饥饿”现象。 ReentrantLock 默认是非公平锁。
  • 公平锁: 线程只有在没有其他等待线程时才能获取锁,或者它是等待队列中的第一个线程。这保证了线程按照它们请求锁的顺序获得锁。

公平锁的必要性在于,某些应用场景对响应时间和资源分配有严格的要求。例如,在高并发的订单处理系统中,如果使用非公平锁,某些用户的请求可能长时间得不到处理,导致用户体验下降。使用公平锁可以保证每个用户的请求都有机会得到响应,避免优先级反转等问题。

2. AQS 简介:同步器的基石

AQS 是 Java 并发包 java.util.concurrent 中一个核心的抽象类,它为实现各种同步器(如锁、信号量、栅栏等)提供了一个通用的框架。AQS 基于模板方法模式,子类通过继承 AQS 并实现特定的方法来定制同步行为。

AQS 的核心思想是使用一个 int 类型的状态变量 state 来表示同步状态,并维护一个 FIFO(先进先出)的同步队列来管理等待线程。

AQS 提供以下核心方法:

方法名 描述
getState() 获取当前同步状态
setState(int newState) 设置当前同步状态
compareAndSetState(int expect, int update) 使用 CAS 操作原子性地更新同步状态, expect 是期望值, update 是更新值
acquire(int arg) 以独占模式获取同步状态,如果获取失败,则进入等待队列
release(int arg) 释放同步状态,唤醒等待队列中的后继节点
acquireShared(int arg) 以共享模式获取同步状态,如果获取失败,则进入等待队列
releaseShared(int arg) 释放同步状态,唤醒等待队列中的后继节点

AQS 的子类需要实现以下方法来定义同步行为:

方法名 描述
tryAcquire(int arg) 尝试以独占模式获取同步状态,成功返回 true,失败返回 false
tryRelease(int arg) 尝试释放同步状态,成功返回 true,失败返回 false
tryAcquireShared(int arg) 尝试以共享模式获取同步状态,成功返回非负数,失败返回负数。
tryReleaseShared(int arg) 尝试释放同步状态,成功返回 true,失败返回 false
isHeldExclusively() 当前同步器是否在独占模式下被线程占用。

3. AQS 如何实现公平锁

AQS 通过维护一个 FIFO 的同步队列来保证公平性。当一个线程尝试获取锁但获取失败时,它会被放入队列的尾部,并进入等待状态。当锁被释放时,队列中的第一个线程会被唤醒并尝试获取锁。

以下是 AQS 实现公平锁的基本步骤:

  1. 线程尝试获取锁: 线程调用 acquire(int arg) 方法尝试获取锁。
  2. 判断是否可以获取锁:acquire(int arg) 方法中,会调用子类实现的 tryAcquire(int arg) 方法来尝试获取锁。
  3. tryAcquire(int arg) 的实现: 对于公平锁,tryAcquire(int arg) 方法会检查同步队列中是否有等待线程。如果有,则直接返回 false,表示获取锁失败。如果没有,则尝试使用 CAS 操作更新 state 变量来获取锁。
  4. 进入等待队列: 如果 tryAcquire(int arg) 方法返回 false,表示获取锁失败,线程会被放入同步队列的尾部,并进入等待状态。
  5. 锁释放: 当持有锁的线程释放锁时,它会调用 release(int arg) 方法。
  6. release(int arg) 的实现:release(int arg) 方法中,会调用子类实现的 tryRelease(int arg) 方法来尝试释放锁。
  7. 唤醒后继节点: 如果 tryRelease(int arg) 方法返回 true,表示成功释放锁,release(int arg) 方法会唤醒同步队列中的第一个等待线程。
  8. 线程被唤醒: 被唤醒的线程会再次尝试获取锁。由于它是队列中的第一个线程,所以可以成功获取锁。

下面是一个使用 AQS 实现公平锁的示例代码:

import java.util.concurrent.locks.AbstractQueuedSynchronizer;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.Condition;

public class FairLock implements Lock {

    private final Sync sync = new Sync(true); // true 表示公平锁

    private static class Sync extends AbstractQueuedSynchronizer {

        private static final long serialVersionUID = -1L;

        Sync(boolean fair) {
            super();
        }

        @Override
        protected final boolean tryAcquire(int acquires) {
            final Thread current = Thread.currentThread();
            int c = getState();
            if (c == 0) {
                // 只有当队列为空或者当前线程是队列的第一个节点时,才能尝试获取锁
                if (!hasQueuedPredecessors() &&
                    compareAndSetState(0, acquires)) {
                    setExclusiveOwnerThread(current);
                    return true;
                }
            } else if (current == getExclusiveOwnerThread()) {
                int nextc = c + acquires;
                if (nextc < 0) // overflow
                    throw new Error("Maximum lock count exceeded");
                setState(nextc);
                return true;
            }
            return false;
        }

        @Override
        protected final boolean tryRelease(int releases) {
            int c = getState() - releases;
            if (Thread.currentThread() != getExclusiveOwnerThread())
                throw new IllegalMonitorStateException();
            boolean free = false;
            if (c == 0) {
                free = true;
                setExclusiveOwnerThread(null);
            }
            setState(c);
            return free;
        }

        final ConditionObject newCondition() {
            return new ConditionObject();
        }

        @Override
        protected final boolean isHeldExclusively() {
            return getExclusiveOwnerThread() == Thread.currentThread();
        }
    }

    @Override
    public void lock() {
        sync.acquire(1);
    }

    @Override
    public void lockInterruptibly() throws InterruptedException {
        sync.acquireInterruptibly(1);
    }

    @Override
    public boolean tryLock() {
        return sync.tryAcquire(1);
    }

    @Override
    public boolean tryLock(long time, java.util.concurrent.TimeUnit unit)
        throws InterruptedException {
        return sync.tryAcquireNanos(1, unit.toNanos(time));
    }

    @Override
    public void unlock() {
        sync.release(1);
    }

    @Override
    public Condition newCondition() {
        return sync.newCondition();
    }

    public boolean isFair() {
        return true;
    }

    public boolean isLocked() {
        return sync.isHeldExclusively();
    }
}

在这个例子中,FairLock 类实现了 Lock 接口,并使用 AQS 的子类 Sync 来实现公平锁的逻辑。

  • Sync 构造函数接受一个 boolean 类型的参数 fair,用于指定是否是公平锁。在这里,我们传入 true,表示创建一个公平锁。
  • tryAcquire(int acquires) 方法首先检查同步队列中是否有等待线程(通过 hasQueuedPredecessors() 方法)。如果有,则直接返回 false,表示获取锁失败。如果没有,则尝试使用 CAS 操作更新 state 变量来获取锁。
  • tryRelease(int releases) 方法用于释放锁。
  • isHeldExclusively() 方法用于判断当前线程是否持有锁。

hasQueuedPredecessors() 方法是 AQS 中一个非常重要的方法,它用于判断当前线程前面是否有等待线程。对于公平锁,只有当没有等待线程时,线程才能尝试获取锁。hasQueuedPredecessors() 方法的具体实现比较复杂,它需要遍历同步队列,判断是否有节点在当前线程的前面。

4. 公平锁的性能考量

虽然公平锁可以保证线程的公平性,但它也带来了一定的性能开销。因为公平锁需要维护一个 FIFO 的同步队列,并且在每次获取锁之前都要检查队列中是否有等待线程。这增加了锁的竞争开销,降低了吞吐量。

在实际应用中,需要根据具体的场景来选择使用公平锁还是非公平锁。如果对响应时间和资源分配有严格的要求,可以使用公平锁。如果对吞吐量有更高的要求,可以使用非公平锁。

以下是公平锁和非公平锁的性能对比:

特性 公平锁 非公平锁
公平性 保证线程按照请求锁的顺序获得锁 不保证线程按照请求锁的顺序获得锁
性能 性能较低,因为需要维护一个 FIFO 的同步队列,并且在每次获取锁之前都要检查队列中是否有等待线程 性能较高,因为线程可以立即尝试获取锁,不需要检查队列
适用场景 对响应时间和资源分配有严格要求的场景 对吞吐量有更高要求的场景

5. AQS队列管理的细节

AQS的队列管理是其实现公平锁(以及其他同步机制)的关键。让我们更深入地了解AQS如何管理这个队列,包括节点(Node)的结构、入队和出队操作,以及如何处理取消(cancel)的情况。

5.1 节点(Node)结构

AQS队列是一个CLH(Craig, Landin, and Hagersten)变体的双向链表。队列中的每个元素都是一个Node对象。Node对象包含以下关键字段:

  • waitStatus: 表示当前节点的状态。它可以是以下值之一:
    • SIGNAL: 表示当前节点的后继节点处于等待状态,当前节点释放锁时需要唤醒后继节点。
    • CANCELLED: 表示当前节点由于超时或中断而被取消。
    • CONDITION: 表示当前节点正在等待某个条件。
    • PROPAGATE: 表示当前节点释放锁后,需要将唤醒操作传播给后继节点(用于共享模式)。
    • 0: 初始状态,表示节点正在等待获取锁。
  • thread: 与当前节点关联的线程。
  • prev: 指向前驱节点的指针。
  • next: 指向后继节点的指针。
  • nextWaiter: 用于CONDITION队列,这里不深入讨论。

5.2 入队操作

当线程尝试获取锁失败时,AQS会将该线程封装成一个Node对象,并将其添加到同步队列的尾部。入队操作通过enq(Node node)方法完成。

private Node enq(final Node node) {
    for (;;) {
        Node t = tail;
        if (t == null) { // Must initialize
            if (compareAndSetHead(null, new Node()))
                tail = head;
        } else {
            node.prev = t;
            if (compareAndSetTail(t, node)) {
                t.next = node;
                return t;
            }
        }
    }
}

这个方法是一个自旋操作,它会不断尝试将节点添加到队列的尾部,直到成功为止。

  1. 队列初始化: 如果队列为空(tail == null),则先创建一个空的头节点,并设置headtail都指向它。
  2. 添加到尾部: 如果队列不为空,则将新节点的prev指针指向当前的尾节点t。然后,使用CAS操作尝试将新节点设置为尾节点。如果CAS操作成功,则将原尾节点的next指针指向新节点,完成入队操作。如果CAS操作失败,则说明有其他线程正在修改队列,需要重新尝试。

5.3 出队操作

当持有锁的线程释放锁时,AQS会唤醒队列中的第一个等待线程。被唤醒的线程会尝试获取锁,如果获取成功,则将自己设置为新的头节点,并从队列中移除。出队操作涉及到更新head节点。

当一个节点成为新的head节点之后, 它的thread域会被设置为null, 这是为了帮助GC回收不再需要的Thread对象。

private void setHead(Node node) {
    head = node;
    node.thread = null;
    node.prev = null; // Help GC
}

5.4 取消(Cancel)操作

如果一个线程在等待获取锁的过程中被中断或超时,AQS会将该线程对应的节点设置为取消状态(waitStatus = CANCELLED)。取消状态的节点不会再被唤醒,并且会被从队列中移除。

AQS通过cancelAcquire(Node node)方法来处理取消操作。

private void cancelAcquire(Node node) {
    // Ignore if node doesn't exist
    if (node == null)
        return;

    node.thread = null;

    // Skip cancelled predecessors
    Node pred = node.prev;
    while (pred.waitStatus > 0)
        node.prev = pred = pred.prev;

    Node predNext = pred.next;

    // Can use unconditional write of next field of pred.
    // After this atomic step, other Nodes can skip past us.
    // Before, we are free to do other stuff.
    node.waitStatus = Node.CANCELLED;

    // If we are the tail, remove ourselves.
    if (node == tail && compareAndSetTail(node, pred)) {
        compareAndSetNext(pred, predNext, null);
    } else {
        // If successor needs signal, wake up to fix pointers.
        int ws;
        if (pred != head &&
            ((ws = pred.waitStatus) == Node.SIGNAL ||
             (ws <= 0 && compareAndSetWaitStatus(pred, ws, Node.SIGNAL))) &&
            pred.thread != null) {
            Node next = node.next;
            if (next != null && next.waitStatus <= 0)
                compareAndSetNext(pred, predNext, next);
        } else {
            unparkSuccessor(node);
        }

        node.next = node; // help GC
    }
}
  1. 设置节点状态: 将节点的状态设置为CANCELLED,并将其关联的线程设置为null,以便GC回收。
  2. 跳过取消的前驱节点: 从当前节点开始,向前遍历队列,跳过所有状态为CANCELLED的前驱节点,找到最近一个状态不是CANCELLED的前驱节点。
  3. 移除节点:
    • 如果当前节点是尾节点,则尝试将尾节点设置为前驱节点。
    • 如果当前节点不是尾节点,则需要调整前驱节点和后继节点的指针,将当前节点从队列中移除。

5.5 公平锁的实现关键在于 hasQueuedPredecessors()

hasQueuedPredecessors()是AQS中一个关键方法,用于判断当前线程是否应该获取锁。对于公平锁,只有当队列为空或者当前线程是队列的第一个节点时,才能尝试获取锁。

public final boolean hasQueuedPredecessors() {
    // head 指向的是一个 dummy 节点,所以需要判断是否 head 的下一个节点不为空
    Node h = head;
    Node t = tail;
    Node s;
    return h != t && // 队列不为空
        ((s = h.next) == null || s.thread != Thread.currentThread()); // 存在前驱节点
}

6. 使用 ReentrantLock 实现公平锁

Java 并发包中提供了 ReentrantLock 类,它也支持公平锁和非公平锁。可以通过在构造函数中传入 true 来创建一个公平锁。

import java.util.concurrent.locks.ReentrantLock;

public class FairLockExample {

    private static final ReentrantLock lock = new ReentrantLock(true); // 创建公平锁

    public static void main(String[] args) {
        for (int i = 0; i < 5; i++) {
            new Thread(() -> {
                lock.lock();
                try {
                    System.out.println(Thread.currentThread().getName() + " 获取了锁");
                    Thread.sleep(100); // 模拟持有锁的时间
                } catch (InterruptedException e) {
                    e.printStackTrace();
                } finally {
                    lock.unlock();
                    System.out.println(Thread.currentThread().getName() + " 释放了锁");
                }
            }, "Thread-" + i).start();
        }
    }
}

在这个例子中,我们创建了一个公平锁 lock。当多个线程同时竞争锁时,它们会按照请求锁的顺序获得锁。

7. 公平锁的应用场景

公平锁在以下场景中比较适用:

  • 避免饥饿: 当某些线程的优先级较低,或者总是无法获得锁时,可以使用公平锁来保证它们有机会获得执行。
  • 保证服务质量(QoS): 在需要保证每个请求都有机会得到响应的系统中,可以使用公平锁来避免某些请求长时间等待。
  • 资源分配公平性: 在需要公平分配资源的系统中,可以使用公平锁来保证每个线程都有机会获得资源。

8. 使用公平锁的注意事项

  • 公平锁会降低吞吐量,因为它需要维护一个 FIFO 的同步队列,并且在每次获取锁之前都要检查队列中是否有等待线程。
  • 在某些情况下,非公平锁可能更适合,例如当对响应时间的要求不高,但对吞吐量有更高的要求时。
  • 需要根据具体的应用场景来选择使用公平锁还是非公平锁。

9. AQS 的公平队列管理,重要且易错

AQS通过维护一个先进先出(FIFO)的同步队列来确保线程的等待公平性。队列中的每个节点都对应一个等待获取锁的线程。当一个线程请求锁但未能立即获得时,它会被添加到队列的尾部,并进入阻塞状态。当锁被释放时,队列头部的线程会被唤醒并尝试获取锁。AQS的公平性主要体现在tryAcquire方法中,它会检查当前线程是否是队列中的第一个等待线程,只有在这种情况下,线程才能尝试获取锁。

AQS的队列管理是实现公平锁的关键,但也带来了一定的性能开销。因此,在实际应用中,需要根据具体的场景来选择使用公平锁还是非公平锁。理解AQS的队列管理机制对于深入理解Java并发编程至关重要。

发表回复

您的邮箱地址不会被公开。 必填项已用 * 标注