Condition对象源码解析:实现比wait/notify更精细的线程等待与唤醒

Condition 对象源码解析:实现比 wait/notify 更精细的线程等待与唤醒

大家好,今天我们来深入探讨 Java 并发编程中一个非常重要的工具:Condition 对象。我们都知道 Object 类提供了 wait()notify()notifyAll() 方法来实现线程间的同步和通信。然而,Condition 对象在某些场景下能够提供更精细的控制,实现更灵活的线程等待和唤醒机制。本次讲座将从以下几个方面展开:

  1. wait/notify 的局限性
  2. Condition 接口概览
  3. AbstractQueuedSynchronizer (AQS) 基础
  4. ConditionObject 源码剖析
  5. Condition 的使用场景和最佳实践
  6. 对比与总结

1. wait/notify 的局限性

wait()notify() 方法是 Java 中最基础的线程同步机制。它们允许线程在特定条件不满足时进入等待状态,并在其他线程满足条件时被唤醒。然而,这种机制存在一些局限性:

  • 无差别唤醒: notify() 方法会随机唤醒一个等待的线程,而 notifyAll() 会唤醒所有等待的线程。如果多个线程等待不同的条件,这种无差别唤醒会导致不必要的线程竞争和上下文切换,降低性能。
  • 条件判断的缺失: 当线程被唤醒时,它需要重新检查条件是否满足。这是因为 notify() 仅仅是通知线程条件可能已经改变,但并不能保证条件一定满足。
  • 难以实现优先级调度: wait/notify 无法对等待线程进行优先级排序,所有线程都处于同等地位。

为了克服这些局限性,Java 提供了 Condition 接口,它与 Lock 接口配合使用,可以实现更细粒度的线程同步和通信。

2. Condition 接口概览

Condition 接口是 Java 并发包 java.util.concurrent.locks 中的一个接口,它提供了一种比 wait/notify 更灵活的线程等待和唤醒机制。Condition 接口的主要方法包括:

方法 描述
await() 使当前线程进入等待状态,直到被 signal()signalAll() 唤醒,或者被中断。与 Object.wait() 类似,但必须在关联的 Lock 的保护范围内调用。
await(long time, TimeUnit unit) 使当前线程进入等待状态,直到被 signal()signalAll() 唤醒,或者被中断,或者超时。与 Object.wait(long timeout) 类似。
awaitNanos(long nanosTimeout) 使当前线程进入等待状态,直到被 signal()signalAll() 唤醒,或者被中断,或者超时。超时时间以纳秒为单位。
awaitUninterruptibly() 使当前线程进入等待状态,直到被 signal()signalAll() 唤醒。与 await() 不同,此方法不会响应中断。
awaitUntil(Date deadline) 使当前线程进入等待状态,直到被 signal()signalAll() 唤醒,或者被中断,或者到达指定的时间。
signal() 唤醒一个等待在 Condition 上的线程。与 Object.notify() 类似,但必须在关联的 Lock 的保护范围内调用。
signalAll() 唤醒所有等待在 Condition 上的线程。与 Object.notifyAll() 类似,但必须在关联的 Lock 的保护范围内调用。

wait/notify 相比,Condition 的优势在于:

  • Lock 接口配合使用: Condition 对象必须从 Lock 对象中获得,这保证了线程在等待和唤醒时对共享资源的互斥访问。
  • 可以创建多个 Condition 对象: 一个 Lock 对象可以关联多个 Condition 对象,每个 Condition 对象可以对应不同的等待条件,从而实现更精细的线程控制。

3. AbstractQueuedSynchronizer (AQS) 基础

在深入了解 ConditionObject 的源码之前,我们需要先了解 AbstractQueuedSynchronizer (AQS)AQS 是 Java 并发包中的一个核心类,它提供了一个通用的框架,用于构建锁和相关的同步器。ReentrantLockReentrantReadWriteLock 等常用的锁都是基于 AQS 实现的。

