ConcurrentHashMap在高并发下的死循环与扩容问题深度剖析
大家好,今天我们来深入探讨一下Java高并发环境下ConcurrentHashMap可能出现的死循环问题,以及与扩容机制相关的细节。ConcurrentHashMap作为Java并发包中一个非常重要的组件,在应对高并发场景时表现出色,但如果对其内部实现机制理解不够深入,就可能遇到一些意想不到的问题。
1. ConcurrentHashMap的基础结构与分段锁
ConcurrentHashMap在JDK 1.7和JDK 1.8的实现方式有所不同。我们先从JDK 1.7开始,然后再过渡到JDK 1.8,这样更容易理解演进的过程。
-
JDK 1.7: ConcurrentHashMap采用分段锁(Segment)机制来实现并发控制。它将整个HashMap分成多个Segment,每个Segment都相当于一个独立的HashMap。每个Segment拥有自己的锁,因此并发访问不同Segment的数据时,不会发生锁竞争。
// JDK 1.7 ConcurrentHashMap 的基本结构 public class ConcurrentHashMap<K, V> extends AbstractMap<K, V> implements ConcurrentMap<K, V>, Serializable { final Segment<K,V>[] segments; static final class Segment<K,V> extends ReentrantLock implements Serializable { transient volatile HashEntry<K,V>[] table; // ... 其他属性和方法 } static final class HashEntry<K,V> { final K key; final int hash; volatile V value; final HashEntry<K,V> next; // ... } }segments: Segment数组,存储多个Segment对象。Segment: 继承自ReentrantLock,作为锁的单元,保护其内部的table。table: HashEntry数组,实际存储键值对数据。HashEntry: 链表节点,用于解决Hash冲突。
分段锁的优势在于提高了并发度,但缺点是当多个线程需要访问同一个Segment时,仍然会发生锁竞争。另外,分段锁的数量一旦确定,就难以动态调整。
-
JDK 1.8: JDK 1.8对ConcurrentHashMap进行了重大改进,放弃了分段锁,采用了CAS(Compare-And-Swap)+ synchronized来实现并发控制。它使用Node数组存储数据,并且引入了红黑树来优化Hash冲突严重的情况。
// JDK 1.8 ConcurrentHashMap 的基本结构 public class ConcurrentHashMap<K, V> extends AbstractMap<K, V> implements ConcurrentMap<K, V>, Serializable { transient volatile Node<K,V>[] table; static class Node<K,V> implements Map.Entry<K,V> { final int hash; final K key; volatile V val; volatile Node<K,V> next; // ... 其他属性和方法 } // ... 其他属性和方法 }table: Node数组,存储键值对数据。Node: 链表节点,也可能是红黑树节点。- CAS:用于无锁地修改Node中的数据,例如添加新的Node到链表头部。
- synchronized:用于在发生Hash冲突时,对链表或红黑树进行加锁操作。
JDK 1.8的ConcurrentHashMap的优势在于并发度更高,锁的粒度更细,而且可以动态扩容。但是,实现也更加复杂。
2. 死循环问题:JDK 1.7的transfer方法
JDK 1.7 ConcurrentHashMap的死循环问题主要出现在transfer方法中,该方法用于在扩容时将数据从旧的table迁移到新的table。
// JDK 1.7 Segment的transfer方法(简化版)
void transfer(HashEntry<K,V>[] oldTable, HashEntry<K,V>[] newTable) {
int size = oldTable.length;
for (int i = 0; i < size; i++) {
HashEntry<K,V> e = oldTable[i];
if (e != null) {
oldTable[i] = null; // Help GC
do {
HashEntry<K,V> next = e.next;
int idx = hash(e.key) & (newTable.length - 1);
e.next = newTable[idx]; // 关键代码
newTable[idx] = e; // 关键代码
e = next;
} while (e != null);
}
}
}
这段代码看起来很简单,就是遍历旧table的每个桶,然后将桶中的链表中的每个节点重新计算hash值,并插入到新的table中。但是,在高并发环境下,如果多个线程同时对同一个Segment进行扩容,就可能出现死循环。
死循环的成因:
假设有两个线程A和B同时对同一个Segment进行扩容,它们都执行transfer方法。
- 线程A和线程B都从旧table的同一个桶中取出一个节点,假设这个节点是
e1。 - 线程A先执行,将
e1插入到新table的某个桶中。假设e1的next节点是e2,那么e1.next指向e2。 - 线程B也执行,将
e1插入到新table的同一个桶中。由于线程A已经修改了e1.next,所以线程B看到的e1.next也是e2。 - 现在,两个线程都将
e1插入到新table的同一个桶中,并且e1.next都指向e2。 - 接下来,线程A和线程B分别处理
e2。由于e2.next指向e3,所以线程A和线程B都会将e2插入到新table的同一个桶中,并且e2.next都指向e3。 - 以此类推,最终,线程A和线程B可能会形成一个循环链表,例如
e1.next = e2; e2.next = e1;。
当线程访问这个循环链表时,就会陷入死循环。
一个更具体的例子:
假设旧table的size是2,新table的size是4。现在有三个节点e1,e2,e3,它们的hash值分别是3,7,11。
e1,e2,e3在旧table中的位置都是1 (hash % 2)。e1,e2,e3在新table中的位置分别是3,3,3 (hash % 4)。- 线程A先执行,将
e1,e2,e3依次插入到新table的3号桶中。此时,新table的3号桶中的链表是e3 -> e2 -> e1。 - 线程B也执行,将
e1,e2,e3依次插入到新table的3号桶中。由于线程A已经修改了链表结构,所以线程B看到的链表结构也是e3 -> e2 -> e1。 - 最终,新table的3号桶中的链表可能是
e1 -> e2 -> e3 -> e1,形成一个循环链表。
如何避免死循环:
JDK 1.7 ConcurrentHashMap并没有完美的解决死循环问题,只能通过一些手段来降低死循环的概率,例如:
- 减小初始化容量: 减小初始化容量可以降低Hash冲突的概率,从而降低死循环的概率。
- 使用合适的Hash函数: 使用合适的Hash函数可以使元素分布更均匀,从而降低Hash冲突的概率。
- 避免频繁扩容: 避免频繁扩容可以减少
transfer方法的执行次数,从而降低死循环的概率。
3. 扩容机制:JDK 1.8的改进
JDK 1.8的ConcurrentHashMap对扩容机制进行了重大改进,避免了JDK 1.7中的死循环问题。
- 不再使用分段锁: JDK 1.8使用CAS + synchronized来实现并发控制,锁的粒度更细。
- 引入红黑树: 当Hash冲突严重时,链表会转换成红黑树,提高了查找效率。
- 扩容时支持多线程协助: 多个线程可以同时参与扩容,提高了扩容效率。
JDK 1.8的扩容过程:
- 检测是否需要扩容: 当ConcurrentHashMap中的元素数量超过threshold时,就会触发扩容。
- 创建新的table: 创建一个新的table,容量是旧table的两倍。
- 迁移数据: 将旧table中的数据迁移到新的table中。这个过程是并发执行的,多个线程可以同时参与。
- 更新table: 当所有数据都迁移完成后,将table指向新的table。
多线程协助扩容:
在JDK 1.8中,扩容时可以有多个线程同时参与。每个线程负责迁移一部分数据。
- 确定每个线程负责的桶的范围: 将旧table分成多个部分,每个线程负责一部分。
- 迁移数据: 每个线程遍历自己负责的桶,将桶中的节点迁移到新的table中。
- 使用CAS保证线程安全: 在迁移节点时,使用CAS操作来保证线程安全。
JDK 1.8是如何避免死循环的?
JDK 1.8通过以下方式避免死循环:
- 迁移节点时,会先将节点复制一份: 这样,即使多个线程同时修改同一个节点,也不会影响其他线程。
- 使用CAS操作来保证线程安全: CAS操作可以保证只有一个线程能够成功修改节点,其他线程需要重试。
- 扩容完成后,会重新计算每个节点的hash值: 这样可以保证节点在新的table中的位置是正确的。
4. 关于size()方法的理解
ConcurrentHashMap的size()方法在JDK 1.7和1.8的实现也有所不同,理解这些差异对于在高并发环境下正确使用ConcurrentHashMap至关重要。
-
JDK 1.7: 由于分段锁的存在,获取精确的size是一个挑战。通常的做法是尝试多次累加所有Segment的count,如果连续两次的结果一致,则认为得到了准确的值。如果多次尝试仍然不一致,则需要锁定所有Segment,然后重新计算。这种方式在并发很高的情况下,性能会受到影响。
// JDK 1.7 size() 方法的简化版 public int size() { final Segment<K,V>[] segments = this.segments; int size; boolean overflow; // true if size is greater than MAX_VALUE long sum = 0; final int segmentsLength = segments.length; int[] mc = new int[segmentsLength]; // Try a few times to get accurate count. On failure due to // continuous alteration, resort to locking all segments. for (int k = 0; k < RETRIES_BEFORE_LOCK; ++k) { sum = 0; overflow = false; int mcsum = 0; for (int i = 0; i < segmentsLength; ++i) { int m = mc[i] = segments[i].modCount; if ((sum += segments[i].count) < 0) overflow = true; mcsum += m; } if (mcsum != 0) { if (overflow) return Integer.MAX_VALUE; else return (int)sum; } } // If fails to get accurate count, then lock all segments // to get accurate count lock(); try { sum = 0; for (int i = 0; i < segmentsLength; ++i) { sum += segments[i].count; } } finally { unlock(); } return (int)sum; }RETRIES_BEFORE_LOCK: 定义了重试的次数。modCount: 记录了Segment被修改的次数。- 在循环中,累加每个Segment的
count,并检查modCount是否发生变化。如果modCount发生变化,则说明在统计的过程中有线程修改了Segment,需要重新统计。 - 如果重试多次仍然无法得到一致的结果,则需要锁定所有Segment,然后重新计算。
-
JDK 1.8: JDK 1.8的
size()方法实现更加复杂,因为它涉及到对baseCount和CounterCell的累加。它也是一个近似值,在高并发下可能不准确。 为了避免阻塞,它采用了一种乐观的方式来估计大小。// JDK 1.8 size() 方法的简化版 public int size() { long n = sumCount(); return (n < 0L) ? 0 : (n > (long)Integer.MAX_VALUE) ? Integer.MAX_VALUE : (int)n; } final long sumCount() { CounterCell[] as = counterCells; CounterCell a; long sum = baseCount; if (as != null) { for (int i = 0; i < as.length; ++i) { if ((a = as[i]) != null) sum += a.value; } } return sum; }baseCount: 记录了ConcurrentHashMap中元素的总数。counterCells: 一个CounterCell数组,用于在并发环境下分散计数,避免单个计数器的竞争。sumCount(): 累加baseCount和所有CounterCell的值,得到一个近似的size。
总结: 无论JDK 1.7还是JDK 1.8,
size()方法都不能保证返回绝对精确的值。在高并发环境下,应该将size()方法的返回值视为一个近似值。 如果需要非常精确的大小,可能需要考虑其他同步机制来保证数据的一致性。
5. Key的选择:Hash冲突的影响
ConcurrentHashMap的性能与Key的hashcode分布密切相关。选择合适的Key,可以有效地减少Hash冲突,提高ConcurrentHashMap的性能。
- 避免使用容易产生Hash冲突的Key: 例如,使用递增的整数作为Key,可能会导致大量的Key被分配到同一个桶中,从而增加Hash冲突的概率。
- 使用分布均匀的Key: 例如,可以使用UUID或者MD5值作为Key,这些Key的hashcode分布比较均匀,可以减少Hash冲突的概率。
- 自定义Key的hashCode()方法: 如果需要使用自定义的Key,可以重写
hashCode()方法,使其返回一个分布更均匀的hashcode。
6. 扩容时并发put操作的影响
在ConcurrentHashMap扩容期间,如果同时有put操作,会发生什么? 这也是一个非常重要的考点。
- JDK 1.7: 如果put操作的key对应的Segment正在进行扩容,那么put操作会被阻塞,直到扩容完成。
- JDK 1.8: 如果put操作的key对应的桶正在进行扩容,那么put操作会尝试帮助扩容。如果put操作的key对应的桶已经完成扩容,那么put操作会直接将key-value添加到新的table中。
JDK 1.8 put操作协助扩容的逻辑:
- 检查table是否正在扩容: 如果table正在扩容,那么put操作会尝试帮助扩容。
- 计算key的hash值和桶的位置: 根据key的hash值计算出key应该被放到哪个桶中。
- 尝试将key-value添加到桶中: 如果桶是空的,那么put操作会使用CAS操作将新的Node添加到桶中。如果桶不是空的,那么put操作会尝试帮助扩容。
- 帮助扩容: put操作会调用
helpTransfer()方法来帮助扩容。 - 扩容完成后,将key-value添加到新的table中: 如果扩容已经完成,那么put操作会将key-value添加到新的table中。
7. JDK 1.8 ConcurrentHashMap put流程
为了将上面的知识点串联起来,我们从源码层面简要分析一下JDK 1.8 ConcurrentHashMap的put流程
// 简化后的put流程
public V put(K key, V value) {
return putVal(key, value, false);
}
final V putVal(K key, V value, boolean onlyIfAbsent) {
if (key == null || value == null) throw new NullPointerException();
int hash = spread(key.hashCode()); // 计算hash
int binCount = 0;
for (Node<K,V>[] tab = table;;) { // 自旋重试
Node<K,V> f; int n, i, fh;
if (tab == null || (n = tab.length) == 0)
tab = initTable(); // 初始化table
else if ((f = tabAt(tab, i = (n - 1) & hash)) == null) { // 桶是空的
if (casTabAt(tab, i, null, new Node<K,V>(hash, key, value, null))) // CAS插入
break; // no lock when adding to empty bin
}
else if ((fh = f.hash) == MOVED) // 正在扩容
tab = helpTransfer(tab, f); // 协助扩容
else {
V oldVal = null;
synchronized (f) { // 锁定桶的第一个节点
if (tabAt(tab, i) == f) { // 再次检查
if (fh >= 0) { // 链表节点
binCount = 1;
for (Node<K,V> e = f;; ++binCount) {
K ek;
if (e.hash == hash &&
((ek = e.key) == key ||
(ek != null && key.equals(ek)))) { // key已存在
oldVal = e.val;
if (!onlyIfAbsent)
e.val = value;
break;
}
Node<K,V> pred = e;
if ((e = e.next) == null) { // 链表尾部
pred.next = new Node<K,V>(hash, key, value, null);
break;
}
}
}
else if (f instanceof TreeBin) { // 红黑树节点
Node<K,V> p;
binCount = 2;
if ((p = ((TreeBin<K,V>)f).putTreeVal(hash, key, value)) != null) { // 插入红黑树
oldVal = p.val;
if (!onlyIfAbsent)
p.val = value;
}
}
}
}
if (binCount != 0) {
if (binCount >= TREEIFY_THRESHOLD) // 链表长度超过阈值,转换为红黑树
treeifyBin(tab, i);
if (oldVal != null)
return oldVal;
break;
}
}
}
addCount(1L, binCount); // 增加计数
return null;
}
代码逻辑概要:
- 初始化检查: 如果table为null,则初始化table(
initTable())。 - 空桶插入: 如果key对应的桶是空的,则使用CAS操作尝试插入新的Node。
- 扩容协助: 如果table正在扩容,则协助扩容(
helpTransfer())。 - 锁定插入/更新: 如果桶不为空,并且没有在扩容,则锁定桶的第一个节点。
- 链表操作: 如果是链表,则遍历链表,如果key已存在,则更新value;如果key不存在,则在链表尾部添加新的Node。
- 红黑树操作: 如果是红黑树,则在红黑树中插入或更新节点。
- 链表转红黑树: 如果链表长度超过
TREEIFY_THRESHOLD,则将链表转换为红黑树(treeifyBin())。 - 增加计数: 增加ConcurrentHashMap的元素计数(
addCount())。
8. 总结一些建议
- 理解内部机制: 深入理解ConcurrentHashMap的内部实现机制,例如分段锁、CAS、synchronized、扩容等。
- 合理选择Key: 选择hashcode分布均匀的Key,避免Hash冲突。
- 谨慎使用size(): 将
size()方法的返回值视为一个近似值。 - 监控扩容: 监控ConcurrentHashMap的扩容情况,避免频繁扩容。
- 结合实际场景: 根据实际场景选择合适的并发容器。
希望今天的分享能够帮助大家更好地理解ConcurrentHashMap,并在实际工作中避免一些常见的坑。 ConcurrentHashMap是一个强大的工具,但只有理解它的原理,才能发挥它的最大价值。