JAVA 并发 Map 使用不当?ConcurrentHashMap 扩容机制与性能陷阱
大家好,今天我们来聊聊 Java 并发编程中一个非常重要的工具:ConcurrentHashMap。它在并发环境下提供了线程安全的 Map 操作,但使用不当也会引发性能问题。我们将深入探讨 ConcurrentHashMap 的扩容机制,以及如何避免常见的性能陷阱。
1. 为什么需要 ConcurrentHashMap?
首先,我们需要理解为什么需要并发 Map。在单线程环境下,我们可以使用 HashMap,它提供了高效的键值对存储和检索。但在多线程环境下,HashMap 是非线程安全的。多个线程同时修改 HashMap 的结构(例如,插入、删除元素)可能导致数据不一致,甚至程序崩溃。
为了解决这个问题,Java 提供了 Hashtable 和 Collections.synchronizedMap(new HashMap<>())。Hashtable 使用 synchronized 关键字锁住整个 Map 对象,保证线程安全,但并发度很低。Collections.synchronizedMap 也是类似,本质上也是对整个 Map 进行同步。这意味着同一时刻只能有一个线程访问 Map,其他线程必须等待。这在高并发场景下会严重影响性能。
ConcurrentHashMap 的出现就是为了解决这个问题。它采用了更细粒度的锁机制,允许多个线程并发地访问 Map 的不同部分,从而提高了并发性能。
2. ConcurrentHashMap 的基本原理
ConcurrentHashMap 的核心思想是将整个 Map 分割成多个独立的段(Segment),每个段维护自己的哈希表。每个段都有自己的锁,线程访问不同的段时不需要竞争同一个锁,从而实现了并发访问。
在 JDK 1.8 之前,ConcurrentHashMap 的实现基于 Segment 数组。每个 Segment 相当于一个小的 HashMap,包含了哈希表和锁。线程首先需要获取 Segment 的锁才能访问该段的数据。
JDK 1.8 之后,ConcurrentHashMap 的实现发生了重大改变,不再使用 Segment 结构。它采用了 Node 数组 + CAS + synchronized 的方式来实现并发控制。
3. JDK 1.8+ ConcurrentHashMap 的数据结构
- Node 数组 (table): 存储键值对的数组,类似于
HashMap的 table 数组。 - TreeNode: 当链表长度超过一定阈值(
TREEIFY_THRESHOLD,默认为 8)时,链表会转换成红黑树,以提高查找效率。 - sizeCtl: 控制数组初始化和扩容的关键字段。
- 小于 0:表示正在进行初始化或扩容操作。
- -1:表示正在进行初始化。
- -(1 + n):表示有 n 个线程正在进行扩容操作。
- 等于 0:表示 table 还没有被初始化。
- 大于 0:
- 在初始化之后,表示 table 的容量阈值,用于触发扩容。通常为 table 容量的 0.75(负载因子)。
- 小于 0:表示正在进行初始化或扩容操作。
- transferIndex: 扩容时,标识下一个需要进行迁移的 table 数组的索引位置。
4. ConcurrentHashMap 的并发控制机制
- CAS (Compare and Swap): 用于原子性地更新 Node 数组中的元素。例如,在插入元素时,使用 CAS 尝试将新 Node 插入到指定位置。如果 CAS 失败,说明有其他线程已经修改了该位置,需要重新尝试。
- synchronized: 用于锁定链表或红黑树的头节点。当多个线程同时访问同一个链表或红黑树时,只有一个线程可以获得锁,其他线程必须等待。
5. ConcurrentHashMap 的扩容机制
ConcurrentHashMap 的扩容是一个复杂的过程,目的是保证在高并发情况下也能正确地扩展 Map 的容量,避免性能下降。扩容主要由 transfer 方法实现。
5.1 触发扩容的条件
以下情况会触发 ConcurrentHashMap 的扩容:
- 添加元素后,如果 table 数组的元素个数超过了容量阈值 (sizeCtl),并且当前 table 数组不处于扩容状态。 容量阈值通常为 table 容量的 0.75(负载因子)。
- 当调用
putAll方法添加大量元素时,可能会触发扩容。 - 在
treeifyBin方法中,如果 table 数组的长度小于MIN_TREEIFY_CAPACITY(默认为 64),则不会进行树化,而是尝试扩容。 这是为了避免在 table 数组过小的情况下,频繁地进行树化和反树化操作。
5.2 扩容流程
-
初始化扩容参数:
- 创建一个新的 table 数组 (nextTable),其容量是原 table 数组的两倍。
- 初始化 transferIndex,指向原 table 数组的最后一个索引。
- 设置 sizeCtl 为 -(1 + n),表示有 n 个线程正在进行扩容操作。
-
分配任务:
- 将原 table 数组划分成多个小的任务块,每个线程负责迁移一个或多个任务块。
- 每个任务块包含一段连续的 table 数组的索引范围。
-
迁移元素:
- 每个线程从 transferIndex 开始,从后向前遍历原 table 数组,迁移元素到新的 table 数组中。
- 对于每个 table 数组的元素 (Node),需要进行以下处理:
- 如果该元素为 null,则直接将该位置设置为 ForwardingNode,表示该位置已经被迁移。
- 如果该元素为 ForwardingNode,表示该位置已经被其他线程迁移,跳过。
- 如果该元素为链表的头节点,则需要对链表进行迁移。
- 如果该元素为红黑树的根节点,则需要对红黑树进行迁移。
-
迁移链表或红黑树:
- 将链表或红黑树中的每个节点重新计算哈希值,并根据哈希值将节点分配到新的 table 数组中的相应位置。
- 在迁移链表时,需要将链表拆分成两个链表:一个链表中的节点的哈希值与原 table 数组的长度进行与运算的结果为 0,另一个链表中的节点的哈希值与原 table 数组的长度进行与运算的结果不为 0。 这样可以保证链表中的节点能够均匀地分布到新的 table 数组中。
- 在迁移红黑树时,如果红黑树中的节点个数小于等于
UNTREEIFY_THRESHOLD(默认为 6),则将红黑树转换成链表。
-
完成迁移:
- 当所有线程都完成了迁移任务后,将 table 数组指向新的 table 数组 (nextTable)。
- 更新 sizeCtl 为新的容量阈值。
5.3 扩容代码示例 (简化)
final Node<K,V>[] transfer(Node<K,V>[] tab, Node<K,V>[] nextTab) {
int n = tab.length, stride;
if ((stride = (NCPU > 1) ? (n >>> 3) / NCPU : n) < MIN_TRANSFER_STRIDE)
stride = MIN_TRANSFER_STRIDE; // 分配任务块大小
if (nextTab == null) { // 初始化 nextTable
try {
@SuppressWarnings("unchecked")
Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n << 1]; // 容量翻倍
nextTab = nt;
} catch (Throwable ex) { // try to cope with OOME
sizeCtl = Integer.MAX_VALUE;
return null;
}
nextTable = nextTab;
transferIndex = n;
}
int nextn = nextTab.length;
ForwardingNode<K,V> fwd = new ForwardingNode<K,V>(nextTab); // 迁移标记
boolean advance = true;
boolean finishing = false; // to ensure that we only advance once
for (int i = 0, bound = 0;;) {
Node<K,V> f; int fh;
while (advance) {
int nextIndex, nextBound;
if (--i >= bound || finishing)
advance = false;
else if ((nextIndex = transferIndex) <= 0) {
i = -1;
advance = false;
}
else if (U.compareAndSwapInt
(this, TRANSFERINDEX, nextIndex,
nextBound = (nextIndex > stride ?
nextIndex - stride : 0))) {
bound = nextBound;
i = nextIndex - 1;
advance = false;
}
}
if (i < 0 || i >= n || nextn <= 0) {
if (finishing) {
nextTable = null;
table = nextTab;
sizeCtl = (n << 1) - (n >>> 1); // 更新 sizeCtl
return nextTab;
}
if (U.compareAndSwapInt(this, SIZECTL, sc = sizeCtl, sc - 1)) {
if ((sc - 2) != resizeStamp(n) << RESIZE_STAMP_SHIFT)
return null;
finishing = advance = true;
i = n; // recheck before commit
}
}
else if ((f = tabAt(tab, i)) == null)
advance = casTabAt(tab, i, null, fwd); // 设置为 ForwardingNode
else if ((fh = f.hash) == MOVED)
advance = true; // already processed
else {
synchronized (f) { // 锁定节点
if (tabAt(tab, i) == f) {
Node<K,V> ln, hn;
if (fh >= 0) {
int runBit = fh & n;
Node<K,V> lastRun = f;
for (Node<K,V> p = f.next; p != null; p = p.next) {
int b = p.hash & n;
if (b != runBit) {
runBit = b;
lastRun = p;
}
}
if (runBit == 0) {
ln = lastRun;
hn = null;
}
else {
hn = lastRun;
ln = null;
}
for (Node<K,V> p = f; p != lastRun; p = p.next) {
int ph = p.hash;
K pk = p.key;
V pv = p.val;
if ((ph & n) == 0)
ln = new Node<K,V>(ph, pk, pv, ln);
else
hn = new Node<K,V>(ph, pk, pv, hn);
}
setTabAt(nextTab, i, ln);
setTabAt(nextTab, i + n, hn);
setTabAt(tab, i, fwd);
advance = true;
}
else if (f instanceof TreeBin) {
TreeBin<K,V> t = (TreeBin<K,V>)f;
TreeNode<K,V> lo = null, loTail = null;
TreeNode<K,V> hi = null, hiTail = null;
int lc = 0, hc = 0;
for (Node<K,V> e = t.first; e != null; e = e.next) {
int h = e.hash;
TreeNode<K,V> p =
new TreeNode<K,V>(h, e.key, e.val, null, null);
if ((h & n) == 0) {
if ((p.prev = loTail) == null)
lo = p;
else
loTail.next = p;
loTail = p;
++lc;
}
else {
if ((p.prev = hiTail) == null)
hi = p;
else
hiTail.next = p;
hiTail = p;
++hc;
}
}
ln = (lc <= UNTREEIFY_THRESHOLD) ? untreeify(lo) :
(hc != 0) ? new TreeBin<K,V>(lo) : t;
hn = (hc <= UNTREEIFY_THRESHOLD) ? untreeify(hi) :
(lc != 0) ? new TreeBin<K,V>(hi) : t;
setTabAt(nextTab, i, ln);
setTabAt(nextTab, i + n, hn);
setTabAt(tab, i, fwd);
advance = true;
}
}
}
}
}
}
6. ConcurrentHashMap 的性能陷阱
虽然 ConcurrentHashMap 提供了高并发性能,但使用不当仍然可能导致性能问题。以下是一些常见的性能陷阱:
- Key 的哈希值分布不均匀: 如果 Key 的哈希值分布不均匀,会导致大量的元素集中在少数的链表或红黑树中,从而降低查找效率。
- 频繁的扩容: 如果 Map 的初始容量设置过小,或者负载因子设置过大,会导致频繁的扩容,从而影响性能。
- 大量的读操作: 虽然
ConcurrentHashMap允许并发读操作,但如果读操作过于频繁,仍然可能导致性能瓶颈。 - 使用不当的 Key 类型: 使用可变对象作为 Key 可能会导致哈希值发生变化,从而导致数据丢失或查找失败。
7. 如何避免性能陷阱
- 选择合适的 Key 类型: 尽量使用不可变对象作为 Key,例如 String、Integer 等。如果必须使用可变对象作为 Key,需要确保 Key 的
hashCode()方法和equals()方法能够正确地处理对象的修改。 - 设置合适的初始容量和负载因子: 根据预期的 Map 大小和并发量,设置合适的初始容量和负载因子。通常建议将初始容量设置为预期大小的 2 倍,并将负载因子设置为 0.75。
- 避免频繁的扩容: 尽量避免频繁的扩容。可以通过设置合适的初始容量和负载因子来减少扩容的次数。
- 使用
get()方法时,尽量避免长时间持有锁: 虽然get()方法是线程安全的,但如果长时间持有锁,仍然可能导致其他线程阻塞。尽量避免在get()方法中执行耗时的操作。 - 监控 Map 的性能: 使用 JConsole、VisualVM 等工具监控 Map 的性能,及时发现和解决性能问题。
8. 代码示例:优化 Key 的哈希值分布
假设我们使用一个自定义的 User 对象作为 Key,如果 User 对象的哈希值分布不均匀,会导致 ConcurrentHashMap 的性能下降。我们可以通过自定义 hashCode() 方法来优化哈希值分布。
class User {
private int id;
private String name;
public User(int id, String name) {
this.id = id;
this.name = name;
}
public int getId() {
return id;
}
public String getName() {
return name;
}
@Override
public boolean equals(Object o) {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
User user = (User) o;
return id == user.id;
}
@Override
public int hashCode() {
// 优化哈希值分布
return Objects.hash(id);
}
}
public class ConcurrentHashMapExample {
public static void main(String[] args) {
ConcurrentHashMap<User, String> map = new ConcurrentHashMap<>();
for (int i = 0; i < 1000; i++) {
User user = new User(i, "User" + i);
map.put(user, "Value" + i);
}
}
}
在这个示例中,我们使用 Objects.hash(id) 方法来生成 User 对象的哈希值。Objects.hash() 方法可以有效地将多个字段组合成一个哈希值,从而提高哈希值分布的均匀性。
9. 总结
ConcurrentHashMap 是 Java 并发编程中一个强大的工具,它提供了线程安全的 Map 操作,并具有较高的并发性能。理解其内部机制,尤其是扩容流程,以及避免常见的性能陷阱,对于编写高性能的并发程序至关重要。 选择正确的 Key 类型,合理设置初始容量和负载因子,并监控 Map 的性能,可以帮助我们充分利用 ConcurrentHashMap 的优势。
理解数据结构和并发控制机制
ConcurrentHashMap 基于 Node 数组,利用 CAS 和 synchronized 实现细粒度的并发控制,避免了全局锁带来的性能瓶颈。
掌握扩容流程,规避性能陷阱
了解扩容的触发条件和具体流程,可以帮助我们合理设置初始容量和负载因子,避免频繁扩容,优化 Key 的哈希值分布,从而提高 ConcurrentHashMap 的性能。