Java 常用集合 ConcurrentHashMap(JDK 1.8)

ConcurrentHashMap 整体设计(1.8)

常用的 HashMap 是线程不安全的, Hashtable 是线程安全的。Hashtable 通过在方法上添加 synchronized 保证线程安全,相当于 Hashtable 实例只有一把锁,导致使用效率低(高并发场景下)。

JDK 1.7 ConcurrentHashMap 使用分段锁局部锁定的方式,ConcurrentHashMap 由多个 Segment(包含 Node 键值对)组成,每个 Segment 各自持有锁实现线程安全,当一个线程占用锁访问其中一个 Segment 数据的时候,其他 Segment 的数据能够被其他线程访问。对于一些需要跨多个 Segment 的方法(比如:size() 和 containsValue()),需要锁整个表,这时会按顺序锁住所有 Segment,使用后再按顺序释放,防止死锁。读操作不加锁。

JDK 1.8 ConcurrentHashMap 使用了大量的 CAS 操作实现无锁操作,在少数地方(扩容操作、对某个桶中的 Node 操作…)用到 synchronized 同步锁保证线程安全。

内部属性

ConcurrentHashMap 的实现基本上与 HashMap 类似,大部分属性可以参考 HashMap

RESIZE_STAMP_BITS   
MAX_RESIZERS        

构造函数

/**
 * Table initialization and resizing control.  When negative, the
 * table is being initialized or resized: -1 for initialization,
 * else -(1 + the number of active resizing threads).  Otherwise,
 * when table is null, holds the initial table size to use upon
 * creation, or 0 for default. After initialization, holds the
 * next element count value upon which to resize the table.
 */
transient volatile int sizeCtl;

public ConcurrentHashMap(int initialCapacity) {
    // 初始化表的容量,tableSizeFor 在 HashMap 中有介绍
    int cap = ((initialCapacity >= (MAXIMUM_CAPACITY >>> 1)) ?
               MAXIMUM_CAPACITY :
               tableSizeFor(initialCapacity + (initialCapacity >>> 1) + 1));
    this.sizeCtl = cap;
}

public ConcurrentHashMap(Map<? extends K, ? extends V> m) {
    this.sizeCtl = DEFAULT_CAPACITY;
    // 调用 putAll,与 HashMap 实现不同
    putAll(m);
}

sizeCtl 属性比较重要,上面贴出了源码中的解释。sizeCtl 作为 ConcurrentHashMap 的控制标识符(初始化和扩容):

sizeCtl=-1                            // 表示正在初始化,initTable(或 tryPresize)方法中设置
sizeCtl=-(1 + resizing_threads_num)   // 表示正在 resizing,tryPresize 方法中设置
sizeCtl=0 | capacity                  // 表示未初始化或当前容量的大小(用于下次扩容判断)

下面的构造函数指定了一个参数 concurrencyLevel,表示能够同时更新 ConccurentHashMap 且不产生锁竞争的最大线程数(默认值 16)。

public ConcurrentHashMap(int initialCapacity,
                             float loadFactor, int concurrencyLevel) {
    // ...

    if (initialCapacity < concurrencyLevel)   
        initialCapacity = concurrencyLevel;  
    long size = (long)(1.0 + (long)initialCapacity / loadFactor);
    int cap = (size >= (long)MAXIMUM_CAPACITY) ?
        MAXIMUM_CAPACITY : tableSizeFor((int)size);
    this.sizeCtl = cap;
}

Key-value 存储结构

// 哈希桶
transient volatile Node<K,V>[] table;

// Node 
static class Node<K,V> implements Map.Entry<K,V> {
    // volatile 保证可见性
    volatile V val;
    volatile Node<K,V> next;

    // 不允许修改,HashMap.Node 可以
    public final V setValue(V value) { 
        throw new UnsupportedOperationException();
    }
}

// TreeNode
static final class TreeNode<K,V> extends Node<K,V>

// TreeBin
static final class TreeBin<K,V> extends Node<K,V>

Node 对象的实现与 HashMap.Node 类似(有一些细微区别),存储 key-value,需要注意的是 val 和 next 属性添加了 volatile 修饰保证可见性。

TreeNode 对象比较简单,相比于 HashMap.TreeNode 没有实现 treeify 方法(链表转换为树),而对应的操作交给了 TreeBin 来实现。

TreeBin 封装了对 TreeNode 的各种操作。

Hash 方法

首先比较一下 ConcurrentHashMap 与 HashMap 对 key 取哈希值的方法,有一些改变。

// HashMap
static final int hash(Object key) {
    int h;
    return (key == null) ? 0 : (h = key.hashCode()) ^ (h >>> 16);
}

// ConcurrentHashMap
static final int HASH_BITS = 0x7fffffff;
static final int spread(int h) {
    return (h ^ (h >>> 16)) & HASH_BITS;
}

Get 方法

首先计算出 key 的 hashCode,找到 table 中的数组索引,即 Node<K, V> 对象,如果存在并且判断该结点是否等于要查询的节点,如果不是,按照链表或者红黑树的查询方式查找。

public V get(Object key) {
    Node<K,V>[] tab; Node<K,V> e, p; int n, eh; K ek;
    int h = spread(key.hashCode());

    // 检查 table 是否存在,hashCode 所在的索引是有为空
    if ((tab = table) != null && (n = tab.length) > 0 &&
        (e = tabAt(tab, (n - 1) & h)) != null) {

        if ((eh = e.hash) == h) {
            if ((ek = e.key) == key || (ek != null && key.equals(ek)))
                return e.val;
        }
        // 按照树的方式遍历 Node 查找
        else if (eh < 0)
            return (p = e.find(h, key)) != null ? p.val : null;

        // 按照链表的方式遍历 Node 查找    
        while ((e = e.next) != null) {
            if (e.hash == h &&
                ((ek = e.key) == key || (ek != null && key.equals(ek))))
                return e.val;
        }
    }
    return null;
}

Put 方法

put 操作应用到 CAS 算法,调用了形如 compareAndSwapXXX 的 native 方法实现。

public V put(K key, V value) {
    return putVal(key, value, false);
}

final V putVal(K key, V value, boolean onlyIfAbsent) {
    int hash = spread(key.hashCode());
    int binCount = 0;

    // 循环直到插入成功,
    for (Node<K,V>[] tab = table;;) {
        Node<K,V> f; int n, i, fh;
        if (tab == null || (n = tab.length) == 0)
            // 初始化 table,应用 CAS
            tab = initTable();

        // tabAt 调用 getObjectVolatile 
        // 当前位置为空可以直接插入的情况    
        else if ((f = tabAt(tab, i = (n - 1) & hash)) == null) {

            // 通过 CAS 操作插入,不需要加锁
            if (casTabAt(tab, i, null,
                         new Node<K,V>(hash, key, value, null)))
                break;              
        }

        // 下面是位置已经有值的情况

        // MOVED 表示当前 Map 正在进行扩容
        else if ((fh = f.hash) == MOVED)
            // 帮助进行扩容,然后进入下一次循环尝试插入
            tab = helpTransfer(tab, f);
        else {
            // 未在扩容的情况
            V oldVal = null;

            // 对f加锁,f 是存储在当前位置的 Node 的头节点
            synchronized (f) {
                // 双重检查,保证 Node 头节点没有改变
                if (tabAt(tab, i) == f) {
                    if (fh >= 0) {
                        // 对链表进行操作
                        binCount = 1;
                        for (Node<K,V> e = f;; ++binCount) {
                            // 更新值(判断 onlyIfAbsent)或插入链表尾部 ...
                            break;
                        }
                    }
                    else if (f instanceof TreeBin) {
                        // 对树进行操作 ...
                    }
                }
            }

            // 判断是否需要 treeify
            if (binCount != 0) {
                if (binCount >= TREEIFY_THRESHOLD)
                    treeifyBin(tab, i);
                if (oldVal != null)
                    return oldVal;
                break;
            }
        }
    }


    addCount(1L, binCount);
    return null;
}

扩容

在 treeifyBin 的操作中会判断是否真的需要转换树,还是对 table 进行扩容。当一个 table 的某个位置上元素比较多时,很大原因并不是因为这些 key 的 hashCode 相同,而是因为 table 比较小,hashCode 对 table 大小取摸后相同的概率比较大,所以选择对 table 进行扩容。

扩容代码如下:

private final void tryPresize(int size) {
    // 计算新的容量
    int c = (size >= (MAXIMUM_CAPACITY >>> 1)) ? MAXIMUM_CAPACITY :
        tableSizeFor(size + (size >>> 1) + 1);
    // sizeCtl
    int sc;

    // sizeCtl < 0 表示正在初始化或扩容
    // 直到不需要扩容或扩容完成跳出循环
    while ((sc = sizeCtl) >= 0) {
        Node<K,V>[] tab = table; int n;
        if (tab == null || (n = tab.length) == 0) {
            // 初始化 table ...
        }
        else if (c <= sc || n >= MAXIMUM_CAPACITY)
            // 无序扩容
            break;
        else if (tab == table) {
            // table 没有变化时进行操作

            // 生成一个 resize 的标识,更新 sizeCtl(CAS)
            int rs = resizeStamp(n);

            // 执行扩容操作
            transfer(tab, null);
        }
    }
}

在 put 操作中,有一个判断调用了 helpTransfer 方法,进行帮助扩容,含义是有其它线程正在扩容时,当前线程一起转移元素。

// f.hash = MOVE 表示 f 是 ForwardingNode,用于扩容操作的节点
final Node<K,V>[] helpTransfer(Node<K,V>[] tab, Node<K,V> f) {
    Node<K,V>[] nextTab; int sc;
    if (tab != null && (f instanceof ForwardingNode) &&
        (nextTab = ((ForwardingNode<K,V>)f).nextTable) != null) {

        // 生成一个 resize 的标识,更新 sizeCtl(CAS)
        int rs = resizeStamp(tab.length);

        while (nextTab == nextTable && table == tab &&
               (sc = sizeCtl) < 0) {

            // 当扩容的线程过多时,跳出循环
            if ((sc >>> RESIZE_STAMP_SHIFT) != rs || sc == rs + 1 ||
                sc == rs + MAX_RESIZERS || transferIndex <= 0)
                break;
            if (U.compareAndSwapInt(this, SIZECTL, sc, sc + 1)) {
                // 进入扩容方法,并更新 sizeCtl
                transfer(tab, nextTab);
                break;
            }
        }
        return nextTab;
    }
    return table;
}

扩容操作 transfer,相当复杂,核心部分代码(即添加锁的部分):

synchronized (f) {
    if (tabAt(tab, i) == f) {
        // 这里与 HashMap 重新离散的思路相同:
        // 当前桶的节点在扩容后只能在新桶的idx(当前位置)或 idx+oldSize(原桶大小)
        // 就是说原链表 Node,会生成两个新的 Node:ln和hn
        Node<K,V> ln, hn;
        if (fh >= 0) {
            // 处理链表节点 ...

            // 调用 putObjectVolatile 方法更新 newTab
            setTabAt(nextTab, i, ln);
            setTabAt(nextTab, i + n, hn);
            setTabAt(tab, i, fwd);
        }
        else if (f instanceof TreeBin) {
            // 处理树节点,同样的更新逻辑 ... 
        }
    }
}

遍历方式

ConcurrentHashMap 是支持在遍历的时候,进行修改。但是频繁的修改和遍历 ConcurrentHashMap 还是会出问题,有可能遍历不到最新修改的数据。

已知问题

JDK7 HashMap 在多线程环境下可能造成 CPU 100% 的现象,由于在扩容的时候 put 时产生了环状链表,会在 get 时造成了 CPU 100%,在 JDK8 通过修改重新离散的方法得到解决。

JDK8 ConcurrentHashMap 也有一种情况会造成 CPU 100% 中,在 JDK9 中已经得到修复(https://bugs.openjdk.java.net/browse/JDK-8062841)

什么情况下 JDK8 ConcurrentHashMap 会出现这个问题?

Map<String, String> map = new ConcurrentHashMap<>();
map.computeIfAbsent("AaAa",
        key -> map.computeIfAbsent("BBBB", key2 -> "value"));

问题的关键在于递归使用了 computeIfAbsent 方法,不要在递归中使用,具体的原因这里不再分析,避免使用。

与 Collections.synchronizedMap() 的区别

Collections.synchronizedMap() 方法来获取一个线程安全的集合(实现原理是 Collections 定义了一个 SynchronizedMap 的内部类,这个类实现了 Map 接口,在调用方法时使用 synchronized 来保证线程同步,其它 Collections.synchronizedXX 方法也是类似原理)

所以 Collections.synchronizedMap() 更类似与 Hashtable,对 Map 进行同步,

还有一个区别是,ConcurrentHashMap 必然是个 HashMap。而 Collections.synchronizedMap() 可以接收任意 Map 实例,实现同步。


Add a Comment

电子邮件地址不会被公开。 必填项已用*标注

10 + 18 =