AQS 的核心思想是使用一个 volatile int state 变量来表示同步状态,并使用一个 FIFO 队列来管理等待线程。AQS 定义了一系列方法,用于获取和释放同步状态,以及管理等待队列。这些方法通常由子类来实现,以适应不同的同步需求。

ConditionObjectAQS 的一个内部类,它实现了 Condition 接口。ConditionObject 利用 AQS 的同步队列来实现线程的等待和唤醒。

4. ConditionObject 源码剖析

ConditionObjectAQS 的一个内部类,它实现了 Condition 接口。下面我们来分析 ConditionObject 的主要源码:

4.1 ConditionObject 的结构

public class ConditionObject implements Condition, java.io.Serializable {
    /** First node of condition queue. */
    private transient Node firstWaiter;
    /** Last node of condition queue. */
    private transient Node lastWaiter;

    // ...
}

ConditionObject 维护了一个等待队列,用于存储等待在 Condition 上的线程。firstWaiterlastWaiter 分别指向队列的头节点和尾节点。这个队列与AQS的同步队列不同。

4.2 await() 方法

await() 方法用于使当前线程进入等待状态。其主要步骤如下:

  1. 释放锁: 首先,await() 方法会原子地释放与 Condition 关联的 Lock。这是为了让其他线程有机会获取锁,并改变等待条件。
  2. 将当前线程添加到等待队列: 创建一个新的 Node 对象,并将当前线程包装在其中,然后将该节点添加到 Condition 的等待队列中。
  3. 阻塞线程: 使用 LockSupport.park() 方法阻塞当前线程,直到被 signal()signalAll() 唤醒,或者被中断。
  4. 重新获取锁: 当线程被唤醒时,它需要重新获取与 Condition 关联的 Lock
  5. 处理中断: 如果线程在等待期间被中断,await() 方法会抛出 InterruptedException 异常。
public final void await() throws InterruptedException {
    if (Thread.interrupted())
        throw new InterruptedException();
    Node node = addConditionWaiter(); // 将当前线程添加到等待队列
    int savedState = fullyRelease(node); // 释放锁
    int interruptMode = 0;
    while (!isOnSyncQueue(node)) { // 检查是否在同步队列
        LockSupport.park(this); // 阻塞线程
        if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
            break;
    }
    if (acquireQueued(node, savedState) && interruptMode != THROW)
        interruptMode = REINTERRUPT;
    if (node.nextWaiter != null) // clean up if cancelled
        unlinkCancelledWaiters();
    if (interruptMode != 0)
        reportInterruptAfterWait(interruptMode);
}

4.3 signal() 方法

signal() 方法用于唤醒一个等待在 Condition 上的线程。其主要步骤如下:

  1. 从等待队列中移除头节点:Condition 的等待队列中移除头节点,该节点对应于等待时间最长的线程。
  2. 将节点添加到同步队列: 将移除的节点添加到 AQS 的同步队列中,使其有机会重新获取锁。
  3. 唤醒线程: 使用 LockSupport.unpark() 方法唤醒与该节点关联的线程。
public final void signal() {
    if (!isHeldExclusively())
        throw new IllegalMonitorStateException();
    Node first = firstWaiter;
    if (first != null)
        doSignal(first);
}

private void doSignal(Node first) {
    do {
        if (firstWaiter == first)
            firstWaiter = first.nextWaiter;
        first.nextWaiter = null;
        if (!transferForSignal(first)) // 将节点转移到同步队列
            signalNext(first);
    } while (firstWaiter != null && firstWaiter.status > 0);
}

final boolean transferForSignal(Node node) {
    /*
     * If cannot change waiter status, the waiter is cancelled.
     * Propagate to unparkSuccessor so that getExclusiveQueuedFor(node)
     * returns true.
     */
    if (!compareAndSetWaitStatus(node, Node.CONDITION, 0))
        return false;

    /*
     * Splice onto queue and try to set waitStatus of predecessor to
     * indicate that thread is (probably) waiting. If even that fails
     * return to signalNext only propagating the signal that it is safe
     * to retry.
     */
    Node p = enq(node); // 将节点添加到同步队列
    int ws = p.waitStatus;
    if (ws > 0 || !compareAndSetWaitStatus(p, ws, Node.SIGNAL))
        LockSupport.unpark(node.thread); // 唤醒线程
    return true;
}

