JAVA 线程池拒绝策略该如何选?详解四种策略的实际应用与区别

好的,接下来我将以讲座的形式,详细讲解 Java 线程池的四种拒绝策略,并结合实际应用进行分析。

线程池拒绝策略:保障服务稳定性的最后一道防线

大家好,今天我们要探讨的是 Java 线程池中的拒绝策略。线程池作为并发编程中重要的工具,可以有效地管理线程资源,提高系统性能。然而,当线程池的任务队列已满,且所有线程都在忙碌时,新提交的任务就会面临被拒绝的风险。这时,拒绝策略就发挥作用了,它定义了线程池如何处理这些无法立即执行的任务,是保障系统稳定性的最后一道防线。

Java 提供了四种内置的拒绝策略,每种策略都有其特定的应用场景和优缺点。理解这些策略,并根据实际需求选择合适的策略,对于构建健壮的并发应用至关重要。

四种内置拒绝策略:深入解析

Java 线程池提供了 RejectedExecutionHandler 接口,允许我们自定义拒绝策略。但是,大多数情况下,我们可以直接使用 Java 内置的四种实现,它们分别是:

  1. AbortPolicy: 抛出 RejectedExecutionException 异常。
  2. CallerRunsPolicy: 由提交任务的线程执行该任务。
  3. DiscardPolicy: 直接丢弃该任务,不做任何处理。
  4. DiscardOldestPolicy: 丢弃队列中最老的任务,然后尝试将新任务加入队列。

接下来,我们将逐一深入解析这些策略,并结合代码示例说明其应用场景。

1. AbortPolicy (中止策略): 直接抛出异常

AbortPolicy 是线程池默认的拒绝策略。当线程池无法处理新任务时,它会直接抛出一个 RejectedExecutionException 异常,通知调用者任务被拒绝。

工作原理:

  • execute() 方法尝试提交任务到线程池时,如果线程池已满(任务队列满且所有线程都在忙碌),AbortPolicy 会立即创建一个 RejectedExecutionException 实例。
  • 然后,它会将该异常抛给调用者,让调用者知道任务提交失败。

代码示例:

import java.util.concurrent.*;

public class AbortPolicyExample {

    public static void main(String[] args) {
        // 创建一个固定大小的线程池,核心线程数和最大线程数都为 2
        ThreadPoolExecutor executor = new ThreadPoolExecutor(
                2, // corePoolSize
                2, // maximumPoolSize
                0L, // keepAliveTime
                TimeUnit.MILLISECONDS, // unit
                new LinkedBlockingQueue<>(1), // workQueue, 容量为 1
                new AbortPolicy() // handler
        );

        try {
            // 提交 3 个任务
            executor.execute(new Task("Task 1"));
            executor.execute(new Task("Task 2"));
            executor.execute(new Task("Task 3")); // 队列已满,线程都在忙碌,将会被拒绝
        } catch (RejectedExecutionException e) {
            System.err.println("Task rejected: " + e.getMessage());
        } finally {
            executor.shutdown();
        }
    }

    static class Task implements Runnable {
        private String name;

        public Task(String name) {
            this.name = name;
        }

        @Override
        public void run() {
            try {
                System.out.println(Thread.currentThread().getName() + " executing " + name);
                Thread.sleep(1000); // 模拟任务执行时间
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }
}

运行结果:

pool-1-thread-1 executing Task 1
pool-1-thread-2 executing Task 2
Task rejected: java.util.concurrent.RejectedExecutionException: Task AbortPolicyExample$Task@24d46ca6 rejected from java.util.concurrent.ThreadPoolExecutor@4517d9a3[Running, pool size = 2, active threads = 2, queued tasks = 1, completed tasks = 0]

应用场景:

  • 对任务丢失零容忍的场景: AbortPolicy 适用于那些对任务丢失零容忍的场景。当任务被拒绝时,调用者可以立即感知到,并采取相应的补救措施,例如重新提交任务、记录日志或者通知管理员。
  • 需要快速失败的场景: 在某些情况下,我们希望系统能够快速失败,而不是继续尝试执行可能会失败的任务。AbortPolicy 可以帮助我们实现这一点。

优点:

