好的,没问题。
Java高并发容器ConcurrentHashMap源码分析:线程安全与性能优化细节
大家好,今天我们来深入探讨Java并发包中的核心组件——ConcurrentHashMap。 作为高并发场景下的首选哈希表实现,ConcurrentHashMap 在JDK 1.5之后逐渐取代了HashTable等同步容器。 本次分享将从源码层面剖析其线程安全机制和性能优化策略,帮助大家更好地理解和使用这个强大的工具。
1. ConcurrentHashMap的演进
在分析具体实现之前,我们先回顾一下ConcurrentHashMap的演进历程,这有助于我们理解其设计思想。
- JDK 1.5-1.7: 基于分段锁(Segment)机制。
- JDK 1.8+: 基于CAS + synchronized + 红黑树。
分段锁机制虽然提升了并发度,但依然存在锁竞争的问题。 JDK 1.8 彻底摒弃了分段锁的设计,采用了更加细粒度的并发控制方式,大大提高了性能。
2. JDK 1.8 ConcurrentHashMap的核心数据结构
ConcurrentHashMap的核心数据结构由以下几个部分组成:
- Node[] table: 存储键值对的哈希表,类似于
HashMap中的table。 - sizeCtl: 控制表初始化和扩容的关键字段。
- nextTable: 在扩容期间,用于指向新的table。
- Node: 基本的数据单元,存储key-value对。
- TreeNode: 当链表长度超过一定阈值时,链表会转换为红黑树,TreeNode用于表示红黑树节点。
我们用一个表格来总结:
| 字段名 | 类型 | 作用 |
|---|---|---|
| table | Node[] | 存储键值对的哈希表 |
| sizeCtl | int | 控制表初始化和扩容。负数表示初始化或扩容,正数表示下一次扩容的阈值。 |
| nextTable | Node[] | 扩容期间,指向新的table |
| Node | 内部类 | 基本的数据单元,存储key-value对。 |
| TreeNode | 内部类 | 红黑树节点,用于优化长链表的查找性能。 |
3. 重要字段sizeCtl详解
sizeCtl字段在ConcurrentHashMap中扮演着至关重要的角色,它控制着表的初始化和扩容。 sizeCtl 的取值有以下几种情况:
- 小于0: 表示
table正在初始化或者扩容。- -1: 表示
table正在初始化。 - -(1 + n): 表示有n个线程正在进行扩容操作。
- -1: 表示
- 等于0: 表示
table还没有被初始化。 - 大于0:
- 如果
table已经初始化,则表示下一次扩容的阈值。 - 如果
table未初始化,则表示table的初始容量,或者默认容量(16)。
- 如果
4. 线程安全的实现:CAS + synchronized
ConcurrentHashMap的线程安全主要依赖于以下两种机制:
- CAS (Compare and Swap): 用于无锁地修改变量,例如初始化
table、更新sizeCtl、添加新的Node等。 - synchronized: 用于锁定
Node或TreeNode,保证在更新链表或红黑树时的互斥性。 synchronized 只锁住一个桶,而不是整个哈希表。
结合CAS和synchronized,ConcurrentHashMap实现了细粒度的并发控制,在保证线程安全的同时,也提升了性能。
5. put() 方法源码分析
put() 方法是ConcurrentHashMap中最核心的方法之一,我们来详细分析一下它的源码。
public V put(K key, V value) {
return putVal(key, value, false);
}
final V putVal(K key, V value, boolean onlyIfAbsent) {
if (key == null || value == null) throw new NullPointerException();
int hash = spread(key.hashCode()); // 计算hash值
int binCount = 0; // 用于记录链表长度,达到阈值后转换为红黑树
for (Node<K,V>[] tab = table;;) { // 自旋重试,直到成功
Node<K,V> f; int n, i, fh;
if (tab == null || (n = tab.length) == 0)
tab = initTable(); // 初始化table
else if ((f = tabAt(tab, i = (n - 1) & hash)) == null) { // 如果该位置为空,直接插入
if (casTabAt(tab, i, null,
new Node<K,V>(hash, key, value, null))) // 使用CAS插入
break; // no lock when adding to empty bin
}
else if ((fh = f.hash) == MOVED) // 如果该位置的Node的hash值为MOVED,表示正在扩容
tab = helpTransfer(tab, f); // 帮助扩容
else { // 该位置已经有Node,需要加锁处理
V oldVal = null;
synchronized (f) { // 锁定该位置的Node
if (tabAt(tab,i) == f) { // 再次检查,防止其他线程修改
if (fh >= 0) { // 该位置是链表
binCount = 1;
for (Node<K,V> e = f;; ++binCount) {
K ek;
if (e.hash == hash &&
((ek = e.key) == key || (ek != null && key.equals(ek)))) { // 找到相同的key
oldVal = e.val;
if (!onlyIfAbsent)
e.val = value; // 替换value
break;
}
Node<K,V> pred = e;
if ((e = e.next) == null) { // 遍历到链表末尾,插入新的Node
pred.next = new Node<K,V>(hash, key, value, null);
break;
}
}
}
else if (f instanceof TreeBin) { // 该位置是红黑树
Node<K,V> p;
binCount = 2;
if ((p = ((TreeBin<K,V>)f).putTreeVal(hash, key, value)) != null) { // 在红黑树中插入或替换
oldVal = p.val;
if (!onlyIfAbsent)
p.val = value;
}
}
}
}
if (binCount != 0) {
if (binCount >= TREEIFY_THRESHOLD) // 如果链表长度达到阈值,转换为红黑树
treeifyBin(tab, i);
if (oldVal != null)
return oldVal;
break;
}
}
}
addCount(1L, binCount); // 增加元素数量,并检查是否需要扩容
return null;
}
// 辅助方法,计算hash值
static final int spread(int h) {
return (h ^ (h >>> 16)) & HASH_BITS;
}
// 辅助方法,从table中获取Node
static final <K,V> Node<K,V> tabAt(Node<K,V>[] tab, int i) {
return (Node<K,V>)U.getObjectVolatile(tab, ((long)i << ASHIFT) + ABASE);
}
// 辅助方法,使用CAS更新table中的Node
static final <K,V> boolean casTabAt(Node<K,V>[] tab, int i,
Node<K,V> c, Node<K,V> v) {
return U.compareAndSwapObject(tab, ((long)i << ASHIFT) + ABASE, c, v);
}
put() 方法的流程可以概括为以下几步:
- 检查key和value是否为null,如果为null,抛出
NullPointerException。 - 计算key的hash值,通过
spread()方法进行扰动,使hash值更加分散。 - 自旋重试,直到插入成功。
- 如果table为空,调用
initTable()方法初始化table。 - 根据hash值找到table中的位置,如果该位置为空,使用CAS插入新的Node。
- 如果该位置的Node的hash值为MOVED,表示正在扩容,帮助扩容。
- 如果该位置已经有Node,加锁处理。
- 如果是链表,遍历链表,找到相同的key则替换value,否则在链表末尾插入新的Node。
- 如果是红黑树,在红黑树中插入或替换。
- 如果链表长度达到阈值,转换为红黑树。
- 增加元素数量,并检查是否需要扩容。
我们可以用一个流程图来表示:
graph LR
A[开始] --> B{key或value为null?}
B -- 是 --> C[抛出NullPointerException]
B -- 否 --> D[计算hash值]
D --> E{table为空?}
E -- 是 --> F[初始化table]
E -- 否 --> G[根据hash值找到table中的位置]
G --> H{该位置为空?}
H -- 是 --> I[使用CAS插入新的Node]
H -- 否 --> J{该位置的Node的hash值为MOVED?}
J -- 是 --> K[帮助扩容]
J -- 否 --> L[加锁处理]
L --> M{该位置是链表?}
M -- 是 --> N[遍历链表,找到相同的key则替换value,否则在链表末尾插入新的Node]
M -- 否 --> O[在红黑树中插入或替换]
N --> P{链表长度达到阈值?}
O --> P
P -- 是 --> Q[转换为红黑树]
P -- 否 --> R[增加元素数量,并检查是否需要扩容]
I --> R
K --> R
R --> S[结束]
6. get() 方法源码分析
get() 方法相对简单,我们来看一下它的源码。
public V get(Object key) {
Node<K,V>[] tab; Node<K,V> e, p; int n, eh; K ek;
int h = spread(key.hashCode());
if ((tab = table) != null && (n = tab.length) > 0 &&
(e = tabAt(tab, (n - 1) & h)) != null) { // 如果table不为空,且该位置有Node
if ((eh = e.hash) == h) { // 如果第一个Node的hash值相等
if ((ek = e.key) == key || (ek != null && key.equals(ek))) // 如果key相等
return e.val; // 返回value
}
else if (eh < 0) // 如果hash值小于0,表示该位置是TreeBin或ForwardingNode
return (p = e.find(h, key)) != null ? p.val : null; // 在TreeBin或ForwardingNode中查找
while ((e = e.next) != null) { // 遍历链表
if (e.hash == h &&
((ek = e.key) == key || (ek != null && key.equals(ek))))
return e.val; // 找到相同的key,返回value
}
}
return null; // 没有找到,返回null
}
get() 方法的流程可以概括为以下几步:
- 计算key的hash值。
- 如果table为空,或者该位置没有Node,返回null。
- 如果第一个Node的hash值相等,且key也相等,返回value。
- 如果hash值小于0,表示该位置是TreeBin或ForwardingNode,在TreeBin或ForwardingNode中查找。
- 遍历链表,找到相同的key,返回value。
- 没有找到,返回null。
get() 方法没有加锁,因为Node的val是volatile的,保证了可见性。
7. initTable() 方法源码分析
initTable() 方法用于初始化table。
private final Node<K,V>[] initTable() {
Node<K,V>[] tab; int sc;
while ((tab = table) == null || tab.length == 0) { // 自旋重试,直到初始化成功
if ((sc = sizeCtl) < 0) // 如果sizeCtl小于0,表示有其他线程正在初始化或扩容
Thread.yield(); // lost initialization race; just spin
else if (U.compareAndSwapInt(this, SIZECTL, sc, -1)) { // 使用CAS将sizeCtl设置为-1,表示当前线程正在初始化
try {
if ((tab = table) == null || tab.length == 0) { // 再次检查,防止其他线程已经初始化
int n = (sc > 0) ? sc : DEFAULT_CAPACITY; // 如果sizeCtl大于0,则使用sizeCtl作为初始容量,否则使用默认容量
@SuppressWarnings("unchecked")
Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n]; // 创建新的table
table = tab = nt; // 将新的table赋值给table
sc = n - (n >>> 2); // 计算下一次扩容的阈值
}
} finally {
sizeCtl = sc; // 设置sizeCtl为下一次扩容的阈值
}
break;
}
}
return tab;
}
initTable() 方法的流程可以概括为以下几步:
- 自旋重试,直到初始化成功。
- 如果sizeCtl小于0,表示有其他线程正在初始化或扩容,放弃CPU时间片。
- 使用CAS将sizeCtl设置为-1,表示当前线程正在初始化。
- 再次检查,防止其他线程已经初始化。
- 创建新的table,容量为sizeCtl或默认容量。
- 将新的table赋值给table。
- 计算下一次扩容的阈值。
- 设置sizeCtl为下一次扩容的阈值。
8. 扩容机制
当ConcurrentHashMap中的元素数量超过阈值时,会触发扩容。 扩容是一个比较复杂的过程,需要保证数据的一致性和并发性。 ConcurrentHashMap的扩容机制采用了分段迁移的方式,允许多个线程同时参与扩容,提高了扩容的效率。
8.1 扩容触发条件
当调用addCount()方法增加元素数量时,会检查是否需要扩容。 扩容的触发条件是:
if (s >= (long)(sc = sizeCtl) && (tab = table) != null &&
(n = tab.length) < MAXIMUM_CAPACITY) {
resizeStamp = Integer.numberOfLeadingZeros(n) | (1 << (RESIZE_STAMP_BITS - 1));
if (sc < 0) { // 有其他线程正在扩容
if ((rs = (sc >>> RESIZE_STAMP_SHIFT)) != resizeStamp ||
rs == MAX_RESIZERS - 1 ||
(nt = nextTable) == null ||
transferIndex <= 0) {
return; // 不需要参与扩容
}
if (U.compareAndSwapInt(this, SIZECTL, sc, sc + 1)) // 增加扩容线程数量
transfer(tab, nt); // 参与扩容
} else if (U.compareAndSwapInt(this, SIZECTL, sc, (resizeStamp << RESIZE_STAMP_SHIFT) + 2)) // 发起扩容
transfer(tab, null); // 发起扩容
}
其中,s表示当前元素数量,sc表示sizeCtl,tab表示table,n表示table的长度,MAXIMUM_CAPACITY表示最大容量。
8.2 transfer() 方法源码分析
transfer() 方法用于将table中的元素迁移到新的table中。
private final void transfer(Node<K,V>[] tab, Node<K,V>[] nextTab) {
int n = tab.length, stride;
if ((stride = (NCPU > 1) ? (n >>> 3) / NCPU : n) < MIN_TRANSFER_STRIDE)
stride = MIN_TRANSFER_STRIDE; // subdivide range
if (nextTab == null) { // initiating
try {
@SuppressWarnings("unchecked")
Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n << 1]; // 创建新的table,容量是原来的两倍
nextTab = nt;
} catch (Throwable ex) { // try to cope with OOME
sizeCtl = Integer.MAX_VALUE;
return;
}
nextTable = nextTab;
transferIndex = n;
}
int nextn = nextTab.length;
ForwardingNode<K,V> fwd = new ForwardingNode<K,V>(nextTab); // 创建ForwardingNode,用于标记该位置已经迁移
boolean advance = true;
boolean finishing = false; // to ensure that we only do one finish
for (int i = 0, bound = 0;;) {
Node<K,V> f; int fh;
while (advance) {
int nextIndex, nextBound;
if (--i >= bound || finishing)
advance = false;
else if ((nextIndex = transferIndex) <= 0) {
i = -1;
advance = false;
}
else if (U.compareAndSwapInt
(this, TRANSFERINDEX, nextIndex,
nextBound = (nextIndex > stride ?
nextIndex - stride : 0))) {
bound = nextBound;
i = nextIndex - 1;
advance = false;
}
}
if (i < 0 || i >= n || i + n >= nextn) {
int sc;
if (finishing) {
nextTable = null;
table = nextTab;
sizeCtl = (n << 1) - (n >>> 1);
return;
}
if (U.compareAndSwapInt(this, SIZECTL, sc = sizeCtl, sc - 1)) {
if ((sc - 2) != resizeStamp << RESIZE_STAMP_SHIFT)
return;
finishing = advance = true;
i = n; // recheck before commit
}
}
else if ((f = tabAt(tab, i)) == null)
advance = casTabAt(tab, i, null, fwd);
else if ((fh = f.hash) == MOVED)
advance = true; // already processed
else {
synchronized (f) {
if (tabAt(tab, i) == f) {
Node<K,V> ln, hn;
if (fh >= 0) {
int runBit = fh & n;
Node<K,V> lastRun = f;
for (Node<K,V> p = f.next; p != null; p = p.next) {
int b = p.hash & n;
if (b != runBit) {
runBit = b;
lastRun = p;
}
}
if (runBit == 0) {
ln = lastRun;
hn = null;
}
else {
hn = lastRun;
ln = null;
}
for (Node<K,V> p = f; p != lastRun; p = p.next) {
int ph = p.hash;
K pk = p.key;
V pv = p.val;
if ((ph & n) == 0)
ln = new Node<K,V>(ph, pk, pv, ln);
else
hn = new Node<K,V>(ph, pk, pv, hn);
}
setTabAt(nextTab, i, ln);
setTabAt(nextTab, i + n, hn);
setTabAt(tab, i, fwd);
advance = true;
}
else if (f instanceof TreeBin) {
TreeBin<K,V> t = (TreeBin<K,V>)f;
TreeNode<K,V> lo = null, loTail = null;
TreeNode<K,V> hi = null, hiTail = null;
int lc = 0, hc = 0;
for (Node<K,V> e = t.first; e != null; e = e.next) {
int h = e.hash;
TreeNode<K,V> p =
new TreeNode<K,V>(h, e.key, e.val, null, null);
if ((h & n) == 0) {
if ((p.prev = loTail) == null)
lo = p;
else
loTail.next = p;
loTail = p;
++lc;
}
else {
if ((p.prev = hiTail) == null)
hi = p;
else
hiTail.next = p;
hiTail = p;
++hc;
}
}
ln = (lc <= UNTREEIFY_THRESHOLD) ? untreeify(lo) :
(hc != 0) ? new TreeBin<K,V>(lo) : t;
hn = (hc <= UNTREEIFY_THRESHOLD) ? untreeify(hi) :
(lc != 0) ? new TreeBin<K,V>(hi) : t;
setTabAt(nextTab, i, ln);
setTabAt(nextTab, i + n, hn);
setTabAt(tab, i, fwd);
advance = true;
}
}
}
}
}
}
transfer() 方法的流程可以概括为以下几步:
- 计算步长(stride),用于将
table分成多个部分,每个线程负责迁移一部分。 - 如果
nextTab为空,创建新的table,容量是原来的两倍。 - 创建
ForwardingNode,用于标记该位置已经迁移。 - 循环遍历
table,迁移元素到nextTab。 - 如果该位置为空,使用CAS将
ForwardingNode设置到该位置。 - 如果该位置的Node的hash值为MOVED,表示该位置已经被其他线程迁移,跳过。
- 加锁处理,迁移该位置的链表或红黑树。
- 如果是链表,将链表分成两个链表,一个链表的元素的hash值与n与运算的结果为0,另一个链表的元素的hash值与n与运算的结果为1。
- 如果是红黑树,将红黑树分成两个红黑树,一个红黑树的元素的hash值与n与运算的结果为0,另一个红黑树的元素的hash值与n与运算的结果为1。
- 将迁移后的链表或红黑树设置到
nextTab的相应位置。 - 将
ForwardingNode设置到table的该位置,表示该位置已经迁移。 - 完成迁移后,将
nextTable设置为null,将table指向nextTab,并重新计算sizeCtl。
9. ConcurrentHashMap的性能优化细节
ConcurrentHashMap在性能优化方面做了很多工作,主要体现在以下几个方面:
- 减少锁的粒度: 从JDK 1.5-1.7的分段锁,到JDK 1.8的CAS + synchronized,锁的粒度越来越细,并发度越来越高。
- 使用红黑树: 当链表长度超过一定阈值时,链表会转换为红黑树,提高查找效率。
- 分段迁移: 扩容时,允许多个线程同时参与迁移,提高扩容效率。
- volatile变量: 使用volatile变量保证可见性,减少锁的使用。
- 延迟初始化: table在第一次使用时才初始化,避免了不必要的开销。
10. 总结
ConcurrentHashMap 是一个非常复杂和精妙的并发容器,它通过CAS、synchronized、红黑树、分段迁移等多种技术,实现了高并发、线程安全和高性能。 深入理解ConcurrentHashMap的源码,有助于我们更好地使用它,并在高并发场景下编写出更高效、更稳定的程序。
11. 核心机制与设计思路
ConcurrentHashMap的线程安全构建于CAS、synchronized及volatile变量的有机结合,分段迁移的扩容策略与红黑树的引入进一步提升了性能。其设计充分考虑了并发场景下的各种问题,是Java并发编程的经典案例。