GVKun编程网logo

《Java并发编程的艺术》之ConcurrentLinkedQueue(java并发编程的艺术和并发编程实战)

18

在这篇文章中,我们将带领您了解《Java并发编程的艺术》之ConcurrentLinkedQueue的全貌,包括java并发编程的艺术和并发编程实战的相关情况。同时,我们还将为您介绍有关15.并发容器

在这篇文章中,我们将带领您了解《Java并发编程的艺术》之ConcurrentLinkedQueue的全貌,包括java并发编程的艺术和并发编程实战的相关情况。同时,我们还将为您介绍有关15. 并发容器之 ConcurrentLinkedQueue、<>有感 ConcurrentLinkedQueue解读、IDEA 调试 JAVA ConcurrentLinkedQueue、Java concurrency集合之ConcurrentLinkedQueue_动力节点Java学院整理的知识,以帮助您更好地理解这个主题。

本文目录一览:

《Java并发编程的艺术》之ConcurrentLinkedQueue(java并发编程的艺术和并发编程实战)

《Java并发编程的艺术》之ConcurrentLinkedQueue(java并发编程的艺术和并发编程实战)

队列这个数据结构已经很熟悉了,就不多介绍,主要还是根据代码理解Doug Lea大师的一些其他技巧。

入队

offer状态图 如图所示,很多人可能会很疑惑,为什么第一次入队后,TAIL没有指向Node2?答案是为了效率!Σ(っ °Д °;)っ 那这还能叫队列吗?当然,它依然符合先进先出(FIFO)的规则。只是TAIL变量不一定指向尾结点,那么来看看大师是怎么做的。

public boolean offer(E e) {
    checkNotNull(e);
    final Node<E> newNode = new Node<E>(e);
    for (Node<E> t = tail, p = t;;) {
        Node<E> q = p.next;
        if (q == null) {
            // p是tail结点
            if (p.casNext(null, newNode)) {
                // 判断p结点是否是尾结点
                // 这一步不执行的话,不会对整体流程造成影响,至多是多一次循
                // 环,相比CAS操作,更愿意多一次循环
                if (p != t) 
                    // 交换Tail结点,如果CAS更新失败表示已经有其他线程对其进行更新
                    casTail(t, newNode);
                return true;
            }
            // CAS竞争失败,重新循环,竞争失败后q一般不会为null,除非又发生了出队
        }
        // HEAD和TAIL都指向同一个结点
        // 一个线程执行入队,一个线程执行出队,假设前面都没有更新tail和head
        // 执行出队的线程更新HEAD并设置其为自引用
        // 那么就会发生这个条件想要的现象
        else if (p == q)
            // 如果tail发生了改变,那么就为p设置t,并重新寻找
            // 如果tail未发生改变,head发生了改变,保
            // 险方法就是重新从新head开始遍历
            // 注意: -----只要在读取前完成tail发生更新就行了-----
            p = (t != (t = tail)) ? t : head;
        else
            // p != t 表示p不是尾结点,发生的原因是 入队时没有更新尾结点
            // t != (t = tail) 更新tail,如果tail被其他线程修改,则返回true
            // 如果为true,重新将p设置为尾结点(此时尾结点已经更新了)
            // 如果为false,p = q,继续循环下去
            p = (p != t && t != (t = tail)) ? t : q;
    }
}

第一次入队:

  1. 初始状态下head和tail都指向同一个item为null的结点
  2. tail没有后继结点
  3. 尝试CAS为tail设置next结点
  4. p == t,没有更新tail变量,直接返回true 注意offer永远都返回true

第二次入队:

  1. 初始状态下head和tail都指向同一个item为null的结点,但是next指向Node2
  2. tail有后继结点
  3. p != q,进入下个if语句
  4. p == t 返回false,整个三目运算符返回false,p = t
  5. 此时p没有后继结点
  6. 尝试CAS为p设置next
  7. p != t,更新tail结点,直接返回true

第二次入队多线程版 线程A执行入队,线程B执行出队

初始状态图如下所示:

操作步骤如下表所示:

次序 线程A 线程B
1
2 node1.item == null,执行下个if语句
3 (q = node1.next) != null,执行下个if语句
4 p != q,执行后面的else语句
5 p = q
6 node2.item != null
7 p != h (p是node2)
8 (q = p.next) == null,故将p设置为头结点,即将node2作为头结点
9 将node1设置为自引用
10 q = p.next,即node1.next,因为自引用q == p 返回item
11 q != null
12 p == q
13 t != (t = tail),即此时tail是否发生改变:true -> p =tail;false -> p = head

在步骤13,如果有个线程C已经执行了入队且tail发生改变,那么p就直接紧跟着更新后的tail就行了;如果tail没更新,就要设置p = head,然后重新循环遍历。

出队

从图中可以看出每次出队会有两种可能:将首结点的item置空,不移除;或是将移除首结点。总结一下,就是每次出队更新HEAD结点时,当HEAD结点里有元素时,直接弹出HEAD结点内的元素,而不会直接更新HEAD结点。只有当HEAD结点里没有元素时,出队操作才会更新HEAD结点。这种做法是为了减少CAS更新HEAD结点的消耗,从而提高出队效率。

public E poll() {
    restartFromHead:
    for (;;) {
        // p变量可以理解为游标、当前处理的结点,用于遍历的
        // q变量可以理解为p变量的next
        for (Node<E> h = head, p = h, q;;) {
            E item = p.item;
            // ① 先判断当前结点的值不为null 且 CAS更新p变量的item值
            if (item != null && p.casItem(item, null)) {
                // ②更新成功后,判断当前结点是否是头结点
                // 这一步主要是为了节省CAS操作,因为少更新一次HEAD结点没什么影响
                if (p != h) 
                    // ③更新头结点
                    updateHead(h, ((q = p.next) != null) ? q : p);
                return item;
            }
            // ④ 获取当前结点的下一个结点,判断是否为null
            // 一般发生于只有一个结点的情况
            else if ((q = p.next) == null) {
                // ⑤ 将当前结点设置为自引用
                updateHead(h, p);
                return null;
            }
            // ⑥ 如果当前结点出现自引用
            // 一般发生在一个线程更新了头结点,让p结点自引用时,p才会等于q
            else if (p == q)
                // 重新获取一个头结点
                continue restartFromHead;
            else
                // p = p.next
                p = q;
        }
    }
}

poll的流程图

第一次出队:

  1. 初始状态所有item不为null,尝试更新结点的item(一般情况下都是成功的),更新成功后结点item为null
  2. 判断 p == h,不满足条件,直接返回item值

第二次出队:

  1. Node1.item为null,进入下个条件
  2. (q = p.next) != null,进入下个条件
  3. p != q,进入下个条件
  4. p = q,重新循环
  5. Node2.item不为null,尝试更新结点的item,更新成功后结点item为null
  6. 判断 p != h,满足条件
  7. 判断p结点(Node2)是否有后继结点,如果有就将后继结点作为HEAD,否则将P作为HEAD
  8. 返回item值

第二次出队多线程版1: 线程A和线程B同时执行poll()方法,假设线程A稍快

线程A 线程B
A1. Node1.item为null B1. Node1.item为null
A2. (q = p.next) != null B2. (q = p.next) != null
A3. p != q B3. p != q
A4. p = q,循环 B4. p = q,循环
A5. Node2.item不为null,尝试更新结点item,更新成功,item值为null B5.
A6. 满足条件 p != h B6. Node2.item为null
A7. 判断p结点(Node2)是否有后继结点 B7. (q = p.next) != null
A8. B8. p != q
A9. B9. p = q 循环
A10. B10. Node3.item不为null,尝试更新结点item,更新成功,item值为null
A11. 将后继结点即Node3设置为HEAD B11.
A12. 返回item B12. 满足条件 p != h
A13. B13. 判断p结点(Node2)是否有后继结点
A14. B14. 由于HEAD已经被修改,所以CAS更新失败
A15. B15. 返回item

这里主要是想讲即便HEAD更新发生冲突,有一次没有更新,也不会影响整体的流程,大不了下次出队的时候多出队一个。

第二次出队多线程版2: 线程A和线程B同时执行poll()方法,假设线程A稍快

线程A 线程B
A1. Node1.item为null B1. Node1.item为null
A2. (q = p.next) != null B2. (q = p.next) != null
A3. p != q B3. p != q
A4. p = q,循环 B4. p = q,循环
A5. Node2.item不为null,尝试更新结点item,更新成功,item值为null B5.
A6. 满足条件 p != h B6. Node2.item为null
A7. 判断p结点(Node2)是否有后继结点 B7.
A8. 将后继结点即Node3设置为HEAD B8.
A9. 返回item B9. (q = p.next) != null
A10. B10. p == q
A11. B11. 重新获取HEAD并执行遍历

这个例子主要表达了当A线程先修改了首结点,并将原来的首结点设置为自引用时,B线程在循环过程中会执行到一条语句(q = p.next),然后在下一个条件语句中进入continue restartFromHead,重新获取HEAD变量并遍历

总结

ConcurrentLinkedQueue主要内容都已经学习过了,其中分析的过程花费了一个早上,吃完饭回来坐下才有了一些思路。学习的难点主要还是在它不同于普通的队列,它的tail和head变量不会时刻指向头结点和尾结点,这也造就了代码的复杂性,否则如下所示即可:

public boolean offer(E item){
    checkNotNull(item);
    for(;;){
        Node<T> node = new Node(item);
        if(tail.casNext(null, node) && casTail(tail, node)){
            return true;
        }
    }
}

但是这样和上面的例子比起来,就有性能的差距,差距主要体现在CAS写竞争方面:

最悲观的角度,ConcurrentLinkedQueue的offer方法需要执行两次 CAS (假设不发生竞争,其实我觉得不会有CAS竞争发生),上面的通用代码方法也需要执行两次,这里持平。 最乐观的角度,ConcurrentLinkedQueue只需要执行一次CAS,上面的通用方法仍需要两次。 原本是参考《Java并发编程的艺术》,但是里面的实现和现在不同了,所以根据现在的实际情况写了一份。当然,里面的主线思路仍然没有发生改变——尽量减少CAS操作。书上的代码是通过hops变量来控制多久需要更新一次值,大致思路如下所示:

遍历前,hops = 0
HEAD---
      |
    Node1 -> Node2 -> Node3 -> Node4
      |
TAIL---

假设现在要插入Node5,就要从TAIL变量位置(Node1位置)开始往后遍历,总共要循环三次才能找到最后一个尾结点,此时计数器hops就等于3,当Node5插入成功后,判断hops的值是否达到阈值,如果达到了,就更新tail变量;反之则不更新,直接返回。

遍历完后,hops = 3,达到阈值(假设达到了),将tail变量更新给Node5
HEAD---
      |
    Node1 -> Node2 -> Node3 -> Node4 -> Node5
                                          |
                                    TAIL---

ConcurrentLinkedQueue初看以为很简单,其实逻辑还是挺复杂的,拓展了对队列的看法。今天在写这篇博客时,感觉一头雾水,因为CAS操作不像锁那样简单,代码块锁住就能放心执行,CAS只对单个操作保证可见性和原子性,很担心后面的线程会对其进行什么修改,今天过后总结了一下写并发容器的思路:

  1. 在了解某个方法的实现时,需要分清局部变量和共享变量,在理清了局部变量的含义后,将重点放在共享变量上
  2. 如果方法里某个语句没看懂(一头雾水型,突然就来了这么一句),请往多线程方向思考。
  3. 多对方法进行多线程分析(有助于理清思路),用md表格的例子写出来,很清晰,如果感觉md表格难以阅读,可以看这个网站 MD表格生成器

15. 并发容器之 ConcurrentLinkedQueue

15. 并发容器之 ConcurrentLinkedQueue

OSC 请你来轰趴啦!1028 苏州源创会,一起寻宝 AI 时代

1.ConcurrentLinkedQueue 简介

在单线程编程中我们会经常用到一些集合类,比如 ArrayList,HashMap 等,但是这些类都不是线程安全的类。在面试中也经常会有一些考点,比如 ArrayList 不是线程安全的,Vector 是线程安全。而保障 Vector 线程安全的方式,是非常粗暴的在方法上用 synchronized 独占锁,将多线程执行变成串行化。要想将 ArrayList 变成线程安全的也可以使用 Collections.synchronizedList(List<T> list) 方法 ArrayList 转换成线程安全的,但这种转换方式依然是通过 synchronized 修饰方法实现的,很显然这不是一种高效的方式,同时,队列也是我们常用的一种数据结构,为了解决线程安全的问题,Doug Lea 大师为我们准备了 ConcurrentLinkedQueue 这个线程安全的队列。从类名就可以看的出来实现队列的数据结构是链式。

1.1 Node

要想先学习 ConcurrentLinkedQueue 自然而然得先从它的节点类看起,明白它的底层数据结构。Node 类的源码为:

private static class Node<E> {
        volatile E item;
        volatile Node<E> next;
		.......
}

Node 节点主要包含了两个域:一个是数据域 item,另一个是 next 指针,用于指向下一个节点从而构成链式队列。并且都是用 volatile 进行修饰的,以保证内存可见性(关于 volatile 可以看这篇文章)。另外 ConcurrentLinkedQueue 含有这样两个成员变量:

private transient volatile Node<E> head;
private transient volatile Node<E> tail;

说明 ConcurrentLinkedQueue 通过持有头尾指针进行管理队列。当我们调用无参构造器时,其源码为:

public ConcurrentLinkedQueue() {
    head = tail = new Node<E>(null);
}

head 和 tail 指针会指向一个 item 域为 null 的节点,此时 ConcurrentLinkedQueue 状态如下图所示:

如图,head 和 tail 指向同一个节点 Node0,该节点 item 域为 null,next 域为 null。