  • 能够立即通知调用者任务被拒绝。
  • 方便进行错误处理和日志记录。

缺点:

  • 可能导致系统不稳定,因为未处理的 RejectedExecutionException 可能导致程序崩溃。
  • 对调用者有一定的侵入性,调用者需要捕获并处理 RejectedExecutionException 异常。

2. CallerRunsPolicy (调用者运行策略): 借用调用者线程执行

CallerRunsPolicy 是一种比较温和的拒绝策略。当线程池无法处理新任务时,它不会抛出异常,而是将任务交给提交任务的线程来执行。

工作原理:

  • execute() 方法尝试提交任务到线程池时,如果线程池已满,CallerRunsPolicy 会判断当前调用 execute() 方法的线程是否处于线程池中。
  • 如果调用线程不是线程池中的线程,那么它会直接使用该线程来执行被拒绝的任务。
  • 如果调用线程是线程池中的线程, 那么这个任务会直接被丢弃(不会抛出异常,也不会执行该任务), 避免死锁.

代码示例:

import java.util.concurrent.*;

public class CallerRunsPolicyExample {

    public static void main(String[] args) {
        ThreadPoolExecutor executor = new ThreadPoolExecutor(
                2,
                2,
                0L,
                TimeUnit.MILLISECONDS,
                new LinkedBlockingQueue<>(1),
                new CallerRunsPolicy()
        );

        try {
            for (int i = 0; i < 5; i++) {
                executor.execute(new Task("Task " + i));
            }
        } finally {
            executor.shutdown();
        }
    }

    static class Task implements Runnable {
        private String name;

        public Task(String name) {
            this.name = name;
        }

        @Override
        public void run() {
            try {
                System.out.println(Thread.currentThread().getName() + " executing " + name);
                Thread.sleep(1000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }
}

运行结果 (近似结果,线程执行顺序可能不同):

pool-1-thread-1 executing Task 0
pool-1-thread-2 executing Task 1
pool-1-thread-1 executing Task 2
main executing Task 3
main executing Task 4

应用场景:

  • 不希望丢失任务,但允许延迟执行的场景: CallerRunsPolicy 适用于那些不希望丢失任务,但允许任务延迟执行的场景。通过将任务交给调用者线程执行,可以保证任务最终会被执行,但可能会影响调用者的性能。
  • 流量削峰的场景: 当系统负载过高时,CallerRunsPolicy 可以将一部分任务交给调用者线程执行,从而降低线程池的压力,起到流量削峰的作用。

优点:

  • 不会丢弃任务,保证任务最终会被执行。
  • 可以起到流量削峰的作用。

缺点:

  • 可能会影响调用者线程的性能,导致调用者线程响应变慢。
  • 不适合对响应时间要求非常高的场景。

3. DiscardPolicy (丢弃策略): 无声丢弃任务

DiscardPolicy 是一种最简单的拒绝策略。当线程池无法处理新任务时,它会直接丢弃该任务,不做任何处理,也不会抛出异常。

工作原理:

  • execute() 方法尝试提交任务到线程池时,如果线程池已满,DiscardPolicy 会直接忽略该任务,不会将其加入任务队列,也不会通知调用者。

代码示例:

import java.util.concurrent.*;

public class DiscardPolicyExample {

    public static void main(String[] args) {
        ThreadPoolExecutor executor = new ThreadPoolExecutor(
                2,
                2,
                0L,
                TimeUnit.MILLISECONDS,
                new LinkedBlockingQueue<>(1),
                new DiscardPolicy()
        );

        try {
            for (int i = 0; i < 5; i++) {
                executor.execute(new Task("Task " + i));
            }
        } finally {
            executor.shutdown();
        }
    }

    static class Task implements Runnable {
        private String name;

        public Task(String name) {
            this.name = name;
        }

        @Override
        public void run() {
            try {
                System.out.println(Thread.currentThread().getName() + " executing " + name);
                Thread.sleep(1000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }
}

运行结果 (近似结果,线程执行顺序可能不同):

pool-1-thread-1 executing Task 0
pool-1-thread-2 executing Task 1
pool-1-thread-1 executing Task 2

应用场景:

  • 可以容忍任务丢失的场景: DiscardPolicy 适用于那些可以容忍任务丢失的场景。例如,某些日志记录任务,即使丢失一些日志,也不会对系统造成严重影响。
  • 不重要任务的场景: 对于一些不重要的任务,例如某些统计任务,即使被丢弃,也不会对业务造成影响。

优点:

  • 实现简单,不会对系统造成额外的负担。
  • 不会抛出异常,避免影响主流程。

缺点:

  • 任务会被直接丢弃,可能会导致数据丢失或功能不完整。
  • 调用者无法感知任务被拒绝,难以进行错误处理。

4. DiscardOldestPolicy (丢弃最旧策略): 牺牲旧任务,保证新任务

DiscardOldestPolicy 是一种相对公平的拒绝策略。当线程池无法处理新任务时,它会丢弃任务队列中最老的任务,然后尝试将新任务加入队列。

工作原理:

  • execute() 方法尝试提交任务到线程池时,如果线程池已满,DiscardOldestPolicy 会从任务队列的头部(最老的任务)移除一个任务。
  • 然后,它会尝试将新任务加入任务队列。如果加入成功,则任务被接受;如果加入失败(例如,队列已满),则该任务也会被丢弃。

代码示例:

import java.util.concurrent.*;

public class DiscardOldestPolicyExample {

    public static void main(String[] args) throws InterruptedException {
        ThreadPoolExecutor executor = new ThreadPoolExecutor(
                2,
                2,
                0L,
                TimeUnit.MILLISECONDS,
                new LinkedBlockingQueue<>(1),
                new DiscardOldestPolicy()
        );

        try {
            // 先提交三个任务,让队列满
            executor.execute(new Task("Task 0"));
            executor.execute(new Task("Task 1"));
            Thread.sleep(50); // 确保 Task 0 和 Task 1 已经进入队列

            // 再提交两个任务,触发拒绝策略
            executor.execute(new Task("Task 2")); // Task 0 被丢弃,Task 2 加入队列
            executor.execute(new Task("Task 3")); // Task 1 被丢弃,Task 3 加入队列

            Thread.sleep(2000); // 等待任务执行完成

            // 再次提交,因为队列被之前的任务填满, 则Task 4 会被丢弃
            executor.execute(new Task("Task 4"));
            Thread.sleep(2000);

        } finally {
            executor.shutdown();
        }
    }

    static class Task implements Runnable {
        private String name;

        public Task(String name) {
            this.name = name;
        }

        @Override
        public void run() {
            try {
                System.out.println(Thread.currentThread().getName() + " executing " + name);
                Thread.sleep(1000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }
}

运行结果 (近似结果,线程执行顺序可能不同):

pool-1-thread-1 executing Task 0
pool-1-thread-2 executing Task 1
pool-1-thread-1 executing Task 2
pool-1-thread-2 executing Task 3
pool-1-thread-1 executing Task 2
pool-1-thread-2 executing Task 3

应用场景:

  • 希望优先处理新任务的场景: DiscardOldestPolicy 适用于那些希望优先处理新任务,而可以容忍丢弃旧任务的场景。例如,在缓存系统中,我们可能更关心最新的数据,而对过期的数据不太关心。
  • 任务具有时效性的场景: 对于一些具有时效性的任务,例如某些实时数据分析任务,旧的数据可能已经失去价值,可以被丢弃。

优点:

  • 可以保证任务队列中始终包含最新的任务。
  • 相对公平,避免某些任务一直被阻塞。

缺点:

  • 可能会导致旧任务被丢弃,丢失部分数据。
  • 需要小心处理任务之间的依赖关系,避免丢弃某些任务导致其他任务无法执行。

拒绝策略的选择:综合考量

选择合适的拒绝策略需要综合考虑以下因素:

  • 任务的重要性: 如果任务非常重要,不能丢失,那么应该避免使用 DiscardPolicyDiscardOldestPolicy
  • 系统的稳定性: 如果系统对稳定性要求很高,应该避免使用 AbortPolicy,因为未处理的 RejectedExecutionException 可能导致系统崩溃。
  • 性能要求: 如果系统对响应时间要求很高,应该避免使用 CallerRunsPolicy,因为它可能会影响调用者线程的性能。
  • 任务的时效性: 如果任务具有时效性,可以考虑使用 DiscardOldestPolicy,优先处理新任务。
  • 可接受的任务丢失率: 需要评估业务上能够容忍的任务丢失率,如果可以容忍,可以选择 DiscardPolicy或者 DiscardOldestPolicy

下面是一个表格,总结了四种拒绝策略的特点:

拒绝策略 行为 优点 缺点 适用场景
AbortPolicy 抛出 RejectedExecutionException 异常 能够立即通知调用者任务被拒绝,方便进行错误处理和日志记录。 可能导致系统不稳定,对调用者有一定的侵入性。 对任务丢失零容忍的场景,需要快速失败的场景。
CallerRunsPolicy 由提交任务的线程执行该任务 不会丢弃任务,可以起到流量削峰的作用。 可能会影响调用者线程的性能,不适合对响应时间要求非常高的场景。 不希望丢失任务,但允许延迟执行的场景,流量削峰的场景。
DiscardPolicy 直接丢弃该任务,不做任何处理 实现简单,不会对系统造成额外的负担,不会抛出异常。 任务会被直接丢弃,调用者无法感知任务被拒绝。 可以容忍任务丢失的场景,不重要任务的场景。
DiscardOldestPolicy 丢弃队列中最老的任务,然后尝试将新任务加入队列 可以保证任务队列中始终包含最新的任务,相对公平。 可能会导致旧任务被丢弃,需要小心处理任务之间的依赖关系。 希望优先处理新任务的场景,任务具有时效性的场景。

自定义拒绝策略:灵活应对复杂场景

除了使用 Java 内置的拒绝策略之外,我们还可以通过实现 RejectedExecutionHandler 接口来创建自定义的拒绝策略。这可以让我们更加灵活地应对复杂的场景。

代码示例:

import java.util.concurrent.*;

public class CustomRejectedExecutionHandler implements RejectedExecutionHandler {

    @Override
    public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
        System.err.println("Task " + r.toString() +
                " rejected from " +
                executor.toString());
        // 在这里可以进行自定义的处理,例如:
        // 1. 将任务持久化到数据库
        // 2. 将任务发送到消息队列
        // 3. 发送告警邮件或短信
    }
}

// 使用自定义拒绝策略
public class CustomRejectedExecutionExample {

    public static void main(String[] args) {
        ThreadPoolExecutor executor = new ThreadPoolExecutor(
                2,
                2,
                0L,
                TimeUnit.MILLISECONDS,
                new LinkedBlockingQueue<>(1),
                new CustomRejectedExecutionHandler()
        );

        try {
            for (int i = 0; i < 5; i++) {
                executor.execute(new Task("Task " + i));
            }
        } finally {
            executor.shutdown();
        }
    }

    static class Task implements Runnable {
        private String name;

        public Task(String name) {
            this.name = name;
        }

        @Override
        public void run() {
            try {
                System.out.println(Thread.currentThread().getName() + " executing " + name);
                Thread.sleep(1000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }
}

在自定义的 rejectedExecution() 方法中,我们可以根据实际需求进行各种处理,例如:

  • 将任务持久化到数据库: 如果任务非常重要,不能丢失,可以将任务信息保存到数据库,以便后续重新执行。
  • 将任务发送到消息队列: 可以将任务发送到消息队列,由其他消费者来处理。
  • 发送告警邮件或短信: 可以发送告警邮件或短信,通知管理员系统负载过高。
  • 记录日志: 可以记录详细的日志信息,方便排查问题。

总结与思考

今天我们详细讲解了 Java 线程池的四种内置拒绝策略,并探讨了如何根据实际场景选择合适的策略,以及如何自定义拒绝策略。选择合适的拒绝策略是构建健壮的并发应用的关键一步。

核心要点回顾: 线程池的拒绝策略是保障系统稳定性的最后一道防线,需要根据任务的重要性、系统的稳定性、性能要求和任务的时效性等因素综合考量,选择合适的策略。

持续学习与实践: 理解并灵活运用这些拒绝策略,能够帮助我们更好地管理线程资源,提高系统性能,并保障系统的稳定性。

发表回复

您的邮箱地址不会被公开。 必填项已用 * 标注