Java高并发容器ConcurrentHashMap源码分析:线程安全与性能优化细节

好的,没问题。

Java高并发容器ConcurrentHashMap源码分析:线程安全与性能优化细节

大家好,今天我们来深入探讨Java并发包中的核心组件——ConcurrentHashMap。 作为高并发场景下的首选哈希表实现,ConcurrentHashMap 在JDK 1.5之后逐渐取代了HashTable等同步容器。 本次分享将从源码层面剖析其线程安全机制和性能优化策略,帮助大家更好地理解和使用这个强大的工具。

1. ConcurrentHashMap的演进

在分析具体实现之前,我们先回顾一下ConcurrentHashMap的演进历程,这有助于我们理解其设计思想。

  • JDK 1.5-1.7: 基于分段锁(Segment)机制。
  • JDK 1.8+: 基于CAS + synchronized + 红黑树。

分段锁机制虽然提升了并发度,但依然存在锁竞争的问题。 JDK 1.8 彻底摒弃了分段锁的设计,采用了更加细粒度的并发控制方式,大大提高了性能。

2. JDK 1.8 ConcurrentHashMap的核心数据结构

ConcurrentHashMap的核心数据结构由以下几个部分组成:

  • Node[] table: 存储键值对的哈希表,类似于HashMap中的table
  • sizeCtl: 控制表初始化和扩容的关键字段。
  • nextTable: 在扩容期间,用于指向新的table。
  • Node: 基本的数据单元,存储key-value对。
  • TreeNode: 当链表长度超过一定阈值时,链表会转换为红黑树,TreeNode用于表示红黑树节点。

我们用一个表格来总结:

字段名 类型 作用
table Node[] 存储键值对的哈希表
sizeCtl int 控制表初始化和扩容。负数表示初始化或扩容,正数表示下一次扩容的阈值。
nextTable Node[] 扩容期间,指向新的table
Node 内部类 基本的数据单元,存储key-value对。
TreeNode 内部类 红黑树节点,用于优化长链表的查找性能。

3. 重要字段sizeCtl详解

sizeCtl字段在ConcurrentHashMap中扮演着至关重要的角色,它控制着表的初始化和扩容。 sizeCtl 的取值有以下几种情况:

  • 小于0: 表示table正在初始化或者扩容。
    • -1: 表示table正在初始化。
    • -(1 + n): 表示有n个线程正在进行扩容操作。
  • 等于0: 表示table还没有被初始化。
  • 大于0:
    • 如果table已经初始化,则表示下一次扩容的阈值。
    • 如果table未初始化,则表示table的初始容量,或者默认容量(16)。

4. 线程安全的实现:CAS + synchronized

ConcurrentHashMap的线程安全主要依赖于以下两种机制:

  • CAS (Compare and Swap): 用于无锁地修改变量,例如初始化table、更新sizeCtl、添加新的Node等。
  • synchronized: 用于锁定NodeTreeNode,保证在更新链表或红黑树时的互斥性。 synchronized 只锁住一个桶,而不是整个哈希表。

结合CAS和synchronized,ConcurrentHashMap实现了细粒度的并发控制,在保证线程安全的同时,也提升了性能。

5. put() 方法源码分析

put() 方法是ConcurrentHashMap中最核心的方法之一,我们来详细分析一下它的源码。

public V put(K key, V value) {
    return putVal(key, value, false);
}

