探索Java中的条件变量(Condition):比Object.wait/notify更精细的线程协作

好的,我们开始。

Java中的条件变量(Condition):比Object.wait/notify更精细的线程协作

各位朋友,大家好。今天我们来深入探讨一个在并发编程中至关重要的概念:条件变量(Condition)。在Java中,Condition接口提供了比传统的Object.wait()/Object.notify()/Object.notifyAll()更精细、更灵活的线程协作机制。我们将从Object.wait/notify的局限性出发,逐步过渡到Condition的优势,并通过代码示例详细讲解其用法。

1. Object.wait/notify的局限性

在Java并发编程的早期,Object.wait()Object.notify()Object.notifyAll()是实现线程间协作的主要手段。它们基于内置锁(intrinsic lock)或称监视器锁(monitor lock)工作。简单来说,线程在获取了对象的锁之后,可以调用wait()方法释放锁并进入等待状态;其他线程在获取锁后,可以通过notify()notifyAll()方法唤醒等待中的线程。

然而,这种机制存在一些局限性:

  • 假唤醒(Spurious Wakeups): 即使没有线程调用notify()notifyAll(),线程也可能被唤醒。这是因为JVM规范允许假唤醒的发生。因此,使用wait()时,必须在一个循环中检查条件是否满足,以防止处理错误的唤醒。

  • 单等待队列: 所有的等待线程都放在同一个等待队列中。当调用notify()时,只有一个线程会被随机唤醒,而调用notifyAll()则会唤醒所有线程。这在某些场景下可能导致不必要的上下文切换和性能损失,特别是当只有一部分线程满足唤醒条件时。

  • 条件不明确: wait()notify()方法本身并不携带任何关于等待条件的语义信息。程序员需要在代码中显式地维护和检查等待条件。这增加了代码的复杂性和出错的可能性。

为了更清晰地说明问题,我们来看一个经典的生产者-消费者模型的例子,用Object.wait/notify实现:

import java.util.LinkedList;
import java.util.Queue;

public class ProducerConsumer {

    private final Queue<Integer> buffer = new LinkedList<>();
    private final int maxSize;
    private final Object lock = new Object();

    public ProducerConsumer(int maxSize) {
        this.maxSize = maxSize;
    }

    public void produce(int value) throws InterruptedException {
        synchronized (lock) {
            while (buffer.size() == maxSize) {
                System.out.println("Buffer is full, producer waiting...");
                lock.wait(); // 等待消费者消费
            }

            buffer.offer(value);
            System.out.println("Produced: " + value);
            lock.notifyAll(); // 唤醒所有等待的线程(包括消费者和生产者)
        }
    }

    public int consume() throws InterruptedException {
        synchronized (lock) {
            while (buffer.isEmpty()) {
                System.out.println("Buffer is empty, consumer waiting...");
                lock.wait(); // 等待生产者生产
            }

            int value = buffer.poll();
            System.out.println("Consumed: " + value);
            lock.notifyAll(); // 唤醒所有等待的线程(包括消费者和生产者)
            return value;
        }
    }

    public static void main(String[] args) throws InterruptedException {
        ProducerConsumer pc = new ProducerConsumer(5);

        Thread producer = new Thread(() -> {
            try {
                for (int i = 0; i < 10; i++) {
                    pc.produce(i);
                    Thread.sleep((long) (Math.random() * 100));
                }
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
            }
        });

        Thread consumer = new Thread(() -> {
            try {
                for (int i = 0; i < 10; i++) {
                    pc.consume();
                    Thread.sleep((long) (Math.random() * 100));
                }
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
            }
        });

        producer.start();
        consumer.start();

        producer.join();
        consumer.join();
    }
}

在这个例子中,producerconsumer线程都在同一个锁对象lock上等待。当缓冲区满时,producer等待;当缓冲区空时,consumer等待。notifyAll()唤醒所有线程,这可能导致不必要的线程竞争和上下文切换。

2. Condition接口的引入

java.util.concurrent.locks.Condition接口是Java并发包(java.util.concurrent)提供的一个更高级的线程协作机制。它与Lock接口紧密配合,提供了比Object.wait/notify更灵活、更精细的控制。

Condition接口的主要方法包括:

