JAVA ThreadPoolExecutor拒绝策略选型错误导致数据丢失的解决方案

JAVA ThreadPoolExecutor拒绝策略选型错误导致数据丢失的解决方案

大家好,今天我们来聊聊Java ThreadPoolExecutor 的拒绝策略,以及选型不当可能导致的数据丢失问题。这个问题在并发编程中非常常见,理解其原理并掌握正确的解决方案至关重要。

ThreadPoolExecutor 的基本概念

ThreadPoolExecutor 是 Java 并发包中一个核心类,它实现了线程池的功能。线程池可以有效地管理和复用线程,从而提高程序的性能和资源利用率。 它的核心参数包括:

  • corePoolSize: 核心线程数,即使线程池空闲,也会保持的线程数量。
  • maximumPoolSize: 最大线程数,线程池允许创建的最大线程数量。
  • keepAliveTime: 线程空闲时间,超过这个时间,多余的空闲线程会被销毁。
  • unit: keepAliveTime 的时间单位。
  • workQueue: 任务队列,用于存放等待执行的任务。
  • threadFactory: 线程工厂,用于创建线程。
  • rejectedExecutionHandler: 拒绝策略,当任务队列满了且线程池中的线程数达到 maximumPoolSize 时,用于处理新提交的任务。

拒绝策略的作用与类型

rejectedExecutionHandler,也就是拒绝策略,是 ThreadPoolExecutor 在无法接受新任务时采取的处理方式。 常见的拒绝策略有以下几种:

拒绝策略 描述 行为
AbortPolicy 默认策略,直接抛出 RejectedExecutionException 异常。 抛出异常,调用者可以捕获并处理。
CallerRunsPolicy 由提交任务的线程执行该任务。 将任务返回给提交任务的线程执行,降低了新任务的提交速度,减缓了任务提交的速度,相当于“节流”。 适用于对提交速度敏感,但对任务处理时间不敏感的场景。
DiscardPolicy 直接丢弃任务,不做任何处理。 无声无息地丢弃任务,不抛出异常,也不通知调用者。
DiscardOldestPolicy 丢弃队列中最老的任务,然后尝试重新提交新任务。 丢弃队列中最老的任务,可能会导致某些任务永远无法执行。
自定义 RejectedExecutionHandler 可以根据实际需求,实现自定义的拒绝策略。 提供极大的灵活性,可以记录日志、发送警报、持久化任务等等。

数据丢失的场景分析

数据丢失通常发生在以下场景:

  1. 使用了 DiscardPolicyDiscardOldestPolicy: 这两种策略都会直接丢弃任务,如果任务包含重要的数据操作,就会导致数据丢失。DiscardPolicy 直接丢弃新任务,而 DiscardOldestPolicy 丢弃队列中最老的任务,这两种方式都可能造成数据的不完整或不一致。

  2. 未正确处理 AbortPolicy 抛出的异常: AbortPolicy 会抛出 RejectedExecutionException 异常,如果调用者没有捕获并处理这个异常,任务就会被丢失,数据操作无法完成。

  3. CallerRunsPolicy 导致的资源竞争: 虽然 CallerRunsPolicy 不会直接丢弃任务,但如果提交任务的线程本身也承担着其他重要职责,执行被拒绝的任务可能会导致资源竞争,影响整体性能,甚至间接导致数据不一致。

解决方案与代码示例

针对上述场景,我们可以采取以下解决方案:

  1. 避免使用 DiscardPolicyDiscardOldestPolicy: 除非你能容忍数据丢失,否则应尽量避免使用这两种策略。

  2. 正确处理 RejectedExecutionException: 在使用 AbortPolicy 时,务必捕获 RejectedExecutionException 异常,并进行相应的处理,例如:

    • 重试任务: 将任务重新提交到线程池,但需要设置重试次数上限,防止无限循环。

      import java.util.concurrent.*;
      
      public class RetryRejectedExecutionHandler implements RejectedExecutionHandler {
      
          private final int maxRetries;
          private final long retryIntervalMillis;
      
          public RetryRejectedExecutionHandler(int maxRetries, long retryIntervalMillis) {
              this.maxRetries = maxRetries;
              this.retryIntervalMillis = retryIntervalMillis;
          }
      
          @Override
          public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
              int retries = 0;
              while (retries < maxRetries) {
                  try {
                      Thread.sleep(retryIntervalMillis);
                      executor.execute(r);
                      System.out.println("Task re-submitted successfully after " + (retries + 1) + " retries.");
                      return;
                  } catch (InterruptedException e) {
                      Thread.currentThread().interrupt();
                      System.err.println("Interrupted while retrying task.");
                      return;
                  } catch (RejectedExecutionException e) {
                      retries++;
                      System.err.println("Task re-submission failed, retry attempt: " + retries);
                  }
              }
              System.err.println("Task failed after " + maxRetries + " retries.  Giving up.");
              // You might want to log this failure or take other appropriate actions.
          }
      
          public static void main(String[] args) throws InterruptedException {
              int corePoolSize = 2;
              int maxPoolSize = 4;
              long keepAliveTime = 60L;
              BlockingQueue<Runnable> workQueue = new ArrayBlockingQueue<>(2);
              int maxRetries = 3;
              long retryIntervalMillis = 100;
      
              ThreadPoolExecutor executor = new ThreadPoolExecutor(
                      corePoolSize,
                      maxPoolSize,
                      keepAliveTime,
                      TimeUnit.SECONDS,
                      workQueue,
                      new RetryRejectedExecutionHandler(maxRetries, retryIntervalMillis)
              );
      
              for (int i = 0; i < 10; i++) {
                  final int taskNumber = i;
                  executor.execute(() -> {
                      System.out.println("Executing task: " + taskNumber + " by thread: " + Thread.currentThread().getName());
                      try {
                          Thread.sleep(500); // Simulate some work
                      } catch (InterruptedException e) {
                          Thread.currentThread().interrupt();
                      }
                  });
              }
      
              executor.shutdown();
              executor.awaitTermination(1, TimeUnit.MINUTES);
          }
      }
    • 持久化任务: 将任务信息持久化到数据库、消息队列或其他存储介质中,稍后重新执行。

      import java.util.concurrent.*;
      import java.util.ArrayList;
      import java.util.List;
      
      interface TaskPersistenceService {
          void saveTask(Runnable task);
      }
      
      class InMemoryTaskPersistenceService implements TaskPersistenceService {
          private final List<Runnable> tasks = new ArrayList<>();
      
          @Override
          public synchronized void saveTask(Runnable task) {
              tasks.add(task);
              System.out.println("Task persisted in memory.");
          }
      
          public synchronized List<Runnable> getTasks() {
              return new ArrayList<>(tasks); // Return a copy to avoid concurrent modification issues
          }
      
          public synchronized void clearTasks() {
              tasks.clear();
          }
      }
      
      public class PersistenceRejectedExecutionHandler implements RejectedExecutionHandler {
      
          private final TaskPersistenceService persistenceService;
      
          public PersistenceRejectedExecutionHandler(TaskPersistenceService persistenceService) {
              this.persistenceService = persistenceService;
          }
      
          @Override
          public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
              persistenceService.saveTask(r);
          }
      
          public static void main(String[] args) throws InterruptedException {
              int corePoolSize = 2;
              int maxPoolSize = 4;
              long keepAliveTime = 60L;
              BlockingQueue<Runnable> workQueue = new ArrayBlockingQueue<>(2);
      
              InMemoryTaskPersistenceService persistenceService = new InMemoryTaskPersistenceService();
              PersistenceRejectedExecutionHandler rejectedExecutionHandler = new PersistenceRejectedExecutionHandler(persistenceService);
      
              ThreadPoolExecutor executor = new ThreadPoolExecutor(
                      corePoolSize,
                      maxPoolSize,
                      keepAliveTime,
                      TimeUnit.SECONDS,
                      workQueue,
                      rejectedExecutionHandler
              );
      
              for (int i = 0; i < 10; i++) {
                  final int taskNumber = i;
                  executor.execute(() -> {
                      System.out.println("Executing task: " + taskNumber + " by thread: " + Thread.currentThread().getName());
                      try {
                          Thread.sleep(500); // Simulate some work
                      } catch (InterruptedException e) {
                          Thread.currentThread().interrupt();
                      }
                  });
              }
      
              executor.shutdown();
              executor.awaitTermination(1, TimeUnit.MINUTES);
      
              // Simulate reloading and executing persisted tasks
              System.out.println("Simulating reloading and executing persisted tasks...");
              List<Runnable> persistedTasks = persistenceService.getTasks();
              persistedTasks.forEach(task -> {
                  new Thread(task).start(); // Execute in a new thread (or another ExecutorService)
              });
      
              persistenceService.clearTasks(); // Clean up persisted tasks
          }
      }
    • 记录日志并发送警报: 记录任务信息到日志,并发送警报通知相关人员。

      import java.util.concurrent.*;
      
      public class LoggingRejectedExecutionHandler implements RejectedExecutionHandler {
      
          private final String systemName;
      
          public LoggingRejectedExecutionHandler(String systemName) {
              this.systemName = systemName;
          }
      
          @Override
          public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
              String taskDescription = r.toString();  // Or a more meaningful description based on your task
              String errorMessage = String.format("Task rejected by ThreadPoolExecutor in system %s.  Task: %s", systemName, taskDescription);
              System.err.println(errorMessage);  // Log the error
              sendAlert(errorMessage);  // Send an alert (implementation not shown)
          }
      
          private void sendAlert(String message) {
              // Implementation to send an alert via email, SMS, or other alerting mechanism
              System.out.println("Sending alert: " + message);
              // In a real system, you'd use a proper alerting library or service
          }
      
          public static void main(String[] args) throws InterruptedException {
              int corePoolSize = 2;
              int maxPoolSize = 4;
              long keepAliveTime = 60L;
              BlockingQueue<Runnable> workQueue = new ArrayBlockingQueue<>(2);
      
              LoggingRejectedExecutionHandler rejectedExecutionHandler = new LoggingRejectedExecutionHandler("MySystem");
      
              ThreadPoolExecutor executor = new ThreadPoolExecutor(
                      corePoolSize,
                      maxPoolSize,
                      keepAliveTime,
                      TimeUnit.SECONDS,
                      workQueue,
                      rejectedExecutionHandler
              );
      
              for (int i = 0; i < 10; i++) {
                  final int taskNumber = i;
                  executor.execute(() -> {
                      System.out.println("Executing task: " + taskNumber + " by thread: " + Thread.currentThread().getName());
                      try {
                          Thread.sleep(500); // Simulate some work
                      } catch (InterruptedException e) {
                          Thread.currentThread().interrupt();
                      }
                  });
              }
      
              executor.shutdown();
              executor.awaitTermination(1, TimeUnit.MINUTES);
          }
      }
  3. 合理配置线程池参数: 根据实际需求,合理配置 corePoolSizemaximumPoolSizeworkQueue 的大小。 如果任务队列经常满,可以考虑增加 maximumPoolSize 或使用更大的队列。

  4. 使用有界队列并监控队列长度: 如果使用无界队列 (LinkedBlockingQueue without capacity), 可能会导致内存溢出。 推荐使用有界队列 (ArrayBlockingQueueLinkedBlockingQueue with capacity),并监控队列的长度,当队列接近满时,采取相应的措施,例如:

    • 增加线程池的线程数: 动态调整 maximumPoolSize (需要谨慎,并做好监控)。
    • 降级服务: 暂时拒绝部分请求,保证核心业务的稳定。
    • 限流: 限制任务提交的速度。
  5. 自定义拒绝策略: 根据具体的业务场景,实现自定义的 RejectedExecutionHandler。 例如,可以结合重试、持久化和警报等多种策略。

  6. 选择合适的队列类型:

    • ArrayBlockingQueue: 一个由数组支持的有界阻塞队列。 必须指定容量。 公平性策略可选(影响等待线程的获取锁的顺序)。
    • LinkedBlockingQueue: 一个由链表支持的可选有界阻塞队列。 可以指定容量,也可以不指定(默认为 Integer.MAX_VALUE,相当于无界队列)。
    • PriorityBlockingQueue: 一个支持优先级排序的无界阻塞队列。 任务必须实现 Comparable 接口。
    • SynchronousQueue: 一个不存储元素的阻塞队列。 每个插入操作必须等待一个相应的移除操作,反之亦然。 适用于传递性场景。

