Java线程池:自定义RejectedExecutionHandler实现任务的持久化或降级处理

好的,我们开始。

Java线程池:自定义RejectedExecutionHandler实现任务的持久化或降级处理

大家好,今天我们来深入探讨Java线程池中一个非常重要的组件:RejectedExecutionHandler。线程池作为并发编程中常用的工具,能够有效地管理和调度线程资源,提高系统的性能和稳定性。然而,当线程池的任务队列已满,且线程池中的线程都在忙碌时,新的任务提交将会被拒绝。RejectedExecutionHandler正是用来处理这种被拒绝的任务的策略接口。

默认情况下,Java提供了几种内置的拒绝策略,如AbortPolicyDiscardPolicyDiscardOldestPolicyCallerRunsPolicy。但是,在实际应用中,这些策略可能无法完全满足我们的需求。例如,我们可能希望将被拒绝的任务持久化到数据库,或者进行降级处理,以保证系统的可用性。这时候,就需要自定义RejectedExecutionHandler来实现我们的特定业务逻辑。

1. 为什么需要自定义RejectedExecutionHandler?

在理解自定义RejectedExecutionHandler的必要性之前,我们先来回顾一下Java线程池的工作原理。

  • 任务提交: 当我们向线程池提交一个任务时,线程池会首先尝试将其放入任务队列中。
  • 任务队列: 任务队列是一个缓冲区,用于存放等待执行的任务。常见的任务队列有ArrayBlockingQueueLinkedBlockingQueuePriorityBlockingQueue等。
  • 线程执行: 线程池中的线程会不断地从任务队列中取出任务并执行。
  • 拒绝策略: 当任务队列已满,且线程池中的线程都在忙碌时,新的任务提交将会被拒绝。此时,RejectedExecutionHandler将会被调用。

Java内置的拒绝策略虽然简单易用,但在某些场景下存在明显的不足:

  • AbortPolicy: 直接抛出RejectedExecutionException,导致程序中断。这种策略过于粗暴,不适合对任务丢失敏感的场景。
  • DiscardPolicy: 直接丢弃被拒绝的任务,没有任何提示。这种策略可能会导致重要数据丢失,不适合关键业务场景。
  • DiscardOldestPolicy: 丢弃任务队列中最老的任务,然后尝试重新提交当前任务。这种策略可能会导致部分任务永远无法执行,且存在公平性问题。
  • CallerRunsPolicy: 由提交任务的线程来执行被拒绝的任务。这种策略会阻塞提交任务的线程,可能导致系统响应变慢。

因此,我们需要根据具体的业务需求,自定义RejectedExecutionHandler,以便更好地处理被拒绝的任务。

2. RejectedExecutionHandler接口

RejectedExecutionHandler接口定义如下:

public interface RejectedExecutionHandler {
    /**
     * Method that may be invoked by a {@link ThreadPoolExecutor} when
     * {@link ThreadPoolExecutor#execute execute} cannot accept a
     * task.  This may occur when no more threads or queue slots are
     * available because their bounds are reached.
     *
     * @param r the runnable task requested to be executed
     * @param executor the executor attempting to execute this task
     * @throws RejectedExecutionException if there is no remedy
     */
    void rejectedExecution(Runnable r, ThreadPoolExecutor executor);
}

该接口只有一个方法rejectedExecution(Runnable r, ThreadPoolExecutor executor),该方法接收两个参数:

  • r: 被拒绝的任务,是一个Runnable对象。
  • executor: 尝试执行该任务的ThreadPoolExecutor对象。

rejectedExecution方法中,我们可以编写自定义的逻辑来处理被拒绝的任务。

3. 自定义RejectedExecutionHandler:任务持久化

假设我们需要将被拒绝的任务持久化到数据库,以便后续重新执行。我们可以创建一个名为PersistenceRejectedExecutionHandler的类,实现RejectedExecutionHandler接口。

import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.PreparedStatement;
import java.sql.SQLException;
import java.util.concurrent.RejectedExecutionHandler;
import java.util.concurrent.ThreadPoolExecutor;

public class PersistenceRejectedExecutionHandler implements RejectedExecutionHandler {

    private final String jdbcUrl;
    private final String username;
    private final String password;
    private final String tableName;

    public PersistenceRejectedExecutionHandler(String jdbcUrl, String username, String password, String tableName) {
        this.jdbcUrl = jdbcUrl;
        this.username = username;
        this.password = password;
        this.tableName = tableName;
    }