方法名 描述
await() 类似于Object.wait()。释放锁,使当前线程进入等待状态,直到被signal或中断。
await(long time, TimeUnit unit) 类似于Object.wait(long timeout)。释放锁,使当前线程进入等待状态,直到被signal、中断或超时。
awaitNanos(long nanosTimeout) 类似于await(long time, TimeUnit unit),但使用纳秒作为超时单位。
awaitUninterruptibly() 类似于await(),但不会响应中断。
awaitUntil(Date deadline) 释放锁,使当前线程进入等待状态,直到被signal、中断或到达指定的截止时间。
signal() 类似于Object.notify()。唤醒一个等待在当前Condition上的线程。
signalAll() 类似于Object.notifyAll()。唤醒所有等待在当前Condition上的线程。

3. Condition的优势

  • 多个等待队列: 一个Lock对象可以关联多个Condition对象,每个Condition对象都有自己的等待队列。这允许我们将等待线程根据不同的条件进行分组,从而实现更精细的控制。

  • 明确的等待条件: 通过使用不同的Condition对象,我们可以更清晰地表达线程等待的条件。例如,在一个生产者-消费者模型中,可以使用一个Condition对象表示缓冲区为空的条件,另一个Condition对象表示缓冲区已满的条件。

  • 避免不必要的唤醒: 使用Condition时,我们可以只唤醒满足特定条件的线程,避免了notifyAll()可能导致的不必要的线程竞争和上下文切换。

4. 使用Condition实现生产者-消费者模型

下面是使用Condition重新实现的生产者-消费者模型:

import java.util.LinkedList;
import java.util.Queue;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;

public class ProducerConsumerWithCondition {

    private final Queue<Integer> buffer = new LinkedList<>();
    private final int maxSize;
    private final Lock lock = new ReentrantLock();
    private final Condition notFull = lock.newCondition();  // 缓冲区未满的条件
    private final Condition notEmpty = lock.newCondition(); // 缓冲区非空的条件

    public ProducerConsumerWithCondition(int maxSize) {
        this.maxSize = maxSize;
    }

    public void produce(int value) throws InterruptedException {
        lock.lock();
        try {
            while (buffer.size() == maxSize) {
                System.out.println("Buffer is full, producer waiting...");
                notFull.await(); // 等待缓冲区未满
            }

            buffer.offer(value);
            System.out.println("Produced: " + value);
            notEmpty.signal(); // 唤醒消费者
        } finally {
            lock.unlock();
        }
    }

    public int consume() throws InterruptedException {
        lock.lock();
        try {
            while (buffer.isEmpty()) {
                System.out.println("Buffer is empty, consumer waiting...");
                notEmpty.await(); // 等待缓冲区非空
            }

            int value = buffer.poll();
            System.out.println("Consumed: " + value);
            notFull.signal(); // 唤醒生产者
            return value;
        } finally {
            lock.unlock();
        }
    }

    public static void main(String[] args) throws InterruptedException {
        ProducerConsumerWithCondition pc = new ProducerConsumerWithCondition(5);

        Thread producer = new Thread(() -> {
            try {
                for (int i = 0; i < 10; i++) {
                    pc.produce(i);
                    Thread.sleep((long) (Math.random() * 100));
                }
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
            }
        });

        Thread consumer = new Thread(() -> {
            try {
                for (int i = 0; i < 10; i++) {
                    pc.consume();
                    Thread.sleep((long) (Math.random() * 100));
                }
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
            }
        });

        producer.start();
        consumer.start();

        producer.join();
        consumer.join();
    }
}

在这个例子中,我们使用了ReentrantLock作为锁,并创建了两个Condition对象:notFullnotEmptyproducernotFull上等待,consumernotEmpty上等待。当producer生产了一个元素后,它只唤醒notEmpty上的等待线程(即consumer);当consumer消费了一个元素后,它只唤醒notFull上的等待线程(即producer)。这样就避免了不必要的线程唤醒,提高了程序的效率。

5. Condition的注意事项

  • 必须在锁的保护下使用: Condition的方法(如await()signal()signalAll())必须在获取锁之后调用。否则,会抛出IllegalMonitorStateException。这是因为Condition依赖于Lock来保证线程安全。

  • 循环检查条件: 类似于Object.wait()Condition.await()也可能发生假唤醒。因此,必须在一个循环中检查等待条件是否满足。

  • 正确处理中断: Condition.await()方法会响应中断。如果线程在等待过程中被中断,会抛出InterruptedException。需要在代码中正确处理这个异常,通常的做法是重新设置中断状态,以便更高层的代码能够处理中断。