1.ConcurrentLinkedQueue初始化状态.png

1.2 操作 Node 的几个 CAS 操作

在队列进行出队入队的时候免不了对节点需要进行操作,在多线程就很容易出现线程安全的问题。可以看出在处理器指令集能够支持 CMPXCHG 指令后,在 java 源码中涉及到并发处理都会使用 CAS 操作 (关于 CAS 操作可以看这篇文章的第 3.1 节),那么在 ConcurrentLinkedQueue 对 Node 的 CAS 操作有这样几个:

//更改Node中的数据域item	
boolean casItem(E cmp, E val) {
    return UNSAFE.compareAndSwapObject(this, itemOffset, cmp, val);
}
//更改Node中的指针域next
void lazySetNext(Node<E> val) {
    UNSAFE.putOrderedObject(this, nextOffset, val);
}
//更改Node中的指针域next
boolean casNext(Node<E> cmp, Node<E> val) {
    return UNSAFE.compareAndSwapObject(this, nextOffset, cmp, val);
}

可以看出这些方法实际上是通过调用 UNSAFE 实例的方法,UNSAFE 为 sun.misc.Unsafe 类,该类是 hotspot 底层方法,目前为止了解即可,知道 CAS 的操作归根结底是由该类提供就好。

2.offer 方法

对一个队列来说,插入满足 FIFO 特性,插入元素总是在队列最末尾的地方进行插入,而取(移除)元素总是从队列的队头。所有要想能够彻底弄懂 ConcurrentLinkedQueue 自然而然是从 offer 方法和 poll 方法开始。那么为了能够理解 offer 方法,采用 debug 的方式来一行一行的看代码走。另外,在看多线程的代码时,可采用这样的思维方式:

单个线程 offer 多个线程 offer 部分线程 offer,部分线程 poll ----offer 的速度快于 poll -------- 队列长度会越来越长,由于 offer 节点总是在对队列队尾,而 poll 节点总是在队列对头,也就是说 offer 线程和 poll 线程两者并无 “交集”,也就是说两类线程间并不会相互影响,这种情况站在相对速率的角度来看,也就是一个 "单线程 offer" ----offer 的速度慢于 poll --------poll 的相对速率快于 offer,也就是队头删的速度要快于队尾添加节点的速度,导致的结果就是队列长度会越来越短,而 offer 线程和 poll 线程就会出现 “交集”,即那一时刻就可以称之为 offer 线程和 poll 线程同时操作的节点为 临界点 ,且在该节点 offer 线程和 poll 线程必定相互影响。根据在临界点时 offer 和 poll 发生的相对顺序又可从两个角度去思考:1. 执行顺序为 offer-->poll-->offer,即表现为当 offer 线程在 Node1 后插入 Node2 时,此时 poll 线程已经将 Node1 删除,这种情况很显然需要在 offer 方法中考虑; 2. 执行顺序可能为:poll-->offer-->poll,即表现为当 poll 线程准备删除的节点为 null 时(队列为空队列),此时 offer 线程插入一个节点使得队列变为非空队列

先看这么一段代码:

1. ConcurrentLinkedQueue<Integer> queue = new ConcurrentLinkedQueue<>();
2. queue.offer(1);
3. queue.offer(2);

创建一个 ConcurrentLinkedQueue 实例,先 offer 1,然后再 offer 2。offer 的源码为:

public boolean offer(E e) {
1.    checkNotNull(e);
2.    final Node<E> newNode = new Node<E>(e);

3.    for (Node<E> t = tail, p = t;;) {
4.        Node<E> q = p.next;
5.        if (q == null) {
6.            // p is last node
7.            if (p.casNext(null, newNode)) {
                // Successful CAS is the linearization point
                // for e to become an element of this queue,
               // and for newNode to become "live".
8.                if (p != t) // hop two nodes at a time
9.                    casTail(t, newNode);  // Failure is OK.
10.                return true;
            }
            // Lost CAS race to another thread; re-read next
        }
11.        else if (p == q)
            // We have fallen off list.  If tail is unchanged, it
            // will also be off-list, in which case we need to
            // jump to head, from which all live nodes are always
            // reachable.  Else the new tail is a better bet.
12.            p = (t != (t = tail)) ? t : head;
           else
            // Check for tail updates after two hops.
13.            p = (p != t && t != (t = tail)) ? t : q;
    }
}

单线程执行角度分析

先从单线程执行的角度看起,分析 offer 1 的过程。第 1 行代码会对是否为 null 进行判断,为 null 的话就直接抛出空指针异常,第 2 行代码将 e 包装成一个 Node 类,第 3 行为 for 循环,只有初始化条件没有循环结束条件,这很符合 CAS 的 “套路”,在循环体 CAS 操作成功会直接 return 返回,如果 CAS 操作失败的话就在 for 循环中不断重试直至成功。这里实例变量 t 被初始化为 tail,p 被初始化为 t 即 tail。为了方便下面的理解,p 被认为队列真正的尾节点,tail 不一定指向对象真正的尾节点,因为在 ConcurrentLinkedQueue 中 tail 是被延迟更新的,具体原因我们慢慢来看。代码走到第 3 行的时候,t 和 p 都分别指向初始化时创建的 item 域为 null,next 域为 null 的 Node0。第 4 行变量 q 被赋值为 null,第 5 行 if 判断为 true,在第 7 行使用 casNext 将插入的 Node 设置成当前队列尾节点 p 的 next 节点,如果 CAS 操作失败,此次循环结束在下次循环中进行重试。CAS 操作成功走到第 8 行,此时 p==t,if 判断为 false, 直接 return true 返回。如果成功插入 1 的话,此时 ConcurrentLinkedQueue 的状态如下图所示:

2.offer 1后队列的状态.png

如图,此时队列的尾节点应该为 Node1, 而 tail 指向的节点依然还是 Node0, 因此可以说明 tail 是延迟更新的。那么我们继续来看 offer 2 的时候的情况,很显然此时第 4 行 q 指向的节点不为 null 了,而是指向 Node1, 第 5 行 if 判断为 false, 第 11 行 if 判断为 false, 代码会走到第 13 行。好了,再插入节点的时候我们会问自己这样一个问题?上面已经解释了 tail 并不是指向队列真正的尾节点,那么在插入节点的时候,我们是不是应该最开始做的就是找到队列当前的尾节点在哪里才能插入?那么第 13 行代码就是找出队列真正的尾节点

定位队列真正的对尾节点

p = (p != t && t != (t = tail)) ? t : q;

我们来分析一下这行代码,如果这段代码在单线程环境执行时,很显然由于 p==t, 此时 p 会被赋值为 q, 而 q 等于 Node<E> q = p.next,即 Node1。在第一次循环中指针 p 指向了队列真正的队尾节点 Node1,那么在下一次循环中第 4 行 q 指向的节点为 null,那么在第 5 行中 if 判断为 true, 那么在第 7 行依然通过 casNext 方法设置 p 节点的 next 为当前新增的 Node, 接下来走到第 8 行,这个时候 p!=t,第 8 行 if 判断为 true, 会通过 casTail(t, newNode) 将当前节点 Node 设置为队列的队尾节点,此时的队列状态示意图如下图所示: 3.队列offer 2后的状态.png

tail 指向的节点由 Node0 改变为 Node2, 这里的 casTail 失败不需要重试的原因是,offer 代码中主要是通过 p 的 next 节点 q (Node<E> q = p.next) 决定后面的逻辑走向的,当 casTail 失败时状态示意图如下: 4.队列进行入队操作后casTail失败后的状态图.png

如图,如果这里 casTail 设置 tail 失败即 tail 还是指向 Node0 节点的话,无非就是多循环几次通过 13 行代码定位到队尾节点

通过对单线程执行角度进行分析,我们可以了解到 poll 的执行逻辑为:

  1. 如果 tail 指向的节点的下一个节点(next 域)为 null 的话,说明 tail 指向的节点即为队列真正的队尾节点,因此可以通过 casNext 插入当前待插入的节点,但此时 tail 并未变化,如图 2;

  2. 如果 tail 指向的节点的下一个节点(next 域)不为 null 的话,说明 tail 指向的节点不是队列的真正队尾节点。通过 q(Node<E> q = p.next)指针往前递进去找到队尾节点,然后通过 casNext 插入当前待插入的节点,并通过 casTail 方式更改 tail,如图 3

我们回过头再来看 p = (p != t && t != (t = tail)) ? t : q; 这行代码在单线程中,这段代码永远不会将 p 赋值为 t, 那么这么写就不会有任何作用,那我们试着在多线程的情况下进行分析。

多线程执行角度分析

多个线程 offer

很显然这么写另有深意,其实在多线程环境下这行代码很有意思的。 t != (t = tail) 这个操作并非一个原子操作,有这样一种情况:

5.线程A和线程B有可能的执行时序.png

如图,假设线程 A 此时读取了变量 t,线程 B 刚好在这个时候 offer 一个 Node 后,此时会修改 tail 指针,那么这个时候线程 A 再次执行 t=tail 时 t 会指向另外一个节点,很显然线程 A 前后两次读取的变量 t 指向的节点不相同,即 t != (t = tail) 为 true, 并且由于 t 指向节点的变化 p != t 也为 true,此时该行代码的执行结果为 p 和 t 最新的 t 指针指向了同一个节点,并且此时 t 也是队列真正的对尾节点。那么,现在已经定位到队列真正的队尾节点,就可以执行 offer 操作了。

offer->poll->offer

那么还剩下第 11 行的代码我们没有分析,大致可以猜想到应该就是回答一部分线程 offer,一部分 poll 的这种情况。当 if (p == q) 为 true 时,说明 p 指向的节点的 next 也指向它自己,这种节点称之为哨兵节点这种节点在队列中存在的价值不大,一般表示为要删除的节点或者是空节点。为了能够很好的理解这种情况,我们先看看 poll 方法的执行过程后,再回过头来看,总之这是一个很有意思的事情 :)。

3.poll 方法

poll 方法源码如下:

public E poll() {
    restartFromHead:
    1. for (;;) {
    2.    for (Node<E> h = head, p = h, q;;) {
    3.        E item = p.item;

    4.        if (item != null && p.casItem(item, null)) {
                // Successful CAS is the linearization point
                // for item to be removed from this queue.
    5.            if (p != h) // hop two nodes at a time
    6.                updateHead(h, ((q = p.next) != null) ? q : p);
    7.            return item;
            }
    8.        else if ((q = p.next) == null) {
    9.            updateHead(h, p);
    10.            return null;
            }
    11.        else if (p == q)
    12.            continue restartFromHead;
            else
    13.            p = q;
        }
    }
}

我们还是先站在单线程的角度去理清该方法的基本逻辑。假设 ConcurrentLinkedQueue 初始状态如下图所示:

6.队列初始状态.png

参数 offer 时的定义,我们还是先将变量 p 作为队列要删除真正的队头节点,h(head)指向的节点并不一定是队列的队头节点。先来看 poll 出 Node1 时的情况,由于 p=h=head,参照上图,很显然此时 p 指向的 Node1 的数据域不为 null, 在第 4 行代码中 item!=null 判断为 true 后接下来通过 casItem 将 Node1 的数据域设置为 null。如果 CAS 设置失败则此次循环结束等待下一次循环进行重试。若第 4 行执行成功进入到第 5 行代码,此时 p 和 h 都指向 Node1, 第 5 行 if 判断为 false, 然后直接到第 7 行 return 回 Node1 的数据域 1,方法运行结束,此时的队列状态如下图。

7.队列出队操作后的状态.png

下面继续从队列中 poll,很显然当前 h 和 p 指向的 Node1 的数据域为 null,那么第一件事就是要定位准备删除的队头节点 (找到数据域不为 null 的节点)

定位删除的队头节点

继续看,第三行代码 item 为 null, 第 4 行代码 if 判断为 false, 走到第 8 行代码(q = p.next)if 也为 false,由于 q 指向了 Node2, 在第 11 行的 if 判断也为 false,因此代码走到了第 13 行,这个时候 p 和 q 共同指向了 Node2, 也就找到了要删除的真正的队头节点。可以总结出,定位待删除的队头节点的过程为:如果当前节点的数据域为 null,很显然该节点不是待删除的节点,就用当前节点的下一个节点去试探。在经过第一次循环后,此时状态图为下图:

8.经过一次循环后的状态.png

进行下一次循环,第 4 行的操作同上述,当前假设第 4 行中 casItem 设置成功,由于 p 已经指向了 Node2, 而 h 还依旧指向 Node1, 此时第 5 行的 if 判断为 true,然后执行 updateHead(h, ((q = p.next) != null) ? q : p),此时 q 指向的 Node3,所有传入 updateHead 方法的分别是指向 Node1 的 h 引用和指向 Node3 的 q 引用。updateHead 方法的源码为:

final void updateHead(Node<E> h, Node<E> p) {
    if (h != p && casHead(h, p))
        h.lazySetNext(h);
}

该方法主要是通过 casHead 将队列的 head 指向 Node3, 并且通过 h.lazySetNext 将 Node1 的 next 域指向它自己。最后在第 7 行代码中返回 Node2 的值。此时队列的状态如下图所示:

9.Node2从队列中出队后的状态.png

