JAVA线程池Shutdown导致任务丢失问题的场景复现与解决

JAVA线程池Shutdown导致任务丢失问题的场景复现与解决

各位同学们,大家好!今天我们来聊聊一个在并发编程中经常遇到的问题:Java线程池 shutdown 导致任务丢失。这个问题听起来很简单,但实际应用中却很容易被忽略,导致一些难以排查的Bug。

一、线程池的基本概念回顾

首先,我们简单回顾一下线程池的概念。线程池是一种池化技术,用于管理和复用线程,从而降低线程创建和销毁的开销,提高系统的性能。Java 提供了 java.util.concurrent.ExecutorService 接口和 java.util.concurrent.ThreadPoolExecutor 类来实现线程池。

线程池的主要参数包括:

参数名称 含义
corePoolSize 核心线程数:线程池中保持存活的线程数量,即使它们是空闲的。
maximumPoolSize 最大线程数:线程池中允许存在的最大线程数量。
keepAliveTime 空闲线程存活时间:当线程池中的线程数量超过 corePoolSize 时,多余的空闲线程在keepAliveTime时间内没有新的任务提交,则会被终止。
unit keepAliveTime 的时间单位。
workQueue 任务队列:用于存放等待执行的任务。常见的任务队列有:ArrayBlockingQueue, LinkedBlockingQueue, PriorityBlockingQueue, SynchronousQueue
threadFactory 线程工厂:用于创建新的线程。
rejectedExecutionHandler 拒绝策略:当任务队列已满且线程池中的线程数量达到 maximumPoolSize 时,新的任务将被拒绝。常见的拒绝策略有:AbortPolicy, CallerRunsPolicy, DiscardPolicy, DiscardOldestPolicy

一个简单的线程池创建示例如下:

import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;

public class ThreadPoolExample {

    public static void main(String[] args) throws InterruptedException {
        // 创建一个固定大小的线程池,包含5个线程
        ExecutorService executor = Executors.newFixedThreadPool(5);

        // 提交10个任务
        for (int i = 0; i < 10; i++) {
            final int taskNumber = i;
            executor.submit(() -> {
                System.out.println("Task " + taskNumber + " started by " + Thread.currentThread().getName());
                try {
                    Thread.sleep(1000); // 模拟任务执行时间
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                System.out.println("Task " + taskNumber + " finished by " + Thread.currentThread().getName());
            });
        }

        // 关闭线程池
        executor.shutdown();
        executor.awaitTermination(60, TimeUnit.SECONDS);

        System.out.println("All tasks finished");
    }
}

二、shutdown()shutdownNow() 的区别

在关闭线程池时,我们通常会使用 shutdown()shutdownNow() 方法。这两个方法的功能有所不同,这也是导致任务丢失问题的关键所在。

  • shutdown(): 启动线程池的优雅关闭过程。它会停止接受新的任务提交,但会等待所有已提交的任务(包括正在执行的和在队列中等待的)执行完成。
  • shutdownNow(): 尝试停止所有正在执行的任务,暂停处理在队列中等待的任务,并返回等待执行的任务列表。

三、任务丢失场景复现:shutdownNow() 的使用

shutdownNow() 的激进关闭方式是导致任务丢失的主要原因。如果我们在任务还没有完成之前调用 shutdownNow(),那么队列中的任务就会被丢弃,正在执行的任务也可能被中断。

以下代码演示了使用 shutdownNow() 导致任务丢失的场景:

import java.util.List;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;

public class ShutdownNowExample {

    public static void main(String[] args) throws InterruptedException {
        ExecutorService executor = Executors.newFixedThreadPool(2);

        // 提交5个耗时任务
        for (int i = 0; i < 5; i++) {
            final int taskNumber = i;
            executor.submit(() -> {
                System.out.println("Task " + taskNumber + " started by " + Thread.currentThread().getName());
                try {
                    Thread.sleep(3000); // 模拟耗时任务
                } catch (InterruptedException e) {
                    System.out.println("Task " + taskNumber + " interrupted by " + Thread.currentThread().getName());
                    Thread.currentThread().interrupt();  //重要:重新设置中断标志
                }
                System.out.println("Task " + taskNumber + " finished by " + Thread.currentThread().getName());
            });
        }

        Thread.sleep(1000); // 等待一部分任务开始执行

        // 粗暴关闭线程池
        List<Runnable> droppedTasks = executor.shutdownNow();

        System.out.println("Shutdown initiated. Dropped tasks: " + droppedTasks.size());

        for (Runnable task : droppedTasks) {
            System.out.println("Dropped task: " + task);
        }

        executor.awaitTermination(60, TimeUnit.SECONDS);

        System.out.println("All tasks finished (or cancelled)");
    }
}

在这个例子中,我们提交了5个需要3秒才能完成的任务。在任务执行1秒后,我们调用了 shutdownNow()。结果是,部分任务被中断,队列中的任务被丢弃。shutdownNow() 返回了一个 List<Runnable>,其中包含了被丢弃的任务。

运行结果分析:

你会发现,并非所有任务都能完成,并且 droppedTasks 列表中包含了没有开始执行的任务。正在执行的任务也可能因为 InterruptedException 而提前结束。 注意看输出,被打断的任务会输出 "interrupted by…"。

关键点:

  • shutdownNow() 会尝试中断正在执行的任务,因此任务内部必须妥善处理 InterruptedException,否则可能导致数据不一致或其他问题。
  • shutdownNow() 返回的 List<Runnable> 仅仅是被丢弃的任务,不包括被中断的任务。

四、任务丢失场景复现:未正确处理中断

即使使用了 shutdown(),如果任务内部没有正确处理 InterruptedException,也可能导致任务提前结束,数据丢失。

import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;

public class InterruptedExceptionExample {

    public static void main(String[] args) throws InterruptedException {
        ExecutorService executor = Executors.newFixedThreadPool(1);

        executor.submit(() -> {
            System.out.println("Task started by " + Thread.currentThread().getName());
            try {
                for (int i = 0; i < 10; i++) {
                    System.out.println("Processing step " + i);
                    Thread.sleep(500);
                }
                System.out.println("Task completed successfully");
            } catch (InterruptedException e) {
                System.out.println("Task interrupted.  Some steps may not have completed.");
                //e.printStackTrace(); // 错误的做法:直接打印异常
            }
        });

        Thread.sleep(1000);
        executor.shutdownNow(); // 使用 shutdownNow 来模拟中断

        executor.awaitTermination(60, TimeUnit.SECONDS);
        System.out.println("Executor finished");
    }
}

在这个例子中,如果任务在循环执行过程中被中断,InterruptedException 会被捕获,但是程序只是简单地打印了异常信息,而没有重新设置中断标志。这会导致线程认为中断已经处理完毕,后续的循环不会再检查中断状态,从而导致任务提前结束。

正确的做法是在捕获 InterruptedException 之后,重新设置中断标志:

} catch (InterruptedException e) {
    System.out.println("Task interrupted.  Some steps may not have completed.");
    Thread.currentThread().interrupt(); // 重新设置中断标志
}

通过重新设置中断标志,线程可以继续响应中断,并及时退出任务。

五、任务丢失场景复现:线程池提前关闭

还有一种情况可能导致任务丢失,那就是线程池在任务提交完成之前就被关闭了。这种情况通常发生在异步任务提交时,主线程在提交完任务后就直接退出了,而没有等待任务执行完成。

import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;

public class EarlyShutdownExample {

    public static void main(String[] args) throws InterruptedException {
        ExecutorService executor = Executors.newFixedThreadPool(2);

        // 提交任务
        for (int i = 0; i < 5; i++) {
            final int taskNumber = i;
            executor.submit(() -> {
                System.out.println("Task " + taskNumber + " started by " + Thread.currentThread().getName());
                try {
                    Thread.sleep(1000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                System.out.println("Task " + taskNumber + " finished by " + Thread.currentThread().getName());
            });
        }

        //  错误的做法:立即关闭线程池
        executor.shutdown();
        //Thread.sleep(500); // 等待一段时间,但仍然可能不够

        // 正确的做法:等待所有任务完成
        executor.awaitTermination(10, TimeUnit.SECONDS);
        System.out.println("All tasks finished");
    }
}

在这个例子中,主线程在提交完任务后立即调用了 executor.shutdown()。由于任务需要一定的时间才能执行完成,因此一部分任务可能还没有开始执行就被线程池关闭了。

正确的做法是使用 awaitTermination() 方法等待所有任务执行完成:

executor.shutdown();
executor.awaitTermination(10, TimeUnit.SECONDS); // 等待所有任务完成

awaitTermination() 方法会阻塞当前线程,直到所有任务执行完成,或者超时时间到达。

六、如何避免任务丢失

为了避免任务丢失,我们可以采取以下措施:

  1. 使用 shutdown() 进行优雅关闭: 尽可能使用 shutdown() 方法,而不是 shutdownNow()shutdown() 允许线程池完成所有已提交的任务,从而避免任务丢失。

  2. 正确处理 InterruptedException: 在任务内部,要正确处理 InterruptedException,并在捕获异常后重新设置中断标志。

  3. 使用 awaitTermination() 等待任务完成: 在关闭线程池之前,使用 awaitTermination() 方法等待所有任务执行完成。

  4. 监控任务执行状态: 可以使用 Future 对象来监控任务的执行状态,并在必要时取消任务。

  5. 合理的线程池配置: 根据实际需求,合理配置线程池的参数,例如核心线程数、最大线程数、任务队列等。

  6. 使用 try-finally 确保资源释放: 确保在任务执行完成后,释放所有资源,例如数据库连接、文件句柄等。即使任务被中断,也要保证资源能够被正确释放。

  7. 使用有界队列: 使用 ArrayBlockingQueue 或其他有界队列可以防止任务无限制地堆积,避免内存溢出。 当队列满时,可以使用合适的 RejectedExecutionHandler 来处理被拒绝的任务。

  8. 日志记录: 记录任务的开始、结束和中断信息,方便排查问题。

七、示例:改进后的代码

下面是一个改进后的代码示例,它使用了 shutdown() 进行优雅关闭,并正确处理了 InterruptedException

import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;

public class SafeShutdownExample {

    public static void main(String[] args) throws InterruptedException {
        ExecutorService executor = Executors.newFixedThreadPool(2);

        for (int i = 0; i < 5; i++) {
            final int taskNumber = i;
            executor.submit(() -> {
                System.out.println("Task " + taskNumber + " started by " + Thread.currentThread().getName());
                try {
                    for (int j = 0; j < 5; j++) {
                        System.out.println("Task " + taskNumber + " processing step " + j);
                        Thread.sleep(500);
                    }
                    System.out.println("Task " + taskNumber + " completed successfully");
                } catch (InterruptedException e) {
                    System.out.println("Task " + taskNumber + " interrupted");
                    Thread.currentThread().interrupt(); // 重新设置中断标志
                }
            });
        }

        executor.shutdown();
        executor.awaitTermination(60, TimeUnit.SECONDS);

        System.out.println("All tasks finished");
    }
}

八、关于 RejectedExecutionHandler 的补充说明

当线程池的任务队列已满,且线程池中的线程数量达到最大值时,新的任务将被拒绝。RejectedExecutionHandler 接口定义了处理被拒绝任务的策略。Java 提供了以下几种默认的拒绝策略:

  • AbortPolicy (默认): 抛出 RejectedExecutionException 异常。
  • CallerRunsPolicy: 由提交任务的线程直接执行该任务。
  • DiscardPolicy: 默默地丢弃该任务。
  • DiscardOldestPolicy: 丢弃队列中最老的未处理任务,然后尝试重新提交该任务。

你也可以自定义 RejectedExecutionHandler 来实现自己的拒绝策略。

九、总结:谨慎关闭,正确处理中断,耐心等待

线程池的 shutdown 操作需要谨慎处理,shutdownNow() 容易导致任务丢失,应尽量避免使用。 确保任务内部正确处理 InterruptedException 并重新设置中断标志,使用 awaitTermination() 方法耐心等待所有任务完成,方能保证任务不丢失。

希望今天的讲座对大家有所帮助!谢谢!

发表回复

您的邮箱地址不会被公开。 必填项已用 * 标注