Java非阻塞队列实现:ArrayBlockingQueue与LinkedBlockingQueue原理对比
大家好,今天我们来深入探讨Java并发编程中常用的两种阻塞队列实现:ArrayBlockingQueue和LinkedBlockingQueue。它们都是java.util.concurrent包下的重要成员,用于解决多线程环境下的生产者-消费者问题。虽然它们都实现了BlockingQueue接口,但在内部实现和适用场景上存在显著差异。我们将从原理、性能、内存占用等方面进行详细对比,帮助大家在实际开发中选择合适的队列。
1. 阻塞队列的必要性:生产者-消费者问题
在多线程编程中,生产者-消费者问题是一个经典模型。生产者线程负责生产数据并放入队列,消费者线程负责从队列中取出数据并进行处理。如果没有队列作为缓冲区,生产者和消费者必须直接通信,这会导致以下问题:
- 耦合度高: 生产者和消费者必须知道彼此的存在和状态,增加了代码的复杂性。
- 效率低下: 如果生产者生产速度快于消费者消费速度,生产者需要等待消费者,反之亦然,造成资源浪费。
阻塞队列通过提供一个线程安全的数据缓冲区,有效地解耦了生产者和消费者。当队列满时,尝试向队列中添加元素的线程会被阻塞,直到队列有空闲空间。当队列为空时,尝试从队列中取出元素的线程会被阻塞,直到队列中有新的元素。
2. ArrayBlockingQueue:基于数组的有界阻塞队列
ArrayBlockingQueue是基于数组实现的有界阻塞队列。这意味着它在创建时必须指定队列的容量,且容量在创建后不能更改。
2.1 内部结构
ArrayBlockingQueue使用一个循环数组来存储元素。它维护以下关键变量:
items: 用于存储元素的数组。takeIndex: 下一个要被take()或poll()操作移除的元素的索引。putIndex: 下一个要被put()或offer()操作添加的元素的索引。count: 队列中当前元素的数量。lock: 用于保护队列状态的重入锁(ReentrantLock)。notEmpty: 条件变量,用于在队列为空时阻塞take()或poll()操作的线程。notFull: 条件变量,用于在队列满时阻塞put()或offer()操作的线程。
2.2 核心方法
-
put(E e): 将指定的元素插入此队列的尾部,如果队列已满,则等待可用的空间。public void put(E e) throws InterruptedException { checkNotNull(e); final ReentrantLock lock = this.lock; lock.lockInterruptibly(); // 可中断的锁 try { while (count == items.length) // 队列已满 notFull.await(); // 阻塞当前线程,直到队列有空闲空间 enqueue(e); // 将元素添加到队列尾部 } finally { lock.unlock(); } } private void enqueue(E x) { final Object[] items = this.items; items[putIndex] = x; if (++putIndex == items.length) putIndex = 0; // 循环数组 count++; notEmpty.signal(); // 唤醒等待的消费者线程 } -
take(): 从此队列的头部移除并返回一个元素,如果队列为空,则等待一个元素变为可用。public E take() throws InterruptedException { final ReentrantLock lock = this.lock; lock.lockInterruptibly(); try { while (count == 0) // 队列为空 notEmpty.await(); // 阻塞当前线程,直到队列有元素 return dequeue(); // 从队列头部移除并返回一个元素 } finally { lock.unlock(); } } private E dequeue() { final Object[] items = this.items; @SuppressWarnings("unchecked") E x = (E) items[takeIndex]; items[takeIndex] = null; // help GC if (++takeIndex == items.length) takeIndex = 0; // 循环数组 count--; if (itrs != null) itrs.elementDequeued(); notFull.signal(); // 唤醒等待的生产者线程 return x; } -
offer(E e): 将指定的元素插入此队列的尾部,如果该队列已满,则立即返回false。这是一个非阻塞方法。 -
poll(): 从此队列的头部移除并返回一个元素,如果该队列为空,则立即返回null。这是一个非阻塞方法。
2.3 构造方法
ArrayBlockingQueue提供了多种构造方法:
ArrayBlockingQueue(int capacity): 创建一个具有给定容量的ArrayBlockingQueue,并使用默认的访问策略(非公平锁)。ArrayBlockingQueue(int capacity, boolean fair): 创建一个具有给定容量的ArrayBlockingQueue,并使用指定的访问策略。fair参数为true时,使用公平锁,否则使用非公平锁。公平锁倾向于让等待时间最长的线程优先获取锁。ArrayBlockingQueue(int capacity, boolean fair, Collection<? extends E> c): 创建一个具有给定容量的ArrayBlockingQueue,并使用指定的访问策略,最初包含给定集合的元素,元素按集合迭代器的遍历顺序添加。
2.4 代码示例
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
public class ArrayBlockingQueueExample {
public static void main(String[] args) throws InterruptedException {
BlockingQueue<Integer> queue = new ArrayBlockingQueue<>(5); // 创建一个容量为5的ArrayBlockingQueue
// 生产者线程
Thread producer = new Thread(() -> {
try {
for (int i = 0; i < 10; i++) {
queue.put(i); // 将元素添加到队列
System.out.println("Producer added: " + i);
Thread.sleep(100); // 模拟生产过程
}
} catch (InterruptedException e) {
e.printStackTrace();
}
});
// 消费者线程
Thread consumer = new Thread(() -> {
try {
while (true) {
Integer value = queue.take(); // 从队列中取出元素
System.out.println("Consumer consumed: " + value);
Thread.sleep(200); // 模拟消费过程
}
} catch (InterruptedException e) {
e.printStackTrace();
}
});
producer.start();
consumer.start();
producer.join(); // 等待生产者线程结束
//consumer.join(); // 不要等待消费者结束,让它一直消费
}
}
3. LinkedBlockingQueue:基于链表的无界/有界阻塞队列
LinkedBlockingQueue是基于链表实现的阻塞队列。它可以是无界的,也可以是有界的。如果不指定容量,则默认为无界队列(实际上,Integer.MAX_VALUE),这意味着理论上它可以容纳无限数量的元素。但是,在实际使用中,应该避免使用无界队列,因为过多的元素可能会导致内存溢出。
3.1 内部结构
LinkedBlockingQueue使用链表节点来存储元素。它维护以下关键变量:
head: 链表的头节点。last: 链表的尾节点。capacity: 队列的容量。count: 队列中当前元素的数量。takeLock: 用于保护take()和poll()操作的锁(ReentrantLock)。putLock: 用于保护put()和offer()操作的锁(ReentrantLock)。notEmpty: 条件变量,用于在队列为空时阻塞take()或poll()操作的线程。notFull: 条件变量,用于在队列满时阻塞put()或offer()操作的线程。
与ArrayBlockingQueue不同,LinkedBlockingQueue使用了两个独立的锁:takeLock和putLock。这使得生产者和消费者可以并发地进行添加和移除操作,从而提高并发性能。
3.2 核心方法
-
put(E e): 将指定的元素插入此队列的尾部,如果队列已满,则等待可用的空间。public void put(E e) throws InterruptedException { if (e == null) throw new NullPointerException(); int c = -1; Node<E> node = new Node<E>(e); final ReentrantLock putLock = this.putLock; final AtomicInteger count = this.count; putLock.lockInterruptibly(); // 可中断的锁 try { while (count.get() == capacity) { // 队列已满 notFull.await(); // 阻塞当前线程,直到队列有空闲空间 } enqueue(node); // 将元素添加到队列尾部 c = count.getAndIncrement(); if (c + 1 < capacity) notFull.signal(); // 唤醒等待的生产者线程 } finally { putLock.unlock(); } if (c == 0) signalNotEmpty(); // 唤醒等待的消费者线程 } private void enqueue(Node<E> node) { last.next = node; last = node; } -
take(): 从此队列的头部移除并返回一个元素,如果队列为空,则等待一个元素变为可用。public E take() throws InterruptedException { E x; int c = -1; final AtomicInteger count = this.count; final ReentrantLock takeLock = this.takeLock; takeLock.lockInterruptibly(); // 可中断的锁 try { while (count.get() == 0) { // 队列为空 notEmpty.await(); // 阻塞当前线程,直到队列有元素 } x = dequeue(); // 从队列头部移除并返回一个元素 c = count.getAndDecrement(); if (c > 1) notEmpty.signal(); // 唤醒等待的消费者线程 } finally { takeLock.unlock(); } if (c == capacity) signalNotFull(); // 唤醒等待的生产者线程 return x; } private E dequeue() { Node<E> h = head; Node<E> first = h.next; h.next = h; // help GC head = first; E x = first.item; first.item = null; return x; } -
offer(E e): 将指定的元素插入此队列的尾部,如果该队列已满,则立即返回false。这是一个非阻塞方法。 -
poll(): 从此队列的头部移除并返回一个元素,如果该队列为空,则立即返回null。这是一个非阻塞方法。
3.3 构造方法
LinkedBlockingQueue提供了多种构造方法:
LinkedBlockingQueue(): 创建一个容量为Integer.MAX_VALUE的LinkedBlockingQueue。LinkedBlockingQueue(int capacity): 创建一个具有给定容量的LinkedBlockingQueue。LinkedBlockingQueue(Collection<? extends E> c): 创建一个容量为Integer.MAX_VALUE的LinkedBlockingQueue,最初包含给定集合的元素,元素按集合迭代器的遍历顺序添加。
3.4 代码示例
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
public class LinkedBlockingQueueExample {
public static void main(String[] args) throws InterruptedException {
BlockingQueue<Integer> queue = new LinkedBlockingQueue<>(5); // 创建一个容量为5的LinkedBlockingQueue
// 生产者线程
Thread producer = new Thread(() -> {
try {
for (int i = 0; i < 10; i++) {
queue.put(i); // 将元素添加到队列
System.out.println("Producer added: " + i);
Thread.sleep(100); // 模拟生产过程
}
} catch (InterruptedException e) {
e.printStackTrace();
}
});
// 消费者线程
Thread consumer = new Thread(() -> {
try {
while (true) {
Integer value = queue.take(); // 从队列中取出元素
System.out.println("Consumer consumed: " + value);
Thread.sleep(200); // 模拟消费过程
}
} catch (InterruptedException e) {
e.printStackTrace();
}
});
producer.start();
consumer.start();
producer.join(); // 等待生产者线程结束
//consumer.join(); // 不要等待消费者结束,让它一直消费
}
}
4. ArrayBlockingQueue vs. LinkedBlockingQueue:对比分析
| 特性 | ArrayBlockingQueue | LinkedBlockingQueue |
|---|---|---|
| 内部实现 | 循环数组 | 链表 |
| 容量 | 有界 | 有界/无界(默认无界) |
| 锁机制 | 单一的ReentrantLock |
两个独立的ReentrantLock (takeLock, putLock) |
| 并发性能 | 较低(竞争同一把锁) | 较高(生产者和消费者可以并发操作) |
| 内存占用 | 预先分配固定大小的数组,可能存在空间浪费 | 动态分配链表节点,更灵活 |
| 适用场景 | 容量固定,对内存占用敏感的场景 | 容量变化大,并发性能要求高的场景 |
| GC压力 | 较低 | 较高 (需要频繁创建和销毁节点) |
4.1 性能对比
- 并发性能:
LinkedBlockingQueue通常具有更高的并发性能,因为它使用了两个独立的锁,允许生产者和消费者并发地进行添加和移除操作。ArrayBlockingQueue使用单一的锁,生产者和消费者必须竞争同一把锁,导致并发性能较低。 - 吞吐量: 在并发量很高的情况下,
LinkedBlockingQueue通常具有更高的吞吐量。 - 延迟:
ArrayBlockingQueue的延迟通常更稳定,因为数组的访问速度更快,且内存是连续的。LinkedBlockingQueue的延迟可能更高,因为链表的访问需要遍历节点,且内存可能是不连续的。
4.2 内存占用
- ArrayBlockingQueue:
ArrayBlockingQueue在创建时需要预先分配固定大小的数组,即使队列中没有元素,数组也会占用内存。如果容量设置过大,可能会造成内存浪费。 - LinkedBlockingQueue:
LinkedBlockingQueue使用链表节点来存储元素,只有在添加元素时才会分配内存。这使得LinkedBlockingQueue的内存占用更加灵活,但也会增加GC的压力。
4.3 选择建议
- 如果队列的容量是固定的,并且对内存占用比较敏感,可以选择
ArrayBlockingQueue。 - 如果队列的容量变化较大,并且对并发性能要求较高,可以选择
LinkedBlockingQueue。 - 在实际使用中,应该避免使用无界的
LinkedBlockingQueue,因为过多的元素可能会导致内存溢出。建议始终指定一个合理的容量。 - 如果需要公平的锁策略,可以使用
ArrayBlockingQueue(int capacity, boolean fair)构造方法来创建一个公平锁的ArrayBlockingQueue。 - 选择哪种队列最终取决于具体的应用场景和性能需求。建议在实际环境中进行性能测试,以确定哪种队列更适合。
5. 深入源码:理解公平锁与非公平锁在ArrayBlockingQueue中的应用
ArrayBlockingQueue允许在构造时指定是否使用公平锁。理解公平锁和非公平锁的区别对于优化并发性能至关重要。
-
公平锁: 公平锁会按照线程请求锁的顺序来分配锁。也就是说,等待时间最长的线程会优先获取锁。虽然公平锁可以避免线程饥饿,但由于需要维护等待队列,性能通常比非公平锁低。
-
非公平锁: 非公平锁允许线程在等待队列中直接尝试获取锁,而无需按照请求顺序。这意味着即使有其他线程在等待,新到的线程也可能直接获取锁。非公平锁的性能通常比公平锁高,但可能会导致线程饥饿。
在ArrayBlockingQueue中,公平锁是通过ReentrantLock的构造方法来指定的。如果fair参数为true,则使用公平锁,否则使用非公平锁。
public ArrayBlockingQueue(int capacity, boolean fair) {
if (capacity <= 0)
throw new IllegalArgumentException();
this.items = new Object[capacity];
lock = new ReentrantLock(fair); // 指定是否使用公平锁
notEmpty = lock.newCondition();
notFull = lock.newCondition();
}
公平锁的实现会增加额外的开销,因为它需要维护一个等待队列,并按照请求顺序来分配锁。在ArrayBlockingQueue中,这意味着在put()和take()方法中,线程需要先检查是否有其他线程在等待,才能获取锁。
非公平锁的实现则更加简单,线程可以直接尝试获取锁,而无需检查等待队列。这可以提高并发性能,但也可能导致某些线程长时间无法获取锁。
6. 总结:选择适合的阻塞队列
总而言之,ArrayBlockingQueue和LinkedBlockingQueue都是强大的并发工具,各自具有不同的优势和适用场景。ArrayBlockingQueue适合于固定容量、内存敏感的场景,而LinkedBlockingQueue则更适合于容量变化大、并发性能要求高的场景。理解它们的内部实现和性能特点,可以帮助我们更好地选择和使用它们,从而构建高效、可靠的并发应用程序。根据实际需求,选择合适的队列,并在必要时进行性能测试,是优化并发程序性能的关键。