JAVA 并发 Map 使用不当?分析 ConcurrentHashMap 扩容机制与性能陷阱
大家好,今天我们来深入探讨一下 Java 并发编程中一个非常重要的组件:ConcurrentHashMap。它在多线程环境下提供了高效且线程安全的 Map 实现,但如果使用不当,也可能遇到性能瓶颈。我们将详细分析 ConcurrentHashMap 的扩容机制,以及由此可能产生的性能陷阱,并提供一些建议来避免这些问题。
为什么要使用 ConcurrentHashMap?
在并发环境下,如果多个线程同时对 HashMap 进行修改,可能会导致数据结构损坏,进而引发程序崩溃。Hashtable 虽然是线程安全的,但它的同步机制非常粗粒度,直接锁住整个 Map,导致并发性能很差。ConcurrentHashMap 旨在解决这些问题,它通过更加精细的锁机制(分段锁,在JDK8之后是CAS+synchronized)和高效的并发算法,在保证线程安全的同时,尽可能地提高并发性能。
ConcurrentHashMap 的数据结构
让我们先回顾一下 ConcurrentHashMap 的基本数据结构(以 JDK 8 为例,也是目前最常用的版本)。
- Node 数组 (table):
ConcurrentHashMap的主干,存储实际的键值对数据。 - Node: 存储键值对的节点,类似
HashMap的节点。但ConcurrentHashMap的 Node 是不可变的,保证了并发读取的安全性。 - TreeNode: 当链表长度超过一定阈值(默认为 8)时,链表会转换为红黑树,提高查找效率。
- sizeCtl: 一个非常重要的控制变量,用于控制 Map 的状态和扩容。它可以是以下几种值:
- 小于 0:表示 Map 正在进行初始化或扩容。
- -1:表示 Map 正在初始化。
- -(1 + n):表示有 n 个线程正在进行扩容。
- 等于 0:表示 Map 还没有被初始化。
- 大于 0:
- 在初始化之后,表示下次扩容的阈值。
- 在初始化之前,表示初始容量,取值是初始化时的容量,或者默认容量16。
- 小于 0:表示 Map 正在进行初始化或扩容。
扩容机制:背后的原理
ConcurrentHashMap 的扩容机制是其复杂性和性能的关键所在。当 Map 中的元素数量达到扩容阈值时,就会触发扩容操作。
1. 扩容触发条件:
当 ConcurrentHashMap 中的元素数量超过 sizeCtl(扩容阈值)时,或者当某个 bin 中的链表长度超过 TREEIFY_THRESHOLD (默认是8) 并且table的容量小于 MIN_TREEIFY_CAPACITY (默认是64) 时,就会尝试触发扩容。
2. 扩容过程:
ConcurrentHashMap 的扩容是一个复杂的过程,它允许多个线程并发地进行扩容,从而减少扩容带来的性能影响。
- transferIndex: 一个原子变量,用于标记迁移的进度。初始值为 table 的长度
n。 - nextTable: 扩容后的新 table。
- ForwardingNode: 一个特殊的 Node,用于标记某个 bin 已经被迁移。
扩容的主要步骤如下:
- 初始化 nextTable: 创建一个新的 table,容量是原来的两倍。
- 分配任务: 将 table 分成多个 segment,每个线程负责迁移一部分 segment。
- 迁移数据: 每个线程负责将自己分配到的 segment 中的数据迁移到新的 table 中。
- 如果某个 bin 为空,直接将
ForwardingNode放到该 bin 中。 - 如果某个 bin 只有一个节点,直接将该节点迁移到新的 table 中。
- 如果某个 bin 是链表,需要遍历链表,将链表中的每个节点迁移到新的 table 中。
- 如果某个 bin 是红黑树,需要将红黑树转换为链表或重新构建红黑树,然后迁移到新的 table 中。
- 如果某个 bin 为空,直接将
- 更新 table: 当所有数据都迁移完成后,将
table指向nextTable,更新sizeCtl为新的扩容阈值。
3. 帮助扩容 (helpTransfer):
当一个线程在进行 put 操作时,如果发现 Map 正在进行扩容,它会帮助进行扩容,而不是等待扩容完成。这样可以加快扩容的速度,减少扩容带来的性能影响。
4. 扩容源码分析 (transfer 方法):
下面是 ConcurrentHashMap 中 transfer 方法的关键代码片段,展示了数据迁移的核心逻辑:
private final void transfer(Node<K,V>[] tab, Node<K,V>[] nextTab) {
int n = tab.length, stride;
if ((stride = (NCPU > 1) ? (n >>> 3) / NCPU : n) < MIN_TRANSFER_STRIDE)
stride = MIN_TRANSFER_STRIDE; // subdivide range
if (nextTab == null) { // initiating
try {
@SuppressWarnings("unchecked")
Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n << 1];
nextTab = nt;
} catch (Throwable ex) { // try to recover if insufficient memory
sizeCtl = Integer.MAX_VALUE;
return;
}
nextTable = nextTab;
transferIndex = n;
}
int nextn = nextTab.length;
ForwardingNode<K,V> fwd = new ForwardingNode<K,V>(nextTab);
boolean advance = true;
boolean finishing = false; // to ensure that we only advance once
for (int i = 0, bound = 0;;) {
Node<K,V> f; int fh;
while (advance) {
int nextIndex, nextBound;
if (--i >= bound || finishing)
advance = false;
else if ((nextIndex = transferIndex) <= 0) {
i = -1;
advance = false;
}
else if (U.compareAndSwapInt
(this, TRANSFERINDEX, nextIndex,
nextBound = (nextIndex > stride ?
nextIndex - stride : 0))) {
bound = nextBound;
i = nextIndex - 1;
advance = false;
}
}
if (i < 0 || i >= n || nextn <= 0) {
int sc;
if (finishing) {
nextTable = null;
table = nextTab;
sizeCtl = (n << 1) - (n >>> 1);
return;
}
if (U.compareAndSwapInt(this, SIZECTL, sc = sizeCtl, sc - 1)) {
if ((sc - 2) != resizeStamp(n) << RESIZE_STAMP_SHIFT)
return;
finishing = advance = true;
i = n; // recheck before commit
}
}
else if ((f = tabAt(tab, i)) == null)
advance = casTabAt(tab, i, null, fwd);
else if ((fh = f.hash) == MOVED)
advance = true; // already processed
else {
synchronized (f) {
if (tabAt(tab, i) != f)
continue;
Node<K,V> ln, hn;
if (fh >= 0) {
int runBit = fh & n;
Node<K,V> lastRun = f;
for (Node<K,V> p = f.next; p != null; p = p.next) {
int b = p.hash & n;
if (b != runBit) {
runBit = b;
lastRun = p;
}
}
if (runBit == 0) {
ln = lastRun;
hn = null;
}
else {
hn = lastRun;
ln = null;
}
for (Node<K,V> p = f; p != lastRun; p = p.next) {
int ph = p.hash;
int b = ph & n;
if (b == 0)
ln = new Node<K,V>(ph, p.key, p.val, ln);
else
hn = new Node<K,V>(ph, p.key, p.val, hn);
}
setTabAt(nextTab, i, ln);
setTabAt(nextTab, i + n, hn);
setTabAt(tab, i, fwd);
advance = true;
}
else if (f instanceof TreeBin) {
TreeBin<K,V> t = (TreeBin<K,V>)f;
TreeNode<K,V> lo = null, loTail = null;
TreeNode<K,V> hi = null, hiTail = null;
int lc = 0, hc = 0;
for (TreeNode<K,V> p = t.first; p != null; p = p.next) {
int h = p.hash;
TreeNode<K,V> np = new TreeNode<K,V>(h, p.key, p.val, null, null);
if ((h & n) == 0) {
if ((np.prev = loTail) == null)
lo = np;
else
loTail.next = np;
loTail = np;
++lc;
}
else {
if ((np.prev = hiTail) == null)
hi = np;
else
hiTail.next = np;
hiTail = np;
++hc;
}
}
ln = (lc <= UNTREEIFY_THRESHOLD) ? untreeify(lo) :
(hc != 0) ? new TreeBin<K,V>(lo) : t;
hn = (hc <= UNTREEIFY_THRESHOLD) ? untreeify(hi) :
(lc != 0) ? new TreeBin<K,V>(hi) : t;
setTabAt(nextTab, i, ln);
setTabAt(nextTab, i + n, hn);
setTabAt(tab, i, fwd);
advance = true;
}
}
}
}
}
这段代码展示了 transfer 方法的核心逻辑,包括分配任务、迁移数据(链表和红黑树)以及更新 table。理解这段代码对于深入理解 ConcurrentHashMap 的扩容机制至关重要。
性能陷阱:扩容风暴
虽然 ConcurrentHashMap 的扩容机制旨在提高并发性能,但在某些情况下,不当的使用仍然可能导致性能问题,最常见的就是“扩容风暴”。
1. 什么是扩容风暴?
当 ConcurrentHashMap 中的元素数量迅速增长,导致频繁的扩容操作时,大量的线程被阻塞或参与到扩容过程中,从而降低了 Map 的性能。 在高并发场景下,频繁的扩容操作会消耗大量的 CPU 资源,导致应用程序响应变慢甚至崩溃。
2. 扩容风暴的成因:
- 初始容量设置过小: 如果初始容量设置过小,Map 会很快达到扩容阈值,从而触发频繁的扩容操作。
- Hash 冲突严重: 如果 Hash 函数设计不合理或者键的 Hash 值分布不均匀,会导致大量的元素集中在少数的 bin 中,从而增加了链表长度,加速了扩容的触发。
- 并发写入量过大: 在高并发写入场景下,Map 中的元素数量会迅速增加,从而导致频繁的扩容操作。
- 错误预估数据量: 没有准确预估数据量,导致预设容量和实际容量相差过大。
3. 如何避免扩容风暴:
-
合理设置初始容量: 根据预估的数据量,合理设置
ConcurrentHashMap的初始容量。可以使用以下公式计算初始容量:initialCapacity = (int) (expectedSize / loadFactor + 1)其中
expectedSize是预估的数据量,loadFactor是负载因子(默认为 0.75)。例如,如果预计 Map 中要存储 10000 个元素,那么可以设置初始容量为:
initialCapacity = (int) (10000 / 0.75 + 1) = 13334 -
选择合适的 Hash 函数: 选择一个能够将键均匀分布的 Hash 函数,减少 Hash 冲突。如果使用的是自定义对象作为键,需要重写
hashCode()方法,保证 Hash 值的均匀分布。 -
避免热点 Key: 尽量避免大量的请求集中访问少数的 Key,这会导致这些 Key 所在的 bin 成为热点,增加 Hash 冲突,从而加速扩容的触发。
-
控制并发写入量: 在某些情况下,可以通过限制并发写入量来避免扩容风暴。例如,可以使用线程池来控制并发写入的线程数量。
-
监控和调优: 监控
ConcurrentHashMap的性能指标,例如扩容次数、平均链表长度等。根据监控结果,调整初始容量、Hash 函数等参数,进行性能调优。 -
使用预分配容量的 Map 实现: 如果数据量可以预估,且写入模式是先写入大量数据,后续读取多于写入,可以考虑使用 Guava 的
ImmutableMap或者其他预分配容量的 Map 实现。 这些实现通常在初始化时分配足够的容量,避免后续的扩容操作。
代码示例:模拟扩容风暴
下面是一个简单的代码示例,模拟了 ConcurrentHashMap 的扩容风暴:
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
public class ConcurrentHashMapExpansionStorm {
public static void main(String[] args) throws InterruptedException {
int initialCapacity = 16; // 初始容量很小
int concurrencyLevel = 32;
int numThreads = 32;
int numElements = 100000;
ConcurrentHashMap<Integer, Integer> map = new ConcurrentHashMap<>(initialCapacity, 0.75f, concurrencyLevel);
ExecutorService executor = Executors.newFixedThreadPool(numThreads);
long startTime = System.nanoTime();
for (int i = 0; i < numThreads; i++) {
final int threadId = i;
executor.submit(() -> {
for (int j = threadId; j < numElements; j += numThreads) {
map.put(j, j);
}
});
}
executor.shutdown();
executor.awaitTermination(10, TimeUnit.MINUTES);
long endTime = System.nanoTime();
long duration = (endTime - startTime) / 1_000_000;
System.out.println("Time taken: " + duration + " ms");
System.out.println("Map size: " + map.size());
}
}
在这个示例中,我们创建了一个初始容量很小的 ConcurrentHashMap,然后使用多个线程并发地向 Map 中写入大量数据。由于初始容量很小,Map 会频繁地进行扩容操作,从而导致性能下降。
可以通过增大 initialCapacity 的值,观察性能的变化。
表格:总结性能优化建议
| 优化手段 | 描述 | 适用场景 |
|---|---|---|
| 合理设置初始容量 | 根据预估的数据量,合理设置 ConcurrentHashMap 的初始容量,避免频繁的扩容操作。 |
数据量可预估,且希望减少扩容带来的性能影响。 |
| 选择合适的 Hash 函数 | 选择一个能够将键均匀分布的 Hash 函数,减少 Hash 冲突。如果使用的是自定义对象作为键,需要重写 hashCode() 方法,保证 Hash 值的均匀分布。 |
希望提高 Hash 值的分布均匀性,减少 Hash 冲突。 |
| 避免热点 Key | 尽量避免大量的请求集中访问少数的 Key,这会导致这些 Key 所在的 bin 成为热点,增加 Hash 冲突,从而加速扩容的触发。 | 存在热点 Key 的场景,可以通过负载均衡、缓存等手段来缓解热点问题。 |
| 控制并发写入量 | 在某些情况下,可以通过限制并发写入量来避免扩容风暴。例如,可以使用线程池来控制并发写入的线程数量。 | 高并发写入场景,可以通过限制并发写入量来避免扩容风暴。 |
| 监控和调优 | 监控 ConcurrentHashMap 的性能指标,例如扩容次数、平均链表长度等。根据监控结果,调整初始容量、Hash 函数等参数,进行性能调优。 |
所有场景,通过监控和调优,持续优化 ConcurrentHashMap 的性能。 |
| 预分配容量的Map | 如果数据量可以预估,且写入模式是先写入大量数据,后续读取多于写入,可以考虑使用 Guava 的 ImmutableMap 或者其他预分配容量的 Map 实现。 这些实现通常在初始化时分配足够的容量,避免后续的扩容操作。 |
数据量可预估,写入模式是先写入大量数据,后续读取多于写入。 |
使用建议
- 理解
ConcurrentHashMap的工作原理: 深入理解ConcurrentHashMap的数据结构和扩容机制,才能更好地使用它,避免性能陷阱。 - 根据实际情况选择合适的参数: 根据预估的数据量、并发量等因素,合理设置
ConcurrentHashMap的初始容量、负载因子等参数。 - 进行性能测试和调优: 在生产环境中,对
ConcurrentHashMap进行性能测试和调优,确保其能够满足应用程序的需求。 - 关注 JDK 版本更新:
ConcurrentHashMap在不同的 JDK 版本中可能会有不同的实现和优化,关注 JDK 版本更新,及时了解最新的特性和改进。
性能优化的关键点
理解扩容机制,合理预估容量,避免热点 Key 是性能优化的关键。
实际场景中的应用
在缓存、会话管理、配置中心等高并发场景中,ConcurrentHashMap 都有着广泛的应用。
不仅仅是扩容
除了扩容,Hash 冲突、Key 的选择也会影响 ConcurrentHashMap 的性能。