Node1 的 next 域指向它自己,head 指向了 Node3。如果队列为空队列的话,就会执行到代码的第 8 行 (q = p.next) == null,if 判断为 true, 因此在第 10 行中直接返回 null。以上的分析是从单线程执行的角度去看,也可以让我们了解 poll 的整体思路,现在来做一个总结:

  1. 如果当前 head,h 和 p 指向的节点的 Item 不为 null 的话,说明该节点即为真正的队头节点(待删除节点),只需要通过 casItem 方法将 item 域设置为 null, 然后将原来的 item 直接返回即可。

  2. 如果当前 head,h 和 p 指向的节点的 item 为 null 的话,则说明该节点不是真正的待删除节点,那么应该做的就是寻找 item 不为 null 的节点。通过让 q 指向 p 的下一个节点(q = p.next)进行试探,若找到则通过 updateHead 方法更新 head 指向的节点以及构造哨兵节点(通过updateHead方法的h.lazySetNext(h)

接下来,按照上面分析 offer 的思维方式,下面来分析一下多线程的情况,第一种情况是;

多线程执行情况分析:

多个线程 poll

现在回过头来看 poll 方法的源码,有这样一部分:

else if (p == q)
    continue restartFromHead;

这一部分就是处理多个线程 poll 的情况,q = p.next 也就是说 q 永远指向的是 p 的下一个节点,那么什么情况下会使得 p,q 指向同一个节点呢?根据上面我们的分析,只有 p 指向的节点在 poll 的时候转变成了哨兵节点(通过 updateHead 方法中的 h.lazySetNext)。当线程 A 在判断 p==q 时,线程 B 已经将执行完 poll 方法将 p 指向的节点转换为哨兵节点并且 head 指向的节点已经发生了改变,所以就需要从 restartFromHead 处执行,保证用到的是最新的 head。

poll->offer->poll

试想,还有这样一种情况,如果当前队列为空队列,线程 A 进行 poll 操作,同时线程 B 执行 offer,然后线程 A 在执行 poll,那么此时线程 A 返回的是 null 还是线程 B 刚插入的最新的那个节点呢?我们来写一代 demo:

public static void main(String[] args) {
    Thread thread1 = new Thread(() -> {
        Integer value = queue.poll();
        System.out.println(Thread.currentThread().getName() + " poll 的值为:" + value);
        System.out.println("queue当前是否为空队列:" + queue.isEmpty());
    });
    thread1.start();
    Thread thread2 = new Thread(() -> {
        queue.offer(1);
    });
    thread2.start();
}

输出结果为:

Thread-0 poll 的值为:null queue 当前是否为空队列:false

通过 debug 控制线程 thread1 和线程 thread2 的执行顺序,thread1 先执行到第 8 行代码 if ((q = p.next) == null),由于此时队列为空队列 if 判断为 true,进入 if 块,此时先让 thread1 暂停,然后 thread2 进行 offer 插入值为 1 的节点后,thread2 执行结束。再让 thread1 执行,这时 thread1 并没有进行重试,而是代码继续往下走,返回 null,尽管此时队列由于 thread2 已经插入了值为 1 的新的节点。所以输出结果为 thread0 poll 的为 null, 然队列不为空队列。因此,在判断队列是否为空队列的时候是不能通过线程在 poll 的时候返回为 null 进行判断的,可以通过 isEmpty 方法进行判断

4. offer 方法中部分线程 offer 部分线程 poll

在分析 offer 方法的时候我们还留下了一个问题,即对 offer 方法中第 11 行代码的理解。

offer->poll->offer

在 offer 方法的第 11 行代码 if (p == q),能够让 if 判断为 true 的情况为 p 指向的节点为哨兵节点,而什么时候会构造哨兵节点呢?在对 poll 方法的讨论中,我们已经找到了答案,即 ** 当 head 指向的节点的 item 域为 null 时会寻找真正的队头节点,等到待插入的节点插入之后,会更新 head,并且将原来 head 指向的节点设置为哨兵节点。** 假设队列初始状态如下图所示: 10.offer和poll相互影响分析时队列初始状态.png 因此在线程 A 执行 offer 时,线程 B 执行 poll 就会存在如下一种情况: 11.线程A和线程B可能存在的执行时序.png

如图,线程 A 的 tail 节点存在 next 节点 Node1, 因此会通过引用 q 往前寻找队列真正的队尾节点,当执行到判断 if (p == q) 时,此时线程 B 执行 poll 操作,在对线程 B 来说,head 和 p 指向 Node0, 由于 Node0 的 item 域为 null, 同样会往前递进找到队列真正的队头节点 Node1, 在线程 B 执行完 poll 之后,Node0 就会转换为哨兵节点,也就意味着队列的 head 发生了改变,此时队列状态为下图。

12.线程B进行poll后队列的状态图.png

此时线程 A 在执行判断 if (p == q) 时就为 true, 会继续执行 p = (t != (t = tail)) ? t : head;,由于 tail 指针没有发生改变所以 p 被赋值为 head, 重新从 head 开始完成插入操作。

5. HOPS 的设计

通过上面对 offer 和 poll 方法的分析,我们发现 tail 和 head 是延迟更新的,两者更新触发时机为:

tail 更新触发时机:当 tail 指向的节点的下一个节点不为 null 的时候,会执行定位队列真正的队尾节点的操作,找到队尾节点后完成插入之后才会通过 casTail 进行 tail 更新;当 tail 指向的节点的下一个节点为 null 的时候,只插入节点不更新 tail。

**head 更新触发时机:** 当 head 指向的节点的 item 域为 null 的时候,会执行定位队列真正的队头节点的操作,找到队头节点后完成删除之后才会通过 updateHead 进行 head 更新;当 head 指向的节点的 item 域不为 null 的时候,只删除节点不更新 head。

并且在更新操作时,源码中会有注释为:hop two nodes at a time。所以这种延迟更新的策略就被叫做 HOPS 的大概原因是这个(猜的 :)),从上面更新时的状态图可以看出,head 和 tail 的更新是 “跳着的” 即中间总是间隔了一个。那么这样设计的意图是什么呢?

如果让 tail 永远作为队列的队尾节点,实现的代码量会更少,而且逻辑更易懂。但是,这样做有一个缺点,** 如果大量的入队操作,每次都要执行 CAS 进行 tail 的更新,汇总起来对性能也会是大大的损耗。如果能减少 CAS 更新的操作,无疑可以大大提升入队的操作效率,所以 doug lea 大师每间隔 1 次(tail 和队尾节点的距离为 1)进行才利用 CAS 更新 tail。** 对 head 的更新也是同样的道理,虽然,这样设计会多出在循环中定位队尾节点,但总体来说读的操作效率要远远高于写的性能,因此,多出来的在循环中定位尾节点的操作的性能损耗相对而言是很小的。

参考资料

《java 并发编程的艺术》 《Java 高并发程序设计》 ConcurrentLinkedQueue 博文:https://www.cnblogs.com/sunshine-2015/p/6067709.html

<<Java并发编程实践>>有感 ConcurrentLinkedQueue解读

<>有感 ConcurrentLinkedQueue解读

ConcurrentLinkedQueue(上集)

算法实现 CAS

  • CAS的优点

    当一个线程执行任务失败不影响其他线程的进行 最大限度的利用CPU资源 能提高程序的伸缩性 伸缩性:不修改任何代码 升级硬件就能带来性能上的提高 升级硬件带来的性能提高明显 就是伸缩性良好

  • CAS的缺点

    代码复杂 影响阅读性 刚开始看ConcurrentLinkedQueue的时候 没有正确的思路,理解起来会比较费劲 我推荐直接用多线程同时执行的方式去理解 这样会比较好

