在本文中,我们将为您详细介绍深入理解JUC:ConcurrentLinkedQueue的相关知识,并且为您解答关于深入理解linux网络豆瓣的疑问,此外,我们还会提供一些关于15.并发容器之Concu
在本文中,我们将为您详细介绍深入理解 JUC:ConcurrentLinkedQueue的相关知识,并且为您解答关于深入理解linux网络 豆瓣的疑问,此外,我们还会提供一些关于15. 并发容器之 ConcurrentLinkedQueue、ConcurrentLinkedQueue、ConcurrentLinkedQueue vs BlockingQueue比较、ConcurrentLinkedQueue 介绍的有用信息。
本文目录一览:- 深入理解 JUC:ConcurrentLinkedQueue(深入理解linux网络 豆瓣)
- 15. 并发容器之 ConcurrentLinkedQueue
- ConcurrentLinkedQueue
- ConcurrentLinkedQueue vs BlockingQueue比较
- ConcurrentLinkedQueue 介绍
深入理解 JUC:ConcurrentLinkedQueue(深入理解linux网络 豆瓣)
ConcurrentLinkedQueue 是线程安全的无界非阻塞队列。在 JUC 包中,线程安全的队列按照实现方式可以分为阻塞队列和非阻塞队列两大类,前者基于锁来保证线程安全,而后者则基于 CAS 机制保证线程安全,阻塞队列一般在类名中都带有 Blocking 的字样。
由 Linked 关键字我们可以推断出 ConcurrentLinkedQueue 底层依赖于链表实现,在 ConcurrentLinkedQueue 的内部实现了一个单链表,用以存放队列元素。其中,结点 Node 类定义如下:
private static class Node<E> {
/** 结点元素值 */
volatile E item;
/** 指针,指向下一个结点 */
volatile Node<E> next;
// ... 省略方法定义
}
Node 类是一个典型的链表结点定义,此外,Node 类还定义了一些方法用于修改结点的元素值和 next 指针,这些方法均基于 Unsafe 实现。ConcurrentLinkedQueue 的 head 和 tail 字段分别指向队列的头结点和尾结点,如下:
public class ConcurrentLinkedQueue<E> extends AbstractQueue<E> implements Queue<E>, Serializable {
private transient volatile Node<E> head;
private transient volatile Node<E> tail;
public ConcurrentLinkedQueue() {
head = tail = new Node<>(null);
}
// ... 省略方法实现
}
当我们构造一个空的 ConcurrentLinkedQueue 对象时,链表的 head 和 tail 均指向一个元素值为 null 的标记结点。在 ConcurrentLinkedQueue 中不允许添加 null 元素,因为值为 null 的结点在 ConcurrentLinkedQueue 中扮演着特殊的角色。
Queue 接口
Queue 接口继承自 Collection 接口,增加了队列相关的操作,定义如下:
public interface Queue<E> extends Collection<E> {
boolean add(E e);
boolean offer(E e);
E poll();
E peek();
E element();
E remove();
}
针对各方法的含义说明如下:
add
:往队列中添加元素,如果成功则返回 true,对于有界队列来说,如果队列已满则抛出 IllegalStateException 异常。offer
:往队列中添加元素,如果成功则返回 true,对于有界队列来说,如果队列已满则返回 false,而不是抛出异常。poll
:移除队列头结点,并返回结点元素值,如果队列为空则返回 null。peek
:仅获取头结点元素值而不删除结点,如果队列为空则返回 null。element
:仅获取头结点元素值而不删除结点,如果队列为空则抛出 NoSuchElementException 异常。remove
:移除队列头结点,并返回结点元素值,如果队列为空则抛出 NoSuchElementException 异常。
核心方法实现
ConcurrentLinkedQueue 实现自 Queue 接口,下面来分析一下其针对 Queue 中声明的核心操作方法的实现。
添加元素:add & offer
ConcurrentLinkedQueue 实现了 Queue 接口中声明的往队列中添加元素方法,即 Queue#add
和 Queue#offer
。这两个方法都是往队列末端追加元素,因为 ConcurrentLinkedQueue 没有容量上的限制,所以这两个方法也就不存在队列已满的问题。所以,对于 ConcurrentLinkedQueue 而言,这两个方法在实现上并没有区别。
下面来看一下 ConcurrentLinkedQueue 针对 Queue#offer
的实现,如下:
public boolean offer(E e) {
// 待添加元素不能为 null
checkNotNull(e);
// 创建待添加元素对应的 Node 结点
final Node<E> newNode = new Node<>(e);
// 添加到尾结点
for (Node<E> t = tail, p = t; ; ) {
Node<E> q = p.next;
if (q == null) { // 1
// p 已经是尾结点,基于 CAS 设置结点 newNode 为 p 的 next 结点
if (p.casNext(null, newNode)) {
/*
* Successful CAS is the linearization point for e to become an element of this queue,
* and for newNode to become "live".
*/
if (p != t) { // hop two nodes at a time
// 更新 tail 结点
this.casTail(t, newNode); // Failure is OK.
}
return true;
}
// Lost CAS race to another thread; re-read next
} else if (p == q) { // 2
/*
* We have fallen off list. If tail is unchanged, it will also be off-list,
* in which case we need to jump to head, from which all live nodes are always reachable.
* Else the new tail is a better bet.
*/
p = (t != (t = tail)) ? t : head;
} else { // 3
// Check for tail updates after two hops.
p = (p != t && t != (t = tail)) ? t : q;
}
}
}
对于待添加的元素,上述方法首先会判断是否为 null,前面已经提及过 ConcurrentLinkedQueue 不允许向其中添加 null 元素,这主要是因为元素值为 null 的结点在 ConcurrentLinkedQueue 中是一个特殊的标记结点。如果待添加元素不为 null,则上述方法会将元素包装成 Node 结点(令该结点为 N)添加到队列的末端。
下面通过图示演示各种不同的元素添加场景,本小节中均使用青色表示 head 结点,使用橘黄色表示 tail 结点。假设当前队列的元素构成如下图 (1) 所示,此时 q 结点为 null(说明:本文中所有虚线表示的结点均指代结点本身为 null,而非结点元素值为 null),即运行到上述代码 1 位置。需要基于 CAS 操作将 p 的 next 结点由 null 修改为 N 结点,成功则返回 true。此时 p 不等于 t,操作完成之后队列的元素构成如下图 (2) 所示。
考虑上述过程存在多个线程竞争,假设现在有两个线程 A 和 B,其中 A 在执行代码 1 中的 Node#casNext
时成功将 p 的 next 结点由 null 更新为 node1 结点,如下图 (1) 所示。此时线程 B 再执行 Node#casNext
企图将 p 的 next 结点由 null 更新为 node2 结点时将失败,因为 p 的 next 结点此时已经不为 null,所以线程 B 将进入下一轮 for 循环,但此时 q 已经不为 null,且不等于 p,所以进入代码 3,这一步的运行结果就是将 q 赋值给 p,如下图 (2) 所示。接着线程 B 继续进入下一轮 for 循环,执行 Node<E> q = p.next;
,如下图 (3) 所示。因为此时 q 等于 null,所以继续执行代码 1 将 p 的 next 结点由 null 修改为 node2,如下图 (4) 所示。但此时的 p 不等于 t,所以需要执行 ConcurrentLinkedQueue#casTail
更新 tail 结点,如下图 (5) 所示。
最后再来分析一下什么情况下会执行到代码 2。假设当前队列的元素构成如下图 (1) 所示,此种结构一般由其它线程执行 poll 方法所造成,下一小节会进行分析。此时 tail 结点形成了自引用,开始执行 for 循环时 p 和 t 均指向 tail 结点,当将 p 的 next 结点赋值给 q 时,因为 p 的 next 结点即为 tail 结点自己,所以 q 也指向 tail 结点。此时,q 结点不为 null,且 p 等于 q,所以执行代码 2 将 head 结点赋值给 p,如下图 (2) 所示。所以这一步的目的在于跳出自引用,后续的执行流程参考下图 (3)、(4) 和 (5),读者可以自行梳理。
除了上面介绍的 ConcurrentLinkedQueue#offer
方法,ConcurrentLinkedQueue 还实现了 ConcurrentLinkedQueue#add
方法同样用于往队列末端追加元素,不过因为 ConcurrentLinkedQueue 是无界队列,所以该方法也只是简单的将请求委托给 ConcurrentLinkedQueue#offer
方法执行。
获取元素:poll & peek & element
针对获取元素的操作,Queue 接口声明了 3 个方法,包括 Queue#poll
、Queue#peek
,以及 Queue#element
。其中 Queue#poll
和 Queue#peek
的区别一般都比较熟悉,而 Queue#element
方法在功能上与 Queue#peek
方法类似,都是获取队列的头结点元素值而不删除结点,区别在于当队列为空时,方法 Queue#peek
返回 null,而 Queue#element
则抛出异常。ConcurrentLinkedQueue 针对 Queue#element
方法的实现实际上也是委托给 ConcurrentLinkedQueue#peek
方法执行的,只是对该方法的返回值进行了处理,如果返回值为 null 则抛出 NoSuchElementException 异常。
下面首先来看一下 ConcurrentLinkedQueue 针对方法 Queue#poll
的实现,如下:
public E poll() {
restartFromHead:
for (; ; ) {
// 从头结点获取元素
for (Node<E> h = head, p = h, q; ; ) {
E item = p.item;
// 如果当前头结点不为 null,则尝试基于 CAS 将其修改为 null
if (item != null && p.casItem(item, null)) { // 1
// CAS 操作成功
// Successful CAS is the linearization point for item to be removed from this queue.
if (p != h) { // hop two nodes at a time
this.updateHead(h, ((q = p.next) != null) ? q : p);
}
return item;
}
// 当前队列为空
else if ((q = p.next) == null) { // 2
this.updateHead(h, p);
return null;
} else if (p == q) { // 3
continue restartFromHead;
} else { // 4
p = q;
}
}
}
}
final void updateHead(Node<E> h, Node<E> p) {
// 将头结点由 h 更新为 p
if (h != p && this.casHead(h, p)) {
// 更新 h 的 next 结点为 h 自己
h.lazySetNext(h);
}
}
上述方法使用了 continue 标签语法以控制代码的执行逻辑,其中标签名为 restartFromHead,此外,break 关键字同样支持标签语法,与 continue 一起实现类似 goto 关键字的功能。
下面同样通过图示演示各种不同的获取元素场景,本小节中均使用青色表示 head 结点,使用橘黄色表示 tail 结点。假设当前队列的元素构成如下图 (1) 所示,此时 p 和 h 均指向 head 结点,而 q 结点未赋值,所以暂未标出。此时 p 所指向的结点元素值不为 null,所以尝试执行 Node#casItem
方法基于 CAS 修改结点元素值为 null,即运行上述代码 1。假设当前线程 CAS 操作成功,如下图 (2) 所示,因为此时 p 等于 h,所以直接返回结点元素值,即出队列成功。
继续演示一些其它情况,假设现在队列的头结点元素值为 null,所以直接跳转到代码 2 执行,将 q 赋值为 p 的 next 结点,如下图 (1) 所示,但是因为结点不为 null,所以继续往下执行。此时 p 不等于 q,所以执行代码 4 将 q 赋值给 p,如下图 (2) 所示,然后进入下一轮循环。
此时结点 p 的元素值不为 null,所以进入代码 1。考虑存在多个线程竞争的场景,假设现在有两个线程 A 和 B,其中 A 在执行代码 1 中的 Node#casItem
时成功将 p 的元素值更新为 null,如下图 (3-1) 所示。因为此时 p 不等于 h,所以执行 ConcurrentLinkedQueue#updateHead
方法将头结点由 h 更新为 p,并重定向 h 的 next 指针指向自己,如下图 (4-1) 所示。最后返回结点元素值,即出队列成功。
因为线程 A 已经操作成功,所以线程 B 在执行 Node#casItem
方法时必然失败,于是继续向下执行代码 2,将 q 指向 p 的 next 结点,如上图 (3-2) 所示。因为此时 q 结点为 null,所以执行 ConcurrentLinkedQueue#updateHead
方法将头结点由 h 更新为 p,并重定向 h 的 next 指针指向自己,如上图 (4-2) 所示。最后返回 null 值,即出队列成功。
最后再来分析一下什么情况下会执行到代码 3。假设当前队列的元素构成如下图 (1) 所示,并且当前有两个线程(A 和 B)竞争执行出队列操作,线程 A 首先执行代码 1 基于 CAS 将结点 p 元素值修改为 null,如下图 (2) 所示。但在线程 A 继续执行代码 1 中的 ConcurrentLinkedQueue#updateHead
方法尝试更新 head 结点之前,B 线程进入了 for 循环,如下图 (3) 所示,此时 B 线程的 h 和 p 指针均指向 head 结点,但是在 B 继续向下执行之前,A 线程执行了 ConcurrentLinkedQueue#updateHead
方法,将 head 结点由 h 更新为 p,并修改 h 的 next 指针指向自己,最后返回元素值,如下图 (4) 所示。
此时,如上图 (5) 所示,B 线程再继续执行时发现 p 结点元素值为 null,所以跳转执行代码 2 将 p 的 next 结点赋值给 q,如上图 (6) 所示。因为此时 p 结点不为 null,且是自引用,所以 p 也就等于 q,继续执行代码 3 跳出本次 for 循环从头再来。再次进入 for 循环时,B 线程看到的队列结构就变成了如上图 (7) 所示。
本小节的最后一起来看一下 ConcurrentLinkedQueue#peek
方法的实现,相对于 ConcurrentLinkedQueue#poll
方法,该方法的区别在于仅获取队头元素,而不移除头结点。方法实现如下:
public E peek() {
restartFromHead:
for (; ; ) {
for (Node<E> h = head, p = h, q; ; ) {
E item = p.item;
if (item != null || (q = p.next) == null) { // 1
this.updateHead(h, p);
return item;
} else if (p == q) { // 2
continue restartFromHead;
} else { // 3
p = q;
}
}
}
}
上述实现初看起来似乎不太好理解,但是如果像上面一样结合图示去分析就会一目了然,这里就不再继续演示各个步骤的执行逻辑,感兴趣的读者可以自己动手画一下。
移除元素:remove
针对删除元素操作,Queue 仅声明了 Queue#remove
方法,用于删除队列头结点并返回结点元素值,区别于 Queue#poll
方法返回 null 值,当队列为空时该方法将抛出异常。ConcurrentLinkedQueue 还实现了带参数的 remove 方法(继承自 Collection 接口),该方法用于从当前队列中删除目标元素值对应的结点,如果存在多个则删除第 1 个。下面主要来看一下带参数版本的 remove 方法实现,如下:
public boolean remove(Object o) {
if (o != null) {
Node<E> next, pred = null;
// 遍历队列中的元素
for (Node<E> p = this.first(); p != null; pred = p, p = next) {
boolean removed = false;
E item = p.item;
if (item != null) {
// 如果当前遍历元素不是期望删除的元素,则继续获取后继结点
if (!o.equals(item)) {
next = this.succ(p);
continue;
}
// 当前遍历元素是期望删除的元素,基于 CAS 将该结点置为 null
removed = p.casItem(item, null);
}
/*
* 指向到这里分为两种情况:
* 1. 当前结点为 null。
* 2. 当前结点为待删除结点。
*/
// 获取当前结点的后继结点
next = this.succ(p);
// 更新前驱结点的 next 指针指向当前结点的后继结点
if (pred != null && next != null) { // unlink
pred.casNext(p, next);
}
if (removed) {
return true;
}
}
}
return false;
}
删除的过程从队列头部开始遍历,并在遇到待删除元素时基于 CAS 将对应结点元素值更新为 null,在遍历过程中会剔除掉所有元素值为 null 的结点。
其它操作:size & contains
ConcurrentLinkedQueue 提供了 ConcurrentLinkedQueue#size
方法用于获取队列的长度,该方法在实现上会从头开始对队列进行遍历,并计数元素值不为 null 的结点,并以 Integer.MAX_VALUE
作为计数值上界。需要注意的一点是不同于一般的集合,ConcurrentLinkedQueue 整个计算队列大小的过程时间复杂度为 O(n)
,并且结果是不准确的。如果期间有其它线程对队列执行增删操作,将不会在 ConcurrentLinkedQueue#size
方法的返回值中体现。
方法 ConcurrentLinkedQueue#contains
用于判断队列中是否包含参数指定的元素值,在实现上与 ConcurrentLinkedQueue#size
方法思想想通,都需要从头开始遍历队列并对元素值进行比对。
总结
本文分析了 ConcurrentLinkedQueue 的设计与实现,并运用图示梳理了队列元素添加、获取,以及删除的过程。ConcurrentLinkedQueue 底层依赖于单链表作为存储结构,并基于 CAS 对链表的结点进行修改,从而实现在不阻塞的前提下保证线程安全,避免线程阻塞和唤醒所带来的性能开销。ConcurrentLinkedQueue 的设计思路还是比较简单明了的,难点在于访问结点过程中对链表的操作,并不是特别直观,所以本文引入了大量的图示演示相关的操作过程,希望能够简化理解。
更多精彩内容,欢迎访问个人主页 https://www.zhenchao.org/
参考
- JDK 1.8 源码
15. 并发容器之 ConcurrentLinkedQueue

