Java中的非阻塞队列实现:ArrayBlockingQueue、LinkedBlockingQueue的原理对比

Java非阻塞队列实现:ArrayBlockingQueue与LinkedBlockingQueue原理对比

大家好,今天我们来深入探讨Java并发编程中常用的两种阻塞队列实现:ArrayBlockingQueue和LinkedBlockingQueue。它们都是java.util.concurrent包下的重要成员,用于解决多线程环境下的生产者-消费者问题。虽然它们都实现了BlockingQueue接口,但在内部实现和适用场景上存在显著差异。我们将从原理、性能、内存占用等方面进行详细对比,帮助大家在实际开发中选择合适的队列。

1. 阻塞队列的必要性:生产者-消费者问题

在多线程编程中,生产者-消费者问题是一个经典模型。生产者线程负责生产数据并放入队列,消费者线程负责从队列中取出数据并进行处理。如果没有队列作为缓冲区,生产者和消费者必须直接通信,这会导致以下问题:

  • 耦合度高: 生产者和消费者必须知道彼此的存在和状态,增加了代码的复杂性。
  • 效率低下: 如果生产者生产速度快于消费者消费速度,生产者需要等待消费者,反之亦然,造成资源浪费。

阻塞队列通过提供一个线程安全的数据缓冲区,有效地解耦了生产者和消费者。当队列满时,尝试向队列中添加元素的线程会被阻塞,直到队列有空闲空间。当队列为空时,尝试从队列中取出元素的线程会被阻塞,直到队列中有新的元素。

2. ArrayBlockingQueue:基于数组的有界阻塞队列

ArrayBlockingQueue是基于数组实现的有界阻塞队列。这意味着它在创建时必须指定队列的容量,且容量在创建后不能更改。

2.1 内部结构

ArrayBlockingQueue使用一个循环数组来存储元素。它维护以下关键变量:

  • items: 用于存储元素的数组。
  • takeIndex: 下一个要被take()poll()操作移除的元素的索引。
  • putIndex: 下一个要被put()offer()操作添加的元素的索引。
  • count: 队列中当前元素的数量。
  • lock: 用于保护队列状态的重入锁(ReentrantLock)。
  • notEmpty: 条件变量,用于在队列为空时阻塞take()poll()操作的线程。
  • notFull: 条件变量,用于在队列满时阻塞put()offer()操作的线程。

2.2 核心方法

  • put(E e): 将指定的元素插入此队列的尾部,如果队列已满,则等待可用的空间。

    public void put(E e) throws InterruptedException {
        checkNotNull(e);
        final ReentrantLock lock = this.lock;
        lock.lockInterruptibly(); // 可中断的锁
        try {
            while (count == items.length) // 队列已满
                notFull.await(); // 阻塞当前线程,直到队列有空闲空间
            enqueue(e); // 将元素添加到队列尾部
        } finally {
            lock.unlock();
        }
    }
    
    private void enqueue(E x) {
        final Object[] items = this.items;
        items[putIndex] = x;
        if (++putIndex == items.length)
            putIndex = 0; // 循环数组
        count++;
        notEmpty.signal(); // 唤醒等待的消费者线程
    }
  • take(): 从此队列的头部移除并返回一个元素,如果队列为空,则等待一个元素变为可用。

    public E take() throws InterruptedException {
        final ReentrantLock lock = this.lock;
        lock.lockInterruptibly();
        try {
            while (count == 0) // 队列为空
                notEmpty.await(); // 阻塞当前线程,直到队列有元素
            return dequeue(); // 从队列头部移除并返回一个元素
        } finally {
            lock.unlock();
        }
    }
    
    private E dequeue() {
        final Object[] items = this.items;
        @SuppressWarnings("unchecked")
        E x = (E) items[takeIndex];
        items[takeIndex] = null; // help GC
        if (++takeIndex == items.length)
            takeIndex = 0; // 循环数组
        count--;
        if (itrs != null)
            itrs.elementDequeued();
        notFull.signal(); // 唤醒等待的生产者线程
        return x;
    }
  • offer(E e): 将指定的元素插入此队列的尾部,如果该队列已满,则立即返回false。这是一个非阻塞方法。

  • poll(): 从此队列的头部移除并返回一个元素,如果该队列为空,则立即返回null。这是一个非阻塞方法。