final V putVal(K key, V value, boolean onlyIfAbsent) {
    if (key == null || value == null) throw new NullPointerException();
    int hash = spread(key.hashCode()); // 计算hash值
    int binCount = 0; // 用于记录链表长度,达到阈值后转换为红黑树
    for (Node<K,V>[] tab = table;;) { // 自旋重试,直到成功
        Node<K,V> f; int n, i, fh;
        if (tab == null || (n = tab.length) == 0)
            tab = initTable(); // 初始化table
        else if ((f = tabAt(tab, i = (n - 1) & hash)) == null) { // 如果该位置为空,直接插入
            if (casTabAt(tab, i, null,
                         new Node<K,V>(hash, key, value, null))) // 使用CAS插入
                break;                   // no lock when adding to empty bin
        }
        else if ((fh = f.hash) == MOVED) // 如果该位置的Node的hash值为MOVED,表示正在扩容
            tab = helpTransfer(tab, f); // 帮助扩容
        else { // 该位置已经有Node,需要加锁处理
            V oldVal = null;
            synchronized (f) { // 锁定该位置的Node
                if (tabAt(tab,i) == f) { // 再次检查,防止其他线程修改
                    if (fh >= 0) { // 该位置是链表
                        binCount = 1;
                        for (Node<K,V> e = f;; ++binCount) {
                            K ek;
                            if (e.hash == hash &&
                                ((ek = e.key) == key || (ek != null && key.equals(ek)))) { // 找到相同的key
                                oldVal = e.val;
                                if (!onlyIfAbsent)
                                    e.val = value; // 替换value
                                break;
                            }
                            Node<K,V> pred = e;
                            if ((e = e.next) == null) { // 遍历到链表末尾,插入新的Node
                                pred.next = new Node<K,V>(hash, key, value, null);
                                break;
                            }
                        }
                    }
                    else if (f instanceof TreeBin) { // 该位置是红黑树
                        Node<K,V> p;
                        binCount = 2;
                        if ((p = ((TreeBin<K,V>)f).putTreeVal(hash, key, value)) != null) { // 在红黑树中插入或替换
                            oldVal = p.val;
                            if (!onlyIfAbsent)
                                p.val = value;
                        }
                    }
                }
            }
            if (binCount != 0) {
                if (binCount >= TREEIFY_THRESHOLD) // 如果链表长度达到阈值,转换为红黑树
                    treeifyBin(tab, i);
                if (oldVal != null)
                    return oldVal;
                break;
            }
        }
    }
    addCount(1L, binCount); // 增加元素数量,并检查是否需要扩容
    return null;
}

// 辅助方法,计算hash值
static final int spread(int h) {
    return (h ^ (h >>> 16)) & HASH_BITS;
}

// 辅助方法,从table中获取Node
static final <K,V> Node<K,V> tabAt(Node<K,V>[] tab, int i) {
    return (Node<K,V>)U.getObjectVolatile(tab, ((long)i << ASHIFT) + ABASE);
}

// 辅助方法,使用CAS更新table中的Node
static final <K,V> boolean casTabAt(Node<K,V>[] tab, int i,
                                    Node<K,V> c, Node<K,V> v) {
    return U.compareAndSwapObject(tab, ((long)i << ASHIFT) + ABASE, c, v);
}

put() 方法的流程可以概括为以下几步:

  1. 检查key和value是否为null,如果为null,抛出NullPointerException
  2. 计算key的hash值,通过spread()方法进行扰动,使hash值更加分散。
  3. 自旋重试,直到插入成功。
  4. 如果table为空,调用initTable()方法初始化table。
  5. 根据hash值找到table中的位置,如果该位置为空,使用CAS插入新的Node。
  6. 如果该位置的Node的hash值为MOVED,表示正在扩容,帮助扩容。
  7. 如果该位置已经有Node,加锁处理。
    • 如果是链表,遍历链表,找到相同的key则替换value,否则在链表末尾插入新的Node。
    • 如果是红黑树,在红黑树中插入或替换。
  8. 如果链表长度达到阈值,转换为红黑树。
  9. 增加元素数量,并检查是否需要扩容。

我们可以用一个流程图来表示:

graph LR
    A[开始] --> B{key或value为null?}
    B -- 是 --> C[抛出NullPointerException]
    B -- 否 --> D[计算hash值]
    D --> E{table为空?}
    E -- 是 --> F[初始化table]
    E -- 否 --> G[根据hash值找到table中的位置]
    G --> H{该位置为空?}
    H -- 是 --> I[使用CAS插入新的Node]
    H -- 否 --> J{该位置的Node的hash值为MOVED?}
    J -- 是 --> K[帮助扩容]
    J -- 否 --> L[加锁处理]
    L --> M{该位置是链表?}
    M -- 是 --> N[遍历链表,找到相同的key则替换value,否则在链表末尾插入新的Node]
    M -- 否 --> O[在红黑树中插入或替换]
    N --> P{链表长度达到阈值?}
    O --> P
    P -- 是 --> Q[转换为红黑树]
    P -- 否 --> R[增加元素数量,并检查是否需要扩容]
    I --> R
    K --> R
    R --> S[结束]