1.ConcurrentLinkedQueue 简介
在单线程编程中我们会经常用到一些集合类,比如 ArrayList,HashMap 等,但是这些类都不是线程安全的类。在面试中也经常会有一些考点,比如 ArrayList 不是线程安全的,Vector 是线程安全。而保障 Vector 线程安全的方式,是非常粗暴的在方法上用 synchronized 独占锁,将多线程执行变成串行化。要想将 ArrayList 变成线程安全的也可以使用 Collections.synchronizedList(List<T> list)
方法 ArrayList 转换成线程安全的,但这种转换方式依然是通过 synchronized 修饰方法实现的,很显然这不是一种高效的方式,同时,队列也是我们常用的一种数据结构,为了解决线程安全的问题,Doug Lea 大师为我们准备了 ConcurrentLinkedQueue 这个线程安全的队列。从类名就可以看的出来实现队列的数据结构是链式。
1.1 Node
要想先学习 ConcurrentLinkedQueue 自然而然得先从它的节点类看起,明白它的底层数据结构。Node 类的源码为:
private static class Node<E> {
volatile E item;
volatile Node<E> next;
.......
}
Node 节点主要包含了两个域:一个是数据域 item,另一个是 next 指针,用于指向下一个节点从而构成链式队列。并且都是用 volatile 进行修饰的,以保证内存可见性(关于 volatile 可以看这篇文章)。另外 ConcurrentLinkedQueue 含有这样两个成员变量:
private transient volatile Node<E> head;
private transient volatile Node<E> tail;
说明 ConcurrentLinkedQueue 通过持有头尾指针进行管理队列。当我们调用无参构造器时,其源码为:
public ConcurrentLinkedQueue() {
head = tail = new Node<E>(null);
}
head 和 tail 指针会指向一个 item 域为 null 的节点,此时 ConcurrentLinkedQueue 状态如下图所示:
如图,head 和 tail 指向同一个节点 Node0,该节点 item 域为 null,next 域为 null。
1.2 操作 Node 的几个 CAS 操作
在队列进行出队入队的时候免不了对节点需要进行操作,在多线程就很容易出现线程安全的问题。可以看出在处理器指令集能够支持 CMPXCHG 指令后,在 java 源码中涉及到并发处理都会使用 CAS 操作 (关于 CAS 操作可以看这篇文章的第 3.1 节),那么在 ConcurrentLinkedQueue 对 Node 的 CAS 操作有这样几个:
//更改Node中的数据域item
boolean casItem(E cmp, E val) {
return UNSAFE.compareAndSwapObject(this, itemOffset, cmp, val);
}
//更改Node中的指针域next
void lazySetNext(Node<E> val) {
UNSAFE.putOrderedObject(this, nextOffset, val);
}
//更改Node中的指针域next
boolean casNext(Node<E> cmp, Node<E> val) {
return UNSAFE.compareAndSwapObject(this, nextOffset, cmp, val);
}
可以看出这些方法实际上是通过调用 UNSAFE 实例的方法,UNSAFE 为 sun.misc.Unsafe 类,该类是 hotspot 底层方法,目前为止了解即可,知道 CAS 的操作归根结底是由该类提供就好。
2.offer 方法
对一个队列来说,插入满足 FIFO 特性,插入元素总是在队列最末尾的地方进行插入,而取(移除)元素总是从队列的队头。所有要想能够彻底弄懂 ConcurrentLinkedQueue 自然而然是从 offer 方法和 poll 方法开始。那么为了能够理解 offer 方法,采用 debug 的方式来一行一行的看代码走。另外,在看多线程的代码时,可采用这样的思维方式:
单个线程 offer 多个线程 offer 部分线程 offer,部分线程 poll ----offer 的速度快于 poll -------- 队列长度会越来越长,由于 offer 节点总是在对队列队尾,而 poll 节点总是在队列对头,也就是说 offer 线程和 poll 线程两者并无 “交集”,也就是说两类线程间并不会相互影响,这种情况站在相对速率的角度来看,也就是一个 "单线程 offer" ----offer 的速度慢于 poll --------poll 的相对速率快于 offer,也就是队头删的速度要快于队尾添加节点的速度,导致的结果就是队列长度会越来越短,而 offer 线程和 poll 线程就会出现 “交集”,即那一时刻就可以称之为 offer 线程和 poll 线程同时操作的节点为 临界点 ,且在该节点 offer 线程和 poll 线程必定相互影响。根据在临界点时 offer 和 poll 发生的相对顺序又可从两个角度去思考:1. 执行顺序为 offer-->poll-->offer,即表现为当 offer 线程在 Node1 后插入 Node2 时,此时 poll 线程已经将 Node1 删除,这种情况很显然需要在 offer 方法中考虑; 2. 执行顺序可能为:poll-->offer-->poll,即表现为当 poll 线程准备删除的节点为 null 时(队列为空队列),此时 offer 线程插入一个节点使得队列变为非空队列
先看这么一段代码:
1. ConcurrentLinkedQueue<Integer> queue = new ConcurrentLinkedQueue<>();
2. queue.offer(1);
3. queue.offer(2);
创建一个 ConcurrentLinkedQueue 实例,先 offer 1,然后再 offer 2。offer 的源码为:
public boolean offer(E e) {
1. checkNotNull(e);
2. final Node<E> newNode = new Node<E>(e);
3. for (Node<E> t = tail, p = t;;) {
4. Node<E> q = p.next;
5. if (q == null) {
6. // p is last node
7. if (p.casNext(null, newNode)) {
// Successful CAS is the linearization point
// for e to become an element of this queue,
// and for newNode to become "live".
8. if (p != t) // hop two nodes at a time
9. casTail(t, newNode); // Failure is OK.
10. return true;
}
// Lost CAS race to another thread; re-read next
}
11. else if (p == q)
// We have fallen off list. If tail is unchanged, it
// will also be off-list, in which case we need to
// jump to head, from which all live nodes are always
// reachable. Else the new tail is a better bet.
12. p = (t != (t = tail)) ? t : head;
else
// Check for tail updates after two hops.
13. p = (p != t && t != (t = tail)) ? t : q;
}
}
单线程执行角度分析:
先从单线程执行的角度看起,分析 offer 1 的过程。第 1 行代码会对是否为 null 进行判断,为 null 的话就直接抛出空指针异常,第 2 行代码将 e 包装成一个 Node 类,第 3 行为 for 循环,只有初始化条件没有循环结束条件,这很符合 CAS 的 “套路”,在循环体 CAS 操作成功会直接 return 返回,如果 CAS 操作失败的话就在 for 循环中不断重试直至成功。这里实例变量 t 被初始化为 tail,p 被初始化为 t 即 tail。为了方便下面的理解,p 被认为队列真正的尾节点,tail 不一定指向对象真正的尾节点,因为在 ConcurrentLinkedQueue 中 tail 是被延迟更新的,具体原因我们慢慢来看。代码走到第 3 行的时候,t 和 p 都分别指向初始化时创建的 item 域为 null,next 域为 null 的 Node0。第 4 行变量 q 被赋值为 null,第 5 行 if 判断为 true,在第 7 行使用 casNext 将插入的 Node 设置成当前队列尾节点 p 的 next 节点,如果 CAS 操作失败,此次循环结束在下次循环中进行重试。CAS 操作成功走到第 8 行,此时 p==t,if 判断为 false, 直接 return true 返回。如果成功插入 1 的话,此时 ConcurrentLinkedQueue 的状态如下图所示:
如图,此时队列的尾节点应该为 Node1, 而 tail 指向的节点依然还是 Node0, 因此可以说明 tail 是延迟更新的。那么我们继续来看 offer 2 的时候的情况,很显然此时第 4 行 q 指向的节点不为 null 了,而是指向 Node1, 第 5 行 if 判断为 false, 第 11 行 if 判断为 false, 代码会走到第 13 行。好了,再插入节点的时候我们会问自己这样一个问题?上面已经解释了 tail 并不是指向队列真正的尾节点,那么在插入节点的时候,我们是不是应该最开始做的就是找到队列当前的尾节点在哪里才能插入?那么第 13 行代码就是找出队列真正的尾节点。
定位队列真正的对尾节点
p = (p != t && t != (t = tail)) ? t : q;
我们来分析一下这行代码,如果这段代码在单线程环境执行时,很显然由于 p==t, 此时 p 会被赋值为 q, 而 q 等于 Node<E> q = p.next
,即 Node1。在第一次循环中指针 p 指向了队列真正的队尾节点 Node1,那么在下一次循环中第 4 行 q 指向的节点为 null,那么在第 5 行中 if 判断为 true, 那么在第 7 行依然通过 casNext 方法设置 p 节点的 next 为当前新增的 Node, 接下来走到第 8 行,这个时候 p!=t,第 8 行 if 判断为 true, 会通过 casTail(t, newNode)
将当前节点 Node 设置为队列的队尾节点,此时的队列状态示意图如下图所示:
tail 指向的节点由 Node0 改变为 Node2, 这里的 casTail 失败不需要重试的原因是,offer 代码中主要是通过 p 的 next 节点 q (Node<E> q = p.next
) 决定后面的逻辑走向的,当 casTail 失败时状态示意图如下:
如图,如果这里 casTail 设置 tail 失败即 tail 还是指向 Node0 节点的话,无非就是多循环几次通过 13 行代码定位到队尾节点。
通过对单线程执行角度进行分析,我们可以了解到 poll 的执行逻辑为:
-
如果 tail 指向的节点的下一个节点(next 域)为 null 的话,说明 tail 指向的节点即为队列真正的队尾节点,因此可以通过 casNext 插入当前待插入的节点,但此时 tail 并未变化,如图 2;
-
如果 tail 指向的节点的下一个节点(next 域)不为 null 的话,说明 tail 指向的节点不是队列的真正队尾节点。通过
q(Node<E> q = p.next)
指针往前递进去找到队尾节点,然后通过 casNext 插入当前待插入的节点,并通过 casTail 方式更改 tail,如图 3。
我们回过头再来看 p = (p != t && t != (t = tail)) ? t : q;
这行代码在单线程中,这段代码永远不会将 p 赋值为 t, 那么这么写就不会有任何作用,那我们试着在多线程的情况下进行分析。
多线程执行角度分析
多个线程 offer
很显然这么写另有深意,其实在多线程环境下这行代码很有意思的。 t != (t = tail)
这个操作并非一个原子操作,有这样一种情况:
如图,假设线程 A 此时读取了变量 t,线程 B 刚好在这个时候 offer 一个 Node 后,此时会修改 tail 指针,那么这个时候线程 A 再次执行 t=tail 时 t 会指向另外一个节点,很显然线程 A 前后两次读取的变量 t 指向的节点不相同,即 t != (t = tail)
为 true, 并且由于 t 指向节点的变化 p != t
也为 true,此时该行代码的执行结果为 p 和 t 最新的 t 指针指向了同一个节点,并且此时 t 也是队列真正的对尾节点。那么,现在已经定位到队列真正的队尾节点,就可以执行 offer 操作了。
offer->poll->offer
那么还剩下第 11 行的代码我们没有分析,大致可以猜想到应该就是回答一部分线程 offer,一部分 poll 的这种情况。当 if (p == q)
为 true 时,说明 p 指向的节点的 next 也指向它自己,这种节点称之为哨兵节点,这种节点在队列中存在的价值不大,一般表示为要删除的节点或者是空节点。为了能够很好的理解这种情况,我们先看看 poll 方法的执行过程后,再回过头来看,总之这是一个很有意思的事情 :)。
3.poll 方法
poll 方法源码如下:
public E poll() {
restartFromHead:
1. for (;;) {
2. for (Node<E> h = head, p = h, q;;) {
3. E item = p.item;
4. if (item != null && p.casItem(item, null)) {
// Successful CAS is the linearization point
// for item to be removed from this queue.
5. if (p != h) // hop two nodes at a time
6. updateHead(h, ((q = p.next) != null) ? q : p);
7. return item;
}
8. else if ((q = p.next) == null) {
9. updateHead(h, p);
10. return null;
}
11. else if (p == q)
12. continue restartFromHead;
else
13. p = q;
}
}
}
我们还是先站在单线程的角度去理清该方法的基本逻辑。假设 ConcurrentLinkedQueue 初始状态如下图所示:
参数 offer 时的定义,我们还是先将变量 p 作为队列要删除真正的队头节点,h(head)指向的节点并不一定是队列的队头节点。先来看 poll 出 Node1 时的情况,由于 p=h=head
,参照上图,很显然此时 p 指向的 Node1 的数据域不为 null, 在第 4 行代码中 item!=null
判断为 true 后接下来通过 casItem
将 Node1 的数据域设置为 null。如果 CAS 设置失败则此次循环结束等待下一次循环进行重试。若第 4 行执行成功进入到第 5 行代码,此时 p 和 h 都指向 Node1, 第 5 行 if 判断为 false, 然后直接到第 7 行 return 回 Node1 的数据域 1,方法运行结束,此时的队列状态如下图。
下面继续从队列中 poll,很显然当前 h 和 p 指向的 Node1 的数据域为 null,那么第一件事就是要定位准备删除的队头节点 (找到数据域不为 null 的节点)。
定位删除的队头节点
继续看,第三行代码 item 为 null, 第 4 行代码 if 判断为 false, 走到第 8 行代码(q = p.next
)if 也为 false,由于 q 指向了 Node2, 在第 11 行的 if 判断也为 false,因此代码走到了第 13 行,这个时候 p 和 q 共同指向了 Node2, 也就找到了要删除的真正的队头节点。可以总结出,定位待删除的队头节点的过程为:如果当前节点的数据域为 null,很显然该节点不是待删除的节点,就用当前节点的下一个节点去试探。在经过第一次循环后,此时状态图为下图:
进行下一次循环,第 4 行的操作同上述,当前假设第 4 行中 casItem 设置成功,由于 p 已经指向了 Node2, 而 h 还依旧指向 Node1, 此时第 5 行的 if 判断为 true,然后执行 updateHead(h, ((q = p.next) != null) ? q : p)
,此时 q 指向的 Node3,所有传入 updateHead 方法的分别是指向 Node1 的 h 引用和指向 Node3 的 q 引用。updateHead 方法的源码为:
final void updateHead(Node<E> h, Node<E> p) {
if (h != p && casHead(h, p))
h.lazySetNext(h);
}
该方法主要是通过 casHead
将队列的 head 指向 Node3, 并且通过 h.lazySetNext
将 Node1 的 next 域指向它自己。最后在第 7 行代码中返回 Node2 的值。此时队列的状态如下图所示:
Node1 的 next 域指向它自己,head 指向了 Node3。如果队列为空队列的话,就会执行到代码的第 8 行 (q = p.next) == null
,if 判断为 true, 因此在第 10 行中直接返回 null。以上的分析是从单线程执行的角度去看,也可以让我们了解 poll 的整体思路,现在来做一个总结:
-
如果当前 head,h 和 p 指向的节点的 Item 不为 null 的话,说明该节点即为真正的队头节点(待删除节点),只需要通过 casItem 方法将 item 域设置为 null, 然后将原来的 item 直接返回即可。
-
如果当前 head,h 和 p 指向的节点的 item 为 null 的话,则说明该节点不是真正的待删除节点,那么应该做的就是寻找 item 不为 null 的节点。通过让 q 指向 p 的下一个节点(q = p.next)进行试探,若找到则通过 updateHead 方法更新 head 指向的节点以及构造哨兵节点(
通过updateHead方法的h.lazySetNext(h)
)。
接下来,按照上面分析 offer 的思维方式,下面来分析一下多线程的情况,第一种情况是;
多线程执行情况分析:
多个线程 poll
现在回过头来看 poll 方法的源码,有这样一部分:
else if (p == q)
continue restartFromHead;
这一部分就是处理多个线程 poll 的情况,q = p.next
也就是说 q 永远指向的是 p 的下一个节点,那么什么情况下会使得 p,q 指向同一个节点呢?根据上面我们的分析,只有 p 指向的节点在 poll 的时候转变成了哨兵节点(通过 updateHead 方法中的 h.lazySetNext)。当线程 A 在判断 p==q
时,线程 B 已经将执行完 poll 方法将 p 指向的节点转换为哨兵节点并且 head 指向的节点已经发生了改变,所以就需要从 restartFromHead 处执行,保证用到的是最新的 head。
poll->offer->poll
试想,还有这样一种情况,如果当前队列为空队列,线程 A 进行 poll 操作,同时线程 B 执行 offer,然后线程 A 在执行 poll,那么此时线程 A 返回的是 null 还是线程 B 刚插入的最新的那个节点呢?我们来写一代 demo:
public static void main(String[] args) {
Thread thread1 = new Thread(() -> {
Integer value = queue.poll();
System.out.println(Thread.currentThread().getName() + " poll 的值为:" + value);
System.out.println("queue当前是否为空队列:" + queue.isEmpty());
});
thread1.start();
Thread thread2 = new Thread(() -> {
queue.offer(1);
});
thread2.start();
}
输出结果为:
Thread-0 poll 的值为:null queue 当前是否为空队列:false
通过 debug 控制线程 thread1 和线程 thread2 的执行顺序,thread1 先执行到第 8 行代码 if ((q = p.next) == null)
,由于此时队列为空队列 if 判断为 true,进入 if 块,此时先让 thread1 暂停,然后 thread2 进行 offer 插入值为 1 的节点后,thread2 执行结束。再让 thread1 执行,这时 thread1 并没有进行重试,而是代码继续往下走,返回 null,尽管此时队列由于 thread2 已经插入了值为 1 的新的节点。所以输出结果为 thread0 poll 的为 null, 然队列不为空队列。因此,在判断队列是否为空队列的时候是不能通过线程在 poll 的时候返回为 null 进行判断的,可以通过 isEmpty 方法进行判断。
4. offer 方法中部分线程 offer 部分线程 poll
在分析 offer 方法的时候我们还留下了一个问题,即对 offer 方法中第 11 行代码的理解。
offer->poll->offer
在 offer 方法的第 11 行代码 if (p == q)
,能够让 if 判断为 true 的情况为 p 指向的节点为哨兵节点,而什么时候会构造哨兵节点呢?在对 poll 方法的讨论中,我们已经找到了答案,即 ** 当 head 指向的节点的 item 域为 null 时会寻找真正的队头节点,等到待插入的节点插入之后,会更新 head,并且将原来 head 指向的节点设置为哨兵节点。** 假设队列初始状态如下图所示: 因此在线程 A 执行 offer 时,线程 B 执行 poll 就会存在如下一种情况:
如图,线程 A 的 tail 节点存在 next 节点 Node1, 因此会通过引用 q 往前寻找队列真正的队尾节点,当执行到判断 if (p == q)
时,此时线程 B 执行 poll 操作,在对线程 B 来说,head 和 p 指向 Node0, 由于 Node0 的 item 域为 null, 同样会往前递进找到队列真正的队头节点 Node1, 在线程 B 执行完 poll 之后,Node0 就会转换为哨兵节点,也就意味着队列的 head 发生了改变,此时队列状态为下图。
此时线程 A 在执行判断 if (p == q)
时就为 true, 会继续执行 p = (t != (t = tail)) ? t : head;
,由于 tail 指针没有发生改变所以 p 被赋值为 head, 重新从 head 开始完成插入操作。
5. HOPS 的设计
通过上面对 offer 和 poll 方法的分析,我们发现 tail 和 head 是延迟更新的,两者更新触发时机为:
tail 更新触发时机:当 tail 指向的节点的下一个节点不为 null 的时候,会执行定位队列真正的队尾节点的操作,找到队尾节点后完成插入之后才会通过 casTail 进行 tail 更新;当 tail 指向的节点的下一个节点为 null 的时候,只插入节点不更新 tail。
**head 更新触发时机:** 当 head 指向的节点的 item 域为 null 的时候,会执行定位队列真正的队头节点的操作,找到队头节点后完成删除之后才会通过 updateHead 进行 head 更新;当 head 指向的节点的 item 域不为 null 的时候,只删除节点不更新 head。
并且在更新操作时,源码中会有注释为:hop two nodes at a time。所以这种延迟更新的策略就被叫做 HOPS 的大概原因是这个(猜的 :)),从上面更新时的状态图可以看出,head 和 tail 的更新是 “跳着的” 即中间总是间隔了一个。那么这样设计的意图是什么呢?
如果让 tail 永远作为队列的队尾节点,实现的代码量会更少,而且逻辑更易懂。但是,这样做有一个缺点,** 如果大量的入队操作,每次都要执行 CAS 进行 tail 的更新,汇总起来对性能也会是大大的损耗。如果能减少 CAS 更新的操作,无疑可以大大提升入队的操作效率,所以 doug lea 大师每间隔 1 次(tail 和队尾节点的距离为 1)进行才利用 CAS 更新 tail。** 对 head 的更新也是同样的道理,虽然,这样设计会多出在循环中定位队尾节点,但总体来说读的操作效率要远远高于写的性能,因此,多出来的在循环中定位尾节点的操作的性能损耗相对而言是很小的。
参考资料
《java 并发编程的艺术》 《Java 高并发程序设计》 ConcurrentLinkedQueue 博文:https://www.cnblogs.com/sunshine-2015/p/6067709.html
ConcurrentLinkedQueue
ConcurrentLinkedQueue 博客分类: javaCAS操作
CAS是单词compare and set的缩写,意思是指在set之前先比较该值有没有变化,只有在没变的情况下才对其赋值。
我们常常做这样的操作
- if(a==b) {
- a++;
- }
试想一下如果在做a++之前a的值被改变了怎么办?a++还执行吗?出现该问题的原因是在多线程环境下,a的值处于一种不定的状态。采用锁可以解决此类问题,但CAS也可以解决,而且可以不加锁。
- int expect = a;
- if(a.compareAndSet(expect,a+1)) {
- doSomeThing1();
- } else {
- doSomeThing2();
- }
这样如果a的值被改变了a++就不会被执行。
按照上面的写法,a!=expect之后,a++就不会被执行,如果我们还是想执行a++操作怎么办,没关系,可以采用while循环
- while(true) {
- int expect = a;
- if (a.compareAndSet(expect, a + 1)) {
- doSomeThing1();
- return;
- } else {
- doSomeThing2();
- }
- }
采用上面的写法,在没有锁的情况下实现了a++操作,这实际上是一种非阻塞算法。
应用
java.util.concurrent.atomic包中几乎大部分类都采用了CAS操作,以AtomicInteger为例,看看它几个主要方法的实现:
- public final int getAndSet(int newValue) {
- for (;;) {
- int current = get();
- if (compareAndSet(current, newValue))
- return current;
- }
- }
getAndSet方法JDK文档中的解释是:以原子方式设置为给定值,并返回旧值。原子方式体现在何处,就体现在compareAndSet上,看看compareAndSet是如何实现的:
- public final boolean compareAndSet(int expect, int update) {
- return unsafe.compareAndSwapInt(this, valueOffset, expect, update);
- }
不出所料,它就是采用的Unsafe类的CAS操作完成的。 再来看看a++操作是如何实现的:
- public final int getAndIncrement() {
- for (;;) {
- int current = get();
- int next = current + 1;
- if (compareAndSet(current, next))
- return current;
- }
- }
几乎和最开始的实例一模一样,也是采用CAS操作来实现自增操作的。 ++a操作和a++操作类似,只不过返回结果不同罢了
- public final int incrementAndGet() {
- for (;;) {
- int current = get();
- int next = current + 1;
- if (compareAndSet(current, next))
- return next;
- }
- }
此外,java.util.concurrent.ConcurrentLinkedQueue类全是采用的非阻塞算法,里面没有使用任何锁,全是基于CAS操作实现的。CAS操作可以说是JAVA并发框架的基础,整个框架的设计都是基于CAS操作的。
缺点:
1、ABA问题
CAS操作容易导致ABA问题,也就是在做a++之间,a可能被多个线程修改过了,只不过回到了最初的值,这时CAS会认为a的值没有变。a在外面 逛了一圈回来,你能保证它没有做任何坏事,不能!!也许它讨闲,把b的值减了一下,把c的值加了一下等等,更有甚者如果a是一个对象,这个对象有可能是新 创建出来的,a是一个引用呢情况又如何,所以这里面还是存在着很多问题的,解决ABA问题的方法有很多,可以考虑增加一个修改计数,只有修改计数不变的且 a值不变的情况下才做a++,也可以考虑引入版本号,当版本号相同时才做a++操作等,这和事务原子性处理有点类似!
2、比较花费CPU资源,即使没有任何争用也会做一些无用功。
3、会增加程序测试的复杂度,稍不注意就会出现问题。
总结:
可以用CAS在无锁的情况下实现原子操作,但要明确应用场合,非常简单的操作且又不想引入锁可以考虑使用CAS操作,当想要非阻塞地完成某一操作也可以考虑CAS。不推荐在复杂操作中引入CAS,会使程序可读性变差,且难以测试,同时会出现ABA问题。
参考资料:
ConcurrentLinkedQueue vs BlockingQueue比较
纯demo代码
package com.honey;
import java.util.Arrays;
import java.util.List;
import java.util.concurrent.*;
public class ConsumerQueueDemo {
public static void main(String[] args) {
//making hot spot code
// for (int i = 0; i < 100; i++) {
// blockingVersion(false);
// lockFreeVersion(false);
// }
System.err.println("///=============///");
blockingVersion(true);
lockFreeVersion(true);
}
private static void blockingVersion(boolean print) {
List<Integer> array = g();
BlockingQueue blockingQueue = new ArrayBlockingQueue(array.size(), false, array);
long startMills = System.currentTimeMillis();
int threadNum = 10;
CountDownLatch countDownLatch = new CountDownLatch(threadNum);
ExecutorService pool = Executors.newFixedThreadPool(threadNum);
for (int i = 0; i < threadNum; i++) {
pool.execute(new BlockingConsumer(blockingQueue, countDownLatch));
}
pool.shutdown();
try {
countDownLatch.await();
} catch (InterruptedException e) {
//
}
if (print)
System.out.println("blocking version cost ms:" + (System.currentTimeMillis() - startMills));
}
private static void lockFreeVersion(boolean print) {
List<Integer> array = g();
ConcurrentLinkedQueue linkedQueue = new ConcurrentLinkedQueue(Arrays.asList(array));
long startMills = System.currentTimeMillis();
int threadNum = 10;
CountDownLatch countDownLatch = new CountDownLatch(threadNum);
ExecutorService pool = Executors.newFixedThreadPool(threadNum);
for (int i = 0; i < threadNum; i++) {
pool.execute(new LockFreeConsumer(linkedQueue, countDownLatch));
}
pool.shutdown();
try {
countDownLatch.await();
} catch (InterruptedException e) {
//
}
if (print)
System.out.println("lock free cost ms:" + (System.currentTimeMillis() - startMills));
}
static class BlockingConsumer implements Runnable {
final BlockingQueue blockingQueue;
final CountDownLatch countDownLatch;
BlockingConsumer(BlockingQueue blockingQueue, CountDownLatch countDownLatch) {
this.blockingQueue = blockingQueue;
this.countDownLatch = countDownLatch;
}
@Override
public void run() {
try {
for (; ; ) {
Object object = blockingQueue.poll();
if (object == null) {
return;
}
//doing works
Thread.sleep(20L);
}
} catch (InterruptedException e) {
//
} finally {
countDownLatch.countDown();
}
}
}
static class LockFreeConsumer implements Runnable {
final ConcurrentLinkedQueue linkedQueue;
final CountDownLatch countDownLatch;
LockFreeConsumer(ConcurrentLinkedQueue linkedQueue, CountDownLatch countDownLatch) {
this.linkedQueue = linkedQueue;
this.countDownLatch = countDownLatch;
}
@Override
public void run() {
try {
for (; ; ) {
Object object = linkedQueue.poll();
if (object == null) {
return;
}
//doing works
Thread.sleep(20L);
}
} catch (InterruptedException e) {
//
} finally {
countDownLatch.countDown();
}
}
}
static List<Integer> g() {
Integer[] array = new Integer[10_000];
Arrays.fill(array, 0);
return Arrays.asList(array);
}
}
结果
///
///
///
///=============///
blocking version cost ms:22052
lock free cost ms:22
ConcurrentLinkedQueue 介绍
在多线程编程环境下并发安全队列是不可或缺的一个重要工具类,为了实现并发安全可以有两种方式:一种是阻塞式的,例如:LinkedBlockingQueue;另一种即是我们将要探讨的非阻塞式,例如:ConcurrentLinkedQueue。相比较于阻塞式,非阻塞的最显著的优点就是性能,非阻塞式算法使用CAS来原子性的更新数据,避免了加锁的时间,同时也保证了数据的一致性。
简单介绍
ConcurrentLinkedQueue是一个基于链接节点的无界无锁线程安全队列,它采用先进先出的规则对节点进行排序,当我们添加一个元素的时候,它会添加到队列的尾部,当我们获取一个元素时,它会返回队列头部的元素。它采用了“wait-free”算法来实现,该算法在Michael & Scott算法上进行了一些修改。
ConcurrentLinkedQueue中的方法不多,其中最主要的两个方法是:offer(E)和poll(),分别实现队列的两个重要的操作:入队和出队。
方法 |
含义 |
offer(E) |
插入一个元素到队列尾部 |
poll() |
从队列头部取出一个元素 |
add(E) |
同offer(E) |
peek() |
获取头部元素,但不删除 |
isEmpty() |
判断队列是否为空 |
size() |
获取队列长度(元素个数) |
contains(Object) |
判断队列是否包含指定元素 |
remove(Object) |
删除队列中指定元素 |
toArray(T[]) |
将队列的元素复制到一个数组 |
iterator() |
返回一个可遍历该队列的迭代器 |
为什么针对ConcurrentLinkedQueue的整个入队/出队/删除都是不需要锁的。
- 如果多个线程同时访问其中任一个方法(offer/poll/remove)都是无需加锁而且线程安全的
- 由于remove方法不修改ConcurrentLinkedQueue的结构,所以跟其他两个方法都不会有冲突
- 如果同时两个线程,一个入队,一个出队,在队列不为NULL的情况下是不会有任何问题的,因为一个操作tail,一个操作head,完全不相关。但是如果队列为NULL时还是会发生冲突的,因为tail==head。
- 如果出队线程发现tail的next不为NULL,那么就会感知到当前有一个线程在执行入队操作,所以出队线程就会帮助入队线程完成入队操作,而且每个操作都是通过CAS保证原子性更新,所以就算同时两个线程,一个入队,一个出队也不会发生冲突。
综上,ConcurrentLinkedQueue最终实现了无锁队列。
使用场景
ConcurrentLinkedQueue适合在对性能要求相对较高,同时对队列的读写存在多个线程同时进行的场景,即如果对队列加锁的成本较高则适合使用无锁的ConcurrentLinkedQueue来替代。下面我们来简单对比下ConcurrentLinkedQueue与我们常用的阻塞队列LinkedBlockingQueue的性能。
表1:入队性能对比
线程数 |
ConcurrentLinkedQueue耗时(ms) |
LinkedBlockingQueue耗时(ms) |
5 |
22 |
29 |
10 |
50 |
59 |
20 |
99 |
112 |
30 |
139 |
171 |
测试数据:N个线程,每个线程入队10000个元素。
我们今天的关于深入理解 JUC:ConcurrentLinkedQueue和深入理解linux网络 豆瓣的分享就到这里,谢谢您的阅读,如果想了解更多关于15. 并发容器之 ConcurrentLinkedQueue、ConcurrentLinkedQueue、ConcurrentLinkedQueue vs BlockingQueue比较、ConcurrentLinkedQueue 介绍的相关信息,可以在本站进行搜索。
本文标签: