Java并发包(java.util.concurrent)详解

好的,各位观众老爷们,欢迎来到“并发世界历险记”!我是你们的向导,名叫并发老司机。今天咱们不飙车,而是要一起深入Java并发包(java.util.concurrent)这个宝藏山,挖掘里面的各种奇珍异宝。准备好了吗?系好安全带,发车咯!

第一站:并发的那些事儿,不得不说的秘密

在进入并发包的奇妙世界之前,咱们先得聊聊并发这档子事。想象一下,你是一位餐厅老板,只有一个厨师。如果顾客络绎不绝,厨师就得忙得像只陀螺,顾客也得饿成嗷嗷待哺的小狼崽。这时候,你灵机一动,雇了几个帮厨,大家齐心协力,效率嗖嗖地就上去了!

这就是并发的魅力:让多个任务(线程)“同时”执行,提高程序的运行效率。但并发也不是万能的,它就像一把双刃剑,用得好能让你飞黄腾达,用不好也能让你栽个大跟头。

  • 并发的优点:

    • 提高响应速度: 就像餐厅多几个厨师,顾客点餐后不用等太久。
    • 提高资源利用率: CPU不再闲着没事干,而是忙着处理多个任务。
    • 程序结构更清晰: 将复杂任务分解成多个小任务,每个线程负责一部分。
  • 并发的缺点:

    • 线程安全问题: 多个线程访问共享资源时,可能会出现数据错乱。比如,一个线程刚要给银行账户加钱,另一个线程却把钱取走了,这可就麻烦大了!
    • 死锁: 线程互相等待对方释放资源,谁也不让谁,就像两辆车在狭窄的路上堵住了,谁也过不去。
    • 上下文切换: CPU在多个线程之间切换,也会消耗一定的资源。就像厨师一会儿炒菜,一会儿切菜,也需要时间来调整。

明白了并发的优缺点,咱们才能更好地利用并发包来解决问题,避免踩坑。

第二站:并发包总览,琳琅满目的宝藏

Java并发包就像一个工具箱,里面装满了各种各样的工具,可以帮助我们轻松地构建并发程序。咱们先来简单地浏览一下这些工具:

工具类型 作用 例子
Executor 线程池框架,管理线程的创建和销毁,避免频繁创建线程的开销。就像餐厅的总调度,负责安排厨师的工作。 ThreadPoolExecutor, ScheduledThreadPoolExecutor
Lock 锁机制,控制对共享资源的访问,保证线程安全。就像餐厅的厨房门,一次只能允许一个厨师进入。 ReentrantLock, ReadWriteLock
Atomic 原子类,提供原子操作,保证操作的完整性。就像餐厅的收款机,一次只能处理一个顾客的付款。 AtomicInteger, AtomicLong, AtomicReference
Concurrent Collections 并发集合类,提供线程安全的集合操作。就像餐厅的菜品清单,多个厨师可以同时查看,但不能同时修改。 ConcurrentHashMap, CopyOnWriteArrayList, BlockingQueue
Condition 条件变量,实现线程间的等待和通知。就像餐厅的服务员,顾客点餐后,服务员会通知厨师做菜,厨师做好后,再通知服务员上菜。 Condition (通常与 ReentrantLock 配合使用)
Phaser 灵活的同步栅栏,协调多个线程的执行。就像餐厅的乐队,所有乐器都准备好后,才能开始演奏。 Phaser
CountDownLatch 倒计时器,等待多个线程完成任务。就像餐厅的食材采购,所有食材都采购齐后,才能开始做菜。 CountDownLatch
CyclicBarrier 循环栅栏,等待多个线程到达同一个点。就像餐厅的开业典礼,所有人员都到齐后,才能剪彩。 CyclicBarrier
Semaphore 信号量,控制对资源的并发访问数量。就像餐厅的停车位,只有有限的几个,超过数量的车辆需要等待。 Semaphore
ForkJoinPool 分而治之的任务分解框架,将大任务分解成小任务并行执行。就像餐厅的流水线,将一道菜的制作分解成多个步骤,每个厨师负责一部分。 ForkJoinPool, RecursiveTask, RecursiveAction