2.3 构造方法

ArrayBlockingQueue提供了多种构造方法:

  • ArrayBlockingQueue(int capacity): 创建一个具有给定容量的ArrayBlockingQueue,并使用默认的访问策略(非公平锁)。
  • ArrayBlockingQueue(int capacity, boolean fair): 创建一个具有给定容量的ArrayBlockingQueue,并使用指定的访问策略。fair参数为true时,使用公平锁,否则使用非公平锁。公平锁倾向于让等待时间最长的线程优先获取锁。
  • ArrayBlockingQueue(int capacity, boolean fair, Collection<? extends E> c): 创建一个具有给定容量的ArrayBlockingQueue,并使用指定的访问策略,最初包含给定集合的元素,元素按集合迭代器的遍历顺序添加。

2.4 代码示例

import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;

public class ArrayBlockingQueueExample {

    public static void main(String[] args) throws InterruptedException {
        BlockingQueue<Integer> queue = new ArrayBlockingQueue<>(5); // 创建一个容量为5的ArrayBlockingQueue

        // 生产者线程
        Thread producer = new Thread(() -> {
            try {
                for (int i = 0; i < 10; i++) {
                    queue.put(i); // 将元素添加到队列
                    System.out.println("Producer added: " + i);
                    Thread.sleep(100); // 模拟生产过程
                }
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        });

        // 消费者线程
        Thread consumer = new Thread(() -> {
            try {
                while (true) {
                    Integer value = queue.take(); // 从队列中取出元素
                    System.out.println("Consumer consumed: " + value);
                    Thread.sleep(200); // 模拟消费过程
                }
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        });

        producer.start();
        consumer.start();

        producer.join(); // 等待生产者线程结束
        //consumer.join(); // 不要等待消费者结束,让它一直消费
    }
}

3. LinkedBlockingQueue:基于链表的无界/有界阻塞队列

LinkedBlockingQueue是基于链表实现的阻塞队列。它可以是无界的,也可以是有界的。如果不指定容量,则默认为无界队列(实际上,Integer.MAX_VALUE),这意味着理论上它可以容纳无限数量的元素。但是,在实际使用中,应该避免使用无界队列,因为过多的元素可能会导致内存溢出。

3.1 内部结构

LinkedBlockingQueue使用链表节点来存储元素。它维护以下关键变量:

  • head: 链表的头节点。
  • last: 链表的尾节点。
  • capacity: 队列的容量。
  • count: 队列中当前元素的数量。
  • takeLock: 用于保护take()poll()操作的锁(ReentrantLock)。
  • putLock: 用于保护put()offer()操作的锁(ReentrantLock)。
  • notEmpty: 条件变量,用于在队列为空时阻塞take()poll()操作的线程。
  • notFull: 条件变量,用于在队列满时阻塞put()offer()操作的线程。

ArrayBlockingQueue不同,LinkedBlockingQueue使用了两个独立的锁:takeLockputLock。这使得生产者和消费者可以并发地进行添加和移除操作,从而提高并发性能。

3.2 核心方法

  • put(E e): 将指定的元素插入此队列的尾部,如果队列已满,则等待可用的空间。

    public void put(E e) throws InterruptedException {
        if (e == null) throw new NullPointerException();
        int c = -1;
        Node<E> node = new Node<E>(e);
        final ReentrantLock putLock = this.putLock;
        final AtomicInteger count = this.count;
        putLock.lockInterruptibly(); // 可中断的锁
        try {
            while (count.get() == capacity) { // 队列已满
                notFull.await(); // 阻塞当前线程,直到队列有空闲空间
            }
            enqueue(node); // 将元素添加到队列尾部
            c = count.getAndIncrement();
            if (c + 1 < capacity)
                notFull.signal(); // 唤醒等待的生产者线程
        } finally {
            putLock.unlock();
        }
        if (c == 0)
            signalNotEmpty(); // 唤醒等待的消费者线程
    }
    
    private void enqueue(Node<E> node) {
        last.next = node;
        last = node;
    }
  • take(): 从此队列的头部移除并返回一个元素,如果队列为空,则等待一个元素变为可用。

    public E take() throws InterruptedException {
        E x;
        int c = -1;
        final AtomicInteger count = this.count;
        final ReentrantLock takeLock = this.takeLock;
        takeLock.lockInterruptibly(); // 可中断的锁
        try {
            while (count.get() == 0) { // 队列为空
                notEmpty.await(); // 阻塞当前线程,直到队列有元素
            }
            x = dequeue(); // 从队列头部移除并返回一个元素
            c = count.getAndDecrement();
            if (c > 1)
                notEmpty.signal(); // 唤醒等待的消费者线程
        } finally {
            takeLock.unlock();
        }
        if (c == capacity)
            signalNotFull(); // 唤醒等待的生产者线程
        return x;
    }
    
    private E dequeue() {
        Node<E> h = head;
        Node<E> first = h.next;
        h.next = h; // help GC
        head = first;
        E x = first.item;
        first.item = null;
        return x;
    }
  • offer(E e): 将指定的元素插入此队列的尾部,如果该队列已满,则立即返回false。这是一个非阻塞方法。

  • poll(): 从此队列的头部移除并返回一个元素,如果该队列为空,则立即返回null。这是一个非阻塞方法。

3.3 构造方法

LinkedBlockingQueue提供了多种构造方法:

  • LinkedBlockingQueue(): 创建一个容量为Integer.MAX_VALUELinkedBlockingQueue
  • LinkedBlockingQueue(int capacity): 创建一个具有给定容量的LinkedBlockingQueue
  • LinkedBlockingQueue(Collection<? extends E> c): 创建一个容量为Integer.MAX_VALUELinkedBlockingQueue,最初包含给定集合的元素,元素按集合迭代器的遍历顺序添加。

3.4 代码示例

import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;

public class LinkedBlockingQueueExample {

    public static void main(String[] args) throws InterruptedException {
        BlockingQueue<Integer> queue = new LinkedBlockingQueue<>(5); // 创建一个容量为5的LinkedBlockingQueue

        // 生产者线程
        Thread producer = new Thread(() -> {
            try {
                for (int i = 0; i < 10; i++) {
                    queue.put(i); // 将元素添加到队列
                    System.out.println("Producer added: " + i);
                    Thread.sleep(100); // 模拟生产过程
                }
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        });

        // 消费者线程
        Thread consumer = new Thread(() -> {
            try {
                while (true) {
                    Integer value = queue.take(); // 从队列中取出元素
                    System.out.println("Consumer consumed: " + value);
                    Thread.sleep(200); // 模拟消费过程
                }
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        });

        producer.start();
        consumer.start();

        producer.join(); // 等待生产者线程结束
        //consumer.join(); // 不要等待消费者结束,让它一直消费
    }
}

4. ArrayBlockingQueue vs. LinkedBlockingQueue:对比分析

特性 ArrayBlockingQueue LinkedBlockingQueue
内部实现 循环数组 链表
容量 有界 有界/无界(默认无界)
锁机制 单一的ReentrantLock 两个独立的ReentrantLock (takeLock, putLock)
并发性能 较低(竞争同一把锁) 较高(生产者和消费者可以并发操作)
内存占用 预先分配固定大小的数组,可能存在空间浪费 动态分配链表节点,更灵活
适用场景 容量固定,对内存占用敏感的场景 容量变化大,并发性能要求高的场景
GC压力 较低 较高 (需要频繁创建和销毁节点)

4.1 性能对比

  • 并发性能: LinkedBlockingQueue通常具有更高的并发性能,因为它使用了两个独立的锁,允许生产者和消费者并发地进行添加和移除操作。ArrayBlockingQueue使用单一的锁,生产者和消费者必须竞争同一把锁,导致并发性能较低。
  • 吞吐量: 在并发量很高的情况下,LinkedBlockingQueue通常具有更高的吞吐量。
  • 延迟: ArrayBlockingQueue的延迟通常更稳定,因为数组的访问速度更快,且内存是连续的。LinkedBlockingQueue的延迟可能更高,因为链表的访问需要遍历节点,且内存可能是不连续的。

4.2 内存占用

  • ArrayBlockingQueue: ArrayBlockingQueue在创建时需要预先分配固定大小的数组,即使队列中没有元素,数组也会占用内存。如果容量设置过大,可能会造成内存浪费。
  • LinkedBlockingQueue: LinkedBlockingQueue使用链表节点来存储元素,只有在添加元素时才会分配内存。这使得LinkedBlockingQueue的内存占用更加灵活,但也会增加GC的压力。

4.3 选择建议

  • 如果队列的容量是固定的,并且对内存占用比较敏感,可以选择ArrayBlockingQueue
  • 如果队列的容量变化较大,并且对并发性能要求较高,可以选择LinkedBlockingQueue
  • 在实际使用中,应该避免使用无界的LinkedBlockingQueue,因为过多的元素可能会导致内存溢出。建议始终指定一个合理的容量。
  • 如果需要公平的锁策略,可以使用ArrayBlockingQueue(int capacity, boolean fair)构造方法来创建一个公平锁的ArrayBlockingQueue
  • 选择哪种队列最终取决于具体的应用场景和性能需求。建议在实际环境中进行性能测试,以确定哪种队列更适合。

5. 深入源码:理解公平锁与非公平锁在ArrayBlockingQueue中的应用

ArrayBlockingQueue允许在构造时指定是否使用公平锁。理解公平锁和非公平锁的区别对于优化并发性能至关重要。

  • 公平锁: 公平锁会按照线程请求锁的顺序来分配锁。也就是说,等待时间最长的线程会优先获取锁。虽然公平锁可以避免线程饥饿,但由于需要维护等待队列,性能通常比非公平锁低。

  • 非公平锁: 非公平锁允许线程在等待队列中直接尝试获取锁,而无需按照请求顺序。这意味着即使有其他线程在等待,新到的线程也可能直接获取锁。非公平锁的性能通常比公平锁高,但可能会导致线程饥饿。

ArrayBlockingQueue中,公平锁是通过ReentrantLock的构造方法来指定的。如果fair参数为true,则使用公平锁,否则使用非公平锁。

public ArrayBlockingQueue(int capacity, boolean fair) {
    if (capacity <= 0)
        throw new IllegalArgumentException();
    this.items = new Object[capacity];
    lock = new ReentrantLock(fair); // 指定是否使用公平锁
    notEmpty = lock.newCondition();
    notFull =  lock.newCondition();
}

公平锁的实现会增加额外的开销,因为它需要维护一个等待队列,并按照请求顺序来分配锁。在ArrayBlockingQueue中,这意味着在put()take()方法中,线程需要先检查是否有其他线程在等待,才能获取锁。

非公平锁的实现则更加简单,线程可以直接尝试获取锁,而无需检查等待队列。这可以提高并发性能,但也可能导致某些线程长时间无法获取锁。

6. 总结:选择适合的阻塞队列

总而言之,ArrayBlockingQueueLinkedBlockingQueue都是强大的并发工具,各自具有不同的优势和适用场景。ArrayBlockingQueue适合于固定容量、内存敏感的场景,而LinkedBlockingQueue则更适合于容量变化大、并发性能要求高的场景。理解它们的内部实现和性能特点,可以帮助我们更好地选择和使用它们,从而构建高效、可靠的并发应用程序。根据实际需求,选择合适的队列,并在必要时进行性能测试,是优化并发程序性能的关键。

发表回复

您的邮箱地址不会被公开。 必填项已用 * 标注