重要概念

  • 不变性

    • 所有item不为null的节点都能从head头节点开始通过succ()方法访问到
    • head!=null 只要队列有值 保证真实的head永不为null head哪怕会自引用 迟早也会解除这种假状态
  • 可变性

    • heatd.item 可能为null也可能不为null 因为cas活锁操作 每一行代码执行都不影响其他线程的访问相同的代码块
    • tail尾节点的更新是滞后于head的 个人理解 在offer中 尾节点掉队后 通过head节点 (不变性1的保证) 成功访问最后一个p.next=null的节点
  • 快照

    • snapshot是我自己的理解 因为对于多线程操作来说 当前引用对象 如offer()中 t=tail中的t; p=t中的p; q=p.next中的q都是一个快照 他获得一个对象的快照版本 然后在后续的操作中 使(t!=(t=tail))这样操作有意义

重要方法

  • offer()入队
  • poll() 出队

源码

public boolean offer(E e) {
        checkNotNull(e); //NullPointException检查   
        final Node<E> newNode = new Node<E>(e); //包装成一个Node对象

        for (Node<E> t = tail, p = t;;) {//获取当前尾节点 t=tail,p是真正的尾节点 p.next==null 
            Node<E> q = p.next;
            if (q == null) {
                // p is last node 
                if (p.casNext(null, newNode)) {//方法1 CAS更新 自己想3个线程同时进行这个操作
                    // Successful CAS is the linearization point
                    // for e to become an element of this queue,
                    // and for newNode to become "live".
                    if (p != t) // hop two nodes at a time //方法2 延迟更新尾节点 下面说为什么
                        casTail(t, newNode);  //方法3 成不成功无所谓 下面说
                    return true;
                }
                // Lost CAS race to another thread; re-read next
            }
            else if (p == q)// 方法4 学习offer方法时 可以暂时放弃这一步
                // We have fallen off list.  If tail is unchanged, it
                // will also be off-list, in which case we need to
                // jump to head, from which all live nodes are always
                // reachable.  Else the new tail is a better bet.
                p = (t != (t = tail)) ? t : head;
            else  //去找到真正的尾节点 此处和方法2 应是相互辉映的存在
                // Check for tail updates after two hops.
                p = (p != t && t != (t = tail)) ? t : q; //方法5
        }
    }

解读offer()方法

自顶向下 思考CAS中可能出现的情况 CAS是活锁 所谓活锁即是每一行代码运行时 允许其他线程访问相同的代码块 成功与失败并存 衍生了更多的条件判断 本人觉得CAS方法都应该从这个方法去理解 再自己画画时序图 (注意:理解offer()时,先把方法4排除,因为4方法出现自引用的情况 只有offer()poll()交替执行时会出现 本文只介绍第一种情况)

  • 多线程操作

    1. 第一种情况: 只有 offer()
    2. 第二种情况: offer()poll()方法交替执行
  • 同时执行offer()(假设我们现在有3个线程)

    • 不变性:永远只有一个线程CAS成功 并且总会成功一个
    • 循环次数分析:Thread1 成功 循环一次退出 Thread2失败 再循环一次成功 Thread3失败 再循环两次成功 如果有n个线程同时执行 offer() 执行次数 最大为n次 最少为1次
    • 方法5中三目表达式解析: p=condition?result1:result2 我先说一下这里的意义 满足result1的场景为 :获取尾节点tail的快照已经过时了(其他线程更新了新的尾节点tail) 直接跳转到当前获得的最新尾节点的地方 满足result2的场景为:多线程同时操作offer() 执行1方法CAS成功后 未更新尾节点(未执行3方法:两种原因 1是未满足前置条件if判断 2是CAS更新失败) 直接找next节点
    • 方法2与方法5 是整个offer() 操作的点睛之笔 下面解释
  1. 只有offer() 操作时

    假设:

    Thread 1执行完1方法成功 还未执行2方法 Thread2和Thread3进入5方法 ,也就是说Thread2和Thread3执行5方法发生在Thread1执行2方法之前 Thread2 and Thread3 invoke method5() before Thread1 invoke method2()

    此时 Thread2.p =q,Thread3.p=q, 因为p==t成立 时序图如下,然后Thread1执行方法2 p==t 不执行tail尾节点的更新操作 由此可知 尾节点是延迟更新 一切为了更高效~~~

    图1

                                  图1                  
    

Thread 2 与 Thread3 此时再次执行 1 方法 见图1 他们此时的q.next==null 我们规定Thread2 CAS成功 Thread3失败了 成功后的时序图如下 我们假设 Thread3 invoke method5() after Thread2 invoke method2() Thread2执行方法2 在 Thread3执行方法5之前

图2

                 
                                 图2                       

对于Thread2 进入2方法 p!=t 满足 执行 casTail(t, newNode) 更新尾节点的快照 如下图

图3

                                 图3
                                 

Thread2 工作完成 退出循环

对于Thread3 因为执行1方法失败 进入5方法 此时Thread3的tail快照t3

p = (p != t && t != (t = tail)) ? t : q;

按图3来翻译

p=(p!=t3&&t3!=(t3=t2))?t2:q;

p=t2;//直接去当前能获取到的尾节点!!!

到这里 offer() 方法解决完成

ConcurrentLinkedQueue核心总结

  • tail和head都是 延迟更新的 但是tail更新在head更新后面 因为方法4中 需要依赖head节点 去找每一个存活的节点
  • 前面的叙述中 可以看到 offer() 方法内 核心操作 就是 p=condition?result1:result2
  • 偶数次offer() 操作更新一次tail 单线程的环境下

与Michael-Scott 队列比较

  • Michael-Scott队列 每次操作 都需要判断是否需要推动尾节点 采取CAS的操作 优点也是缺点
  • Doug Lead老神仙的CAS 我这个菜鸟猜测 能不用CAS 就尽量不用 因为CAS存在竞争 提供以最少次数的更新达到最终正确的效果
  • 我们把offer()中的整个行为想象为跳台阶 result1的形式就像是 武侠小说中的越阶战斗!!!result2的形式就是一步一个脚印 每次平稳地去下一个台阶
  • 我们想象一下 offer()最优的情况 10个线程同时offer()

    每一个执行1方法成功的线程都没有(执行2方法或则执行3方法失败) 没关系 尾节点的更新终会成功

    每一个失败的线程都是去当前节点的next节点 p.next进行插入操作 在第9个线程(相当于我们上文中的线程2)

    当第10个线程操作时 虽然它很可怜 一直排到最后 但是尾节点更新一下就越过了9阶!!!(不太恰当的地方请大佬们指点) 

  • ConcurrrntLinkedQueue 优点

    1. 能跃过一整段因为多线程在极短时间内offer()插入的节点 直接去尾节点 直接跨过去
    2. 能抵达每一个相对于当前快照来说最新的next节点
    3. 高并发时 tail 和 p 相互配合 尽力去离当前尾节点 最近的地方
  • ConcurrentLinkedQueue 缺点

    1. CAS操作 虽然总会成功 但是竞争效率如果很低 不如用同步锁 采用CAS编写并发代码 都是大佬级别 难度高 不接地气(嘿嘿)
    2. 循环可能会带来额外的资源开销

IDEA 调试 JAVA ConcurrentLinkedQueue

IDEA 调试 JAVA ConcurrentLinkedQueue

调试ConcurrentLinkedQueue 源码 poll() 方法时 ,出现了比较奇怪的现象,当时队列里有两个元素,如下截图,

 

但执行完

 p.casItem(item, null)
后,出现了令人无法理解的现象,如下图。程序是在单线程的模式下跑,无论如何都理解不了这个结果,理论上只有item被置为null,为什么next的值也变了,还指向了自身

 

 

 最终在网上找到了答案,原来是IDEA的问题。使用debug时会默认开始 使用集合类的替代视图,这将会导致我们期待看到的队列的链表结构和实际的不相符。就 比如 idea会默认将 调用变量的toString方法一样。对于非并发集合还好,因为其读操作是幂等的,但对与并发集合则不然(如ConcurrentLinkedQueue的poll操作会改变内部数据结构),IDEA在转换视图时会调用集合的读操作方法,最终导致了显示问题。

更改IDEA设置解决问题。

 

 

Java concurrency集合之ConcurrentLinkedQueue_动力节点Java学院整理

Java concurrency集合之ConcurrentLinkedQueue_动力节点Java学院整理

ConcurrentLinkedQueue介绍

ConcurrentLinkedQueue是线程安全的队列,它适用于“高并发”的场景。

它是一个基于链接节点的无界线程安全队列,按照 FIFO(先进先出)原则对元素进行排序。队列元素中不可以放置null元素(内部实现的特殊节点除外)。 

ConcurrentLinkedQueue原理和数据结构

ConcurrentLinkedQueue的数据结构,如下图所示:

说明:

1. ConcurrentLinkedQueue继承于AbstractQueue。

2. ConcurrentLinkedQueue内部是通过链表来实现的。它同时包含链表的头节点head和尾节点tail。ConcurrentLinkedQueue按照 FIFO(先进先出)原则对元素进行排序。元素都是从尾部插入到链表,从头部开始返回。

3. ConcurrentLinkedQueue的链表Node中的next的类型是volatile,而且链表数据item的类型也是volatile。关于volatile,我们知道它的语义包含:“即对一个volatile变量的读,总是能看到(任意线程)对这个volatile变量最后的写入”。ConcurrentLinkedQueue就是通过volatile来实现多线程对竞争资源的互斥访问的。 

ConcurrentLinkedQueue函数列表

// 创建一个最初为空的 ConcurrentLinkedQueue。
ConcurrentLinkedQueue()
// 创建一个最初包含给定 collection 元素的 ConcurrentLinkedQueue,按照此 collection 迭代器的遍历顺序来添加元素。
ConcurrentLinkedQueue(Collection<? extends E> c)
// 将指定元素插入此队列的尾部。
boolean add(E e)
// 如果此队列包含指定元素,则返回 true。
boolean contains(Object o)
// 如果此队列不包含任何元素,则返回 true。
boolean isEmpty()
// 返回在此队列元素上以恰当顺序进行迭代的迭代器。
Iterator<E> iterator()
// 将指定元素插入此队列的尾部。
boolean offer(E e)
// 获取但不移除此队列的头;如果此队列为空,则返回 null。
E peek()
// 获取并移除此队列的头,如果此队列为空,则返回 null。
E poll()
// 从队列中移除指定元素的单个实例(如果存在)。
boolean remove(Object o)
// 返回此队列中的元素数量。
int size()
// 返回以恰当顺序包含此队列所有元素的数组。
Object[] toArray()
// 返回以恰当顺序包含此队列所有元素的数组;返回数组的运行时类型是指定数组的运行时类型。
<T> T[] toArray(T[] a)

下面从ConcurrentLinkedQueue的创建,添加,删除这几个方面对它进行分析。

1 创建

下面以ConcurrentLinkedQueue()来进行说明。

public ConcurrentLinkedQueue() {
 head = tail = new Node<E>(null);
}

说明:在构造函数中,新建了一个“内容为null的节点”,并设置表头head和表尾tail的值为新节点。

head和tail的定义如下:

private transient volatile Node<E> head;
private transient volatile Node<E> tail;

head和tail都是volatile类型,他们具有volatile赋予的含义:“即对一个volatile变量的读,总是能看到(任意线程)对这个volatile变量最后的写入”。

Node的声明如下:

private static class Node<E> {
 volatile E item;
 volatile Node<E> next;
 Node(E item) {
 UNSAFE.putObject(this,itemOffset,item);
 }
 boolean casItem(E cmp,E val) {
 return UNSAFE.compareAndSwapObject(this,cmp,val);
 }
 void lazySetNext(Node<E> val) {
 UNSAFE.putOrderedobject(this,nextOffset,val);
 }
 boolean casNext(Node<E> cmp,Node<E> val) {
 return UNSAFE.compareAndSwapObject(this,val);
 }
 // Unsafe mechanics
 private static final sun.misc.Unsafe UNSAFE;
 private static final long itemOffset;
 private static final long nextOffset;
 static {
 try {
  UNSAFE = sun.misc.Unsafe.getUnsafe();
  Class k = Node.class;
  itemOffset = UNSAFE.objectFieldOffset
  (k.getDeclaredField("item"));
  nextOffset = UNSAFE.objectFieldOffset
  (k.getDeclaredField("next"));
 } catch (Exception e) {
  throw new Error(e);
 }
 }
}

说明:

Node是个单向链表节点,next用于指向下一个Node,item用于存储数据。Node中操作节点数据的API,都是通过Unsafe机制的CAS函数实现的;例如casNext()是通过CAS函数“比较并设置节点的下一个节点”。

2. 添加

下面以add(E e)为例对ConcurrentLinkedQueue中的添加进行说明。

public boolean add(E e) {
 return offer(e);
}

说明:add()实际上是调用的offer()来完成添加操作的。

offer()的源码如下:

public boolean offer(E e) {
 // 检查e是不是null,是的话抛出NullPointerException异常。
 checkNotNull(e);
 // 创建新的节点
 final Node<E> newNode = new Node<E>(e);

 // 将“新的节点”添加到链表的末尾。
 for (Node<E> t = tail,p = t;;) {
 Node<E> q = p.next;
 // 情况1:q为空
 if (q == null) {
  // CAS操作:如果“p的下一个节点为null”(即p为尾节点),则设置p的下一个节点为newNode。
  // 如果该CAS操作成功的话,则比较“p和t”(若p不等于t,则设置newNode为新的尾节点),然后返回true。
  // 如果该CAS操作失败,这意味着“其它线程对尾节点进行了修改”,则重新循环。
  if (p.casNext(null,newNode)) {
  if (p != t) // hop two nodes at a time
   casTail(t,newNode); // Failure is OK.
  return true;
  }
 }
 // 情况2:p和q相等
 else if (p == q)
  p = (t != (t = tail)) ? t : head;
 // 情况3:其它
 else
  p = (p != t && t != (t = tail)) ? t : q;
 }
}

