Java线程池:自定义RejectedExecutionHandler实现任务的持久化或降级处理

Java线程池:自定义RejectedExecutionHandler实现任务的持久化或降级处理

大家好,今天我们来深入探讨Java线程池的一个重要组成部分:RejectedExecutionHandler。当线程池因为达到饱和状态而无法接受新的任务时,RejectedExecutionHandler就发挥作用了。默认的策略往往比较简单粗暴,例如直接抛出异常或者丢弃任务。但在实际应用中,我们常常需要更精细的控制,例如将任务持久化到数据库,或者进行降级处理。本次讲座将围绕如何自定义RejectedExecutionHandler,实现任务的持久化和降级处理展开。

1. 线程池回顾:为什么需要RejectedExecutionHandler?

在深入RejectedExecutionHandler之前,我们先简单回顾一下线程池的工作原理。Java提供了ExecutorService接口及其实现类ThreadPoolExecutor来管理线程池。线程池的核心参数包括:

  • corePoolSize: 核心线程数,即使线程空闲也会保持存在的线程数量。
  • maximumPoolSize: 最大线程数,线程池允许创建的最大线程数量。
  • keepAliveTime: 空闲线程的存活时间,超过这个时间空闲线程会被回收。
  • unit: keepAliveTime的时间单位。
  • workQueue: 任务队列,用于存放等待执行的任务。
  • threadFactory: 用于创建线程的工厂。
  • handler: 拒绝策略,即RejectedExecutionHandler

当向线程池提交任务时,线程池会按照以下步骤执行:

  1. 如果当前线程数小于corePoolSize,则创建新的线程来执行任务。
  2. 如果当前线程数大于等于corePoolSize,则将任务放入workQueue
  3. 如果workQueue已满,且当前线程数小于maximumPoolSize,则创建新的线程来执行任务。
  4. 如果workQueue已满,且当前线程数大于等于maximumPoolSize,则执行RejectedExecutionHandlerrejectedExecution方法。

因此,RejectedExecutionHandler在线程池达到饱和状态时,负责处理无法被执行的任务。如果没有定义RejectedExecutionHandler,默认的行为会抛出一个RejectedExecutionException

2. Java提供的默认RejectedExecutionHandler

Java提供了几种默认的RejectedExecutionHandler实现:

Handler 行为 适用场景
AbortPolicy 抛出RejectedExecutionException。这是默认策略。 适用于对任务丢失零容忍的场景,需要立即发现并处理线程池饱和的问题。
DiscardPolicy 静默丢弃任务,不抛出任何异常。 适用于可以容忍任务丢失的场景,例如日志记录等。
DiscardOldestPolicy 丢弃队列中最旧的任务,然后尝试重新提交当前任务。 适用于希望优先处理最新任务的场景,例如缓存更新等。
CallerRunsPolicy 由提交任务的线程来执行任务。 适用于希望减缓任务提交速率的场景,通过让提交线程执行任务,可以起到一定的限流作用。

这些默认的策略各有特点,但在很多实际场景下,它们并不能满足我们的需求。例如,我们可能希望将任务持久化到数据库,以便后续重试;或者我们希望对任务进行降级处理,例如返回默认值或者执行更简单的逻辑。

3. 自定义RejectedExecutionHandler:持久化任务

现在我们来实现一个自定义的RejectedExecutionHandler,用于将无法执行的任务持久化到数据库。

首先,我们需要定义一个任务持久化的接口:

public interface TaskPersistenceService {
    void persistTask(Runnable task);
}

然后,我们需要实现这个接口,这里我们简单地使用一个List来模拟数据库:

import java.util.ArrayList;
import java.util.List;

public class InMemoryTaskPersistenceService implements TaskPersistenceService {

    private final List<Runnable> taskQueue = new ArrayList<>();

    @Override
    public void persistTask(Runnable task) {
        taskQueue.add(task);
        System.out.println("Task persisted to in-memory queue: " + task);
    }

    public List<Runnable> getPersistedTasks() {
        return new ArrayList<>(taskQueue); // 返回一个拷贝,防止外部修改
    }

    public void clearPersistedTasks() {
        taskQueue.clear();
    }
}

接下来,我们可以实现自定义的RejectedExecutionHandler

import java.util.concurrent.RejectedExecutionHandler;
import java.util.concurrent.ThreadPoolExecutor;

public class PersistenceRejectedExecutionHandler implements RejectedExecutionHandler {

    private final TaskPersistenceService taskPersistenceService;

    public PersistenceRejectedExecutionHandler(TaskPersistenceService taskPersistenceService) {
        this.taskPersistenceService = taskPersistenceService;
    }

    @Override
    public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
        taskPersistenceService.persistTask(r);
    }
}

在这个RejectedExecutionHandler中,我们将无法执行的任务传递给TaskPersistenceService进行持久化。

最后,我们可以创建一个使用这个RejectedExecutionHandler的线程池:

import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;

public class ThreadPoolExample {

    public static void main(String[] args) throws InterruptedException {
        TaskPersistenceService taskPersistenceService = new InMemoryTaskPersistenceService();
        PersistenceRejectedExecutionHandler handler = new PersistenceRejectedExecutionHandler(taskPersistenceService);

        ThreadPoolExecutor executor = new ThreadPoolExecutor(
                2, // corePoolSize
                4, // maximumPoolSize
                60, // keepAliveTime
                TimeUnit.SECONDS, // unit
                new LinkedBlockingQueue<>(2), // workQueue
                handler // handler
        );

        // 提交多个任务,超过线程池的处理能力
        for (int i = 0; i < 10; i++) {
            final int taskNumber = i;
            executor.execute(() -> {
                try {
                    System.out.println("Executing task: " + taskNumber + " by thread: " + Thread.currentThread().getName());
                    Thread.sleep(1000); // 模拟任务执行时间
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            });
        }

        executor.shutdown();
        executor.awaitTermination(5, TimeUnit.SECONDS);

        // 检查是否有任务被持久化
        InMemoryTaskPersistenceService inMemoryTaskPersistenceService = (InMemoryTaskPersistenceService) taskPersistenceService;
        System.out.println("Persisted tasks: " + inMemoryTaskPersistenceService.getPersistedTasks().size());

        // 展示持久化的任务
        inMemoryTaskPersistenceService.getPersistedTasks().forEach(task -> System.out.println("Persisted Task: " + task));
    }
}

在这个例子中,我们创建了一个核心线程数为2,最大线程数为4,队列长度为2的线程池。然后我们提交了10个任务。由于线程池的处理能力有限,一部分任务会被拒绝执行,并被持久化到数据库中。

4. 自定义RejectedExecutionHandler:降级处理

除了持久化任务,我们还可以使用RejectedExecutionHandler进行降级处理。例如,当线程池饱和时,我们可以返回一个默认值,或者执行一个更简单的逻辑。

import java.util.concurrent.RejectedExecutionHandler;
import java.util.concurrent.ThreadPoolExecutor;

public class DegradeRejectedExecutionHandler implements RejectedExecutionHandler {

    private final Runnable fallbackTask;

    public DegradeRejectedExecutionHandler(Runnable fallbackTask) {
        this.fallbackTask = fallbackTask;
    }

    @Override
    public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
        System.out.println("Thread pool is full, executing fallback task.");
        fallbackTask.run(); // 执行降级任务
    }
}

在这个RejectedExecutionHandler中,当任务被拒绝执行时,我们会执行一个fallbackTask,这个fallbackTask可以是一个返回默认值的操作,或者一个执行更简单的逻辑的操作。

import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;

public class ThreadPoolDegradeExample {

    public static void main(String[] args) throws InterruptedException {
        Runnable fallbackTask = () -> System.out.println("Fallback task executed.");
        DegradeRejectedExecutionHandler handler = new DegradeRejectedExecutionHandler(fallbackTask);

        ThreadPoolExecutor executor = new ThreadPoolExecutor(
                2, // corePoolSize
                4, // maximumPoolSize
                60, // keepAliveTime
                TimeUnit.SECONDS, // unit
                new LinkedBlockingQueue<>(2), // workQueue
                handler // handler
        );

        // 提交多个任务,超过线程池的处理能力
        for (int i = 0; i < 10; i++) {
            final int taskNumber = i;
            executor.execute(() -> {
                try {
                    System.out.println("Executing task: " + taskNumber + " by thread: " + Thread.currentThread().getName());
                    Thread.sleep(1000); // 模拟任务执行时间
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            });
        }

        executor.shutdown();
        executor.awaitTermination(5, TimeUnit.SECONDS);
    }
}

在这个例子中,我们创建了一个核心线程数为2,最大线程数为4,队列长度为2的线程池。然后我们提交了10个任务。由于线程池的处理能力有限,一部分任务会被拒绝执行,并执行fallbackTask

5. 最佳实践和注意事项

  • 选择合适的策略: 根据实际业务场景选择合适的RejectedExecutionHandler。如果对任务丢失零容忍,应该选择AbortPolicy并捕获RejectedExecutionException进行处理。如果可以容忍任务丢失,可以选择DiscardPolicy或者自定义的降级策略。
  • 避免阻塞:rejectedExecution方法中避免执行耗时操作,否则会阻塞提交任务的线程。如果需要执行耗时操作,可以考虑将操作放入另一个线程池中异步执行。
  • 监控和告警: 监控线程池的状态,包括队列长度、活跃线程数、已完成任务数等。当线程池达到饱和状态时,应该及时发出告警,以便及时处理。
  • 幂等性: 如果使用持久化策略,需要保证任务的幂等性,即多次执行同一个任务的结果应该相同。
  • 事务: 在持久化任务时,需要考虑事务的一致性。如果持久化失败,需要进行回滚,避免数据不一致。
  • 资源释放: 在任务执行完毕后,需要及时释放资源,例如关闭数据库连接、释放文件句柄等。

6. 更复杂的例子:结合消息队列实现异步重试

除了直接持久化到数据库,我们还可以将拒绝的任务发送到消息队列,例如RabbitMQ或Kafka,然后由消费者从消息队列中获取任务并进行重试。这样可以进一步解耦任务提交和任务执行,提高系统的可用性和可伸缩性。

首先,我们需要引入消息队列的依赖。以RabbitMQ为例,我们需要添加以下依赖:

<dependency>
    <groupId>com.rabbitmq</groupId>
    <artifactId>amqp-client</artifactId>
    <version>5.16.0</version> <!-- 请使用最新的版本 -->
</dependency>

然后,我们需要实现一个消息队列服务:

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.concurrent.TimeoutException;

public class RabbitMQTaskQueueService {

    private static final String QUEUE_NAME = "rejected_tasks";
    private final Connection connection;
    private final Channel channel;

    public RabbitMQTaskQueueService(String host) throws IOException, TimeoutException {
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost(host);
        connection = factory.newConnection();
        channel = connection.createChannel();
        channel.queueDeclare(QUEUE_NAME, true, false, false, null); // durable: true
    }

    public void enqueueTask(Runnable task) throws IOException {
        String message = task.getClass().getName();
        channel.basicPublish("", QUEUE_NAME, null, message.getBytes(StandardCharsets.UTF_8));
        System.out.println("Task enqueued to RabbitMQ: " + task.getClass().getName());
    }

    public void close() throws IOException, TimeoutException {
        channel.close();
        connection.close();
    }
}

在这个RabbitMQTaskQueueService中,我们将任务的类名作为消息发送到RabbitMQ队列中。

接下来,我们可以实现自定义的RejectedExecutionHandler

import java.io.IOException;
import java.util.concurrent.RejectedExecutionHandler;
import java.util.concurrent.ThreadPoolExecutor;

public class RabbitMQRejectedExecutionHandler implements RejectedExecutionHandler {

    private final RabbitMQTaskQueueService taskQueueService;

    public RabbitMQRejectedExecutionHandler(RabbitMQTaskQueueService taskQueueService) {
        this.taskQueueService = taskQueueService;
    }

    @Override
    public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
        try {
            taskQueueService.enqueueTask(r);
        } catch (IOException e) {
            System.err.println("Failed to enqueue task to RabbitMQ: " + e.getMessage());
            e.printStackTrace();
            // 可以考虑降级处理,例如记录日志或者抛出异常
        }
    }
}

在这个RejectedExecutionHandler中,我们将无法执行的任务发送到RabbitMQ队列中。

最后,我们需要创建一个消费者,从RabbitMQ队列中获取任务并执行:

import com.rabbitmq.client.*;

import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.concurrent.TimeoutException;

public class RabbitMQTaskConsumer {

    private static final String QUEUE_NAME = "rejected_tasks";

    public static void main(String[] args) throws IOException, TimeoutException {
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("localhost"); // 根据实际情况修改
        Connection connection = factory.newConnection();
        Channel channel = connection.createChannel();

        channel.queueDeclare(QUEUE_NAME, true, false, false, null);
        System.out.println(" [*] Waiting for messages. To exit press CTRL+C");

        DeliverCallback deliverCallback = (consumerTag, delivery) -> {
            String message = new String(delivery.getBody(), StandardCharsets.UTF_8);
            System.out.println(" [x] Received task: " + message);
            try {
                // 根据类名创建任务实例并执行
                Class<?> taskClass = Class.forName(message);
                Runnable task = (Runnable) taskClass.getDeclaredConstructor().newInstance();
                task.run(); // 注意:这里直接在当前线程执行任务,可以考虑使用线程池
            } catch (Exception e) {
                System.err.println("Failed to execute task: " + e.getMessage());
                e.printStackTrace();
            } finally {
                channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false); // 确认消息
            }
        };
        channel.basicConsume(QUEUE_NAME, false, deliverCallback, consumerTag -> { }); // autoAck: false
    }
}

在这个消费者中,我们从RabbitMQ队列中获取消息,并根据消息中的类名创建任务实例并执行。

现在,我们可以创建一个使用这个RabbitMQRejectedExecutionHandler的线程池:

import java.io.IOException;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;

public class ThreadPoolRabbitMQExample {

    public static void main(String[] args) throws InterruptedException, IOException, TimeoutException {
        RabbitMQTaskQueueService taskQueueService = new RabbitMQTaskQueueService("localhost"); // 根据实际情况修改
        RabbitMQRejectedExecutionHandler handler = new RabbitMQRejectedExecutionHandler(taskQueueService);

        ThreadPoolExecutor executor = new ThreadPoolExecutor(
                2, // corePoolSize
                4, // maximumPoolSize
                60, // keepAliveTime
                TimeUnit.SECONDS, // unit
                new LinkedBlockingQueue<>(2), // workQueue
                handler // handler
        );

        // 提交多个任务,超过线程池的处理能力
        for (int i = 0; i < 10; i++) {
            final int taskNumber = i;
            executor.execute(() -> {
                try {
                    System.out.println("Executing task: " + taskNumber + " by thread: " + Thread.currentThread().getName());
                    Thread.sleep(1000); // 模拟任务执行时间
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            });
        }

        executor.shutdown();
        executor.awaitTermination(5, TimeUnit.SECONDS);

        taskQueueService.close();
    }
}

在这个例子中,我们创建了一个核心线程数为2,最大线程数为4,队列长度为2的线程池。然后我们提交了10个任务。由于线程池的处理能力有限,一部分任务会被拒绝执行,并发送到RabbitMQ队列中,等待消费者执行。

7. 总结一下刚才讲的内容

  • RejectedExecutionHandler在线程池饱和时处理被拒绝的任务。
  • 可以自定义RejectedExecutionHandler实现任务持久化、降级处理等。
  • 持久化可以结合数据库或消息队列实现,需要考虑幂等性和事务。

8. 选择合适的策略,优化线程池使用

自定义RejectedExecutionHandler是优化Java线程池使用的关键一步。通过选择合适的策略,并结合实际业务场景进行定制,可以更好地控制任务的执行流程,提高系统的可用性和可靠性。希望今天的讲解能对你有所帮助。

发表回复

您的邮箱地址不会被公开。 必填项已用 * 标注