6. get() 方法源码分析

get() 方法相对简单,我们来看一下它的源码。

public V get(Object key) {
    Node<K,V>[] tab; Node<K,V> e, p; int n, eh; K ek;
    int h = spread(key.hashCode());
    if ((tab = table) != null && (n = tab.length) > 0 &&
        (e = tabAt(tab, (n - 1) & h)) != null) { // 如果table不为空,且该位置有Node
        if ((eh = e.hash) == h) { // 如果第一个Node的hash值相等
            if ((ek = e.key) == key || (ek != null && key.equals(ek))) // 如果key相等
                return e.val; // 返回value
        }
        else if (eh < 0) // 如果hash值小于0,表示该位置是TreeBin或ForwardingNode
            return (p = e.find(h, key)) != null ? p.val : null; // 在TreeBin或ForwardingNode中查找
        while ((e = e.next) != null) { // 遍历链表
            if (e.hash == h &&
                ((ek = e.key) == key || (ek != null && key.equals(ek))))
                return e.val; // 找到相同的key,返回value
        }
    }
    return null; // 没有找到,返回null
}

get() 方法的流程可以概括为以下几步:

  1. 计算key的hash值。
  2. 如果table为空,或者该位置没有Node,返回null。
  3. 如果第一个Node的hash值相等,且key也相等,返回value。
  4. 如果hash值小于0,表示该位置是TreeBin或ForwardingNode,在TreeBin或ForwardingNode中查找。
  5. 遍历链表,找到相同的key,返回value。
  6. 没有找到,返回null。

get() 方法没有加锁,因为Node的val是volatile的,保证了可见性。

7. initTable() 方法源码分析

initTable() 方法用于初始化table

private final Node<K,V>[] initTable() {
    Node<K,V>[] tab; int sc;
    while ((tab = table) == null || tab.length == 0) { // 自旋重试,直到初始化成功
        if ((sc = sizeCtl) < 0) // 如果sizeCtl小于0,表示有其他线程正在初始化或扩容
            Thread.yield(); // lost initialization race; just spin
        else if (U.compareAndSwapInt(this, SIZECTL, sc, -1)) { // 使用CAS将sizeCtl设置为-1,表示当前线程正在初始化
            try {
                if ((tab = table) == null || tab.length == 0) { // 再次检查,防止其他线程已经初始化
                    int n = (sc > 0) ? sc : DEFAULT_CAPACITY; // 如果sizeCtl大于0,则使用sizeCtl作为初始容量,否则使用默认容量
                    @SuppressWarnings("unchecked")
                    Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n]; // 创建新的table
                    table = tab = nt; // 将新的table赋值给table
                    sc = n - (n >>> 2); // 计算下一次扩容的阈值
                }
            } finally {
                sizeCtl = sc; // 设置sizeCtl为下一次扩容的阈值
            }
            break;
        }
    }
    return tab;
}

initTable() 方法的流程可以概括为以下几步:

  1. 自旋重试,直到初始化成功。
  2. 如果sizeCtl小于0,表示有其他线程正在初始化或扩容,放弃CPU时间片。
  3. 使用CAS将sizeCtl设置为-1,表示当前线程正在初始化。
  4. 再次检查,防止其他线程已经初始化。
  5. 创建新的table,容量为sizeCtl或默认容量。
  6. 将新的table赋值给table。
  7. 计算下一次扩容的阈值。
  8. 设置sizeCtl为下一次扩容的阈值。

8. 扩容机制

ConcurrentHashMap中的元素数量超过阈值时,会触发扩容。 扩容是一个比较复杂的过程,需要保证数据的一致性和并发性。 ConcurrentHashMap的扩容机制采用了分段迁移的方式,允许多个线程同时参与扩容,提高了扩容的效率。

8.1 扩容触发条件

当调用addCount()方法增加元素数量时,会检查是否需要扩容。 扩容的触发条件是:

if (s >= (long)(sc = sizeCtl) && (tab = table) != null &&
    (n = tab.length) < MAXIMUM_CAPACITY) {
    resizeStamp = Integer.numberOfLeadingZeros(n) | (1 << (RESIZE_STAMP_BITS - 1));
    if (sc < 0) { // 有其他线程正在扩容
        if ((rs = (sc >>> RESIZE_STAMP_SHIFT)) != resizeStamp ||
            rs == MAX_RESIZERS - 1 ||
            (nt = nextTable) == null ||
            transferIndex <= 0) {
            return; // 不需要参与扩容
        }
        if (U.compareAndSwapInt(this, SIZECTL, sc, sc + 1)) // 增加扩容线程数量
            transfer(tab, nt); // 参与扩容
    } else if (U.compareAndSwapInt(this, SIZECTL, sc, (resizeStamp << RESIZE_STAMP_SHIFT) + 2)) // 发起扩容
        transfer(tab, null); // 发起扩容
}

其中,s表示当前元素数量,sc表示sizeCtltab表示tablen表示table的长度,MAXIMUM_CAPACITY表示最大容量。

8.2 transfer() 方法源码分析

transfer() 方法用于将table中的元素迁移到新的table中。

private final void transfer(Node<K,V>[] tab, Node<K,V>[] nextTab) {
    int n = tab.length, stride;
    if ((stride = (NCPU > 1) ? (n >>> 3) / NCPU : n) < MIN_TRANSFER_STRIDE)
        stride = MIN_TRANSFER_STRIDE; // subdivide range
    if (nextTab == null) {            // initiating
        try {
            @SuppressWarnings("unchecked")
            Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n << 1]; // 创建新的table,容量是原来的两倍
            nextTab = nt;
        } catch (Throwable ex) {      // try to cope with OOME
            sizeCtl = Integer.MAX_VALUE;
            return;
        }
        nextTable = nextTab;
        transferIndex = n;
    }
    int nextn = nextTab.length;
    ForwardingNode<K,V> fwd = new ForwardingNode<K,V>(nextTab); // 创建ForwardingNode,用于标记该位置已经迁移
    boolean advance = true;
    boolean finishing = false; // to ensure that we only do one finish
    for (int i = 0, bound = 0;;) {
        Node<K,V> f; int fh;
        while (advance) {
            int nextIndex, nextBound;
            if (--i >= bound || finishing)
                advance = false;
            else if ((nextIndex = transferIndex) <= 0) {
                i = -1;
                advance = false;
            }
            else if (U.compareAndSwapInt
                     (this, TRANSFERINDEX, nextIndex,
                      nextBound = (nextIndex > stride ?
                                   nextIndex - stride : 0))) {
                bound = nextBound;
                i = nextIndex - 1;
                advance = false;
            }
        }
        if (i < 0 || i >= n || i + n >= nextn) {
            int sc;
            if (finishing) {
                nextTable = null;
                table = nextTab;
                sizeCtl = (n << 1) - (n >>> 1);
                return;
            }
            if (U.compareAndSwapInt(this, SIZECTL, sc = sizeCtl, sc - 1)) {
                if ((sc - 2) != resizeStamp << RESIZE_STAMP_SHIFT)
                    return;
                finishing = advance = true;
                i = n; // recheck before commit
            }
        }
        else if ((f = tabAt(tab, i)) == null)
            advance = casTabAt(tab, i, null, fwd);
        else if ((fh = f.hash) == MOVED)
            advance = true; // already processed
        else {
            synchronized (f) {
                if (tabAt(tab, i) == f) {
                    Node<K,V> ln, hn;
                    if (fh >= 0) {
                        int runBit = fh & n;
                        Node<K,V> lastRun = f;
                        for (Node<K,V> p = f.next; p != null; p = p.next) {
                            int b = p.hash & n;
                            if (b != runBit) {
                                runBit = b;
                                lastRun = p;
                            }
                        }
                        if (runBit == 0) {
                            ln = lastRun;
                            hn = null;
                        }
                        else {
                            hn = lastRun;
                            ln = null;
                        }
                        for (Node<K,V> p = f; p != lastRun; p = p.next) {
                            int ph = p.hash;
                            K pk = p.key;
                            V pv = p.val;
                            if ((ph & n) == 0)
                                ln = new Node<K,V>(ph, pk, pv, ln);
                            else
                                hn = new Node<K,V>(ph, pk, pv, hn);
                        }
                        setTabAt(nextTab, i, ln);
                        setTabAt(nextTab, i + n, hn);
                        setTabAt(tab, i, fwd);
                        advance = true;
                    }
                    else if (f instanceof TreeBin) {
                        TreeBin<K,V> t = (TreeBin<K,V>)f;
                        TreeNode<K,V> lo = null, loTail = null;
                        TreeNode<K,V> hi = null, hiTail = null;
                        int lc = 0, hc = 0;
                        for (Node<K,V> e = t.first; e != null; e = e.next) {
                            int h = e.hash;
                            TreeNode<K,V> p =
                                new TreeNode<K,V>(h, e.key, e.val, null, null);
                            if ((h & n) == 0) {
                                if ((p.prev = loTail) == null)
                                    lo = p;
                                else
                                    loTail.next = p;
                                loTail = p;
                                ++lc;
                            }
                            else {
                                if ((p.prev = hiTail) == null)
                                    hi = p;
                                else
                                    hiTail.next = p;
                                hiTail = p;
                                ++hc;
                            }
                        }
                        ln = (lc <= UNTREEIFY_THRESHOLD) ? untreeify(lo) :
                            (hc != 0) ? new TreeBin<K,V>(lo) : t;
                        hn = (hc <= UNTREEIFY_THRESHOLD) ? untreeify(hi) :
                            (lc != 0) ? new TreeBin<K,V>(hi) : t;
                        setTabAt(nextTab, i, ln);
                        setTabAt(nextTab, i + n, hn);
                        setTabAt(tab, i, fwd);
                        advance = true;
                    }
                }
            }
        }
    }
}

transfer() 方法的流程可以概括为以下几步:

  1. 计算步长(stride),用于将table分成多个部分,每个线程负责迁移一部分。
  2. 如果nextTab为空,创建新的table,容量是原来的两倍。
  3. 创建ForwardingNode用于标记该位置已经迁移。
  4. 循环遍历table迁移元素到nextTab
  5. 如果该位置为空,使用CAS将ForwardingNode设置到该位置。
  6. 如果该位置的Node的hash值为MOVED,表示该位置已经被其他线程迁移,跳过。
  7. 加锁处理,迁移该位置的链表或红黑树。
    • 如果是链表,将链表分成两个链表,一个链表的元素的hash值与n与运算的结果为0,另一个链表的元素的hash值与n与运算的结果为1。
    • 如果是红黑树,将红黑树分成两个红黑树,一个红黑树的元素的hash值与n与运算的结果为0,另一个红黑树的元素的hash值与n与运算的结果为1。
  8. 将迁移后的链表或红黑树设置到nextTab的相应位置。
  9. ForwardingNode设置到table的该位置,表示该位置已经迁移。
  10. 完成迁移后,nextTable设置为null,将table指向nextTab,并重新计算sizeCtl

9. ConcurrentHashMap的性能优化细节

ConcurrentHashMap在性能优化方面做了很多工作,主要体现在以下几个方面:

  • 减少锁的粒度: 从JDK 1.5-1.7的分段锁,到JDK 1.8的CAS + synchronized,锁的粒度越来越细,并发度越来越高。
  • 使用红黑树: 当链表长度超过一定阈值时,链表会转换为红黑树,提高查找效率。
  • 分段迁移: 扩容时,允许多个线程同时参与迁移,提高扩容效率。
  • volatile变量: 使用volatile变量保证可见性,减少锁的使用。
  • 延迟初始化: table在第一次使用时才初始化,避免了不必要的开销。

10. 总结

ConcurrentHashMap 是一个非常复杂和精妙的并发容器,它通过CAS、synchronized、红黑树、分段迁移等多种技术,实现了高并发、线程安全和高性能。 深入理解ConcurrentHashMap的源码,有助于我们更好地使用它,并在高并发场景下编写出更高效、更稳定的程序。

11. 核心机制与设计思路

ConcurrentHashMap的线程安全构建于CAS、synchronized及volatile变量的有机结合,分段迁移的扩容策略与红黑树的引入进一步提升了性能。其设计充分考虑了并发场景下的各种问题,是Java并发编程的经典案例。

发表回复

您的邮箱地址不会被公开。 必填项已用 * 标注