好的,我们开始今天的讲座,主题是利用 Condition 对象实现更精细的线程等待/通知模式,这相比传统的 wait/notify 提供了更强大的控制能力。
1. 传统 wait/notify 的局限性
在多线程编程中,wait/notify (或 wait/notifyAll) 是实现线程间同步和通信的经典机制。 它的基本原理是:一个线程可以调用对象的 wait() 方法进入等待状态,释放对象锁;另一个线程可以通过调用同一个对象的 notify() 或 notifyAll() 方法来唤醒一个或所有等待的线程。
然而,wait/notify 存在一些固有的局限性,尤其是在复杂的并发场景下:
-
盲目唤醒 (Spurious Wakeups): 线程可能在没有接收到任何通知的情况下被唤醒。 这是由于 JVM 实现细节或其他系统事件引起的。 虽然规范建议在
wait()返回后检查等待条件,但这增加了代码的复杂性。 -
单一条件:
wait/notify只能与一个锁关联,这意味着所有等待线程都在等待 同一个 条件。 如果线程需要等待不同的条件,则必须使用额外的逻辑来区分,这很容易出错。 -
缺乏公平性: 被
notify()唤醒的线程是随机的。 无法保证等待时间最长的线程会被优先唤醒,这可能导致某些线程一直处于饥饿状态。 -
容易出错: 正确使用
wait/notify需要非常小心,特别是要保证在修改共享变量之前持有锁,并在等待之前和之后检查条件。 稍有不慎就可能导致死锁或活锁。
2. Condition 对象的优势
java.util.concurrent.locks.Condition 接口提供了一种更强大、更灵活的方式来实现线程等待/通知。 Condition 对象与 Lock 接口关联,允许我们创建多个等待队列,每个队列对应于不同的等待条件。
Condition 接口的主要方法包括:
| 方法 | 描述 |
|---|---|
await() |
使当前线程进入等待状态,释放与 Condition 关联的 Lock。 必须在 Lock.lock() 和 Lock.unlock() 之间调用。 |
awaitUninterruptibly() |
与 await() 类似,但不会响应中断。 |
awaitNanos(long time) |
使当前线程进入等待状态,最多等待指定的时间(纳秒)。 |
await(long time, TimeUnit unit) |
使当前线程进入等待状态,最多等待指定的时间。 |
awaitUntil(Date deadline) |
使当前线程进入等待状态,直到指定的截止时间。 |
signal() |
唤醒一个等待在 Condition 上的线程。 |
signalAll() |
唤醒所有等待在 Condition 上的线程。 |
与 wait/notify 相比,Condition 对象具有以下优势:
-
多个等待队列: 可以为不同的等待条件创建不同的
Condition对象,从而避免了盲目唤醒和复杂的条件判断。 -
更强的控制:
Condition提供了更多的方法来控制线程的等待和唤醒,例如可以指定等待时间、响应中断等。 -
与 Lock 配合:
Condition必须与Lock接口一起使用,这强制我们显式地获取和释放锁,提高了代码的可读性和可维护性。
3. Condition 对象的使用示例:生产者-消费者模型
让我们通过一个经典的生产者-消费者模型来演示如何使用 Condition 对象。 假设我们有一个缓冲区,生产者线程向缓冲区添加数据,消费者线程从缓冲区取出数据。 当缓冲区满时,生产者线程需要等待;当缓冲区为空时,消费者线程需要等待。
import java.util.LinkedList;
import java.util.Queue;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
public class ProducerConsumer {
private final Lock lock = new ReentrantLock();
private final Condition notFull = lock.newCondition();
private final Condition notEmpty = lock.newCondition();
private final Queue<Integer> queue = new LinkedList<>();
private final int capacity;
public ProducerConsumer(int capacity) {
this.capacity = capacity;
}
public void produce(int data) throws InterruptedException {
lock.lock();
try {
while (queue.size() == capacity) {
System.out.println("Producer is waiting... Queue is full.");
notFull.await(); // 等待缓冲区不满
}
queue.offer(data);
System.out.println("Produced: " + data);
notEmpty.signal(); // 唤醒等待的消费者
} finally {
lock.unlock();
}
}
public int consume() throws InterruptedException {
lock.lock();
try {
while (queue.isEmpty()) {
System.out.println("Consumer is waiting... Queue is empty.");
notEmpty.await(); // 等待缓冲区不空
}
int data = queue.poll();
System.out.println("Consumed: " + data);
notFull.signal(); // 唤醒等待的生产者
return data;
} finally {
lock.unlock();
}
}
public static void main(String[] args) throws InterruptedException {
ProducerConsumer pc = new ProducerConsumer(5);
Thread producer1 = new Thread(() -> {
try {
for (int i = 0; i < 10; i++) {
pc.produce(i);
Thread.sleep(100); // 模拟生产时间
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
});
Thread producer2 = new Thread(() -> {
try {
for (int i = 100; i < 110; i++) {
pc.produce(i);
Thread.sleep(150); // 模拟生产时间
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
});
Thread consumer1 = new Thread(() -> {
try {
for (int i = 0; i < 10; i++) {
pc.consume();
Thread.sleep(200); // 模拟消费时间
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
});
Thread consumer2 = new Thread(() -> {
try {
for (int i = 0; i < 10; i++) {
pc.consume();
Thread.sleep(250); // 模拟消费时间
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
});
producer1.start();
producer2.start();
consumer1.start();
consumer2.start();
producer1.join();
producer2.join();
consumer1.join();
consumer2.join();
System.out.println("Finished.");
}
}
在这个例子中:
-
lock是一个ReentrantLock对象,用于保护共享资源(队列)。 -
notFull和notEmpty是两个Condition对象,分别表示缓冲区不满和缓冲区不空这两个条件。 -
produce()方法在添加数据之前,会检查缓冲区是否已满。 如果已满,则调用notFull.await()进入等待状态,释放锁。 当消费者线程从缓冲区取出数据后,会调用notFull.signal()唤醒一个等待的生产者线程。 -
consume()方法在取出数据之前,会检查缓冲区是否为空。 如果为空,则调用notEmpty.await()进入等待状态,释放锁。 当生产者线程向缓冲区添加数据后,会调用notEmpty.signal()唤醒一个等待的消费者线程。
通过使用 Condition 对象,我们可以将生产者和消费者线程分别置于不同的等待队列中,从而避免了盲目唤醒,并提高了程序的效率。
4. Condition 对象的高级用法:公平锁和优先级
Condition 对象可以与公平锁一起使用,以确保线程按照它们请求锁的顺序获得锁。 这可以避免某些线程一直处于饥饿状态。
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.ReentrantLock;
public class FairLockExample {
private final ReentrantLock fairLock = new ReentrantLock(true); // 使用公平锁
private final Condition condition = fairLock.newCondition();
public void accessResource(String threadName) throws InterruptedException {
fairLock.lock();
try {
System.out.println(threadName + " is waiting for the condition...");
condition.await();
System.out.println(threadName + " is accessing the resource.");
// 模拟资源访问
Thread.sleep(100);
} finally {
System.out.println(threadName + " is releasing the lock.");
fairLock.unlock();
}
}
public void signalWaitingThread() {
fairLock.lock();
try {
System.out.println("Signaling a waiting thread...");
condition.signal();
} finally {
fairLock.unlock();
}
}
public static void main(String[] args) throws InterruptedException {
FairLockExample example = new FairLockExample();
Thread thread1 = new Thread(() -> {
try {
example.accessResource("Thread-1");
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
});
Thread thread2 = new Thread(() -> {
try {
example.accessResource("Thread-2");
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
});
thread1.start();
Thread.sleep(50); // 确保 Thread-1 先进入等待状态
thread2.start();
Thread.sleep(1000); // 等待一段时间
example.signalWaitingThread(); // 唤醒 Thread-1
Thread.sleep(1000);
example.signalWaitingThread(); //唤醒 Thread-2
thread1.join();
thread2.join();
System.out.println("Finished.");
}
}
在这个例子中,ReentrantLock 被创建为公平锁 (new ReentrantLock(true))。 这意味着线程将按照它们请求锁的顺序获得锁。 当调用 condition.signal() 时,等待时间最长的线程(即 Thread-1)将被唤醒。
虽然 Condition 本身没有内置的优先级队列,但我们可以通过自定义逻辑来实现类似的功能。 例如,我们可以使用一个 PriorityQueue 来存储等待线程,并根据线程的优先级来决定唤醒哪个线程。 这需要更复杂的代码实现,但可以提供更细粒度的控制。
5. Condition 对象的注意事项
在使用 Condition 对象时,需要注意以下几点:
-
必须与 Lock 关联:
Condition对象必须与Lock接口一起使用。await()、signal()和signalAll()方法必须在Lock.lock()和Lock.unlock()之间调用。 -
等待条件必须检查: 在
await()返回后,必须再次检查等待条件。 这是为了防止盲目唤醒。 -
避免死锁: 确保以正确的顺序获取和释放锁,以避免死锁。
-
异常处理: 在
try-finally块中释放锁,以确保即使发生异常,锁也能被正确释放。 -
选择合适的唤醒策略:
signal()唤醒一个等待线程,而signalAll()唤醒所有等待线程。 根据具体情况选择合适的唤醒策略,以提高程序的效率。 通常,如果只有一个线程可以满足条件,则使用signal();如果有多个线程可以满足条件,则使用signalAll()。
6. Condition 的一些替代方案
虽然 Condition 对象提供了比 wait/notify 更精细的控制,但在某些情况下,可能存在更合适的替代方案,例如:
-
BlockingQueue: 对于生产者-消费者模型,BlockingQueue提供了更简洁的 API 和更好的性能。BlockingQueue内部使用了锁和条件变量,但将这些细节隐藏起来,使代码更易于理解和维护。 -
CountDownLatch: 用于等待一组线程完成操作。 -
CyclicBarrier: 用于同步一组线程,使它们在达到某个屏障点之前互相等待。 -
Semaphore: 用于控制对有限资源的访问。
选择哪种并发工具取决于具体的应用场景和需求。 Condition 对象适用于需要高度定制化的等待/通知模式的场景。
7.总结来说
Condition对象相较于传统的wait/notify提供了更精细的线程控制能力,允许创建多个等待队列,与Lock接口配合使用,并能与公平锁结合使用,实现更公平的线程调度。在复杂的并发场景下,Condition对象是强大的同步工具。
今天的讲座到此结束。希望大家通过今天的学习,能够更好地理解和使用 Condition 对象,从而编写出更健壮、更高效的并发程序。