4.4 signalAll() 方法

signalAll() 方法用于唤醒所有等待在 Condition 上的线程。其主要步骤如下:

  1. 遍历等待队列: 遍历 Condition 的等待队列,将所有节点依次移除并添加到 AQS 的同步队列中。
  2. 唤醒所有线程: 使用 LockSupport.unpark() 方法唤醒与所有节点关联的线程。
public final void signalAll() {
    if (!isHeldExclusively())
        throw new IllegalMonitorStateException();
    Node first = firstWaiter;
    if (first != null)
        doSignalAll(first);
}

private void doSignalAll(Node first) {
    lastWaiter = firstWaiter = null;
    do {
        Node next = first.nextWaiter;
        first.nextWaiter = null;
        if (!transferForSignal(first))
            signalNext(first);
        first = next;
    } while (first != null);
}

4.5 关键代码逻辑解释

  • addConditionWaiter(): 这个方法负责创建一个新的Node(类型为Node.CONDITION),并将当前线程包装进去,然后添加到Condition的等待队列的末尾。如果等待队列为空,则新节点同时是头节点和尾节点。如果队列不为空,则将新节点链接到当前的尾节点之后。
  • fullyRelease(Node node): 这个方法负责完全释放锁,并返回锁释放之前的state值。Conditionawait操作需要先释放锁,才能让其他线程有机会获取锁,从而改变等待的条件。
  • isOnSyncQueue(Node node): 这个方法检查给定的节点是否已经存在于AQS的同步队列中。当一个线程被signal后,它对应的节点会被转移到同步队列中,等待重新获取锁。
  • acquireQueued(Node node, int savedState): 这个方法负责让节点在同步队列中排队,并尝试获取锁。如果获取成功,则返回。如果在等待过程中被中断,则会设置中断标志。
  • transferForSignal(Node node): 这个方法是signal操作的核心。它尝试将等待队列中的节点转移到同步队列中。首先,它会尝试将节点的waitStatusCONDITION修改为0。如果修改失败,说明节点已经被取消了。如果修改成功,则将节点加入到同步队列中。然后,检查前驱节点的状态,如果前驱节点的状态大于0或者修改前驱节点的状态为SIGNAL失败,则直接unpark该节点对应的线程。
  • unlinkCancelledWaiters(): 这个方法用于清理等待队列中被取消的节点。

4.6 ConditionObject的等待队列和AQS的同步队列

ConditionObject维护一个等待队列(也称为条件队列),而AQS维护一个同步队列(也称为CLH队列)。这两个队列是不同的,用于不同的目的。

特性 ConditionObject 的等待队列 AQS 的同步队列
目的 存储等待特定条件的线程。线程在这个队列中等待,直到条件满足。 存储等待获取锁的线程。线程在这个队列中按照FIFO的顺序等待获取锁。
节点类型 节点类型为Node.CONDITION 节点类型为Node.EXCLUSIVE(独占模式)或Node.SHARED(共享模式)。
状态 节点的状态通常为Node.CONDITION,表示线程正在等待。 节点的状态可以是SIGNAL(表示后继节点需要被唤醒)、CANCELLED(表示节点被取消)、0(初始状态)、PROPAGATE(表示需要传播唤醒信号)等。
队列属性 是一个简单的链表,只包含firstWaiterlastWaiter指针。 是一个CLH队列,每个节点都有指向前驱节点和后继节点的指针。
线程转移 当条件满足时,线程会从等待队列转移到同步队列。这个过程通过transferForSignal方法实现。 线程在同步队列中等待获取锁,一旦锁被释放,队列中的线程会按照FIFO的顺序尝试获取锁。
唤醒机制 通过signalsignalAll方法唤醒等待队列中的线程,然后将它们转移到同步队列。 通过unparkSuccessor方法唤醒同步队列中的后继节点对应的线程。
应用场景 用于实现更精细的线程等待和唤醒机制,允许线程在不同的条件下等待。例如,生产者-消费者模型中,生产者等待队列用于存储等待生产的线程,消费者等待队列用于存储等待消费的线程。 用于实现锁的公平性和同步,保证线程按照一定的顺序获取锁,避免饥饿现象。

总结来说,ConditionObject的等待队列用于管理等待特定条件的线程,而AQS的同步队列用于管理等待获取锁的线程。Condition机制允许线程在等待队列和同步队列之间转移,从而实现更灵活的线程同步和通信。

5. Condition 的使用场景和最佳实践

Condition 对象在以下场景中特别有用:

  • 生产者-消费者问题: 使用多个 Condition 对象可以分别管理等待生产的线程和等待消费的线程,避免不必要的线程竞争。
  • 有界缓冲区: 当缓冲区满时,生产者线程可以等待;当缓冲区空时,消费者线程可以等待。
  • 多条件同步: 当线程需要等待多个条件满足时,可以使用多个 Condition 对象分别管理这些条件。

最佳实践:

  • 始终在 Lock 的保护范围内使用 Condition 对象。
  • await() 方法返回后,始终重新检查等待条件。
  • 使用 signalAll() 方法可以避免死锁,但可能会降低性能。
  • 避免长时间持有锁,以免影响其他线程的执行。

下面是一个使用 Condition 对象实现生产者-消费者问题的示例:

import java.util.LinkedList;
import java.util.Queue;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;

public class ProducerConsumer {

    private final Lock lock = new ReentrantLock();
    private final Condition notFull = lock.newCondition();
    private final Condition notEmpty = lock.newCondition();

    private final Queue<Integer> queue = new LinkedList<>();
    private final int capacity;

    public ProducerConsumer(int capacity) {
        this.capacity = capacity;
    }

    public void produce(int item) throws InterruptedException {
        lock.lock();
        try {
            while (queue.size() == capacity) {
                notFull.await(); // 缓冲区已满,等待消费者消费
            }
            queue.offer(item);
            System.out.println("Produced: " + item);
            notEmpty.signal(); // 通知消费者可以消费了
        } finally {
            lock.unlock();
        }
    }

    public int consume() throws InterruptedException {
        lock.lock();
        try {
            while (queue.isEmpty()) {
                notEmpty.await(); // 缓冲区为空,等待生产者生产
            }
            int item = queue.poll();
            System.out.println("Consumed: " + item);
            notFull.signal(); // 通知生产者可以生产了
            return item;
        } finally {
            lock.unlock();
        }
    }

    public static void main(String[] args) throws InterruptedException {
        ProducerConsumer pc = new ProducerConsumer(5);

        Thread producer = new Thread(() -> {
            try {
                for (int i = 0; i < 10; i++) {
                    pc.produce(i);
                    Thread.sleep((long) (Math.random() * 100));
                }
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
            }
        });

        Thread consumer = new Thread(() -> {
            try {
                for (int i = 0; i < 10; i++) {
                    pc.consume();
                    Thread.sleep((long) (Math.random() * 100));
                }
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
            }
        });

        producer.start();
        consumer.start();

        producer.join();
        consumer.join();
    }
}

6. 总结

Condition 对象是 Java 并发编程中一个强大的工具,它提供了一种比 wait/notify 更灵活的线程等待和唤醒机制。Condition 对象与 Lock 接口配合使用,可以实现更细粒度的线程同步和通信。理解 Condition 对象的源码和使用场景,可以帮助我们编写更高效、更可靠的并发程序。

通过对源码的分析,我们知道了:

  • ConditionObject通过维护一个等待队列来实现线程的等待和唤醒。
  • Condition 必须配合 Lock 使用,实现互斥。
  • signal()signalAll() 方法用于唤醒等待的线程,await() 方法用于使线程进入等待状态。

发表回复

您的邮箱地址不会被公开。 必填项已用 * 标注