Java 常用集合 ConcurrentHashMap(JDK 1.8)

ConcurrentHashMap 整体设计(1.8)

常用的 HashMap 是线程不安全的, Hashtable 是线程安全的。Hashtable 通过在方法上添加 synchronized 保证线程安全,相当于 Hashtable 实例只有一把锁,导致高并发场景下使用效率低。

JDK 1.7 ConcurrentHashMap 使用分段锁局部锁定的方式,ConcurrentHashMap 由多个 Segment(包含 Node 键值对)组成,每个 Segment 各自持有锁实现线程安全,当一个线程占用锁访问其中一个 Segment 数据的时候,其他 Segment 的数据能够被其他线程访问。对于一些需要跨多个 Segment 的方法(比如:size()containsValue()),需要锁整个表,这时会按顺序锁住所有 Segment,使用后再按顺序释放,防止死锁。读操作不加锁。

JDK 1.8 ConcurrentHashMap 使用了大量的 CAS 操作实现无锁操作,在少数地方(扩容操作、对某个桶中的 Node 操作…)用到 synchronized 同步锁保证线程安全。

ConcurrentHashMap 的实现基本上与 HashMap 类似,大部分属性可以参考 HashMap

public class ConcurrentHashMap<K,V> extends AbstractMap<K,V>
    implements ConcurrentMap<K,V>, Serializable {

  // 作为 ConcurrentHashMap 的控制标识符(初始化和扩容)
  // sizeCtl = -1                                                           表示正在初始化,initTable(或 tryPresize)方法中设置
    // sizeCtl = -(1 + resizing_threads_num)        表示正在 resizing,tryPresize 方法中设置
    // sizeCtl = 0 | capacity                   表示未初始化或当前容量的大小(用于下次扩容判断)
  transient volatile int sizeCtl;

  // 要使用的下一个表;仅在 resize 时为非空。
  private transient volatile Node<K,V>[] nextTable;
}

构造函数

public ConcurrentHashMap() {
}

public ConcurrentHashMap(int initialCapacity) {
    // 初始化表的容量,tableSizeFor 计算大于等于初始容量的的最小的二次幂数值
    int cap = ((initialCapacity >= (MAXIMUM_CAPACITY >>> 1)) ?
               MAXIMUM_CAPACITY :
               tableSizeFor(initialCapacity + (initialCapacity >>> 1) + 1));
    // 设置 sizeCtl
    this.sizeCtl = cap;
}

public ConcurrentHashMap(Map<? extends K, ? extends V> m) {
      // 设置 sizeCtl
    this.sizeCtl = DEFAULT_CAPACITY;
    // 调用 putAll,将所有元素存入当前实例中
    putAll(m);
}

下面的构造函数指定了一个参数 concurrencyLevel,表示能够同时更新 ConcurrentHashMap 且不产生锁竞争的最大线程数。

public ConcurrentHashMap(int initialCapacity,
                             float loadFactor, int concurrencyLevel) {
    // ...
    if (initialCapacity < concurrencyLevel)   
        initialCapacity = concurrencyLevel;  
    long size = (long)(1.0 + (long)initialCapacity / loadFactor);
    int cap = (size >= (long)MAXIMUM_CAPACITY) ?
        MAXIMUM_CAPACITY : tableSizeFor((int)size);
    this.sizeCtl = cap;
}

public ConcurrentHashMap(int initialCapacity, float loadFactor) {
    this(initialCapacity, loadFactor, 1);
}

存储结构

Node 对象的实现与 HashMap.Node 类似(有细微区别),存储 key-value,需要注意的是 val 和 next 属性添加了 volatile 修饰保证可见性。并且 val 不允许修改(HashMap 中可以)。

// 哈希桶
transient volatile Node<K,V>[] table;

static class Node<K,V> implements Map.Entry<K,V> {
    // volatile 保证可见性
    volatile V val;
    volatile Node<K,V> next;

    // 不允许修改,调用会抛出异常
    public final V setValue(V value) { 
        throw new UnsupportedOperationException();
    }
}

当 Node 数量超过 TREEIFY_THRESHOLD 时,会由链表结构转换为红黑树 TreeNode。在 HashMap.TreeNode 中实现了 treeify 方法(链表转换为树),在 ConcurrentHashMap 中,转换的操作交给了 TreeBin 来实现。TreeBin 封装了对 TreeNode 的各种操作。

// TreeNode
static final class TreeNode<K,V> extends Node<K,V> {
    TreeNode<K,V> parent; 
    TreeNode<K,V> left;
    TreeNode<K,V> right;
    TreeNode<K,V> prev;   
    boolean red;

    Node<K,V> find(int h, Object k) {
      return findTreeNode(h, k, null);
    }

    final TreeNode<K,V> findTreeNode(int h, Object k, Class<?> kc) { /****/ }
}

// TreeBin
static final class TreeBin<K,V> extends Node<K,V> {
    TreeNode<K,V> root;
    volatile TreeNode<K,V> first;
    volatile Thread waiter;
    volatile int lockState;
    // 锁状态标识
    static final int WRITER = 1; // set while holding write lock
    static final int WAITER = 2; // set when waiting for write lock
    static final int READER = 4; // increment value for setting read lock

    TreeBin(TreeNode<K,V> b) { /****/ }

    // 获取写锁
    private final void lockRoot() {
      if (!U.compareAndSwapInt(this, LOCKSTATE, 0, WRITER))
        contendedLock(); // offload to separate method
    }

    // 释放写锁
    private final void unlockRoot() {
      lockState = 0;
    }

    final Node<K,V> find(int h, Object k) { /****/ }

    final TreeNode<K,V> putTreeVal(int h, K k, V v) { /****/ }

    final boolean removeTreeNode(TreeNode<K,V> p) { /****/ }

    // Red-black tree methods, all adapted from CLR

    static <K,V> TreeNode<K,V> rotateLeft(TreeNode<K,V> root,
                                          TreeNode<K,V> p) {
    }

    static <K,V> TreeNode<K,V> rotateRight(TreeNode<K,V> root,
                                           TreeNode<K,V> p) {
    }

    static <K,V> TreeNode<K,V> balanceInsertion(TreeNode<K,V> root,
                                                TreeNode<K,V> x) {
    }

    static <K,V> TreeNode<K,V> balanceDeletion(TreeNode<K,V> root,
                                               TreeNode<K,V> x) {
    }
}

Hash 方法

首先比较一下 ConcurrentHashMap 与 HashMap 对 key 取哈希值的方法,有一些不同。

// HashMap
static final int hash(Object key) {
    int h;
    return (key == null) ? 0 : (h = key.hashCode()) ^ (h >>> 16);
}

// ConcurrentHashMap
static final int HASH_BITS = 0x7fffffff;
static final int spread(int h) {
    return (h ^ (h >>> 16)) & HASH_BITS;
}

Get 方法

首先计算出 key 的 hashCode,找到 table 中的数组索引,即 Node<K, V> 对象,如果存在并且判断该结点是否等于要查询的节点,如果不是,按照链表或者红黑树的查询方式查找。

public V get(Object key) {
    Node<K,V>[] tab; Node<K,V> e, p; int n, eh; K ek;
    int h = spread(key.hashCode());

    // 检查 table 是否存在,hashCode 所在的索引是有为空
    if ((tab = table) != null && (n = tab.length) > 0 &&
        (e = tabAt(tab, (n - 1) & h)) != null) {

        if ((eh = e.hash) == h) {
            if ((ek = e.key) == key || (ek != null && key.equals(ek)))
                return e.val;
        }

        // 按照树的方式遍历 Node 查找
        else if (eh < 0)
            return (p = e.find(h, key)) != null ? p.val : null;

        // 按照链表的方式遍历 Node 查找    
        while ((e = e.next) != null) {
            if (e.hash == h &&
                ((ek = e.key) == key || (ek != null && key.equals(ek))))
                return e.val;
        }
    }
    return null;
}

Put 方法

Put 操作应用到 CAS 算法,调用了形如 compareAndSwapXXX 的 native 方法实现。

public V put(K key, V value) {
    return putVal(key, value, false);
}

final V putVal(K key, V value, boolean onlyIfAbsent) {
    // 计算 key 的 hashCode
    int hash = spread(key.hashCode());
    int binCount = 0;

    // 循环直到插入成功,
    for (Node<K,V>[] tab = table;;) {
        Node<K,V> f; int n, i, fh;
        if (tab == null || (n = tab.length) == 0)
            // 初始化 table
            tab = initTable();

        // tabAt 调用 getObjectVolatile 
        // 当前位置为空可以直接插入的情况    
        else if ((f = tabAt(tab, i = (n - 1) & hash)) == null) {

            // 通过 CAS 操作插入,不需要加锁
            if (casTabAt(tab, i, null,
                         new Node<K,V>(hash, key, value, null)))
                break;              
        }

        // 下面是位置已经有值的情况

        // MOVED 表示当前 Map 正在进行扩容
        else if ((fh = f.hash) == MOVED)
            // 帮助进行扩容,然后进入下一次循环尝试插入
            tab = helpTransfer(tab, f);

          // 未在扩容的情况
        else {
            V oldVal = null;

            // 对f加锁,f 是存储在当前位置的 Node 的头节点
            synchronized (f) {
                // 双重检查,保证 Node 头节点没有改变
                if (tabAt(tab, i) == f) {
                    if (fh >= 0) {
                        // 对链表进行操作
                        binCount = 1;
                        for (Node<K,V> e = f;; ++binCount) {
                            // 更新值(判断 onlyIfAbsent)或插入链表尾部 ...
                            break;
                        }
                    }
                    else if (f instanceof TreeBin) {
                        // 对树进行操作 ...
                    }
                }
            }

            // 判断是否需要 treeify
            if (binCount != 0) {
                if (binCount >= TREEIFY_THRESHOLD)
                    treeifyBin(tab, i);
                if (oldVal != null)
                    return oldVal;
                break;
            }
        }
    }

    addCount(1L, binCount);
    return null;
}

初始化 table

private final Node<K,V>[] initTable() {
    Node<K,V>[] tab; int sc;
    // 初始化成功退出循环
    while ((tab = table) == null || tab.length == 0) {
      if ((sc = sizeCtl) < 0)
        Thread.yield(); // 有其他线程在初始化,自旋等待
      else if (U.compareAndSwapInt(this, SIZECTL, sc, -1)) {
        // 进入初始化 sizeCtl = -1
        try {
          if ((tab = table) == null || tab.length == 0) {
            int n = (sc > 0) ? sc : DEFAULT_CAPACITY;
            Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n];
            table = tab = nt;
            // sc = n-n/4 ???
            sc = n - (n >>> 2);
          }
        } finally {
          // 初始化成功设置 sizeCtl
          sizeCtl = sc;
        }
        break;
      }
    }
    return tab;
}

扩容

在 treeifyBin 的操作中会判断是否真的需要转换树,还是对 table 进行扩容。当一个 table 的某个位置上元素比较多时,很大原因并不是因为这些 key 的 hashCode 相同,而是因为 table 比较小(跟 HashMap 的逻辑相同),hashCode 对 table 大小取摸后相同的概率比较大,所以选择对 table 进行扩容。扩容代码如下:

private final void treeifyBin(Node<K,V>[] tab, int index) {
    Node<K,V> b; int n, sc;
    if (tab != null) {
          if ((n = tab.length) < MIN_TREEIFY_CAPACITY)
            tryPresize(n << 1);
          else if ((b = tabAt(tab, index)) != null && b.hash >= 0) {
                // ....
        }
    }
}

private final void tryPresize(int size) {
    // 计算新的容量
    int c = (size >= (MAXIMUM_CAPACITY >>> 1)) ? MAXIMUM_CAPACITY :
        tableSizeFor(size + (size >>> 1) + 1);
    // sizeCtl
    int sc;

    // sizeCtl < 0 表示正在初始化或扩容
    // 直到不需要扩容或扩容完成跳出循环
    while ((sc = sizeCtl) >= 0) {
        Node<K,V>[] tab = table; int n;
        if (tab == null || (n = tab.length) == 0) {
            // 初始化 table ...
        }
        else if (c <= sc || n >= MAXIMUM_CAPACITY)
            // 不需要要扩容
            break;
        else if (tab == table) {
            // 生成一个 resize 的标识,更新 sizeCtl(CAS)
            int rs = resizeStamp(n);
            // 此时 sc 应该是 >= 0 
            if (sc < 0) {
                // ...
            }
            else if (U.compareAndSwapInt(this, SIZECTL, sc,
                                         (rs << RESIZE_STAMP_SHIFT) + 2))
                            // 执行扩容操作               
                  transfer(tab, null);
        }
    }
}

private final void transfer(Node<K,V>[] tab, Node<K,V>[] nextTab) {
    int n = tab.length, stride;
    if ((stride = (NCPU > 1) ? (n >>> 3) / NCPU : n) < MIN_TRANSFER_STRIDE)
        stride = MIN_TRANSFER_STRIDE; 
    if (nextTab == null) {        
        // 第一个扩容线程会进入 if
        // 新的 table 容量是老的 table 的两倍,如果溢出则失败退出
        Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n << 1];
        nextTab = nt;
        // 设置 nextTable
        nextTable = nextTab;
        transferIndex = n;
    }

      int nextn = nextTab.length;
    ForwardingNode<K,V> fwd = new ForwardingNode<K,V>(nextTab);
    // 省略复杂的扩容过程 ...
    // 通过 CAS 将老的 table 中的元素插入 nextTable 中
}

在 put 操作中,有一个判断调用了 helpTransfer 方法,进行帮助扩容,含义是有其它线程正在扩容时,当前线程一起转移元素。

// put 方法
{       
    if ((fh = f.hash) == MOVED)
        // 帮助进行扩容,然后进入下一次循环尝试插入
        tab = helpTransfer(tab, f);
}

// f.hash = MOVE 表示 f 是 ForwardingNode,用于扩容操作的节点
final Node<K,V>[] helpTransfer(Node<K,V>[] tab, Node<K,V> f) {
    Node<K,V>[] nextTab; int sc;
    if (tab != null && (f instanceof ForwardingNode) &&
        (nextTab = ((ForwardingNode<K,V>)f).nextTable) != null) {
        // 生成一个 resize 的标识,更新 sizeCtl(CAS)
        int rs = resizeStamp(tab.length);

        while (nextTab == nextTable && table == tab &&
               (sc = sizeCtl) < 0) {

            // 当扩容的线程过多时,跳出循环
            if ((sc >>> RESIZE_STAMP_SHIFT) != rs || sc == rs + 1 ||
                sc == rs + MAX_RESIZERS || transferIndex <= 0)
                break;
            if (U.compareAndSwapInt(this, SIZECTL, sc, sc + 1)) {
                // 进入扩容方法,并更新 sizeCtl
                transfer(tab, nextTab);
                break;
            }
        }
        return nextTab;
    }
    return table;
}

扩容操作 transfer 相当复杂,抽取部分核心代码(添加锁的部分):

synchronized (f) {
    if (tabAt(tab, i) == f) {
        // 这里与 HashMap 重新离散的思路相同:
        // 当前桶的节点在扩容后只能在新桶的idx(当前位置)或 idx+oldSize(原桶大小)
        // 就是说原链表 Node,会生成两个新的 Node:ln和hn
        Node<K,V> ln, hn;
        if (fh >= 0) {
            // 处理链表节点 ...

            // 调用 putObjectVolatile 方法更新 newTab
            setTabAt(nextTab, i, ln);
            setTabAt(nextTab, i + n, hn);
            setTabAt(tab, i, fwd);
        }
        else if (f instanceof TreeBin) {
            // 处理树节点,同样的更新逻辑 ... 
        }
    }
}

遍历方式

ConcurrentHashMap 是支持在遍历的时候,进行修改。但是频繁的修改和遍历 ConcurrentHashMap 还是会出问题,有可能遍历不到最新修改的数据。

已知问题

JDK7 HashMap 在多线程环境下可能造成 CPU 100% 的现象,由于在扩容的时候 put 时产生了环状链表,会在 get 时造成了 CPU 100%,在 JDK8 通过修改重新离散的方法得到解决。

JDK8 ConcurrentHashMap 也有一种情况会造成 CPU 100% 中,在 JDK9 中已经得到修复(https://bugs.openjdk.java.net/browse/JDK-8062841)

什么情况下 JDK8 ConcurrentHashMap 会出现这个问题?

Map<String, String> map = new ConcurrentHashMap<>();
map.computeIfAbsent("AaAa",
        key -> map.computeIfAbsent("BBBB", key2 -> "value"));

问题的关键在于递归使用了 computeIfAbsent 方法,不要在递归中使用,具体的原因这里不再分析,避免使用。

与 Collections.synchronizedMap() 的区别

Collections.synchronizedMap() 方法来获取一个线程安全的集合(实现原理是 Collections 定义了一个 SynchronizedMap 的内部类,这个类实现了 Map 接口,在调用方法时使用 synchronized 来保证线程同步,其它 Collections.synchronizedXX 方法也是类似原理),所以 Collections.synchronizedMap() 更类似与 Hashtable,对 Map 进行同步。

还有一个区别是,ConcurrentHashMap 必然是个 HashMap。而 Collections.synchronizedMap() 可以接收任意 Map 实例,实现同步。


Add a Comment

电子邮件地址不会被公开。 必填项已用*标注