6. 其他Condition的使用场景

除了生产者-消费者模型,Condition还可以用于其他各种线程协作场景,例如:

  • 有界阻塞队列: 可以使用Condition来实现一个有界阻塞队列,其中put()操作在队列满时等待,take()操作在队列空时等待。

  • 读写锁: 可以使用Condition来实现一个读写锁,其中多个线程可以同时读取共享资源,但只有一个线程可以写入共享资源。

  • 任务调度器: 可以使用Condition来实现一个任务调度器,其中线程在没有任务时等待,当有任务到达时被唤醒。

7. ReentrantReadWriteLock 中的 Condition 使用

ReentrantReadWriteLock 类也使用了 Condition 来管理等待读锁和写锁的线程。它内部使用两个 Condition 对象:一个用于等待写锁的线程(writeCondition),另一个用于等待读锁的线程(readCondition)。

  • 写锁的获取: 当一个线程尝试获取写锁时,如果当前有其他线程持有读锁或写锁,则该线程会被放入 writeCondition 的等待队列中。
  • 读锁的获取: 当一个线程尝试获取读锁时,如果当前有其他线程持有写锁,则该线程会被放入 readCondition 的等待队列中。
  • 锁的释放: 当一个线程释放写锁时,它会唤醒 writeCondition 中的一个等待线程,使其尝试获取写锁。当一个线程释放读锁时,如果当前没有其他线程持有写锁,它会唤醒 readCondition 中的所有等待线程,使它们尝试获取读锁。

这使得 ReentrantReadWriteLock 能够有效地管理并发的读写操作,提高程序的性能。

import java.util.concurrent.locks.ReentrantReadWriteLock;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReadLock;
import java.util.concurrent.locks.WriteLock;

public class ReadWriteLockExample {

    private final ReentrantReadWriteLock rwl = new ReentrantReadWriteLock();
    private final ReadLock readLock = rwl.readLock();
    private final WriteLock writeLock = rwl.writeLock();

    private String data = "Initial Data";

    public String readData() {
        readLock.lock();
        try {
            System.out.println(Thread.currentThread().getName() + " is reading data.");
            return data;
        } finally {
            readLock.unlock();
            System.out.println(Thread.currentThread().getName() + " finished reading.");
        }
    }

    public void writeData(String newData) {
        writeLock.lock();
        try {
            System.out.println(Thread.currentThread().getName() + " is writing data.");
            data = newData;
        } finally {
            writeLock.unlock();
            System.out.println(Thread.currentThread().getName() + " finished writing.");
        }
    }

    public static void main(String[] args) {
        ReadWriteLockExample example = new ReadWriteLockExample();

        // Multiple reader threads
        for (int i = 0; i < 3; i++) {
            new Thread(() -> {
                System.out.println(example.readData());
            }, "Reader-" + i).start();
        }

        // Writer thread
        new Thread(() -> {
            example.writeData("Updated Data");
        }, "Writer").start();
    }
}

8. 与LockSupport类的关系

LockSupport类是Java并发工具类,它提供了一些底层的原语,用于创建锁和其他同步工具。Condition接口实际上是基于LockSupport类来实现的。LockSupport.park()LockSupport.unpark()方法分别用于阻塞线程和解除阻塞线程,这是构建Condition等待/通知机制的基础。 虽然我们通常不会直接使用LockSupport,但了解它与Condition的关系有助于深入理解Java并发机制的底层实现。

9. 总结

Condition接口是Java并发编程中一个强大的工具,它提供了比Object.wait/notify更精细、更灵活的线程协作机制。通过使用多个Condition对象,我们可以将等待线程根据不同的条件进行分组,避免不必要的线程唤醒,提高程序的效率。在实际开发中,可以根据具体的场景选择合适的线程协作方式,充分利用Condition的优势。

精细的线程控制,避免不必要的唤醒,提高程序效率。

发表回复

您的邮箱地址不会被公开。 必填项已用 * 标注