ConcurrentLinkedQueue详解

如果要实现一个线程安全的队列有两种方式:一种是使用阻塞算法,即队列用一个锁(入队和出队用同一把锁)或两个锁(入队和出队分别用一把锁)来实现;另一种是使用非阻塞算法,即使用循环CAS的方式实现。而ConcurrentLinkedQueue是使用非阻塞的方式来实现的基于链表的线程安全队列,采用先进先出(FIFO)规则

Node

要学习ConcurrentLinkedQueue就先从它的节点类看起,Node的源码为:

1
2
3
4
5
private static class Node<E> {
volatile E item;
volatile Node<E> next;
.......
}

Node节点主要包含了两个域:一个是数据域item,另一个是next指针,用于指向下一个节点从而构成链式队列。并且都是用volatile进行修饰的,以保证内存可见性。

另外ConcurrentLinkedQueue含有这样两个成员变量:

1
2
private transient volatile Node<E> head;
private transient volatile Node<E> tail;

默认情况下,head节点等于tail节点等于null,即数据域和next域都为空:

1
2
3
public ConcurrentLinkedQueue() {
head = tail = new Node<E>(null);
}

几个CAS操作

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31

/**
*获取p节点的下一个节点
*/
final Node<E> succ(Node<E> p){
Node<E> next = p.getNext();
// 如果p节点和next节点相等时,表示队列刚初始化,此时下一个节点就是head节点
return (p == next) ? head : next;
}

/**
*更改Node中的数据域item
*/
boolean casItem(E cmp, E val) {
return UNSAFE.compareAndSwapObject(this, itemOffset, cmp, val);
}

/**
* casNext(null, n)
* 将入队节点设置为当前队列尾结点的next节点
*/
boolean casNext(Node<E> cmp, Node<E> val) {
return UNSAFE.compareAndSwapObject(this, nextOffset, cmp, val);
}

/**
*更新tail节点,允许失败
*/
boolean casTail(Node<E> cmp, Node<E> val){

}

入队列

将入队节点添加到队列的尾部

添加4个节点的快照图:
image.png

单线程入队过程

  • 第一是将入队节点设置成当前队列尾结点的下一个节点,这里的尾结点不一定是tail节点,可能是tail节点的next节点
  • 第二是更新tail节点,如果tail节点的next节点不为空,则将入队节点设置成tail节点;如果tail节点的next节点为空,则将入队节点设置成tail节点的next节点,即tail指针不动

入队操作

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
public boolean offer(E e) {
if (e == null) throw new NullPointerException();
// 入队前,创建一个入队节点
Node<E> n = new Node<E>(e);
retry:
// 死循环,入队不成功反复入队。
for (; ; ) {
// 创建一个指向tail节点的引用
Node<E> t = tail;
// p用来表示队列的尾节点,默认情况下等于tail节点。
Node<E> p = t;
for (int hops = 0; ; hops++) {
// 获得p节点的下一个节点。
Node<E> next = succ(p);
//next节点不为空,说明p不是尾节点,需要更新p后在将它指向next节点
if (next != null) {
// 循环了两次及其以上,并且当前节点还是不等于尾节点
if (hops > HOPS && t != tail)
continue retry;
p = next;
}
// 如果p是尾节点,则设置p节点的next节点为入队节点。
else if (p.casNext(null, n)) {
//如果tail节点有大于等于1个next节点,则将入队节点设置成tail节点,
//更新失败了也没关系,因为失败了表示有其他线程成功更新了tail节点
if (hops >= HOPS)
// 更新tail节点,允许失败
casTail(t, n);
return true;
}
// p有next节点,表示p的next节点是尾节点,则重新设置p节点
else {
p = succ(p);
}
}
}
}

HOPS的设计意图

如果让tail节点永远作为队列的尾结点,则每次都需要使用循环CAS更新tail节点,这样效率不高;因此使用hops变量来控制并减少tail节点的更新频率,当tail节点和尾结点的距离大于等于HOPS的值(默认为1)时,才更新tail节点,tail和尾结点的距离越长,使用CAS更新tail节点的次数就越少,但距离越长则每次入队时定位尾结点的时间越长,但这样仍能提高入队效率,因为通过增加对volatile变量的读操作来减少volatile变量的写操作,其写操作开销远远大于读操作的开销。

出队列

出队列的快照图:
image.png

从图可知:
并不是每次出队时都更新head节点,当head节点里有元素时,直接弹出head节点里面的元素,而不会更新head节点;只有当head节点里没有元素时,出队操作才会更新head节点。其目的也是通过hops变量来减少使用CAS更新head节点的频率。

出队操作

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
public E poll() {
Node<E> h = head;
// p表示头节点,需要出队的节点
Node<E> p = h;
for (int hops = 0; ; hops++) {
// 获取p节点的元素
E item = p.getItem();
// 如果p节点的元素不为空,使用CAS设置p节点引用的元素为null,
// 如果成功则返回p节点的元素。
if (item != null && p.casItem(item, null)) {
if (hops >= HOPS) {
// 将p节点下一个节点设置成head节点
Node<E> q = p.getNext();
updateHead(h, (q != null)q :p);
}
return item;
}
// 如果头节点的元素为空或头节点发生了变化,这说明头节点已经被另外
// 一个线程修改了。那么获取p节点的下一个节点
Node<E> next = succ(p);
// 如果p的下一个节点也为空,说明这个队列已经空了
if (next == null) {
// 更新头节点。
updateHead(h, p);
break;
}
// 如果下一个元素不为空,则将头节点的下一个节点设置成头节点
p = next;
}
return null;
}
  • 首先获取头结点的元素,然后判断头结点元素是否为空;
  • 如果为空,表示另外一个线程已经进行了一次出队操作将该节点的元素取走;
  • 如果不为空,则使用CAS将头结点的引用设置为null;
  • 如果CAS成功,则直接返回头结点的元素;
  • 如果不成功,表示另外一个线程已经进行了一次出队操作更新了head节点,导致元素发生变化,需要重新获取头结点。

参考来源:

-------- ♥感谢阅读♥ --------