    @Override
    public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
        try (Connection connection = DriverManager.getConnection(jdbcUrl, username, password)) {
            String sql = "INSERT INTO " + tableName + " (task_data) VALUES (?)"; // 简化,实际可能需要更多字段
            PreparedStatement preparedStatement = connection.prepareStatement(sql);

            //将Runnable对象序列化成字节数组,存储到数据库中
            byte[] taskData = serializeRunnable(r);

            preparedStatement.setBytes(1, taskData);
            preparedStatement.executeUpdate();

            System.out.println("Task persisted to database: " + r.toString());

        } catch (SQLException e) {
            System.err.println("Failed to persist task to database: " + e.getMessage());
            // 可以选择抛出异常,或者进行其他处理,例如记录日志
            throw new RuntimeException("Failed to persist task", e); // or log and continue
        }
    }

    // 将Runnable对象序列化成字节数组
    private byte[] serializeRunnable(Runnable r) {
        // 实现序列化逻辑,例如使用ObjectOutputStream
        // 注意:Runnable对象必须是可序列化的
        try (java.io.ByteArrayOutputStream bos = new java.io.ByteArrayOutputStream();
             java.io.ObjectOutputStream oos = new java.io.ObjectOutputStream(bos)) {
            oos.writeObject(r);
            return bos.toByteArray();
        } catch (java.io.IOException e) {
            throw new RuntimeException("Failed to serialize task", e);
        }
    }

    // 反序列化Runnable对象
    public static Runnable deserializeRunnable(byte[] data) {
        try (java.io.ByteArrayInputStream bis = new java.io.ByteArrayInputStream(data);
             java.io.ObjectInputStream ois = new java.io.ObjectInputStream(bis)) {
            return (Runnable) ois.readObject();
        } catch (java.io.IOException | ClassNotFoundException e) {
            throw new RuntimeException("Failed to deserialize task", e);
        }
    }

    public static void main(String[] args) throws InterruptedException {
        // 数据库连接信息
        String jdbcUrl = "jdbc:mysql://localhost:3306/testdb";
        String username = "root";
        String password = "password";
        String tableName = "rejected_tasks";

        // 创建自定义的RejectedExecutionHandler
        PersistenceRejectedExecutionHandler handler = new PersistenceRejectedExecutionHandler(jdbcUrl, username, password, tableName);

        // 创建线程池
        java.util.concurrent.ThreadPoolExecutor executor = new java.util.concurrent.ThreadPoolExecutor(
                2, // corePoolSize
                4, // maximumPoolSize
                60, // keepAliveTime
                java.util.concurrent.TimeUnit.SECONDS,
                new java.util.concurrent.ArrayBlockingQueue<>(2), // workQueue
                handler
        );

        // 提交任务
        for (int i = 0; i < 10; i++) {
            final int taskNumber = i;
            executor.execute(() -> {
                try {
                    System.out.println("Running task: " + taskNumber + " - Thread: " + Thread.currentThread().getName());
                    Thread.sleep(1000); // 模拟任务执行时间
                } catch (InterruptedException e) {
                    Thread.currentThread().interrupt();
                }
            });
        }

        // 关闭线程池
        executor.shutdown();
        executor.awaitTermination(5, java.util.concurrent.TimeUnit.SECONDS);

        System.out.println("All tasks submitted.  Attempting to retrieve tasks from DB.");

        // 从数据库恢复任务(示例)
        try (Connection connection = DriverManager.getConnection(jdbcUrl, username, password)) {
            String sql = "SELECT task_data FROM " + tableName;
            PreparedStatement preparedStatement = connection.prepareStatement(sql);
            java.sql.ResultSet resultSet = preparedStatement.executeQuery();

            while (resultSet.next()) {
                byte[] taskData = resultSet.getBytes("task_data");
                Runnable task = deserializeRunnable(taskData);
                if (task != null) {
                    System.out.println("Retrieved task from DB and executing...");
                    new Thread(task).start(); // 在新线程中执行反序列化的任务
                }
            }
        } catch (SQLException e) {
            System.err.println("Failed to retrieve tasks from database: " + e.getMessage());
        }
    }
}

注意:

  • 在实际应用中,需要根据具体的数据库类型和表结构来修改SQL语句。
  • Runnable对象必须是可序列化的,否则会抛出NotSerializableException异常。
  • 上述代码只是一个简单的示例,实际应用中需要考虑更多的因素,例如事务处理、异常处理、重试机制等。
  • 数据库连接信息和表名应从配置文件中读取,避免硬编码。
  • 必须处理serializeRunnabledeserializeRunnable 方法中可能抛出的 IOException,可以使用try-with-resources语句确保资源得到正确关闭。

数据库表结构示例:

CREATE TABLE rejected_tasks (
    id INT AUTO_INCREMENT PRIMARY KEY,
    task_data BLOB
);

