JAVA ParallelStream 并行流导致非幂等业务重复执行的规避方式
大家好,今天我们来探讨一个在使用 Java ParallelStream 时经常遇到的问题:非幂等业务重复执行。ParallelStream 固然能提高处理速度,但在处理非幂等操作时,如果稍有不慎,就会导致数据错误,甚至产生严重的业务逻辑问题。
幂等性与非幂等性
首先,我们来明确一下幂等性的概念。一个操作被称为是幂等的,意味着无论执行多少次,其结果都相同。比如,设置一个变量的值:x = 5; 无论执行多少次,x 的值最终都是 5。
而非幂等操作,执行多次会产生不同的结果。典型的例子包括:
- 增加账户余额:每次执行都会导致余额增加。
- 生成订单:每次执行都会创建一个新的订单。
- 发送短信:每次执行都会发送一条短信。
ParallelStream 的潜在问题
ParallelStream 利用多线程并行处理数据,这意味着元素的处理顺序是不确定的。如果我们的业务逻辑依赖于执行顺序,或者操作本身是非幂等的,那么 ParallelStream 可能会导致意想不到的结果。
考虑以下简单的例子:
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.atomic.AtomicInteger;
public class NonIdempotentExample {
public static void main(String[] args) {
List<Integer> numbers = new ArrayList<>();
for (int i = 0; i < 10; i++) {
numbers.add(i);
}
AtomicInteger counter = new AtomicInteger(0);
// 并行流,非幂等操作:每次处理都增加计数器
numbers.parallelStream().forEach(number -> {
counter.incrementAndGet();
System.out.println("Thread: " + Thread.currentThread().getName() + ", Number: " + number + ", Counter: " + counter.get());
});
System.out.println("Final Counter Value: " + counter.get());
}
}
在这个例子中,我们使用一个 AtomicInteger 作为计数器,并在 forEach 操作中每次都增加它的值。由于 forEach 是并行执行的,因此每次运行程序,最终的计数器值都可能不同,而且极有可能大于 10。这是因为多个线程同时访问并修改了计数器,导致竞争条件。
规避非幂等操作重复执行的方法
现在,我们来讨论如何规避 ParallelStream 导致非幂等操作重复执行的问题。主要有以下几种策略:
1. 避免在 ParallelStream 中执行非幂等操作
这是最直接也是最推荐的方法。如果可能,尽量将非幂等操作移到 ParallelStream 之外执行。例如,可以先使用 ParallelStream 进行一些预处理,然后将结果收集起来,再在单线程环境下执行非幂等操作。
import java.util.ArrayList;
import java.util.List;
import java.util.stream.Collectors;
public class AvoidNonIdempotent {
public static void main(String[] args) {
List<Integer> numbers = new ArrayList<>();
for (int i = 0; i < 10; i++) {
numbers.add(i);
}
// 并行流,只进行数据转换等幂等操作
List<Integer> transformedNumbers = numbers.parallelStream()
.map(number -> number * 2)
.collect(Collectors.toList());
// 单线程环境,执行非幂等操作
transformedNumbers.forEach(number -> {
// 模拟非幂等操作,例如打印日志
System.out.println("Processing number: " + number);
});
}
}
在这个例子中,我们首先使用 ParallelStream 将 numbers 列表中的每个元素乘以 2,并将结果收集到 transformedNumbers 列表中。然后,我们在单线程环境下遍历 transformedNumbers 列表,并执行打印日志的操作。这样就避免了在 ParallelStream 中执行非幂等操作。
2. 使用线程安全的数据结构和同步机制
如果必须在 ParallelStream 中执行非幂等操作,我们需要使用线程安全的数据结构和同步机制来保证数据的一致性。
- 线程安全的数据结构: 例如
ConcurrentHashMap,ConcurrentLinkedQueue,CopyOnWriteArrayList等。 - 同步机制: 例如
synchronized关键字,Lock接口,AtomicInteger等。
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ConcurrentLinkedQueue;
public class ThreadSafeStructure {
public static void main(String[] args) {
List<Integer> numbers = new ArrayList<>();
for (int i = 0; i < 10; i++) {
numbers.add(i);
}
// 使用线程安全的队列
ConcurrentLinkedQueue<Integer> results = new ConcurrentLinkedQueue<>();
// 并行流,将结果添加到线程安全的队列中
numbers.parallelStream().forEach(number -> {
// 模拟非幂等操作,例如添加到队列
results.add(number * 2);
});
// 打印结果
results.forEach(System.out::println);
}
}
在这个例子中,我们使用 ConcurrentLinkedQueue 作为线程安全的队列,并在 ParallelStream 中将每个元素乘以 2 后添加到队列中。由于 ConcurrentLinkedQueue 是线程安全的,因此可以保证多个线程同时添加元素时不会出现数据竞争。
3. 使用 collect 操作进行累积
collect 操作可以将 ParallelStream 的结果累积到一个可变的结果容器中。我们可以利用 collect 操作来执行一些非幂等操作,并保证线程安全。
collect 操作接受三个参数:
- supplier: 创建结果容器的函数。
- accumulator: 将元素添加到结果容器的函数。
- combiner: 合并两个结果容器的函数(只在并行流中执行)。
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.Collectors;
public class CollectExample {
public static void main(String[] args) {
List<Integer> numbers = new ArrayList<>();
for (int i = 0; i < 10; i++) {
numbers.add(i);
}
AtomicInteger counter = numbers.parallelStream().collect(
AtomicInteger::new, // supplier: 创建 AtomicInteger
(atomicInt, number) -> atomicInt.addAndGet(number), // accumulator: 累加数值
AtomicInteger::addAndGet // combiner: 合并 AtomicInteger
);
System.out.println("Sum: " + counter.get());
}
}
在这个例子中,我们使用 collect 操作计算 numbers 列表中所有元素的总和。
AtomicInteger::new创建一个新的AtomicInteger对象作为结果容器。(atomicInt, number) -> atomicInt.addAndGet(number)将每个元素添加到AtomicInteger中。AtomicInteger::addAndGet合并两个AtomicInteger对象。
由于 AtomicInteger 是线程安全的,因此可以保证多个线程同时累加时不会出现数据竞争。
4. 使用 forEachOrdered 保证顺序
如果业务逻辑依赖于元素的处理顺序,可以使用 forEachOrdered 方法。forEachOrdered 会按照流中元素的顺序依次执行操作,即使在并行流中也是如此。但是,需要注意的是,forEachOrdered 会降低并行流的性能,因为它需要保证元素的处理顺序。通常不建议在高并发场景下使用。
import java.util.ArrayList;
import java.util.List;
public class ForEachOrderedExample {
public static void main(String[] args) {
List<Integer> numbers = new ArrayList<>();
for (int i = 0; i < 10; i++) {
numbers.add(i);
}
numbers.parallelStream().forEachOrdered(number -> {
// 模拟非幂等操作,例如打印日志
System.out.println("Processing number: " + number + " in thread: " + Thread.currentThread().getName());
});
}
}
在这个例子中,我们使用 forEachOrdered 方法按照 numbers 列表中元素的顺序依次打印日志。
5. 使用数据库事务
如果非幂等操作涉及到数据库操作,可以使用数据库事务来保证数据的一致性。将多个数据库操作放在一个事务中,要么全部成功,要么全部失败。
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.PreparedStatement;
import java.sql.SQLException;
import java.util.ArrayList;
import java.util.List;
public class DatabaseTransaction {
public static void main(String[] args) {
List<Integer> numbers = new ArrayList<>();
for (int i = 0; i < 10; i++) {
numbers.add(i);
}
String url = "jdbc:mysql://localhost:3306/testdb";
String user = "root";
String password = "password";
try (Connection connection = DriverManager.getConnection(url, user, password)) {
connection.setAutoCommit(false); // 禁用自动提交
String sql = "INSERT INTO numbers (value) VALUES (?)";
try (PreparedStatement statement = connection.prepareStatement(sql)) {
numbers.parallelStream().forEach(number -> {
try {
statement.setInt(1, number);
statement.executeUpdate();
System.out.println("Inserted: " + number + " in thread: " + Thread.currentThread().getName());
} catch (SQLException e) {
e.printStackTrace();
// 处理SQL异常
try {
connection.rollback(); // 回滚事务
System.err.println("Transaction rolled back due to error: " + e.getMessage());
} catch (SQLException rollbackException) {
System.err.println("Error rolling back transaction: " + rollbackException.getMessage());
}
}
});
connection.commit(); // 提交事务
System.out.println("Transaction committed successfully.");
} catch (SQLException e) {
try {
connection.rollback();
System.err.println("Transaction rolled back due to error: " + e.getMessage());
} catch (SQLException rollbackException) {
System.err.println("Error rolling back transaction: " + rollbackException.getMessage());
}
}
} catch (SQLException e) {
e.printStackTrace();
}
}
}
在这个例子中,我们将数据库插入操作放在一个事务中。如果插入过程中发生任何错误,都会回滚事务,保证数据的一致性。
6. 设计具有幂等性的操作
从根本上解决问题的方法是设计具有幂等性的操作。例如,可以使用乐观锁或悲观锁来避免并发更新导致的数据不一致。或者,可以为每个操作分配一个唯一的 ID,并在执行操作之前检查该 ID 是否已经存在。
例如,更新账户余额的操作,可以设计成如下的形式:
UPDATE account SET balance = new_balance WHERE id = account_id AND balance = old_balance;
只有当账户余额等于 old_balance 时,才会更新余额。这样就可以避免并发更新导致的数据不一致。
总结表格:规避 ParallelStream 非幂等操作重复执行的方法
| 方法 | 描述 | 适用场景 | 优点 | 缺点 |
|---|---|---|---|---|
| 避免在 ParallelStream 中执行 | 将非幂等操作移到 ParallelStream 之外执行。 | 大部分场景,只要能够将非幂等操作分离出来。 | 最简单,最安全。 | 需要重构代码,可能会降低代码的可读性。 |
| 使用线程安全的数据结构和同步机制 | 使用线程安全的数据结构(例如 ConcurrentHashMap)和同步机制(例如 synchronized 关键字)来保证数据的一致性。 |
必须在 ParallelStream 中执行非幂等操作,且可以接受一定的性能损失。 | 可以保证数据的一致性。 | 增加了代码的复杂性,可能会降低性能。 |
使用 collect 操作进行累积 |
使用 collect 操作将 ParallelStream 的结果累积到一个可变的结果容器中。 |
需要对结果进行累积,例如计算总和、平均值等。 | 可以保证数据的一致性,并且可以方便地进行累积操作。 | 代码相对复杂,需要理解 collect 操作的原理。 |
使用 forEachOrdered 保证顺序 |
使用 forEachOrdered 方法按照流中元素的顺序依次执行操作。 |
业务逻辑依赖于元素的处理顺序。 | 可以保证元素的处理顺序。 | 会降低并行流的性能,不适合在高并发场景下使用。 |
| 使用数据库事务 | 将多个数据库操作放在一个事务中,要么全部成功,要么全部失败。 | 非幂等操作涉及到数据库操作。 | 可以保证数据库数据的一致性。 | 增加了数据库的负担,可能会降低性能。 |
| 设计具有幂等性的操作 | 将非幂等操作设计成具有幂等性的操作。例如,可以使用乐观锁或悲观锁来避免并发更新导致的数据不一致。 | 从根本上解决问题,适用于所有场景。 | 最好的解决方案,可以避免并发问题。 | 需要重新设计业务逻辑,可能会比较困难。 |
案例分析
假设我们需要统计一个网站的访问量,每次访问都会向数据库中插入一条记录。这是一个非幂等操作。如果使用 ParallelStream 来处理访问日志,可能会导致重复插入记录。
我们可以采用以下几种方法来规避这个问题:
- 避免在 ParallelStream 中执行数据库插入操作。 我们可以先使用 ParallelStream 对访问日志进行预处理,例如过滤掉无效的访问记录,然后将处理后的访问记录收集起来,再在单线程环境下执行数据库插入操作。
- 使用数据库事务。 我们可以将数据库插入操作放在一个事务中,如果插入过程中发生任何错误,都会回滚事务,保证数据的一致性。
- 设计具有幂等性的操作。 我们可以为每个访问记录分配一个唯一的 ID,并在插入记录之前检查该 ID 是否已经存在。如果存在,则不插入记录。
选择合适的策略
选择哪种策略取决于具体的业务场景和需求。一般来说,我们应该尽量避免在 ParallelStream 中执行非幂等操作。如果必须执行,可以使用线程安全的数据结构和同步机制,或者使用 collect 操作进行累积。如果业务逻辑依赖于元素的处理顺序,可以使用 forEachOrdered 方法。如果非幂等操作涉及到数据库操作,可以使用数据库事务。最终的目标是保证数据的一致性和业务逻辑的正确性。
不要过度迷信 ParallelStream
最后,我们需要强调的是,ParallelStream 并不是万能的。虽然它可以提高处理速度,但也带来了额外的复杂性和风险。在使用 ParallelStream 之前,我们需要仔细评估是否真的需要使用它,以及是否能够正确地处理并发问题。有时候,使用简单的单线程代码可能更加安全和可靠。
好的,今天就分享到这里,希望对大家有所帮助!
总结
ParallelStream 虽快,非幂等操作要小心。
线程安全工具用,事务保证数据稳。
幂等设计是王道,谨慎使用是关键。