好的,接下来我将以讲座的形式,详细讲解 Java 线程池的四种拒绝策略,并结合实际应用进行分析。
线程池拒绝策略:保障服务稳定性的最后一道防线
大家好,今天我们要探讨的是 Java 线程池中的拒绝策略。线程池作为并发编程中重要的工具,可以有效地管理线程资源,提高系统性能。然而,当线程池的任务队列已满,且所有线程都在忙碌时,新提交的任务就会面临被拒绝的风险。这时,拒绝策略就发挥作用了,它定义了线程池如何处理这些无法立即执行的任务,是保障系统稳定性的最后一道防线。
Java 提供了四种内置的拒绝策略,每种策略都有其特定的应用场景和优缺点。理解这些策略,并根据实际需求选择合适的策略,对于构建健壮的并发应用至关重要。
四种内置拒绝策略:深入解析
Java 线程池提供了 RejectedExecutionHandler 接口,允许我们自定义拒绝策略。但是,大多数情况下,我们可以直接使用 Java 内置的四种实现,它们分别是:
AbortPolicy: 抛出RejectedExecutionException异常。CallerRunsPolicy: 由提交任务的线程执行该任务。DiscardPolicy: 直接丢弃该任务,不做任何处理。DiscardOldestPolicy: 丢弃队列中最老的任务,然后尝试将新任务加入队列。
接下来,我们将逐一深入解析这些策略,并结合代码示例说明其应用场景。
1. AbortPolicy (中止策略): 直接抛出异常
AbortPolicy 是线程池默认的拒绝策略。当线程池无法处理新任务时,它会直接抛出一个 RejectedExecutionException 异常,通知调用者任务被拒绝。
工作原理:
- 当 
execute()方法尝试提交任务到线程池时,如果线程池已满(任务队列满且所有线程都在忙碌),AbortPolicy会立即创建一个RejectedExecutionException实例。 - 然后,它会将该异常抛给调用者,让调用者知道任务提交失败。
 
代码示例:
import java.util.concurrent.*;
public class AbortPolicyExample {
    public static void main(String[] args) {
        // 创建一个固定大小的线程池,核心线程数和最大线程数都为 2
        ThreadPoolExecutor executor = new ThreadPoolExecutor(
                2, // corePoolSize
                2, // maximumPoolSize
                0L, // keepAliveTime
                TimeUnit.MILLISECONDS, // unit
                new LinkedBlockingQueue<>(1), // workQueue, 容量为 1
                new AbortPolicy() // handler
        );
        try {
            // 提交 3 个任务
            executor.execute(new Task("Task 1"));
            executor.execute(new Task("Task 2"));
            executor.execute(new Task("Task 3")); // 队列已满,线程都在忙碌,将会被拒绝
        } catch (RejectedExecutionException e) {
            System.err.println("Task rejected: " + e.getMessage());
        } finally {
            executor.shutdown();
        }
    }
    static class Task implements Runnable {
        private String name;
        public Task(String name) {
            this.name = name;
        }
        @Override
        public void run() {
            try {
                System.out.println(Thread.currentThread().getName() + " executing " + name);
                Thread.sleep(1000); // 模拟任务执行时间
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }
}
运行结果:
pool-1-thread-1 executing Task 1
pool-1-thread-2 executing Task 2
Task rejected: java.util.concurrent.RejectedExecutionException: Task AbortPolicyExample$Task@24d46ca6 rejected from java.util.concurrent.ThreadPoolExecutor@4517d9a3[Running, pool size = 2, active threads = 2, queued tasks = 1, completed tasks = 0]
应用场景:
- 对任务丢失零容忍的场景:  
AbortPolicy适用于那些对任务丢失零容忍的场景。当任务被拒绝时,调用者可以立即感知到,并采取相应的补救措施,例如重新提交任务、记录日志或者通知管理员。 - 需要快速失败的场景:  在某些情况下,我们希望系统能够快速失败,而不是继续尝试执行可能会失败的任务。
AbortPolicy可以帮助我们实现这一点。 
优点:
- 能够立即通知调用者任务被拒绝。
 - 方便进行错误处理和日志记录。
 
缺点:
- 可能导致系统不稳定,因为未处理的 
RejectedExecutionException可能导致程序崩溃。 - 对调用者有一定的侵入性,调用者需要捕获并处理 
RejectedExecutionException异常。 
2. CallerRunsPolicy (调用者运行策略): 借用调用者线程执行
CallerRunsPolicy 是一种比较温和的拒绝策略。当线程池无法处理新任务时,它不会抛出异常,而是将任务交给提交任务的线程来执行。
工作原理:
- 当 
execute()方法尝试提交任务到线程池时,如果线程池已满,CallerRunsPolicy会判断当前调用execute()方法的线程是否处于线程池中。 - 如果调用线程不是线程池中的线程,那么它会直接使用该线程来执行被拒绝的任务。
 - 如果调用线程是线程池中的线程, 那么这个任务会直接被丢弃(不会抛出异常,也不会执行该任务), 避免死锁.
 
代码示例:
import java.util.concurrent.*;
public class CallerRunsPolicyExample {
    public static void main(String[] args) {
        ThreadPoolExecutor executor = new ThreadPoolExecutor(
                2,
                2,
                0L,
                TimeUnit.MILLISECONDS,
                new LinkedBlockingQueue<>(1),
                new CallerRunsPolicy()
        );
        try {
            for (int i = 0; i < 5; i++) {
                executor.execute(new Task("Task " + i));
            }
        } finally {
            executor.shutdown();
        }
    }
    static class Task implements Runnable {
        private String name;
        public Task(String name) {
            this.name = name;
        }
        @Override
        public void run() {
            try {
                System.out.println(Thread.currentThread().getName() + " executing " + name);
                Thread.sleep(1000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }
}
运行结果 (近似结果,线程执行顺序可能不同):
pool-1-thread-1 executing Task 0
pool-1-thread-2 executing Task 1
pool-1-thread-1 executing Task 2
main executing Task 3
main executing Task 4
应用场景:
- 不希望丢失任务,但允许延迟执行的场景:  
CallerRunsPolicy适用于那些不希望丢失任务,但允许任务延迟执行的场景。通过将任务交给调用者线程执行,可以保证任务最终会被执行,但可能会影响调用者的性能。 - 流量削峰的场景:  当系统负载过高时,
CallerRunsPolicy可以将一部分任务交给调用者线程执行,从而降低线程池的压力,起到流量削峰的作用。 
优点:
- 不会丢弃任务,保证任务最终会被执行。
 - 可以起到流量削峰的作用。
 
缺点:
- 可能会影响调用者线程的性能,导致调用者线程响应变慢。
 - 不适合对响应时间要求非常高的场景。
 
3. DiscardPolicy (丢弃策略): 无声丢弃任务
DiscardPolicy 是一种最简单的拒绝策略。当线程池无法处理新任务时,它会直接丢弃该任务,不做任何处理,也不会抛出异常。
工作原理:
- 当 
execute()方法尝试提交任务到线程池时,如果线程池已满,DiscardPolicy会直接忽略该任务,不会将其加入任务队列,也不会通知调用者。 
代码示例:
import java.util.concurrent.*;
public class DiscardPolicyExample {
    public static void main(String[] args) {
        ThreadPoolExecutor executor = new ThreadPoolExecutor(
                2,
                2,
                0L,
                TimeUnit.MILLISECONDS,
                new LinkedBlockingQueue<>(1),
                new DiscardPolicy()
        );
        try {
            for (int i = 0; i < 5; i++) {
                executor.execute(new Task("Task " + i));
            }
        } finally {
            executor.shutdown();
        }
    }
    static class Task implements Runnable {
        private String name;
        public Task(String name) {
            this.name = name;
        }
        @Override
        public void run() {
            try {
                System.out.println(Thread.currentThread().getName() + " executing " + name);
                Thread.sleep(1000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }
}
运行结果 (近似结果,线程执行顺序可能不同):
pool-1-thread-1 executing Task 0
pool-1-thread-2 executing Task 1
pool-1-thread-1 executing Task 2
应用场景:
- 可以容忍任务丢失的场景:  
DiscardPolicy适用于那些可以容忍任务丢失的场景。例如,某些日志记录任务,即使丢失一些日志,也不会对系统造成严重影响。 - 不重要任务的场景: 对于一些不重要的任务,例如某些统计任务,即使被丢弃,也不会对业务造成影响。
 
优点:
- 实现简单,不会对系统造成额外的负担。
 - 不会抛出异常,避免影响主流程。
 
缺点:
- 任务会被直接丢弃,可能会导致数据丢失或功能不完整。
 - 调用者无法感知任务被拒绝,难以进行错误处理。
 
4. DiscardOldestPolicy (丢弃最旧策略): 牺牲旧任务,保证新任务
DiscardOldestPolicy 是一种相对公平的拒绝策略。当线程池无法处理新任务时,它会丢弃任务队列中最老的任务,然后尝试将新任务加入队列。
工作原理:
- 当 
execute()方法尝试提交任务到线程池时,如果线程池已满,DiscardOldestPolicy会从任务队列的头部(最老的任务)移除一个任务。 - 然后,它会尝试将新任务加入任务队列。如果加入成功,则任务被接受;如果加入失败(例如,队列已满),则该任务也会被丢弃。
 
代码示例:
import java.util.concurrent.*;
public class DiscardOldestPolicyExample {
    public static void main(String[] args) throws InterruptedException {
        ThreadPoolExecutor executor = new ThreadPoolExecutor(
                2,
                2,
                0L,
                TimeUnit.MILLISECONDS,
                new LinkedBlockingQueue<>(1),
                new DiscardOldestPolicy()
        );
        try {
            // 先提交三个任务,让队列满
            executor.execute(new Task("Task 0"));
            executor.execute(new Task("Task 1"));
            Thread.sleep(50); // 确保 Task 0 和 Task 1 已经进入队列
            // 再提交两个任务,触发拒绝策略
            executor.execute(new Task("Task 2")); // Task 0 被丢弃,Task 2 加入队列
            executor.execute(new Task("Task 3")); // Task 1 被丢弃,Task 3 加入队列
            Thread.sleep(2000); // 等待任务执行完成
            // 再次提交,因为队列被之前的任务填满, 则Task 4 会被丢弃
            executor.execute(new Task("Task 4"));
            Thread.sleep(2000);
        } finally {
            executor.shutdown();
        }
    }
    static class Task implements Runnable {
        private String name;
        public Task(String name) {
            this.name = name;
        }
        @Override
        public void run() {
            try {
                System.out.println(Thread.currentThread().getName() + " executing " + name);
                Thread.sleep(1000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }
}
运行结果 (近似结果,线程执行顺序可能不同):
pool-1-thread-1 executing Task 0
pool-1-thread-2 executing Task 1
pool-1-thread-1 executing Task 2
pool-1-thread-2 executing Task 3
pool-1-thread-1 executing Task 2
pool-1-thread-2 executing Task 3
应用场景:
- 希望优先处理新任务的场景:  
DiscardOldestPolicy适用于那些希望优先处理新任务,而可以容忍丢弃旧任务的场景。例如,在缓存系统中,我们可能更关心最新的数据,而对过期的数据不太关心。 - 任务具有时效性的场景: 对于一些具有时效性的任务,例如某些实时数据分析任务,旧的数据可能已经失去价值,可以被丢弃。
 
优点:
- 可以保证任务队列中始终包含最新的任务。
 - 相对公平,避免某些任务一直被阻塞。
 
缺点:
- 可能会导致旧任务被丢弃,丢失部分数据。
 - 需要小心处理任务之间的依赖关系,避免丢弃某些任务导致其他任务无法执行。
 
拒绝策略的选择:综合考量
选择合适的拒绝策略需要综合考虑以下因素:
- 任务的重要性:  如果任务非常重要,不能丢失,那么应该避免使用 
DiscardPolicy和DiscardOldestPolicy。 - 系统的稳定性:  如果系统对稳定性要求很高,应该避免使用 
AbortPolicy,因为未处理的RejectedExecutionException可能导致系统崩溃。 - 性能要求:  如果系统对响应时间要求很高,应该避免使用 
CallerRunsPolicy,因为它可能会影响调用者线程的性能。 - 任务的时效性:  如果任务具有时效性,可以考虑使用 
DiscardOldestPolicy,优先处理新任务。 - 可接受的任务丢失率: 需要评估业务上能够容忍的任务丢失率,如果可以容忍,可以选择 
DiscardPolicy或者DiscardOldestPolicy。 
下面是一个表格,总结了四种拒绝策略的特点:
| 拒绝策略 | 行为 | 优点 | 缺点 | 适用场景 | 
|---|---|---|---|---|
AbortPolicy | 
抛出 RejectedExecutionException 异常 | 
能够立即通知调用者任务被拒绝,方便进行错误处理和日志记录。 | 可能导致系统不稳定,对调用者有一定的侵入性。 | 对任务丢失零容忍的场景,需要快速失败的场景。 | 
CallerRunsPolicy | 
由提交任务的线程执行该任务 | 不会丢弃任务,可以起到流量削峰的作用。 | 可能会影响调用者线程的性能,不适合对响应时间要求非常高的场景。 | 不希望丢失任务,但允许延迟执行的场景,流量削峰的场景。 | 
DiscardPolicy | 
直接丢弃该任务,不做任何处理 | 实现简单,不会对系统造成额外的负担,不会抛出异常。 | 任务会被直接丢弃,调用者无法感知任务被拒绝。 | 可以容忍任务丢失的场景,不重要任务的场景。 | 
DiscardOldestPolicy | 
丢弃队列中最老的任务,然后尝试将新任务加入队列 | 可以保证任务队列中始终包含最新的任务,相对公平。 | 可能会导致旧任务被丢弃,需要小心处理任务之间的依赖关系。 | 希望优先处理新任务的场景,任务具有时效性的场景。 | 
自定义拒绝策略:灵活应对复杂场景
除了使用 Java 内置的拒绝策略之外,我们还可以通过实现 RejectedExecutionHandler 接口来创建自定义的拒绝策略。这可以让我们更加灵活地应对复杂的场景。
代码示例:
import java.util.concurrent.*;
public class CustomRejectedExecutionHandler implements RejectedExecutionHandler {
    @Override
    public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
        System.err.println("Task " + r.toString() +
                " rejected from " +
                executor.toString());
        // 在这里可以进行自定义的处理,例如:
        // 1. 将任务持久化到数据库
        // 2. 将任务发送到消息队列
        // 3. 发送告警邮件或短信
    }
}
// 使用自定义拒绝策略
public class CustomRejectedExecutionExample {
    public static void main(String[] args) {
        ThreadPoolExecutor executor = new ThreadPoolExecutor(
                2,
                2,
                0L,
                TimeUnit.MILLISECONDS,
                new LinkedBlockingQueue<>(1),
                new CustomRejectedExecutionHandler()
        );
        try {
            for (int i = 0; i < 5; i++) {
                executor.execute(new Task("Task " + i));
            }
        } finally {
            executor.shutdown();
        }
    }
    static class Task implements Runnable {
        private String name;
        public Task(String name) {
            this.name = name;
        }
        @Override
        public void run() {
            try {
                System.out.println(Thread.currentThread().getName() + " executing " + name);
                Thread.sleep(1000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }
}
在自定义的 rejectedExecution() 方法中,我们可以根据实际需求进行各种处理,例如:
- 将任务持久化到数据库: 如果任务非常重要,不能丢失,可以将任务信息保存到数据库,以便后续重新执行。
 - 将任务发送到消息队列: 可以将任务发送到消息队列,由其他消费者来处理。
 - 发送告警邮件或短信: 可以发送告警邮件或短信,通知管理员系统负载过高。
 - 记录日志: 可以记录详细的日志信息,方便排查问题。
 
总结与思考
今天我们详细讲解了 Java 线程池的四种内置拒绝策略,并探讨了如何根据实际场景选择合适的策略,以及如何自定义拒绝策略。选择合适的拒绝策略是构建健壮的并发应用的关键一步。
核心要点回顾: 线程池的拒绝策略是保障系统稳定性的最后一道防线,需要根据任务的重要性、系统的稳定性、性能要求和任务的时效性等因素综合考量,选择合适的策略。
持续学习与实践: 理解并灵活运用这些拒绝策略,能够帮助我们更好地管理线程资源,提高系统性能,并保障系统的稳定性。