Java线程池:自定义RejectedExecutionHandler实现任务的持久化或降级处理
大家好,今天我们来深入探讨Java线程池的一个重要组成部分:RejectedExecutionHandler。当线程池因为达到饱和状态而无法接受新的任务时,RejectedExecutionHandler就发挥作用了。默认的策略往往比较简单粗暴,例如直接抛出异常或者丢弃任务。但在实际应用中,我们常常需要更精细的控制,例如将任务持久化到数据库,或者进行降级处理。本次讲座将围绕如何自定义RejectedExecutionHandler,实现任务的持久化和降级处理展开。
1. 线程池回顾:为什么需要RejectedExecutionHandler?
在深入RejectedExecutionHandler之前,我们先简单回顾一下线程池的工作原理。Java提供了ExecutorService接口及其实现类ThreadPoolExecutor来管理线程池。线程池的核心参数包括:
- corePoolSize: 核心线程数,即使线程空闲也会保持存在的线程数量。
- maximumPoolSize: 最大线程数,线程池允许创建的最大线程数量。
- keepAliveTime: 空闲线程的存活时间,超过这个时间空闲线程会被回收。
- unit:
keepAliveTime的时间单位。 - workQueue: 任务队列,用于存放等待执行的任务。
- threadFactory: 用于创建线程的工厂。
- handler: 拒绝策略,即
RejectedExecutionHandler。
当向线程池提交任务时,线程池会按照以下步骤执行:
- 如果当前线程数小于
corePoolSize,则创建新的线程来执行任务。 - 如果当前线程数大于等于
corePoolSize,则将任务放入workQueue。 - 如果
workQueue已满,且当前线程数小于maximumPoolSize,则创建新的线程来执行任务。 - 如果
workQueue已满,且当前线程数大于等于maximumPoolSize,则执行RejectedExecutionHandler的rejectedExecution方法。
因此,RejectedExecutionHandler在线程池达到饱和状态时,负责处理无法被执行的任务。如果没有定义RejectedExecutionHandler,默认的行为会抛出一个RejectedExecutionException。
2. Java提供的默认RejectedExecutionHandler
Java提供了几种默认的RejectedExecutionHandler实现:
| Handler | 行为 | 适用场景 |
|---|---|---|
AbortPolicy |
抛出RejectedExecutionException。这是默认策略。 |
适用于对任务丢失零容忍的场景,需要立即发现并处理线程池饱和的问题。 |
DiscardPolicy |
静默丢弃任务,不抛出任何异常。 | 适用于可以容忍任务丢失的场景,例如日志记录等。 |
DiscardOldestPolicy |
丢弃队列中最旧的任务,然后尝试重新提交当前任务。 | 适用于希望优先处理最新任务的场景,例如缓存更新等。 |
CallerRunsPolicy |
由提交任务的线程来执行任务。 | 适用于希望减缓任务提交速率的场景,通过让提交线程执行任务,可以起到一定的限流作用。 |
这些默认的策略各有特点,但在很多实际场景下,它们并不能满足我们的需求。例如,我们可能希望将任务持久化到数据库,以便后续重试;或者我们希望对任务进行降级处理,例如返回默认值或者执行更简单的逻辑。
3. 自定义RejectedExecutionHandler:持久化任务
现在我们来实现一个自定义的RejectedExecutionHandler,用于将无法执行的任务持久化到数据库。
首先,我们需要定义一个任务持久化的接口:
public interface TaskPersistenceService {
void persistTask(Runnable task);
}
然后,我们需要实现这个接口,这里我们简单地使用一个List来模拟数据库:
import java.util.ArrayList;
import java.util.List;
public class InMemoryTaskPersistenceService implements TaskPersistenceService {
private final List<Runnable> taskQueue = new ArrayList<>();
@Override
public void persistTask(Runnable task) {
taskQueue.add(task);
System.out.println("Task persisted to in-memory queue: " + task);
}
public List<Runnable> getPersistedTasks() {
return new ArrayList<>(taskQueue); // 返回一个拷贝,防止外部修改
}
public void clearPersistedTasks() {
taskQueue.clear();
}
}
接下来,我们可以实现自定义的RejectedExecutionHandler:
import java.util.concurrent.RejectedExecutionHandler;
import java.util.concurrent.ThreadPoolExecutor;
public class PersistenceRejectedExecutionHandler implements RejectedExecutionHandler {
private final TaskPersistenceService taskPersistenceService;
public PersistenceRejectedExecutionHandler(TaskPersistenceService taskPersistenceService) {
this.taskPersistenceService = taskPersistenceService;
}
@Override
public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
taskPersistenceService.persistTask(r);
}
}
在这个RejectedExecutionHandler中,我们将无法执行的任务传递给TaskPersistenceService进行持久化。
最后,我们可以创建一个使用这个RejectedExecutionHandler的线程池:
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
public class ThreadPoolExample {
public static void main(String[] args) throws InterruptedException {
TaskPersistenceService taskPersistenceService = new InMemoryTaskPersistenceService();
PersistenceRejectedExecutionHandler handler = new PersistenceRejectedExecutionHandler(taskPersistenceService);
ThreadPoolExecutor executor = new ThreadPoolExecutor(
2, // corePoolSize
4, // maximumPoolSize
60, // keepAliveTime
TimeUnit.SECONDS, // unit
new LinkedBlockingQueue<>(2), // workQueue
handler // handler
);
// 提交多个任务,超过线程池的处理能力
for (int i = 0; i < 10; i++) {
final int taskNumber = i;
executor.execute(() -> {
try {
System.out.println("Executing task: " + taskNumber + " by thread: " + Thread.currentThread().getName());
Thread.sleep(1000); // 模拟任务执行时间
} catch (InterruptedException e) {
e.printStackTrace();
}
});
}
executor.shutdown();
executor.awaitTermination(5, TimeUnit.SECONDS);
// 检查是否有任务被持久化
InMemoryTaskPersistenceService inMemoryTaskPersistenceService = (InMemoryTaskPersistenceService) taskPersistenceService;
System.out.println("Persisted tasks: " + inMemoryTaskPersistenceService.getPersistedTasks().size());
// 展示持久化的任务
inMemoryTaskPersistenceService.getPersistedTasks().forEach(task -> System.out.println("Persisted Task: " + task));
}
}
在这个例子中,我们创建了一个核心线程数为2,最大线程数为4,队列长度为2的线程池。然后我们提交了10个任务。由于线程池的处理能力有限,一部分任务会被拒绝执行,并被持久化到数据库中。
4. 自定义RejectedExecutionHandler:降级处理
除了持久化任务,我们还可以使用RejectedExecutionHandler进行降级处理。例如,当线程池饱和时,我们可以返回一个默认值,或者执行一个更简单的逻辑。
import java.util.concurrent.RejectedExecutionHandler;
import java.util.concurrent.ThreadPoolExecutor;
public class DegradeRejectedExecutionHandler implements RejectedExecutionHandler {
private final Runnable fallbackTask;
public DegradeRejectedExecutionHandler(Runnable fallbackTask) {
this.fallbackTask = fallbackTask;
}
@Override
public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
System.out.println("Thread pool is full, executing fallback task.");
fallbackTask.run(); // 执行降级任务
}
}
在这个RejectedExecutionHandler中,当任务被拒绝执行时,我们会执行一个fallbackTask,这个fallbackTask可以是一个返回默认值的操作,或者一个执行更简单的逻辑的操作。
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
public class ThreadPoolDegradeExample {
public static void main(String[] args) throws InterruptedException {
Runnable fallbackTask = () -> System.out.println("Fallback task executed.");
DegradeRejectedExecutionHandler handler = new DegradeRejectedExecutionHandler(fallbackTask);
ThreadPoolExecutor executor = new ThreadPoolExecutor(
2, // corePoolSize
4, // maximumPoolSize
60, // keepAliveTime
TimeUnit.SECONDS, // unit
new LinkedBlockingQueue<>(2), // workQueue
handler // handler
);
// 提交多个任务,超过线程池的处理能力
for (int i = 0; i < 10; i++) {
final int taskNumber = i;
executor.execute(() -> {
try {
System.out.println("Executing task: " + taskNumber + " by thread: " + Thread.currentThread().getName());
Thread.sleep(1000); // 模拟任务执行时间
} catch (InterruptedException e) {
e.printStackTrace();
}
});
}
executor.shutdown();
executor.awaitTermination(5, TimeUnit.SECONDS);
}
}
在这个例子中,我们创建了一个核心线程数为2,最大线程数为4,队列长度为2的线程池。然后我们提交了10个任务。由于线程池的处理能力有限,一部分任务会被拒绝执行,并执行fallbackTask。
5. 最佳实践和注意事项
- 选择合适的策略: 根据实际业务场景选择合适的
RejectedExecutionHandler。如果对任务丢失零容忍,应该选择AbortPolicy并捕获RejectedExecutionException进行处理。如果可以容忍任务丢失,可以选择DiscardPolicy或者自定义的降级策略。 - 避免阻塞: 在
rejectedExecution方法中避免执行耗时操作,否则会阻塞提交任务的线程。如果需要执行耗时操作,可以考虑将操作放入另一个线程池中异步执行。 - 监控和告警: 监控线程池的状态,包括队列长度、活跃线程数、已完成任务数等。当线程池达到饱和状态时,应该及时发出告警,以便及时处理。
- 幂等性: 如果使用持久化策略,需要保证任务的幂等性,即多次执行同一个任务的结果应该相同。
- 事务: 在持久化任务时,需要考虑事务的一致性。如果持久化失败,需要进行回滚,避免数据不一致。
- 资源释放: 在任务执行完毕后,需要及时释放资源,例如关闭数据库连接、释放文件句柄等。
6. 更复杂的例子:结合消息队列实现异步重试
除了直接持久化到数据库,我们还可以将拒绝的任务发送到消息队列,例如RabbitMQ或Kafka,然后由消费者从消息队列中获取任务并进行重试。这样可以进一步解耦任务提交和任务执行,提高系统的可用性和可伸缩性。
首先,我们需要引入消息队列的依赖。以RabbitMQ为例,我们需要添加以下依赖:
<dependency>
<groupId>com.rabbitmq</groupId>
<artifactId>amqp-client</artifactId>
<version>5.16.0</version> <!-- 请使用最新的版本 -->
</dependency>
然后,我们需要实现一个消息队列服务:
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.concurrent.TimeoutException;
public class RabbitMQTaskQueueService {
private static final String QUEUE_NAME = "rejected_tasks";
private final Connection connection;
private final Channel channel;
public RabbitMQTaskQueueService(String host) throws IOException, TimeoutException {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost(host);
connection = factory.newConnection();
channel = connection.createChannel();
channel.queueDeclare(QUEUE_NAME, true, false, false, null); // durable: true
}
public void enqueueTask(Runnable task) throws IOException {
String message = task.getClass().getName();
channel.basicPublish("", QUEUE_NAME, null, message.getBytes(StandardCharsets.UTF_8));
System.out.println("Task enqueued to RabbitMQ: " + task.getClass().getName());
}
public void close() throws IOException, TimeoutException {
channel.close();
connection.close();
}
}
在这个RabbitMQTaskQueueService中,我们将任务的类名作为消息发送到RabbitMQ队列中。
接下来,我们可以实现自定义的RejectedExecutionHandler:
import java.io.IOException;
import java.util.concurrent.RejectedExecutionHandler;
import java.util.concurrent.ThreadPoolExecutor;
public class RabbitMQRejectedExecutionHandler implements RejectedExecutionHandler {
private final RabbitMQTaskQueueService taskQueueService;
public RabbitMQRejectedExecutionHandler(RabbitMQTaskQueueService taskQueueService) {
this.taskQueueService = taskQueueService;
}
@Override
public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
try {
taskQueueService.enqueueTask(r);
} catch (IOException e) {
System.err.println("Failed to enqueue task to RabbitMQ: " + e.getMessage());
e.printStackTrace();
// 可以考虑降级处理,例如记录日志或者抛出异常
}
}
}
在这个RejectedExecutionHandler中,我们将无法执行的任务发送到RabbitMQ队列中。
最后,我们需要创建一个消费者,从RabbitMQ队列中获取任务并执行:
import com.rabbitmq.client.*;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.concurrent.TimeoutException;
public class RabbitMQTaskConsumer {
private static final String QUEUE_NAME = "rejected_tasks";
public static void main(String[] args) throws IOException, TimeoutException {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost"); // 根据实际情况修改
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
channel.queueDeclare(QUEUE_NAME, true, false, false, null);
System.out.println(" [*] Waiting for messages. To exit press CTRL+C");
DeliverCallback deliverCallback = (consumerTag, delivery) -> {
String message = new String(delivery.getBody(), StandardCharsets.UTF_8);
System.out.println(" [x] Received task: " + message);
try {
// 根据类名创建任务实例并执行
Class<?> taskClass = Class.forName(message);
Runnable task = (Runnable) taskClass.getDeclaredConstructor().newInstance();
task.run(); // 注意:这里直接在当前线程执行任务,可以考虑使用线程池
} catch (Exception e) {
System.err.println("Failed to execute task: " + e.getMessage());
e.printStackTrace();
} finally {
channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false); // 确认消息
}
};
channel.basicConsume(QUEUE_NAME, false, deliverCallback, consumerTag -> { }); // autoAck: false
}
}
在这个消费者中,我们从RabbitMQ队列中获取消息,并根据消息中的类名创建任务实例并执行。
现在,我们可以创建一个使用这个RabbitMQRejectedExecutionHandler的线程池:
import java.io.IOException;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
public class ThreadPoolRabbitMQExample {
public static void main(String[] args) throws InterruptedException, IOException, TimeoutException {
RabbitMQTaskQueueService taskQueueService = new RabbitMQTaskQueueService("localhost"); // 根据实际情况修改
RabbitMQRejectedExecutionHandler handler = new RabbitMQRejectedExecutionHandler(taskQueueService);
ThreadPoolExecutor executor = new ThreadPoolExecutor(
2, // corePoolSize
4, // maximumPoolSize
60, // keepAliveTime
TimeUnit.SECONDS, // unit
new LinkedBlockingQueue<>(2), // workQueue
handler // handler
);
// 提交多个任务,超过线程池的处理能力
for (int i = 0; i < 10; i++) {
final int taskNumber = i;
executor.execute(() -> {
try {
System.out.println("Executing task: " + taskNumber + " by thread: " + Thread.currentThread().getName());
Thread.sleep(1000); // 模拟任务执行时间
} catch (InterruptedException e) {
e.printStackTrace();
}
});
}
executor.shutdown();
executor.awaitTermination(5, TimeUnit.SECONDS);
taskQueueService.close();
}
}
在这个例子中,我们创建了一个核心线程数为2,最大线程数为4,队列长度为2的线程池。然后我们提交了10个任务。由于线程池的处理能力有限,一部分任务会被拒绝执行,并发送到RabbitMQ队列中,等待消费者执行。
7. 总结一下刚才讲的内容
RejectedExecutionHandler在线程池饱和时处理被拒绝的任务。- 可以自定义
RejectedExecutionHandler实现任务持久化、降级处理等。 - 持久化可以结合数据库或消息队列实现,需要考虑幂等性和事务。
8. 选择合适的策略,优化线程池使用
自定义RejectedExecutionHandler是优化Java线程池使用的关键一步。通过选择合适的策略,并结合实际业务场景进行定制,可以更好地控制任务的执行流程,提高系统的可用性和可靠性。希望今天的讲解能对你有所帮助。