4. 自定义RejectedExecutionHandler:任务降级

除了持久化任务之外,我们还可以使用RejectedExecutionHandler来实现任务降级。例如,当线程池无法处理新的任务时,我们可以将任务发送到消息队列,由其他系统来处理。

import java.util.concurrent.RejectedExecutionHandler;
import java.util.concurrent.ThreadPoolExecutor;

public class DegradationRejectedExecutionHandler implements RejectedExecutionHandler {

    private final MessageQueueService messageQueueService; // 假设有一个消息队列服务

    public DegradationRejectedExecutionHandler(MessageQueueService messageQueueService) {
        this.messageQueueService = messageQueueService;
    }

    @Override
    public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
        try {
            messageQueueService.sendMessage(r); // 将任务发送到消息队列
            System.out.println("Task sent to message queue for degradation processing: " + r.toString());
        } catch (Exception e) {
            System.err.println("Failed to send task to message queue: " + e.getMessage());
            // 可以选择抛出异常,或者进行其他处理,例如记录日志
            throw new RuntimeException("Failed to send task to message queue", e);
        }
    }

    // 假设的消息队列服务接口
    interface MessageQueueService {
        void sendMessage(Runnable task);
    }

    // 简单实现,实际使用时应替换为真正的消息队列服务
    static class SimpleMessageQueueService implements MessageQueueService {
        @Override
        public void sendMessage(Runnable task) {
            System.out.println("Simulating sending task to message queue: " + task.toString());
        }
    }

    public static void main(String[] args) throws InterruptedException {
        // 创建消息队列服务
        MessageQueueService messageQueueService = new SimpleMessageQueueService();

        // 创建自定义的RejectedExecutionHandler
        DegradationRejectedExecutionHandler handler = new DegradationRejectedExecutionHandler(messageQueueService);

        // 创建线程池
        java.util.concurrent.ThreadPoolExecutor executor = new java.util.concurrent.ThreadPoolExecutor(
                2, // corePoolSize
                4, // maximumPoolSize
                60, // keepAliveTime
                java.util.concurrent.TimeUnit.SECONDS,
                new java.util.concurrent.ArrayBlockingQueue<>(2), // workQueue
                handler
        );

        // 提交任务
        for (int i = 0; i < 10; i++) {
            final int taskNumber = i;
            executor.execute(() -> {
                try {
                    System.out.println("Running task: " + taskNumber + " - Thread: " + Thread.currentThread().getName());
                    Thread.sleep(1000); // 模拟任务执行时间
                } catch (InterruptedException e) {
                    Thread.currentThread().interrupt();
                }
            });
        }

        // 关闭线程池
        executor.shutdown();
        executor.awaitTermination(5, java.util.concurrent.TimeUnit.SECONDS);
    }
}

注意:

  • 需要根据具体的消息队列服务来修改代码。
  • 需要处理发送消息可能抛出的异常。
  • 消息队列的选择需要根据具体的业务需求来决定。

5. 选择合适的RejectedExecutionHandler

选择合适的RejectedExecutionHandler需要根据具体的业务需求来决定。以下是一些常用的选择策略:

场景 推荐的RejectedExecutionHandler
任务丢失不可接受 自定义RejectedExecutionHandler,将任务持久化到数据库或消息队列。
可以容忍少量任务丢失 DiscardPolicy,直接丢弃被拒绝的任务。
希望降低系统负载 CallerRunsPolicy,由提交任务的线程来执行被拒绝的任务。
需要快速失败 AbortPolicy,直接抛出RejectedExecutionException
任务有优先级,希望尽快执行重要任务 可以使用PriorityBlockingQueue作为任务队列,并自定义RejectedExecutionHandler,将被拒绝的任务重新放入队列中,并提高其优先级。 (需要谨慎设计,避免重要任务一直被拒绝导致死循环)
需要熔断机制 自定义RejectedExecutionHandler,当拒绝的任务数量达到一定阈值时,触发熔断器,阻止新的任务提交。

6. 总结

通过自定义RejectedExecutionHandler,我们可以灵活地处理线程池中被拒绝的任务,从而提高系统的可用性和可靠性。在实际应用中,我们需要根据具体的业务需求选择合适的拒绝策略,并进行充分的测试,以确保系统的稳定运行。

选择合适的拒绝策略至关重要,自定义能满足特定需求
自定义RejectedExecutionHandler能够灵活地处理被拒绝的任务,从而提高系统的可用性。需要根据具体的业务需求选择合适的拒绝策略,并进行充分的测试。

发表回复

您的邮箱地址不会被公开。 必填项已用 * 标注