说明:offer(E e)的作用就是将元素e添加到链表的末尾。offer()比较的地方是理解for循环,下面区分3种情况对for进行分析。

情况1 -- q为空。这意味着q是尾节点的下一个节点。此时,通过p.casNext(null,newNode)将“p的下一个节点设为newNode”,若设置成功的话,则比较“p和t”(若p不等于t,则设置newNode为新的尾节点),然后返回true。否则的话(意味着“其它线程对尾节点进行了修改”),什么也不做,继续进行for循环。

p.casNext(null,newNode),是调用CAS对p进行操作。若“p的下一个节点等于null”,则设置“p的下一个节点等于newNode”;设置成功的话,返回true,失败的话返回false。

情况2 -- p和q相等。这种情况什么时候会发生呢?通过“情况3”,我们知道,经过“情况3”的处理后,p的值可能等于q。

此时,若尾节点没有发生变化的话,那么,应该是头节点发生了变化,则设置p为头节点,然后重新遍历链表;否则(尾节点变化的话),则设置p为尾节点。

情况3 -- 其它。

我们将p = (p != t && t != (t = tail)) ? t : q;转换成如下代码。

if (p==t) {
 p = q;
} else {
 Node<E> tmp=t;
 t = tail;
 if (tmp==t) {
 p=q;
 } else {
 p=t;
 }
}

如果p和t相等,则设置p为q。否则的话,判断“尾节点是否发生变化”,没有变化的话,则设置p为q;否则,设置p为尾节点。

checkNotNull()的源码如下:

private static void checkNotNull(Object v) {
 if (v == null)
 throw new NullPointerException();
} 

3. 删除

下面以poll()为例对ConcurrentLinkedQueue中的删除进行说明。

public E poll() {
 // 设置“标记”
 restartFromHead:
 for (;;) {
 for (Node<E> h = head,p = h,q;;) {
  E item = p.item;

  // 情况1
  // 表头的数据不为null,并且“设置表头的数据为null”这个操作成功的话;
  // 则比较“p和h”(若p!=h,即表头发生了变化,则更新表头,即设置表头为p),然后返回原表头的item值。
  if (item != null && p.casItem(item,null)) {
  if (p != h) // hop two nodes at a time
   updateHead(h,((q = p.next) != null) ? q : p);
  return item;
  }
  // 情况2
  // 表头的下一个节点为null,即链表只有一个“内容为null的表头节点”。则更新表头为p,并返回null。
  else if ((q = p.next) == null) {
  updateHead(h,p);
  return null;
  }
  // 情况3
  // 这可能到由于“情况4”的发生导致p=q,在该情况下跳转到restartFromHead标记重新操作。
  else if (p == q)
  continue restartFromHead;
  // 情况4
  // 设置p为q
  else
  p = q;
 }
 }
}

说明:poll()的作用就是删除链表的表头节点,并返回被删节点对应的值。poll()的实现原理和offer()比较类似,下面根将or循环划分为4种情况进行分析。

情况1:“表头节点的数据”不为null,并且“设置表头节点的数据为null”这个操作成功。

p.casItem(item,null) -- 调用CAS函数,比较“节点p的数据值”与item是否相等,是的话,设置节点p的数据值为null。

在情况1发生时,先比较“p和h”,若p!=h,即表头发生了变化,则调用updateHead()更新表头;然后返回删除节点的item值。

updateHead()的源码如下:

final void updateHead(Node<E> h,Node<E> p) {
 if (h != p && casHead(h,p))
 h.lazySetNext(h);
}

说明:updateHead()的最终目的是更新表头为p,并设置h的下一个节点为h本身。

casHead(h,p)是通过CAS函数设置表头,若表头等于h的话,则设置表头为p。

lazySetNext()的源码如下:

void lazySetNext(Node<E> val) {
 UNSAFE.putOrderedobject(this,val);
}

putOrderedobject()函数,我们在前面一章“Todo”中介绍过。h.lazySetNext(h)的作用是通过CAS函数设置h的下一个节点为h自身,该设置可能会延迟执行。

情况2:如果表头的下一个节点为null,即链表只有一个“内容为null的表头节点”。

则调用updateHead(h,p),将表头更新p;然后返回null。

情况3:p=q

在“情况4”的发生后,会导致p=q;此时,“情况3”就会发生。当“情况3”发生后,它会跳转到restartFromHead标记重新操作。

情况4:其它情况。

设置p=q。

ConcurrentLinkedQueue示例

import java.util.*;
 import java.util.concurrent.*;
 /*
 * ConcurrentLinkedQueue是“线程安全”的队列,而LinkedList是非线程安全的。
 *
 * 下面是“多个线程同时操作并且遍历queue”的示例
 * (01) 当queue是ConcurrentLinkedQueue对象时,程序能正常运行。
 * (02) 当queue是LinkedList对象时,程序会产生ConcurrentModificationException异常。
 *
 *
 */
 public class ConcurrentLinkedQueueDemo1 {
 // Todo: queue是LinkedList对象时,程序会出错。
 //private static Queue<String> queue = new LinkedList<String>();
 private static Queue<String> queue = new ConcurrentLinkedQueue<String>();
 public static void main(String[] args) {
  // 同时启动两个线程对queue进行操作!
  new MyThread("ta").start();
  new MyThread("tb").start();
 }
 private static void printAll() {
  String value;
  Iterator iter = queue.iterator();
  while(iter.hasNext()) {
  value = (String)iter.next();
  System.out.print(value+",");
  }
  System.out.println();
 }
 private static class MyThread extends Thread {
  MyThread(String name) {
  super(name);
  }
  @Override
  public void run() {
   int i = 0;
  while (i++ < 6) {
   // “线程名” + "-" + "序号"
   String val = Thread.currentThread().getName()+i;
   queue.add(val);
   // 通过“Iterator”遍历queue。
   printAll();
  }
  }
 }
 }

(某一次)运行结果:

ta1,ta1,tb1,ta2,tb2,ta3,tb3,ta4,tb4,ta5,tb5,ta6,tb6,

结果说明:如果将源码中的queue改成LinkedList对象时,程序会产生ConcurrentModificationException异常。

关于《Java并发编程的艺术》之ConcurrentLinkedQueuejava并发编程的艺术和并发编程实战的问题就给大家分享到这里,感谢你花时间阅读本站内容,更多关于15. 并发容器之 ConcurrentLinkedQueue、<>有感 ConcurrentLinkedQueue解读、IDEA 调试 JAVA ConcurrentLinkedQueue、Java concurrency集合之ConcurrentLinkedQueue_动力节点Java学院整理等相关知识的信息别忘了在本站进行查找喔。

本文标签: