GVKun编程网logo

ConcurrentHashMap1.8源码分析(concurrenthashmap源码1.7)

27

对于ConcurrentHashMap1.8源码分析感兴趣的读者,本文将提供您所需要的所有信息,我们将详细讲解concurrenthashmap源码1.7,并且为您提供关于ConcurrentHash

对于ConcurrentHashMap1.8源码分析感兴趣的读者,本文将提供您所需要的所有信息,我们将详细讲解concurrenthashmap源码1.7,并且为您提供关于ConcurrentHashMap 1.8 源码分析、ConcurrentHashMap JDK 1.6 源码分析、ConcurrentHashMap 与同步 HashMap - ConcurrentHashMap vs Synchronized HashMap、ConcurrentHashMap 源码分析的宝贵知识。

本文目录一览:

ConcurrentHashMap1.8源码分析(concurrenthashmap源码1.7)

ConcurrentHashMap1.8源码分析(concurrenthashmap源码1.7)

文章简介

想必大家对HashMap数据结构并不陌生,JDK1.7采用的是数组+链表的方式,JDK1.8采用的是数组+链表+红黑树的方式。虽然JDK1.8对于HashMap有了很大的改进,提高了存取效率,但是线程安全的问题不可忽视,所以就有了线程安全的解决方案,比如在方法上加synchronized同步锁的HashTable,或者并发包中的ConcurrentHashMap线程安全类,本文就来和大家一起探讨一下关于ConcurrentHashMap的源码,版本是JDK1.8,下面让我们正式开始吧。

备注:大家需要对HashMap1.8源码有一些了解,在原来HashMap1.8源码中比较常见的知识点本文不会具体展开。


 

内容导航

  • 数组初始化线程安全实现
  • put(key,value)线程安全实现
  • transfer扩容及不同的扩容场景

 

01 put(key,value)方法

不妨先以一段大家熟悉的代码开始本文的旅程

ConcurrentHashMap<Integer,String> map=new ConcurrentHashMap<Integer, String>();
map.put(1,"Zhang");

当我们在put元素时,点开put方法的源码会发现,这里调用了一个putVal()的方法,同时将key和value作为参数传入

public V put(K key, V value) { return putVal(key, value, false); }

继续点击putVal()方法,然后我们看看这里到底实现了什么


//key或者value都不能为空
if (key == null || value == null) throw new NullPointerException(); //计算hash值,实际上就是得到一个int类型的数,只是需要对这个数进行处理,目的是为了确定key,value组成的Node节点在数组下标中的位置 int hash = spread(key.hashCode());

不妨先看下spread(key.hashCode())的实现

key.hashCode()实际上调用的是native的方法,目的是得到一个×××数,为了使得这个×××数尽可能不一样,所以要对高16位和低16位进行异或运算,尽可能利用好每一位的值
static final int spread(int h) { //对key.hashCode的结果进行高16位和低16位的运算 return (h ^ (h >>> 16)) & HASH_BITS; }

接下来就是要初始化这个数组的大小,因为数组不初始化,代表key,value的每个Node类也不能放到对应的位置


if (tab == null || (n = tab.length) == 0)
    //初始化数组的大小
     tab = initTable();
private final Node<K,V>[] initTable() {
    Node<K,V>[] tab; int sc; //只有当数组为空或者大小为0的时候才对数组进行初始化 while ((tab = table) == null || tab.length == 0) { //这里其实就是用一个sizeCtl记录是否已经有线程在进行初始化操作,如果有,则让出CPU的资源,也就是保证只有一个线程对数组进行初始化操作,从而保证线程安全。 if ((sc = sizeCtl) < 0) Thread.yield(); // lost initialization race; just spin //使用CAS乐观锁机制比较SIZECTL和sc是否相等,只有当前值和内存中最新值相等的时候,才会将当前值赋值为-1,一旦被赋值为-1,上面有其他线程进来,就直接执行了Thread.yeild()方法了 else if (U.compareAndSwapInt(this, SIZECTL, sc, -1)) { try { if ((tab = table) == null || tab.length == 0) { //三元运算符得到数组默认大小,点击DEFAULT_CAPACITY发现是16,这点和HashMap是一样的 int n = (sc > 0) ? sc : DEFAULT_CAPACITY; @SuppressWarnings("unchecked") //创建Node类型的数组,真正初始化的地方 Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n]; table = tab = nt; //计算扩容的标准,采用的是位移运算,因为效率更高,sc最终结果为12 sc = n - (n >>> 2); } } finally { //不管无论最终将sc赋值为sizeCtl,这时候sizeCtl结果为12 sizeCtl = sc; } break; } } return tab; }

当数组初始化完成之后,就需要将key,value创建出来的Node节点放到数组中对应的位置了,分为几种情况,下面这种是原来某个位置就没有元素值,但是为了保证线程安全,放到多个线程同时添加,也使用CAS乐观锁的机制进行添加。

//根据(n-1)&hash的结果确认当前节点所在的位置是否有元素,效果和hash%n是一样的,只是&运算效率更高,这里hashmap也是这样做的,就不做更多赘述了
else if ((f = tabAt(tab, i = (n - 1) & hash)) == null) { //使用CAS乐观锁机制向对应的下标中添加对应的Node if (casTabAt(tab, i, null, new Node<K,V>(hash, key, value, null))) break; // no lock when adding to empty bin }
//f实际上是当前数组下标的Node节点,这里判断它的hash值是否为MOVED,也就是-1,如果是-1,就调用helpTransfer(tab,f)方法帮助其他线程完成扩容操作,然后再添加元素。 
else if ((fh = f.hash) == MOVED)
     tab = helpTransfer(tab, f);

接下来就要考虑数组具体下标位置有元素的情况,这时候就需要把Node节点向当前节点下进行顺延,形成链表或者红黑树的结构,还有一种情况就是key值相同,value值不能,这时候只需要进行一个value值的替换即可。

V oldVal = null;
//数组初始化和在数组下标中插入Node时,为了保证线程安全使用的是CAS无锁化机制
//那元素继续往下插入时,线程安全的问题怎么保证呢?可以使用synchronized关键字
//发现同步代码块中锁的对象是f,也就是当前数组下标的元素,这样不同的数组下标之间彼此互相不影响。
synchronized (f) {
    //再次确认当前头结点是否为f if (tabAt(tab, i) == f) { if (fh >= 0) { binCount = 1; for (Node<K,V> e = f;; ++binCount) { K ek; //第一种情况,发现是key值相同,只需要替换掉oldValue即可 if (e.hash == hash && ((ek = e.key) == key || (ek != null && key.equals(ek)))) { oldVal = e.val; if (!onlyIfAbsent) e.val = value; break; } //第二种情况,按照链表的方式进行插入 Node<K,V> pred = e; if ((e = e.next) == null) { pred.next = new Node<K,V>(hash, key, value, null); break; } } } //第三种情况,按照红黑树的方式进行插入 else if (f instanceof TreeBin) { Node<K,V> p; binCount = 2; if ((p = ((TreeBin<K,V>)f).putTreeVal(hash, key, value)) != null) { oldVal = p.val; if (!onlyIfAbsent) p.val = value; } } } } if (binCount != 0) { //一般链表转红黑树是节点数>8的时候,但不是一旦某个数组下标的节点数大于8就转成红黑树,也可以通过调整数组的容量来解决,比如treeifyBin中进行的 if (binCount >= TREEIFY_THRESHOLD) treeifyBin(tab, i); //说明上面有需要替换掉旧值的节点 if (oldVal != null) return oldVal; break; }

当添加完一个key,value方式的Node之后,就需要检查是否整个数据结构中的节点数超过扩容标准比如12,如果超过了就需要进行数组大小的扩容,先调用addCount()方法,因为第二个参数check大于0,所以直接看里面这段代码。

if (check >= 0) {
    Node<K,V>[] tab, nt; int n, sc;
    while (s >= (long)(sc = sizeCtl) && (tab = table) != null && (n = tab.length) < MAXIMUM_CAPACITY) { //通过resizeStamp(n),n是数组大小,得到一个int的结果,赋值给rs保存 int rs = resizeStamp(n); if (sc < 0) { if ((sc >>> RESIZE_STAMP_SHIFT) != rs || sc == rs + 1 || sc == rs + MAX_RESIZERS || (nt = nextTable) == null || transferIndex <= 0) break; if (U.compareAndSwapInt(this, SIZECTL, sc, sc + 1)) transfer(tab, nt); } //因为sc<0不成立,所以会来到这段代码 //这里通过CAS的方式比较SIZECTL和sc的值,当两者相等时,会执行rs<<RESIZE<STAMP_SHIFT+2赋值操作,这个结果值是一个负数,表示当前正在执行扩容操作的线程数量 else if (U.compareAndSwapInt(this, SIZECTL, sc, (rs << RESIZE_STAMP_SHIFT) + 2)) //调用transfer方法进行真正的扩容操作 transfer(tab, null); s = sumCount(); } }



02 扩容操作tranfer()

在concurrenthashmap中的扩容操作可能不止一个线程,所以每个线程就需要分工合作完成扩容,也就是每个线程需要领取自己负责的task,当然前提是得要有一个新的数组,这样才能将老数组中的Node节点搬移到新数组中。

int n = tab.length, stride;
//确定线程负责数组大小的范围
if ((stride = (NCPU > 1) ? (n >>> 3) / NCPU : n) < MIN_TRANSFER_STRIDE) stride = MIN_TRANSFER_STRIDE; // subdivide range //判断新的数组是否为null,为空则进行创建,比如数组原来的大小是16,2的N次幂,扩容也需要双倍扩容 if (nextTab == null) { // initiating try { @SuppressWarnings("unchecked") //采用位移运算进行双倍扩容 Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n << 1]; nextTab = nt; } catch (Throwable ex) { // try to cope with OOME sizeCtl = Integer.MAX_VALUE; return; } nextTable = nextTab; //使用transferIndex变量记录数组的大小,表示线程进行扩容的时候,是从后往前进行的 transferIndex = n; }

接下来就要进行搬移工作了,我们需要用一些标识记录一下搬移的完成状态,同时线程将某个数组下标的节点搬移完成之后也要让别人知道,同时也能知道有线程正在进行扩容操作。


int nextn = nextTab.length;
//某个下标节点完成之后的节点类型,实际上就是继承了Node节点,只不过点进去发现它的hash值为MOVED也就是-1
ForwardingNode<K,V> fwd = new ForwardingNode<K,V>(nextTab);
boolean advance = true; boolean finishing = false; // to ensure sweep before committing nextTab
Node<K,V> f; int fh;
//i指向当前数组的下标,通过while循环遍历--i,从而知道当前线程拿到的一个区间范围
while (advance) {
    int nextIndex, nextBound;
    //一个数组下标一个数组下标的处理 if (--i >= bound || finishing) advance = false; //表示已经没有需要搬运的节点了,将advance赋值为false else if ((nextIndex = transferIndex) <= 0) { i = -1; advance = false; } //不同的线程搬运的内容,不断地将transferindex的值变小 else if (U.compareAndSwapInt (this, TRANSFERINDEX, nextIndex, nextBound = (nextIndex > stride ? nextIndex - stride : 0))) { bound = nextBound; i = nextIndex - 1; advance = false; } }
 if (i < 0 || i >= n || i + n >= nextn) {
     int sc;
     //finishing等于true就表示所有的线程都搬运完了,做最后的收尾工作
     //比如将新数组的内容赋值到table,扩容标准由原来的12变成24 if (finishing) { nextTable = null; table = nextTab; sizeCtl = (n << 1) - (n >>> 1); return; } //这里是每次有一个线程完成搬运工作,就将线程总数量-1 if (U.compareAndSwapInt(this, SIZECTL, sc = sizeCtl, sc - 1)) { if ((sc - 2) != resizeStamp(n) << RESIZE_STAMP_SHIFT) return; finishing = advance = true; i = n; // recheck before commit } }
//如果某个线程的某个数组下标搬运完成,则将该头节点赋值为fwd类型的,其实就是hash值为MOVED
else if ((f = tabAt(tab, i)) == null) advance = casTabAt(tab, i, null, fwd); //表示已经搬运完成 else if ((fh = f.hash) == MOVED) advance = true; // already processed

接下来就是每个线程真正在搬运代码的过程,其实这块和hashmap1.8中的resize后面的过程很类似

 synchronized (f) {
     //再次检查当前数组下标的节点是否为f
     if (tabAt(tab, i) == f) {
         Node<K,V> ln, hn;
         if (fh >= 0) { int runBit = fh & n; Node<K,V> lastRun = f; for (Node<K,V> p = f.next; p != null; p = p.next) { //新节点的位置要么在原来的位置,要么在原来的位置+原来数组的大小,这点和hashmap中一样 //p.hash&n 也就是判断这个结果是否等于0 int b = p.hash & n; if (b != runBit) { runBit = b; lastRun = p; } } //等于0会走这边 if (runBit == 0) { ln = lastRun; hn = null; } //不等于0会走这边 else { hn = lastRun; ln = null; } for (Node<K,V> p = f; p != lastRun; p = p.next) { int ph = p.hash; K pk = p.key; V pv = p.val; if ((ph & n) == 0) ln = new Node<K,V>(ph, pk, pv, ln); else hn = new Node<K,V>(ph, pk, pv, hn); } //将链表整体迁移到nextTable中 setTabAt(nextTab, i, ln); setTabAt(nextTab, i + n, hn); //标识原桶标识位已经处理,头节点标记为fw,hash值为-1 setTabAt(tab, i, fwd); advance = true; } //这里是红黑树迁移的情况 else if (f instanceof TreeBin) { TreeBin<K,V> t = (TreeBin<K,V>)f; TreeNode<K,V> lo = null, loTail = null; TreeNode<K,V> hi = null, hiTail = null; int lc = 0, hc = 0; for (Node<K,V> e = t.first; e != null; e = e.next) { int h = e.hash; TreeNode<K,V> p = new TreeNode<K,V> (h, e.key, e.val, null, null); if ((h & n) == 0) { if ((p.prev = loTail) == null) lo = p; else loTail.next = p; loTail = p; ++lc; } else { if ((p.prev = hiTail) == null) hi = p; else hiTail.next = p; hiTail = p; ++hc; } } ln = (lc <= UNTREEIFY_THRESHOLD) ? untreeify(lo) : (hc != 0) ? new TreeBin<K,V>(lo) : t; hn = (hc <= UNTREEIFY_THRESHOLD) ? untreeify(hi) : (lc != 0) ? new TreeBin<K,V>(hi) : t; setTabAt(nextTab, i, ln); setTabAt(nextTab, i + n, hn); setTabAt(tab, i, fwd); advance = true; } } }



03 其他方式引起的扩容

链表转红黑树

前面说到,当链表长度超过8会转成红黑树,但是节点总数如果小于64,会用扩容的方式代替转红黑树,代码如下

private final void treeifyBin(Node<K,V>[] tab, int index) { Node<K,V> b; int n, sc; if (tab != null) { if ((n = tab.length) < MIN_TREEIFY_CAPACITY) //tryPresize进行扩容 tryPresize(n << 1); else if ((b = tabAt(tab, index)) != null && b.hash >= 0) { synchronized (b) { if (tabAt(tab, index) == b) { TreeNode<K,V> hd = null, tl = null; for (Node<K,V> e = b; e != null; e = e.next) { TreeNode<K,V> p = new TreeNode<K,V>(e.hash, e.key, e.val, null, null); if ((p.prev = tl) == null) hd = p; else tl.next = p; tl = p; } setTabAt(tab, index, new TreeBin<K,V>(hd)); } } } } }

点击tryPresize方法,最终也会来到下面这段代码,和前面addCount中的一样

else if (U.compareAndSwapInt(this, SIZECTL, sc,
                             (rs << RESIZE_STAMP_SHIFT) + 2))
    //注意这里的第二个参数为null,表示新的数组还没有创建,之前也是null transfer(tab, null);


 

当前线程协助其他线程

在之前put的时候,中间跳过了这段话,这段话是当前线程发现有其他线程正在进行扩容操作,协助其他线程扩容完成之后再继续put元素。

else if ((fh = f.hash) == MOVED)
              tab = helpTransfer(tab, f);
/**
  * Helps transfer if a resize is in progress.
  */
final Node<K,V>[] helpTransfer(Node<K,V>[] tab, Node<K,V> f) {
    Node<K,V>[] nextTab; int sc; if (tab != null && (f instanceof ForwardingNode) && (nextTab = ((ForwardingNode<K,V>)f).nextTable) != null) { int rs = resizeStamp(tab.length); while (nextTab == nextTable && table == tab && (sc = sizeCtl) < 0) { if ((sc >>> RESIZE_STAMP_SHIFT) != rs || sc == rs + 1 || sc == rs + MAX_RESIZERS || transferIndex <= 0) break; //每当来一个线程帮助扩容,此时就会sc+1,表示多了一个线程 //其实这块也能和transfer方法中的sc-1对应上,一个线程完成之后就数量-1 if (U.compareAndSwapInt(this, SIZECTL, sc, sc + 1)) { //扩容的方法,注意第二个参数有传入nextTab,原因是当前线程只是协助其他线程扩容 //既然其他线程正在扩容,说明这个新数组已经创建好了 transfer(tab, nextTab); break; } } return nextTab; } return table; }


总结

到目前为止,我们分析了put过程中会遇到线程安全的点,比如数组初始化,数组头元素添加,put完成过程等。同时还分析了transfer扩容每个线程领取的任务,搬运结果的方式,协助扩容等方面的内容。如果对大家有帮助,请帮忙转发。

ConcurrentHashMap 1.8 源码分析

ConcurrentHashMap 1.8 源码分析

         前两已经分析过几篇关于CHM的源码,本篇分析下1.8中的实现,已经弃用之前 segment 双桶的机制。但是本质还是将锁细化达到性能的提升,但是不是之前版本中定义的segment 上锁,而是用了synchronized关键字锁住 table (1.8中只维护了和hashmap中类似的数据结构。一个数据,数组内的结构是链表或者红黑树)中本次所操作对象的链表头。同样大量使用了UnSafe的 本地方法。如CAS,putOr并且最大的改进在于实现了并发的扩容。同时内部数据结构与Java8中HashMap一样,增加了红黑树。当链表中的长度大于一定值后,转化为红黑树(红黑树结构的同时也仍然保持了链表的结构,下面会详细介绍)。

其实对于1.8的源码分析 我所引用 的两篇分章已经分析的十分详细了,本篇本章就不对每个方法或参数在详细说明了,只对其中我认为难以理解的对方加以概括,重点对如何实现并发扩容进行了分析。如有不对的地方,欢迎大家一起来交流。

 

       先简单分析下put操作。我们可以猜想,在之前的实现中,首先定位segment,上锁。而后操作在对segment中的map进一步处理。现在的实现中并没有用segment,而是延用 hashMap中单桶,定位到链表后,直接 上锁,而后对链表就行操作,同样是将锁细化。简化了很多的计算操作。 从put的源码可以看出总体与hashmap的put操作相差不大,除了加锁,另外就是增加了判断当前结点是否是ForwardingNode。表示当前正在扩容,且该结点已经扩容完毕。而后通过helpTransfer 判断是否参与扩容的过程。

 

红黑树

与1.8中的HashMap一样,当链表长度超过一定长度时,会转换成红黑树。但是 两者之间有一点细微的区别。如下代码,表示HashMap的树节点的数据结构。

HashMap
/**
 * Entry for Tree bins. Extends LinkedHashMap.Entry (which in turn
 * extends Node) so can be used as extension of either regular or
 * linked node.
 */   也是继承于HashMap中的Node,可以将该节点做为数组中的链表节点。
static final class TreeNode<K,V> extends LinkedHashMap.Entry<K,V> {
    TreeNode<K,V> parent;  // red-black tree links
    TreeNode<K,V> left;
    TreeNode<K,V> right;
    TreeNode<K,V> prev;    // needed to unlink next upon deletion
    boolean red;   。。。}

HashMap中TreeNode的实现了红黑树功能的方法。

ConcurrentHashMap中的TreeNode相对来说很简单,只定义了基本的数据结构和一定查找的方法。如下

/**
 * Nodes for use in TreeBins
 */
ConcurrentHashMap  树的成员与上面类似,但是
static final class TreeNode<K,V> extends Node<K,V> {
    TreeNode<K,V> parent;  // red-black tree links
    TreeNode<K,V> left;
    TreeNode<K,V> right;
    TreeNode<K,V> prev;    // needed to unlink next upon deletion
    boolean red;
  }

并没有实现红黑树应实现的方法。但是两者有一个共通点,就是与单纯的红黑树的数据结构多了一个变量,就是prev,增加这个变量的意义在于删除红黑树的node时,需要找到被删node的上一个结点,因为如果只是单向链表与红黑树的结构。删除红黑树的结点时,单向链表就不能维护了,因为找不到被册结点的上一个结点。因此这里增加了一个prev的结构,形成双向链表。

而这样做是其增加了一个TreeBin来包装TreeNode,而这个容器不直接保存用户的key,value信息。hash值为定值-2,在遍历时可通过hash值判断是当前Node是哪种结构。-1表示正在ForwardingNode,-2为TreeBin,大于0为链表。

ConcurrentHashMap  
/**
 * TreeNodes used at the heads of bins. TreeBins do not hold user
 * keys or values, but instead point to list of TreeNodes and
 * their root. They also maintain a parasitic read-write lock
 * forcing writers (who hold bin lock) to wait for readers (who do
 * not) to complete before tree restructuring operations.
 */
static final class TreeBin<K,V> extends Node<K,V> {
    TreeNode<K,V> root;     //红黑树的根结点。
    volatile TreeNode<K,V> first;   // 指向链表的头部,虽然为红黑树,但保持了链表的结构。在unTree化时简化。
    volatile Thread waiter;
    volatile int lockState; 。。。}

 

 

接下来介绍下由链表是怎么转化为红黑树的。首先通过如下的方法将原来的Node转换成TreeNode,从且从单身链表转成双向链表。此时还没有发生红黑树的操作。

/* ---------------- Conversion from/to TreeBins -------------- */

/**
 * Replaces all linked nodes in bin at given index unless table is
 * too small, in which case resizes instead.
 */
private final void treeifyBin(Node<K,V>[] tab, int index) {
    Node<K,V> b; int n, sc;
    if (tab != null) {
        if ((n = tab.length) < MIN_TREEIFY_CAPACITY)
            tryPresize(n << 1);
        else if ((b = tabAt(tab, index)) != null && b.hash >= 0) {
            synchronized (b) {
                if (tabAt(tab, index) == b) {
                    TreeNode<K,V> hd = null, tl = null;
                    for (Node<K,V> e = b; e != null; e = e.next) {
                        TreeNode<K,V> p =
                            new TreeNode<K,V>(e.hash, e.key, e.val,
                                              null, null);
                        if ((p.prev = tl) == null)
                            hd = p;
                        else
                            tl.next = p;
                        tl = p;
                    }
                    setTabAt(tab, index, new TreeBin<K,V>(hd));
                }
            }
        }
    }
}
setTabAt(tab, index, new TreeBin<K,V>(hd));

然后new封装转换的TreeNode,在构造 方法里转换成红黑树。hd为原链表的头部。现在做为红黑树的first。在构造方法里执行的过程就和hashMap中转红黑树的过程类似 ,根据key的 systemCode的大小,决定在树中的位置,形成一个颗二叉树。最后通过r = balanceInsertion(r, x);  由二叉树转为红黑树。r为root指针。

而在HashMap中就没有first指针的概念,虽然其内部同样还是个双向链表(通过prev实现)。它是在转换成红黑树之后 ,通过将红黑树的root结点做为first node。

//   HashMap 中treeify后执行的方法,将root做为链表的first node
/**
 * Ensures that the given root is the first node of its bin.
 */
static <K,V> void moveRootToFront(Node<K,V>[] tab, TreeNode<K,V> root) {
    int n;
    if (root != null && tab != null && (n = tab.length) > 0) {
        int index = (n - 1) & root.hash;
        TreeNode<K,V> first = (TreeNode<K,V>)tab[index];
        if (root != first) {
            Node<K,V> rn;
            tab[index] = root;
            TreeNode<K,V> rp = root.prev;
            if ((rn = root.next) != null)
                ((TreeNode<K,V>)rn).prev = rp;
            if (rp != null)
                rp.next = rn;
            if (first != null)
                first.prev = root;
            root.next = first;
            root.prev = null;
        }
        assert checkInvariants(root);
    }
}

UNSAFE   

如下,这三个操作是实现并发访问的关键。通过UNSAFE的本地方法,1.7中就已经引入过,而在1.8中继续扮演着重要的角色。并且大量使用了CAS的操作。

关于  UNSAFE方法  在之前1.7就已经分析过
@SuppressWarnings("unchecked")  //定位tab中index为i的 Node
static final <K,V> Node<K,V> tabAt(Node<K,V>[] tab, int i) {
    return (Node<K,V>)U.getObjectVolatile(tab, ((long)i << ASHIFT) + ABASE);
}
//CAS设置tab中index为i的结点为node
static final <K,V> boolean casTabAt(Node<K,V>[] tab, int i,
                                    Node<K,V> c, Node<K,V> v) {
    return U.compareAndSwapObject(tab, ((long)i << ASHIFT) + ABASE, c, v);
}
//Volatile  写 tab中的 Node。在加锁的情况下调用 
static final <K,V> void setTabAt(Node<K,V>[] tab, int i, Node<K,V> v) {
    U.putObjectVolatile(tab, ((long)i << ASHIFT) + ABASE, v);
}

并发扩容

扩容这一操作相比于之前的实现,1.8中最大的不同在于实现了并发扩容。其核心 是通过了一个特殊的Node。其定义如下 如果遍历时发现node的 hash值是 -1,表示当前正在扩容。且当前table中的该node已经容完成。遍历下一个node进行扩容。

/**
 * A node inserted at head of bins during transfer operations.
 */
static final class ForwardingNode<K,V> extends Node<K,V> {
    final Node<K,V>[] nextTable;
    ForwardingNode(Node<K,V>[] tab) {
        super(MOVED, null, null, null);
        this.nextTable = tab;
    }
    Node<K,V> find(int h, Object k) {
       。。。。
    }
}

而其并发到底又是怎么实现的呢 ? 接下来看,详细代码就不贴了,这里分析关键地方,

private final void transfer(Node<K,V>[] tab, Node<K,V>[] nextTab) {
    int n = tab.length, stride;
    if ((stride = (NCPU > 1) ? (n >>> 3) / NCPU : n) < MIN_TRANSFER_STRIDE)
        stride = MIN_TRANSFER_STRIDE; // subdivide range   
。。。。。。。
}

stride 非常关键,大致的意思 是每个处理器处理的node个数不小于16。而这一步的实现在下面的方法 。

i=0开始,第一步CAS  设置  transferIndex,该值初始为原表的大小,假设n为64,第一个线程将其CAS设为64-16等于48.则第一个线程就先执行64到48之前的node的扩容操作,如果在此期间第二个线程要执行到transfer方法,则transferIndex为 32.其执行48到32之间。但是如果没有其它线程进来执行,则它们就接着往下,争取下一个stride数量的扩容操作。但是当最后transferIndex的值小于等于0时。表示此时已经不需要参与扩容了。如此通过CAS设置transferIndex的值,解决并发的冲突。同时每个node扩容时,要上锁。以防其它操作改变该链表的结构。这样就可以有多个线程并发扩容,且不会产生冲突。

Node<K,V> f; int fh;
while (advance) {
    int nextIndex, nextBound;
    if (--i >= bound || finishing)
        advance = false;
    else if ((nextIndex = transferIndex) <= 0) {
        i = -1;
        advance = false;
    }
    else if (U.compareAndSwapInt
             (this, TRANSFERINDEX, nextIndex,
              nextBound = (nextIndex > stride ?
                           nextIndex - stride : 0))) {
        bound = nextBound;
        i = nextIndex - 1;
        advance = false;
    }
}
if (i < 0 || i >= n || i + n >= nextn) {
    int sc;
    if (finishing) {
        nextTable = null;
        table = nextTab;
        sizeCtl = (n << 1) - (n >>> 1);
        return;
    }
    if (U.compareAndSwapInt(this, SIZECTL, sc = sizeCtl, sc - 1)) {
        if ((sc - 2) != resizeStamp(n) << RESIZE_STAMP_SHIFT)
            return;
        finishing = advance = true;
        i = n; // recheck before commit
    }
}

如下是在transfer过程中,原node是链表的情况下扩容过程。

if (tabAt(tab, i) == f) {
    Node<K,V> ln, hn;
    if (fh >= 0) {
        int runBit = fh & n;
        Node<K,V> lastRun = f;
        for (Node<K,V> p = f.next; p != null; p = p.next) {
            int b = p.hash & n;
            if (b != runBit) {
                runBit = b;
                lastRun = p;
            }
        }
        if (runBit == 0) {
            ln = lastRun;
            hn = null;
        }
        else {
            hn = lastRun;
            ln = null;
        }
        for (Node<K,V> p = f; p != lastRun; p = p.next) {
            int ph = p.hash; K pk = p.key; V pv = p.val;
            if ((ph & n) == 0)
                ln = new Node<K,V>(ph, pk, pv, ln);
            else
                hn = new Node<K,V>(ph, pk, pv, hn);
        }
        setTabAt(nextTab, i, ln);
        setTabAt(nextTab, i + n, hn);
        setTabAt(tab, i, fwd);
        advance = true;
    }

下图表示遍历到第i个结点,上锁后,判断是链表后,首先是找到lastRun,其表示尾部  hash&n  相同的排在最前面的node。1表示扩容后在高位hn,0表示在低位ln(也就是原来的位置)。而之所以 用hash&n 的 表达式来决定原node在扩容后的位置,而没有通过hash计算新数组的大小 来决定 位置 。这个实现十分的巧妙,直接将hash值与原表的大小& 一下。 这样的做的原因是因为, put操作时决定结点所在位置时通过 hash& n-1。而n我们这里定义的是2的x次方。n-1显然就是 11..11的结点,而n就是 1..00。举个例子n为16,则n-1为 1111。而n为10000。对应扩容后的大小为100000,原结点在新数组的位置就是hash& 11111。现在我们发现 原之前的差别就在于 原数组的&操作。也就是 11111中第一个1。所以扩容时,不需要重新计算。而只需要将原来的hash值与n进行&操作(这样的操作简化了计算的复杂度)。就可以确定在新table的位置。0表示原索引,而1表示 i+n的位置。且不需要考虑并发的问题,nextTable的  i和 i+n的位置 只会由原table的i中的node给重新占据,而我们一开始就对table的 node  i已经上锁了。所以是安全的。

上面分析了链表的反序处理。红黑树类似,但是需要判读是否unTree。

总结:进一步将锁细化,不在是设置的并发级别,随着扩容之后 ,锁粒度进一步细化,并且提供了并发扩容,且大量使用了UNSAFE的本地方法,性能也进一步提升。
 

本文没有对CHM的所有操作进行分析 ,如get,size,remove等,在下面引用的两篇文章已经很详细的分析,上文只是对部门关键点表明了我的一点点的看法。如有不正之处希望大家指出来。本文后面接着在完善。

 

 

 

 

http://www.importnew.com/22007.html

http://blog.csdn.net/u010723709/article/details/48007881

ConcurrentHashMap JDK 1.6 源码分析

ConcurrentHashMap JDK 1.6 源码分析

前言

前段时间把 JDK 1.6 中的 HashMap 主要的一些操作源码分析了一次。既然把 HashMap 源码分析了, 就顺便把 JDK 1.6 中 ConcurrentHashMap 的主要一些操作源码分析一下。因为其中有很多思想是值得我们去借鉴的。 ConcurrentHashMap 中的分段锁。这个思想在 JDK 1.8 中 为了优化 JUC 下的原子锁 CAS 高并发情况下导致自旋次数太多效率低下。引用 Adder 。其中就是借鉴了分段锁的思想。AtomicLong 对比 LongAdder。 有兴趣可以查看。

准备

如果有人问你了解 ConcurrentHashMap 吗? 你可以这样回答,了解。 ConcurrentHashMap 是为了 取代 HashMap非线程安全的,一种线程安全实现类。它有一个 Segment 数组,Segment 本身就是相当于一个 HashMap 对象。里面是一个 HashEntry 数组,数组中的每一个 HashEntry 都是一个键值对,也是一个链表的表头。如果别人问你,那 ConcurrentHashMap get 或者 put 一个对象的时候是怎么操作的 ,你该怎么回答。emmm..... 继续往下看。会有你要的答案。

构造函数

分析源码,先从构造函数开始。直接研究带所有参数的构造方法,其他一些重载的构造方法,最里面还是调用了该构造方法。在看构造方法之前,需要 明白 sshift 是表示并发数的2的几次方 比如并发数是16 那么他的值就是 4 。ssize 是 segment 数组的大小。

 public ConcurrentHashMap(int initialCapacity,
                             float loadFactor, int concurrencyLevel) {
        if (!(loadFactor > 0) || initialCapacity < 0 || concurrencyLevel <= 0)
            throw new IllegalArgumentException();

        if (concurrencyLevel > MAX_SEGMENTS)
            concurrencyLevel = MAX_SEGMENTS;

        // Find power-of-two sizes best matching arguments
        int sshift = 0;
        int ssize = 1;
        while (ssize < concurrencyLevel) {
            ++sshift;
            ssize <<= 1;
        }
        // 用来与 key的hashCode >>> 运算 获取HashCode的高位
        segmentShift = 32 - sshift;
        // 高位与 它做与运算 eg 假如 默认的创建该对象 那么 segmentShift = 28  segmentMask=15(二进制为1111) 假如现在put一个值 他的key的HashCode值为2的32次方 那么 他在segment里面的定位时 2的32次方 无符号 高位补零 右移28个 那么就等于 10000(二进制) 等于 16  与 1111 做与运算 等于0 也就是定位在 segment[0]上 。
        segmentMask = ssize - 1;


        // segment数组大小为 16 
        this.segments = Segment.newArray(ssize);

        if (initialCapacity > MAXIMUM_CAPACITY)
            initialCapacity = MAXIMUM_CAPACITY;
        int c = initialCapacity / ssize;
        if (c * ssize < initialCapacity)
            ++c;
        // segment数组中 每个HashEntry数组的大小,
        int cap = 1;
        while (cap < c)
            cap <<= 1;
        // 为segment数组中的每个HashEntry数组初始化大小,每个semengt中只有一个HashEntry数组。如果你设置的 ConcurrentHashMap 初始化大小为16的话,则 segment数组中每个的HashEntry的大小为1,如果你初始化他的大小为28 的话。它会根据上面的运算,cap的大小为2,也就是segment数组中的每个HashEntry数组的大小为2 ,总的大小为32。
        for (int i = 0; i < this.segments.length; ++i)
            this.segments[i] = new Segment<K,V>(cap, loadFactor);
    }

上面的注释应该都挺清楚了,要注意的是 ConcurrentHashMap的大小 是所有 Segment 数组中每个HashEntry数组的大小相加的和。

put 方法

ConcurrentHashMap 每次 put 的时候都是需要加锁的,只不过会锁住他所在的那个Segment数组位置。其他的不锁,这也就是分段锁,默认支持16个并发。说起put,以数组的形式存储的数据,就会涉及到扩容。这样是接下来需要好好讨论的一个事情。

 	public V put(K key, V value) {
 	// key value 不能为null
        if (value == null)
            throw new NullPointerException();
        // 获取hash值
        int hash = hash(key.hashCode());
        // 先获取hash二进制数的高位与15的二进制做与运算,得到segment数组的位置。
        return segmentFor(hash).put(key, hash, value, false);
    }

    V put(K key, int hash, V value, boolean onlyIfAbsent) {
    	// 锁住
            lock();
            try {
                int c = count;
                if (c++ > threshold) // ensure capacity
                // 扩容操作
                    rehash();
                    // 获取 Segment数组中的其中的HashEntry数组
                HashEntry<K,V>[] tab = table;
                // 获取在在HashEntry数组中的位置。
                int index = hash & (tab.length - 1);
                HashEntry<K,V> first = tab[index];
                HashEntry<K,V> e = first;
                // 判断是否是该key。
                while (e != null && (e.hash != hash || !key.equals(e.key)))
                    e = e.next;

                V oldValue;
                // 如果存在该key的数据 ,那么更新该值 返回旧值
                if (e != null) {
                    oldValue = e.value;
                    if (!onlyIfAbsent)
                        e.value = value;
                }
                else {
                    oldValue = null;
                    ++modCount;
                    //头插法插入 tab[index] 
                    tab[index] = new HashEntry<K,V>(key, hash, first, value);
                    count = c; // write-volatile
                }
                return oldValue;
            } finally {
                unlock();
            }
        }

        // 看下扩容操作的细节
          void rehash() {
            HashEntry<K,V>[] oldTable = table;
            int oldCapacity = oldTable.length;
            if (oldCapacity >= MAXIMUM_CAPACITY)
                return;

            // HashEntry数组,新的数组为它的两倍
            HashEntry<K,V>[] newTable = HashEntry.newArray(oldCapacity<<1);
            // 阈值
            threshold = (int)(newTable.length * loadFactor);
            //他的二进制添加以为 原来他的大小为3 那么二进制就是11 现在就为 7 二进制为 111
            int sizeMask = newTable.length - 1;
            for (int i = 0; i < oldCapacity ; i++) {
               
               	// 旧的HashEntry。
                HashEntry<K,V> e = oldTable[i];

                if (e != null) {
                	// 下一个 该HashEntry数组上的 HashEntry是否为链表,有下一个值。
                    HashEntry<K,V> next = e.next;
                    // 该HashEntry的新位置 如果高位为1 那么他在HashEntry数组中的位置就是老的HashEntry数组中的加上这个倍数。举个例子
                    // 假如e.hash 原来的的二进制位...111 老的HashEntry数组的大小为 4 那么e.hash和 4-1 也就是3 做与运算 得到的值也就是二进制的11
                   	// 值位3 现在新的HashEntry数组的大小为 8 也就是 e.hash 和 8-1 做与运算 得到的值 也就是二进制位 111 位 7 。
                    int idx = e.hash & sizeMask;

                    //  没有的话就直接放入该位置了,如果有的话往下看:
                    if (next == null)
                        newTable[idx] = e;

                    else {
                        HashEntry<K,V> lastRun = e;
                        // 假如idx 等于 7
                        int lastIdx = idx;
                        // 找到最后一个 HashEntry中的位置,并且后面的HashEntry的位置都是一样的。举个例子
                        // 假如这个链表中的所有HashEntry的Hash值为 1-5-1-5-5-5 。那么最后lastIdx = 5 也就是1-5-1后面的这个5 。lastRun 为 1-5-1后面的这个5的HashEnrty。
                        for (HashEntry<K,V> last = next;
                             last != null;
                             last = last.next) {
                            int k = last.hash & sizeMask;
                            if (k != lastIdx) {
                                lastIdx = k;
                                //     
                                lastRun = last;
                            }
                        }
                        // 将 lastRun 复制给 这个新的Table 那么后面还有 5-5-5 这些的就不用移动了 直接带过来了。 这就是上面那个for循环做的事情
                        newTable[lastIdx] = lastRun;

                        // 对前面的 1-5-1做操作了 1就是在新HashEntry书中的1的位置 5的后就是头插法 ,查到新HashEntry的头部了
                        for (HashEntry<K,V> p = e; p != lastRun; p = p.next) {
                            int k = p.hash & sizeMask;
                            HashEntry<K,V> n = newTable[k];
                            newTable[k] = new HashEntry<K,V>(p.key, p.hash,
                                                             n, p.value);
                        }
                    }
                }
            }
            table = newTable;
        }

其实put 方法中有点难理解的就是 把查找到后面如果有所有相同的 HashEntry key 的位置是一样的话,就不用额外的进行Hash重新定位了。不知道我描述的清不清楚。如果还有不清楚的话,可以私信一下我。

get 方法

ConcurrentHashMapget 方法是不会加锁的,如果get的值为null的时候,这个时候会对这个HashEntry进行加锁。预防此时并发出现的问题。

    public V get(Object key) {
    	//定位Segment数组中的HashEntry数组为位置
        int hash = hash(key.hashCode());
        return segmentFor(hash).get(key, hash);
    }

     V get(Object key, int hash) {
     		// 曾经put进去过,也就是里面有值
            if (count != 0) { // read-volatile
            	// 定位HashEntry数组中的HashEntry。
                HashEntry<K,V> e = getFirst(hash);
                while (e != null) {
                    if (e.hash == hash && key.equals(e.key)) {
                        V v = e.value;
                        if (v != null)
                            return v;
                        return readValueUnderLock(e); // recheck
                    }
                    e = e.next;
                }
            }
            return null;
        }

ConcurrentHashMapget方法是比较简单的。看一看就知道了。

总结

这一遍ConcurrentHashMap源码分析,可以说是自己写了大半个月吧。好早之前就准备写了。总是写一点,然后就停笔了。加上自己换了公司的原因,又忙上了一段时间,导致一拖再落。哇,严重拖延症患者。上面自己也是全部透彻之后写下来的,如果有些表达不够清晰的还得多加包涵,如果有不同的可以下方浏览讨论一下。上面很多关键的代码我都写上了注释,可以配合着注释,然后自己对源码进行研究,查看,如果还有不是很透彻的话,自己多翻一翻其他人写的。最近一直在写LeetCode上的动态规划这些算法题。其实也就是抄一遍。等以后有了感悟再来写这一些吧。

原文出处:https://www.cnblogs.com/Krloypower/p/10795170.html

ConcurrentHashMap 与同步 HashMap - ConcurrentHashMap vs Synchronized HashMap

ConcurrentHashMap 与同步 HashMap - ConcurrentHashMap vs Synchronized HashMap

问题:

What is the difference between using the wrapper class, SynchronizedMap , on a HashMap and ConcurrentHashMap ?HashMapConcurrentHashMap 上使用包装类 SynchronizedMap 什么区别?

Is it just being able to modify the HashMap while iterating it ( ConcurrentHashMap )? 它是否只能在迭代时修改 HashMap ( ConcurrentHashMap )?


解决方案:

参考: https://stackoom.com/en/question/5Q44

ConcurrentHashMap 源码分析

ConcurrentHashMap 源码分析

1.ConcurrentHashMap 原理介绍 (JDK1.7)

我们知道 HashTable 在并发情况下效率低的原因是所有访问 HashTable 的线程都必须竞争同一把锁,这就导致一旦一个线程获取了锁,其余所有的线程都必须处于等待状态。而 ConcurrentHashMap 使用的是锁分段技术,就是将数据分为一段一段的存储,给每一段数据都配一把锁,当一个线程占用一把锁访问其中一段数据的时候,其他段的数据也能够被其他线程访问。ConcurrentHashMap 是由 segment 数组构成,segment 是一个可重入锁,它的结构和 HashMap 类似,底层是一个 HashEntry 数组,看如下图:

 

2. 构造函数

@SuppressWarnings("unchecked")
    public ConcurrentHashMap(int initialCapacity,
                             float loadFactor, int concurrencyLevel) {
        if (!(loadFactor > 0) || initialCapacity < 0 || concurrencyLevel <= 0)
            throw new IllegalArgumentException();
        if (concurrencyLevel > MAX_SEGMENTS)
            concurrencyLevel = MAX_SEGMENTS;
        // Find power-of-two sizes best matching arguments
        int sshift = 0;
        int ssize = 1;
        while (ssize < concurrencyLevel) {
            ++sshift;
            ssize <<= 1;
        }
        this.segmentShift = 32 - sshift;
        this.segmentMask = ssize - 1;
        if (initialCapacity > MAXIMUM_CAPACITY)
            initialCapacity = MAXIMUM_CAPACITY;
//initialCapacity是ConcurrentHashMap的初始化容量
int c = initialCapacity / ssize; if (c * ssize < initialCapacity) ++c;
// cap是每个segment的容量,也就是HashEntry数组的长度,它也是一个2的N次幂值
int cap = MIN_SEGMENT_TABLE_CAPACITY; while (cap < c) cap <<= 1; // create segments and segments[0]
// loadFactor是每个segment的负载因子,创建Segments数组,然后初始化每个Segment的值
Segment<K,V> s0 = new Segment<K,V>(loadFactor, (int)(cap * loadFactor), (HashEntry<K,V>[])new HashEntry[cap]); Segment<K,V>[] ss = (Segment<K,V>[])new Segment[ssize]; UNSAFE.putOrderedObject(ss, SBASE, s0); // ordered write of segments[0] this.segments = ss; }

通过以上代码可知:Segment 的数组长度 ssize 是由 concurrencyLevel 计算出来的,ssize 是一个大于等于 concurrencyLevel 的最小的 2 的 N 次幂值。sshift 等于 ssize 从 1 向左移位的次数,默认情况下会移动 4 次,即 sshift 等于 4。segmentShift 和 segmentMask 在定位 segment 的算法中有使用到。

3 方法

3.1 get 方法

public V get(Object key) {
        Segment<K,V> s; // manually integrate access methods to reduce overhead
        HashEntry<K,V>[] tab;
// 根据key的hash值h,以及之前得到的segmentShift,segmentMask等值得到u,从而定位到segment
int h = hash(key); long u = (((h >>> segmentShift) & segmentMask) << SSHIFT) + SBASE; if ((s = (Segment<K,V>)UNSAFE.getObjectVolatile(segments, u)) != null && (tab = s.table) != null) {
// 在定位到Segment后,然后定位到HashEntry数组中的位置,最后通过逐一比较,返回要找的值
for (HashEntry<K,V> e = (HashEntry<K,V>) UNSAFE.getObjectVolatile (tab, ((long)(((tab.length - 1) & h)) << TSHIFT) + TBASE); e != null; e = e.next) { K k; if ((k = e.key) == key || (e.hash == h && key.equals(k))) return e.value; } } return null; }

有两个问题值得思考:一是在上面代码中,使用了 UNSAFE.getObjectVolatile (segments, u) 来获取 segment,为什么不直接使用 segments [u] 来获取呢?我们知道每个线程都有自己的工作内存,里面存放着 segments 的副本,在并发环境下,并不能保证每次拿到的是 segments 中的最新元素。但是使用 UNSAFE.getObjectVolatile (segments, u) 可以直接获取指定内存的数据,保证拿到的数据是最新的。二是读操作为什么不需要加锁?get 方法的高效之处就是不加锁,我们知道 HashTable 读操作时需要加锁的,那么 ConcurrentHashMap 是怎么做到的呢?HashEntry 中的 value 值被定义为了 volatile 类型,这样 value 就能在多个线程间保持可见性,能够被多个线程读,而不会读到过期的值。一旦有线程修改 value 值,那么其他线程本地内存中的 value 值就会失效,从而保证去获取最新的 value 值,这是 volatile 代替锁的经典应用场景。

3.2 put 方法

@SuppressWarnings("unchecked")
    public V put(K key, V value) {
        Segment<K,V> s;
        if (value == null)
            throw new NullPointerException();
// 先根据key,segmentShift,segmentMask定位到segment
int hash = hash(key); int j = (hash >>> segmentShift) & segmentMask; if ((s = (Segment<K,V>)UNSAFE.getObject // nonvolatile; recheck (segments, (j << SSHIFT) + SBASE)) == null) // in ensureSegment s = ensureSegment(j);
// 然后,将key和value放到segment中 进入该方法
return s.put(key, hash, value, false); }
final V put(K key, int hash, V value, boolean onlyIfAbsent) {
// put操作需要加锁 HashEntry
<K,V> node = tryLock() ? null : scanAndLockForPut(key, hash, value); V oldValue; try { HashEntry<K,V>[] tab = table; int index = (tab.length - 1) & hash; HashEntry<K,V> first = entryAt(tab, index); for (HashEntry<K,V> e = first;;) { if (e != null) { K k; if ((k = e.key) == key || (e.hash == hash && key.equals(k))) { oldValue = e.value; if (!onlyIfAbsent) { e.value = value; ++modCount; } break; } e = e.next; } else { if (node != null) node.setNext(first); else node = new HashEntry<K,V>(hash, key, value, first); int c = count + 1;
// 满足条件,则进行扩容操作
if (c > threshold && tab.length < MAXIMUM_CAPACITY) rehash(node); else setEntryAt(tab, index, node); ++modCount; count = c; oldValue = null; break; } } } finally {
// 解锁 unlock(); }
return oldValue; }

在 put 方法中,需要注意的是扩容时,针对的是某个 segment,而不是对整个容器扩容。

3.3 size 方法

public int size() {
        // Try a few times to get accurate count. On failure due to
        // continuous async changes in table, resort to locking.
        final Segment<K,V>[] segments = this.segments;
        int size;
        boolean overflow; // true if size overflows 32 bits
        long sum;         // sum of modCounts
        long last = 0L;   // previous sum
        int retries = -1; // first iteration isn''t retry
        try {
            for (;;) {
// 在尝试三次后,则加锁
if (retries++ == RETRIES_BEFORE_LOCK) { for (int j = 0; j < segments.length; ++j) ensureSegment(j).lock(); // force creation } sum = 0L; size = 0; overflow = false; for (int j = 0; j < segments.length; ++j) { Segment<K,V> seg = segmentAt(segments, j); if (seg != null) { sum += seg.modCount; int c = seg.count; if (c < 0 || (size += c) < 0) overflow = true; } }
// 如果在累加count前后,modCount不变,则跳出循环
if (sum == last) break; last = sum; } } finally {
// 尝试三次后,需要解锁
if (retries > RETRIES_BEFORE_LOCK) { for (int j = 0; j < segments.length; ++j) segmentAt(segments, j).unlock(); } } return overflow ? Integer.MAX_VALUE : size; }

对于获取 ConcurrentHashMap 中元素的个数,就是把每个 segment 中 count 的个数相加即可,但是这里有个问题,在多线程环境下,在累加 count 的过程中,之前已经累加过的 count 可能会发生变化,使用加锁处理的话能解决这个问题,但是在累加 count 操作过程中,之前累加过的 count 发生变化的几率非常小,那么 ConcurrentHashMap 是这样解决的:先尝试三次不加锁的方式来累加 count,在累加结束前后比较 modCount 的值是否发生变化;如果发生变化,则再通过加锁的方式来统计累加 count。

最后关于 ConcurrentHashMap,还有个问题:它的迭代器是强一致性的迭代器还是弱一致性的迭代器?在 java.util 包中集合类都返回 fail-fast 迭代器,这就意味着,在集合迭代过程中,如果修改集合内容,则会抛出异常 ConcurrentModificationException,这是强一致性迭代器。但是在 java.util.concurrent 集合返回的迭代器,是弱一致性迭代器,在遍历过程中,如果已经遍历的集合上的内容变化了,迭代器不会抛出 ConcurrentModificationException 异常。如果未遍历的数组上的内容发生了变化,则有可能反映到迭代过程中。这就是 ConcurrentHashMap 迭代器弱一致的表现,这是为了提升效率,是一致性与效率之间的一种权衡。

今天关于ConcurrentHashMap1.8源码分析concurrenthashmap源码1.7的讲解已经结束,谢谢您的阅读,如果想了解更多关于ConcurrentHashMap 1.8 源码分析、ConcurrentHashMap JDK 1.6 源码分析、ConcurrentHashMap 与同步 HashMap - ConcurrentHashMap vs Synchronized HashMap、ConcurrentHashMap 源码分析的相关知识,请在本站搜索。

本文标签: