好的,我们开始。
Java线程池:自定义RejectedExecutionHandler实现任务的持久化或降级处理
大家好,今天我们来深入探讨Java线程池中一个非常重要的组件:RejectedExecutionHandler。线程池作为并发编程中常用的工具,能够有效地管理和调度线程资源,提高系统的性能和稳定性。然而,当线程池的任务队列已满,且线程池中的线程都在忙碌时,新的任务提交将会被拒绝。RejectedExecutionHandler正是用来处理这种被拒绝的任务的策略接口。
默认情况下,Java提供了几种内置的拒绝策略,如AbortPolicy、DiscardPolicy、DiscardOldestPolicy和CallerRunsPolicy。但是,在实际应用中,这些策略可能无法完全满足我们的需求。例如,我们可能希望将被拒绝的任务持久化到数据库,或者进行降级处理,以保证系统的可用性。这时候,就需要自定义RejectedExecutionHandler来实现我们的特定业务逻辑。
1. 为什么需要自定义RejectedExecutionHandler?
在理解自定义RejectedExecutionHandler的必要性之前,我们先来回顾一下Java线程池的工作原理。
- 任务提交: 当我们向线程池提交一个任务时,线程池会首先尝试将其放入任务队列中。
- 任务队列: 任务队列是一个缓冲区,用于存放等待执行的任务。常见的任务队列有
ArrayBlockingQueue、LinkedBlockingQueue、PriorityBlockingQueue等。 - 线程执行: 线程池中的线程会不断地从任务队列中取出任务并执行。
- 拒绝策略: 当任务队列已满,且线程池中的线程都在忙碌时,新的任务提交将会被拒绝。此时,
RejectedExecutionHandler将会被调用。
Java内置的拒绝策略虽然简单易用,但在某些场景下存在明显的不足:
- AbortPolicy: 直接抛出
RejectedExecutionException,导致程序中断。这种策略过于粗暴,不适合对任务丢失敏感的场景。 - DiscardPolicy: 直接丢弃被拒绝的任务,没有任何提示。这种策略可能会导致重要数据丢失,不适合关键业务场景。
- DiscardOldestPolicy: 丢弃任务队列中最老的任务,然后尝试重新提交当前任务。这种策略可能会导致部分任务永远无法执行,且存在公平性问题。
- CallerRunsPolicy: 由提交任务的线程来执行被拒绝的任务。这种策略会阻塞提交任务的线程,可能导致系统响应变慢。
因此,我们需要根据具体的业务需求,自定义RejectedExecutionHandler,以便更好地处理被拒绝的任务。
2. RejectedExecutionHandler接口
RejectedExecutionHandler接口定义如下:
public interface RejectedExecutionHandler {
/**
* Method that may be invoked by a {@link ThreadPoolExecutor} when
* {@link ThreadPoolExecutor#execute execute} cannot accept a
* task. This may occur when no more threads or queue slots are
* available because their bounds are reached.
*
* @param r the runnable task requested to be executed
* @param executor the executor attempting to execute this task
* @throws RejectedExecutionException if there is no remedy
*/
void rejectedExecution(Runnable r, ThreadPoolExecutor executor);
}
该接口只有一个方法rejectedExecution(Runnable r, ThreadPoolExecutor executor),该方法接收两个参数:
r: 被拒绝的任务,是一个Runnable对象。executor: 尝试执行该任务的ThreadPoolExecutor对象。
在rejectedExecution方法中,我们可以编写自定义的逻辑来处理被拒绝的任务。
3. 自定义RejectedExecutionHandler:任务持久化
假设我们需要将被拒绝的任务持久化到数据库,以便后续重新执行。我们可以创建一个名为PersistenceRejectedExecutionHandler的类,实现RejectedExecutionHandler接口。
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.PreparedStatement;
import java.sql.SQLException;
import java.util.concurrent.RejectedExecutionHandler;
import java.util.concurrent.ThreadPoolExecutor;
public class PersistenceRejectedExecutionHandler implements RejectedExecutionHandler {
private final String jdbcUrl;
private final String username;
private final String password;
private final String tableName;
public PersistenceRejectedExecutionHandler(String jdbcUrl, String username, String password, String tableName) {
this.jdbcUrl = jdbcUrl;
this.username = username;
this.password = password;
this.tableName = tableName;
}
@Override
public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
try (Connection connection = DriverManager.getConnection(jdbcUrl, username, password)) {
String sql = "INSERT INTO " + tableName + " (task_data) VALUES (?)"; // 简化,实际可能需要更多字段
PreparedStatement preparedStatement = connection.prepareStatement(sql);
//将Runnable对象序列化成字节数组,存储到数据库中
byte[] taskData = serializeRunnable(r);
preparedStatement.setBytes(1, taskData);
preparedStatement.executeUpdate();
System.out.println("Task persisted to database: " + r.toString());
} catch (SQLException e) {
System.err.println("Failed to persist task to database: " + e.getMessage());
// 可以选择抛出异常,或者进行其他处理,例如记录日志
throw new RuntimeException("Failed to persist task", e); // or log and continue
}
}
// 将Runnable对象序列化成字节数组
private byte[] serializeRunnable(Runnable r) {
// 实现序列化逻辑,例如使用ObjectOutputStream
// 注意:Runnable对象必须是可序列化的
try (java.io.ByteArrayOutputStream bos = new java.io.ByteArrayOutputStream();
java.io.ObjectOutputStream oos = new java.io.ObjectOutputStream(bos)) {
oos.writeObject(r);
return bos.toByteArray();
} catch (java.io.IOException e) {
throw new RuntimeException("Failed to serialize task", e);
}
}
// 反序列化Runnable对象
public static Runnable deserializeRunnable(byte[] data) {
try (java.io.ByteArrayInputStream bis = new java.io.ByteArrayInputStream(data);
java.io.ObjectInputStream ois = new java.io.ObjectInputStream(bis)) {
return (Runnable) ois.readObject();
} catch (java.io.IOException | ClassNotFoundException e) {
throw new RuntimeException("Failed to deserialize task", e);
}
}
public static void main(String[] args) throws InterruptedException {
// 数据库连接信息
String jdbcUrl = "jdbc:mysql://localhost:3306/testdb";
String username = "root";
String password = "password";
String tableName = "rejected_tasks";
// 创建自定义的RejectedExecutionHandler
PersistenceRejectedExecutionHandler handler = new PersistenceRejectedExecutionHandler(jdbcUrl, username, password, tableName);
// 创建线程池
java.util.concurrent.ThreadPoolExecutor executor = new java.util.concurrent.ThreadPoolExecutor(
2, // corePoolSize
4, // maximumPoolSize
60, // keepAliveTime
java.util.concurrent.TimeUnit.SECONDS,
new java.util.concurrent.ArrayBlockingQueue<>(2), // workQueue
handler
);
// 提交任务
for (int i = 0; i < 10; i++) {
final int taskNumber = i;
executor.execute(() -> {
try {
System.out.println("Running task: " + taskNumber + " - Thread: " + Thread.currentThread().getName());
Thread.sleep(1000); // 模拟任务执行时间
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
});
}
// 关闭线程池
executor.shutdown();
executor.awaitTermination(5, java.util.concurrent.TimeUnit.SECONDS);
System.out.println("All tasks submitted. Attempting to retrieve tasks from DB.");
// 从数据库恢复任务(示例)
try (Connection connection = DriverManager.getConnection(jdbcUrl, username, password)) {
String sql = "SELECT task_data FROM " + tableName;
PreparedStatement preparedStatement = connection.prepareStatement(sql);
java.sql.ResultSet resultSet = preparedStatement.executeQuery();
while (resultSet.next()) {
byte[] taskData = resultSet.getBytes("task_data");
Runnable task = deserializeRunnable(taskData);
if (task != null) {
System.out.println("Retrieved task from DB and executing...");
new Thread(task).start(); // 在新线程中执行反序列化的任务
}
}
} catch (SQLException e) {
System.err.println("Failed to retrieve tasks from database: " + e.getMessage());
}
}
}
注意:
- 在实际应用中,需要根据具体的数据库类型和表结构来修改SQL语句。
Runnable对象必须是可序列化的,否则会抛出NotSerializableException异常。- 上述代码只是一个简单的示例,实际应用中需要考虑更多的因素,例如事务处理、异常处理、重试机制等。
- 数据库连接信息和表名应从配置文件中读取,避免硬编码。
- 必须处理
serializeRunnable和deserializeRunnable方法中可能抛出的IOException,可以使用try-with-resources语句确保资源得到正确关闭。
数据库表结构示例:
CREATE TABLE rejected_tasks (
id INT AUTO_INCREMENT PRIMARY KEY,
task_data BLOB
);
4. 自定义RejectedExecutionHandler:任务降级
除了持久化任务之外,我们还可以使用RejectedExecutionHandler来实现任务降级。例如,当线程池无法处理新的任务时,我们可以将任务发送到消息队列,由其他系统来处理。
import java.util.concurrent.RejectedExecutionHandler;
import java.util.concurrent.ThreadPoolExecutor;
public class DegradationRejectedExecutionHandler implements RejectedExecutionHandler {
private final MessageQueueService messageQueueService; // 假设有一个消息队列服务
public DegradationRejectedExecutionHandler(MessageQueueService messageQueueService) {
this.messageQueueService = messageQueueService;
}
@Override
public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
try {
messageQueueService.sendMessage(r); // 将任务发送到消息队列
System.out.println("Task sent to message queue for degradation processing: " + r.toString());
} catch (Exception e) {
System.err.println("Failed to send task to message queue: " + e.getMessage());
// 可以选择抛出异常,或者进行其他处理,例如记录日志
throw new RuntimeException("Failed to send task to message queue", e);
}
}
// 假设的消息队列服务接口
interface MessageQueueService {
void sendMessage(Runnable task);
}
// 简单实现,实际使用时应替换为真正的消息队列服务
static class SimpleMessageQueueService implements MessageQueueService {
@Override
public void sendMessage(Runnable task) {
System.out.println("Simulating sending task to message queue: " + task.toString());
}
}
public static void main(String[] args) throws InterruptedException {
// 创建消息队列服务
MessageQueueService messageQueueService = new SimpleMessageQueueService();
// 创建自定义的RejectedExecutionHandler
DegradationRejectedExecutionHandler handler = new DegradationRejectedExecutionHandler(messageQueueService);
// 创建线程池
java.util.concurrent.ThreadPoolExecutor executor = new java.util.concurrent.ThreadPoolExecutor(
2, // corePoolSize
4, // maximumPoolSize
60, // keepAliveTime
java.util.concurrent.TimeUnit.SECONDS,
new java.util.concurrent.ArrayBlockingQueue<>(2), // workQueue
handler
);
// 提交任务
for (int i = 0; i < 10; i++) {
final int taskNumber = i;
executor.execute(() -> {
try {
System.out.println("Running task: " + taskNumber + " - Thread: " + Thread.currentThread().getName());
Thread.sleep(1000); // 模拟任务执行时间
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
});
}
// 关闭线程池
executor.shutdown();
executor.awaitTermination(5, java.util.concurrent.TimeUnit.SECONDS);
}
}
注意:
- 需要根据具体的消息队列服务来修改代码。
- 需要处理发送消息可能抛出的异常。
- 消息队列的选择需要根据具体的业务需求来决定。
5. 选择合适的RejectedExecutionHandler
选择合适的RejectedExecutionHandler需要根据具体的业务需求来决定。以下是一些常用的选择策略:
| 场景 | 推荐的RejectedExecutionHandler |
|---|---|
| 任务丢失不可接受 | 自定义RejectedExecutionHandler,将任务持久化到数据库或消息队列。 |
| 可以容忍少量任务丢失 | DiscardPolicy,直接丢弃被拒绝的任务。 |
| 希望降低系统负载 | CallerRunsPolicy,由提交任务的线程来执行被拒绝的任务。 |
| 需要快速失败 | AbortPolicy,直接抛出RejectedExecutionException。 |
| 任务有优先级,希望尽快执行重要任务 | 可以使用PriorityBlockingQueue作为任务队列,并自定义RejectedExecutionHandler,将被拒绝的任务重新放入队列中,并提高其优先级。 (需要谨慎设计,避免重要任务一直被拒绝导致死循环) |
| 需要熔断机制 | 自定义RejectedExecutionHandler,当拒绝的任务数量达到一定阈值时,触发熔断器,阻止新的任务提交。 |
6. 总结
通过自定义RejectedExecutionHandler,我们可以灵活地处理线程池中被拒绝的任务,从而提高系统的可用性和可靠性。在实际应用中,我们需要根据具体的业务需求选择合适的拒绝策略,并进行充分的测试,以确保系统的稳定运行。
选择合适的拒绝策略至关重要,自定义能满足特定需求
自定义RejectedExecutionHandler能够灵活地处理被拒绝的任务,从而提高系统的可用性。需要根据具体的业务需求选择合适的拒绝策略,并进行充分的测试。