怎么样,是不是感觉眼花缭乱?别担心,咱们接下来会逐一深入地了解这些工具,并结合实际例子,让你彻底掌握它们的使用方法。

第三站:线程池,让线程不再“野蛮生长”

线程池是并发包中最重要的工具之一,它可以有效地管理线程的生命周期,避免频繁创建和销毁线程的开销。

  • 为什么要使用线程池?

    • 降低资源消耗: 重复利用已创建的线程,减少线程创建和销毁的开销。
    • 提高响应速度: 当任务到达时,无需等待线程创建即可立即执行。
    • 提高线程的可管理性: 统一管理线程,可以更好地控制并发数量。
  • 线程池的核心参数:

    • corePoolSize: 核心线程数,线程池中始终保持的线程数量,即使它们是空闲的。
    • maximumPoolSize: 最大线程数,线程池中允许的最大线程数量。
    • keepAliveTime: 线程空闲时间,超过这个时间,空闲线程会被销毁,直到线程池中的线程数量等于 corePoolSize。
    • unit: keepAliveTime 的时间单位,例如 TimeUnit.SECONDS。
    • workQueue: 阻塞队列,用于存放等待执行的任务。
    • threadFactory: 线程工厂,用于创建新的线程。
    • handler: 拒绝策略,当任务队列已满,且线程池中的线程数量达到 maximumPoolSize 时,用于处理新提交的任务。
  • 常见的线程池类型:

    • FixedThreadPool: 固定大小的线程池,核心线程数等于最大线程数。
    • CachedThreadPool: 可缓存的线程池,线程数量不固定,可以根据需要动态创建和销毁线程。
    • SingleThreadExecutor: 单线程的线程池,只有一个线程执行任务。
    • ScheduledThreadPoolExecutor: 可调度的线程池,可以定时或延时执行任务。

举个栗子:

import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

public class ThreadPoolExample {
    public static void main(String[] args) {
        // 创建一个固定大小的线程池,包含 5 个线程
        ExecutorService executor = Executors.newFixedThreadPool(5);

        // 提交 10 个任务
        for (int i = 0; i < 10; i++) {
            final int taskNumber = i;
            executor.execute(() -> {
                System.out.println("Task " + taskNumber + " is running in thread: " + Thread.currentThread().getName());
                try {
                    Thread.sleep(1000); // 模拟任务执行时间
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            });
        }

        // 关闭线程池
        executor.shutdown();
    }
}

在这个例子中,我们创建了一个包含 5 个线程的固定大小的线程池。然后,我们提交了 10 个任务到线程池中。线程池会自动分配任务给空闲的线程执行。当所有任务都执行完毕后,我们关闭了线程池。

第四站:锁,守护共享资源的卫士

锁是并发编程中不可或缺的工具,它可以保证多个线程对共享资源的互斥访问,避免数据竞争和死锁等问题。

  • synchronized 关键字:

    synchronized 是 Java 中最基本的锁机制,它可以修饰方法或代码块,保证同一时刻只有一个线程可以执行被 synchronized 修饰的代码。

    public class Counter {
        private int count = 0;
    
        public synchronized void increment() {
            count++;
        }
    
        public int getCount() {
            return count;
        }
    }

    在这个例子中,increment() 方法被 synchronized 修饰,这意味着同一时刻只有一个线程可以执行 increment() 方法,从而保证 count 变量的线程安全。

  • ReentrantLock:

    ReentrantLock 是一个可重入的互斥锁,它提供了比 synchronized 关键字更灵活的锁机制。

    import java.util.concurrent.locks.Lock;
    import java.util.concurrent.locks.ReentrantLock;
    
    public class Counter {
        private int count = 0;
        private Lock lock = new ReentrantLock();
    
        public void increment() {
            lock.lock(); // 获取锁
            try {
                count++;
            } finally {
                lock.unlock(); // 释放锁
            }
        }
    
        public int getCount() {
            return count;
        }
    }

    在这个例子中,我们使用 ReentrantLock 来保护 count 变量的线程安全。需要注意的是,在使用 ReentrantLock 时,一定要在 finally 块中释放锁,以避免死锁。

  • ReadWriteLock:

    ReadWriteLock 是一种读写锁,它允许多个线程同时读取共享资源,但只允许一个线程写入共享资源。

    import java.util.concurrent.locks.ReadWriteLock;
    import java.util.concurrent.locks.ReentrantReadWriteLock;
    
    public class DataContainer {
        private String data = "Initial Data";
        private ReadWriteLock lock = new ReentrantReadWriteLock();
    
        public String readData() {
            lock.readLock().lock(); // 获取读锁
            try {
                System.out.println("Thread " + Thread.currentThread().getName() + " is reading data: " + data);
                return data;
            } finally {
                lock.readLock().unlock(); // 释放读锁
            }
        }
    
        public void writeData(String newData) {
            lock.writeLock().lock(); // 获取写锁
            try {
                System.out.println("Thread " + Thread.currentThread().getName() + " is writing data: " + newData);
                data = newData;
            } finally {
                lock.writeLock().unlock(); // 释放写锁
            }
        }
    }

    在这个例子中,我们使用 ReadWriteLock 来保护 data 变量的线程安全。多个线程可以同时读取 data 变量,但只有一个线程可以写入 data 变量。

第五站:原子类,让操作“一步到位”

原子类提供原子操作,保证操作的完整性,避免数据竞争。

  • AtomicInteger:

    AtomicInteger 提供原子性的整型操作,例如 get(), set(), incrementAndGet(), decrementAndGet() 等。

    import java.util.concurrent.atomic.AtomicInteger;
    
    public class Counter {
        private AtomicInteger count = new AtomicInteger(0);
    
        public int increment() {
            return count.incrementAndGet(); // 原子性递增
        }
    
        public int getCount() {
            return count.get();
        }
    }

    在这个例子中,我们使用 AtomicInteger 来保证 count 变量的原子性递增。

  • AtomicLong:

    AtomicLong 提供原子性的长整型操作,类似于 AtomicInteger

  • AtomicReference:

    AtomicReference 提供原子性的对象引用操作,可以用于实现无锁的数据结构。

第六站:并发集合,线程安全的容器

并发集合类提供线程安全的集合操作,避免手动加锁的麻烦。

  • ConcurrentHashMap:

    ConcurrentHashMap 是一个线程安全的哈希表,它允许多个线程同时读取和写入数据,而无需手动加锁。

  • CopyOnWriteArrayList:

    CopyOnWriteArrayList 是一个线程安全的列表,它在修改数据时,会复制一份新的列表,然后在新的列表上进行修改,最后将旧的列表替换为新的列表。这种方式可以保证读操作的线程安全,但写操作的性能较低。

  • BlockingQueue:

    BlockingQueue 是一个阻塞队列,它提供了线程安全的队列操作,例如 put(), take(), offer(), poll() 等。当队列为空时,take() 方法会阻塞线程,直到队列中有元素可用。当队列已满时,put() 方法会阻塞线程,直到队列中有空闲位置可用。

第七站:同步工具,协调线程的步调

同步工具可以帮助我们协调多个线程的执行顺序,实现复杂的并发逻辑。

  • CountDownLatch:

    CountDownLatch 是一个倒计时器,它可以让一个或多个线程等待其他线程完成任务。

    import java.util.concurrent.CountDownLatch;
    
    public class TaskRunner {
        public static void main(String[] args) throws InterruptedException {
            int numberOfTasks = 3;
            CountDownLatch latch = new CountDownLatch(numberOfTasks);
    
            for (int i = 0; i < numberOfTasks; i++) {
                new Thread(() -> {
                    System.out.println("Task started by " + Thread.currentThread().getName());
                    try {
                        Thread.sleep((long) (Math.random() * 2000)); // Simulate work
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                    System.out.println("Task completed by " + Thread.currentThread().getName());
                    latch.countDown(); // Decrement the latch
                }).start();
            }
    
            latch.await(); // Wait for all tasks to complete
            System.out.println("All tasks completed! Main thread continuing.");
        }
    }

    在这个例子中,我们创建了一个 CountDownLatch,初始值为 3。然后,我们启动了 3 个线程执行任务。每个线程完成任务后,都会调用 latch.countDown() 方法将 CountDownLatch 的值减 1。主线程调用 latch.await() 方法等待 CountDownLatch 的值变为 0,表示所有任务都已完成。

  • CyclicBarrier:

    CyclicBarrier 是一个循环栅栏,它可以让多个线程等待彼此到达同一个点,然后一起继续执行。

    import java.util.concurrent.CyclicBarrier;
    
    public class CyclicBarrierExample {
        public static void main(String[] args) {
            int numberOfThreads = 3;
            CyclicBarrier barrier = new CyclicBarrier(numberOfThreads, () -> {
                System.out.println("All threads have reached the barrier. Let's proceed together!");
            });
    
            for (int i = 0; i < numberOfThreads; i++) {
                new Thread(() -> {
                    try {
                        System.out.println(Thread.currentThread().getName() + " is doing some initial work.");
                        Thread.sleep((long) (Math.random() * 2000));
                        System.out.println(Thread.currentThread().getName() + " is waiting at the barrier.");
                        barrier.await(); // Wait for all threads to reach the barrier
                        System.out.println(Thread.currentThread().getName() + " continues after the barrier.");
                    } catch (Exception e) {
                        e.printStackTrace();
                    }
                }).start();
            }
        }
    }

    在这个例子中,我们创建了一个 CyclicBarrier,初始值为 3。然后,我们启动了 3 个线程执行任务。每个线程执行到 barrier.await() 方法时,会等待其他线程也到达 barrier.await() 方法。当所有线程都到达 barrier.await() 方法时,CyclicBarrier 会执行一个 Runnable 任务(在这个例子中是打印一条消息),然后所有线程一起继续执行。

  • Semaphore:

    Semaphore 是一个信号量,它可以控制对资源的并发访问数量。

    import java.util.concurrent.Semaphore;
    
    public class SemaphoreExample {
        public static void main(String[] args) {
            int numberOfPermits = 2; // Number of available permits
            Semaphore semaphore = new Semaphore(numberOfPermits);
    
            for (int i = 0; i < 5; i++) {
                new Thread(() -> {
                    try {
                        System.out.println(Thread.currentThread().getName() + " is requesting a permit.");
                        semaphore.acquire(); // Acquire a permit
                        System.out.println(Thread.currentThread().getName() + " has acquired a permit and is working.");
                        Thread.sleep((long) (Math.random() * 2000)); // Simulate work
                        System.out.println(Thread.currentThread().getName() + " is releasing the permit.");
                        semaphore.release(); // Release the permit
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }).start();
            }
        }
    }

    在这个例子中,我们创建了一个 Semaphore,初始值为 2。然后,我们启动了 5 个线程执行任务。每个线程在执行任务之前,都需要调用 semaphore.acquire() 方法获取一个许可。如果 Semaphore 的许可数量已经用完,线程会阻塞等待,直到有其他线程释放许可。线程完成任务后,会调用 semaphore.release() 方法释放许可。

  • Phaser:

    Phaser 是一个灵活的同步栅栏,它可以协调多个线程的执行,并且可以动态地调整参与同步的线程数量。

第八站:ForkJoinPool,分而治之的利器

ForkJoinPool 是一个分而治之的任务分解框架,它可以将大任务分解成小任务并行执行,提高程序的运行效率。

  • RecursiveTask:

    RecursiveTask 是一个可以递归分解的任务,它有返回值。

  • RecursiveAction:

    RecursiveAction 是一个可以递归分解的任务,它没有返回值。

总结:并发世界,任你驰骋

经过这次“并发世界历险记”,相信各位观众老爷们对 Java 并发包已经有了更深入的了解。并发编程虽然复杂,但只要掌握了并发包中的各种工具,并结合实际例子进行练习,就能轻松地构建高效、稳定的并发程序。

记住,并发编程就像烹饪,需要耐心、细心和不断地尝试。希望各位观众老爷们都能成为并发世界的大厨,烹饪出美味的并发佳肴!

最后,送大家一句并发箴言:谨慎使用锁,拥抱原子操作,善用并发集合,巧用同步工具,驾驭 ForkJoinPool!

咱们下次再见! 🚗💨

发表回复

您的邮箱地址不会被公开。 必填项已用 * 标注