代码示例:自定义拒绝策略

import java.util.concurrent.*;

public class CustomRejectedExecutionHandler implements RejectedExecutionHandler {

    @Override
    public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
        if (!executor.isShutdown()) {
            // 1. Log the rejection
            System.err.println("Task rejected: " + r.toString());

            // 2. Attempt to re-queue the task (with a limit on retries)
            if (r instanceof  ResubmittableTask){
                ResubmittableTask resubmittableTask = (ResubmittableTask) r;
                if (resubmittableTask.getRetryCount() < 3) { // Example: Limit to 3 retries
                    try {
                        Thread.sleep(100); // Add a small delay before retrying
                        resubmittableTask.incrementRetryCount();
                        executor.execute(resubmittableTask);
                        System.out.println("Task re-submitted (attempt " + resubmittableTask.getRetryCount() + ")");
                        return;
                    } catch (RejectedExecutionException e) {
                        System.err.println("Re-submission failed, task still rejected.");
                    } catch (InterruptedException e) {
                        Thread.currentThread().interrupt();
                    }
                } else {
                    System.err.println("Max retries reached for task, giving up.");
                    //Implement persistence logic or other final handling here
                }
            } else {
                 System.err.println("Non-Resubmittable task rejected and dropped.");
            }

            // 3. Optionally, persist the task to a database or message queue for later processing
            // persistTask(r);

            // 4. Optionally, send an alert to notify administrators
            // sendAlert("Task rejected by thread pool");
        } else {
            System.out.println("Executor is shutting down, discarding task.");
        }
    }

    public static void main(String[] args) throws InterruptedException {
        int corePoolSize = 2;
        int maxPoolSize = 4;
        long keepAliveTime = 60L;
        BlockingQueue<Runnable> workQueue = new ArrayBlockingQueue<>(2);

        CustomRejectedExecutionHandler rejectedExecutionHandler = new CustomRejectedExecutionHandler();

        ThreadPoolExecutor executor = new ThreadPoolExecutor(
                corePoolSize,
                maxPoolSize,
                keepAliveTime,
                TimeUnit.SECONDS,
                workQueue,
                rejectedExecutionHandler
        );

        for (int i = 0; i < 10; i++) {
            final int taskNumber = i;
            executor.execute(new ResubmittableTask(() -> {
                System.out.println("Executing task: " + taskNumber + " by thread: " + Thread.currentThread().getName());
                try {
                    Thread.sleep(500); // Simulate some work
                } catch (InterruptedException e) {
                    Thread.currentThread().interrupt();
                }
            }));
        }

        executor.shutdown();
        executor.awaitTermination(1, TimeUnit.MINUTES);
    }

    static class ResubmittableTask implements Runnable {
        private final Runnable task;
        private int retryCount = 0;

        public ResubmittableTask(Runnable task) {
            this.task = task;
        }

        @Override
        public void run() {
            task.run();
        }

        public int getRetryCount() {
            return retryCount;
        }

        public void incrementRetryCount() {
            retryCount++;
        }

        @Override
        public String toString() {
            return "ResubmittableTask{" +
                    "task=" + task +
                    ", retryCount=" + retryCount +
                    '}';
        }
    }
}

监控线程池状态

为了更好地了解线程池的运行状况,可以使用 ThreadPoolExecutor 提供的方法进行监控:

  • getPoolSize(): 返回当前的线程池大小。
  • getActiveCount(): 返回正在执行任务的线程数。
  • getQueue().size(): 返回任务队列中的任务数。
  • getCompletedTaskCount(): 返回已完成的任务数。
  • getTaskCount(): 返回已提交的任务总数。

通过这些指标,可以及时发现线程池的瓶颈,并进行相应的调整。

如何选择正确的拒绝策略?

选择合适的拒绝策略取决于你的应用场景和对数据完整性的要求。 以下是一些建议:

  • 数据完整性至关重要: 避免使用 DiscardPolicyDiscardOldestPolicy。 考虑使用 AbortPolicy 并正确处理异常,或者使用自定义的策略进行重试或持久化。
  • 提交速度敏感,但任务处理时间不敏感: 可以使用 CallerRunsPolicy,但要注意资源竞争的问题。
  • 需要快速失败: 可以使用 AbortPolicy,并通知调用者任务失败。
  • 需要根据业务逻辑进行处理: 自定义 RejectedExecutionHandler,实现灵活的拒绝策略。

总结要点

正确选择 ThreadPoolExecutor 的拒绝策略至关重要,错误的选择可能导致数据丢失。 需要根据实际业务场景,权衡数据完整性和系统性能,选择合适的策略,并进行充分的测试和监控。 记住要避免无脑使用丢弃策略,一定要做好任务失败重试或持久化,并监控线程池运行状态,及时调整参数。

发表回复

您的邮箱地址不会被公开。 必填项已用 * 标注