Fork me on GitHub

【Java多线程】JUC集合 05. LinkedBlockingQueue

LinkedBlockingQueue

1. 前言

  • 单向链表实现的FIFO阻塞队列
  • 链接队列的吞吐量通常要高于基于数组的队列,但是在大多数并发应用程序中,其可预知的性能要低
  • 默认容量大小等于Integer.MAX_VALUE ,可设置队列容量大小

2. 源码分析

2.1 数据结构

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
public class LinkedBlockingQueue<E> extends AbstractQueue<E>
implements BlockingQueue<E>, java.io.Serializable {

private final int capacity;//链表容量
/** Current number of elements */
private final AtomicInteger count = new AtomicInteger();//链表实际大小
/**
* Head of linked list.
* Invariant: head.item == null
*/
transient Node<E> head;//从head取数据
/**
* Tail of linked list.
* Invariant: last.next == null
*/
private transient Node<E> last;//从last插入数据

/** Lock held by take, poll, etc */
private final ReentrantLock takeLock = new ReentrantLock();//取出锁

/** Wait queue for waiting takes */
private final Condition notEmpty = takeLock.newCondition();//非空 条件

/** Lock held by put, offer, etc */
private final ReentrantLock putLock = new ReentrantLock();//插入锁

/** Wait queue for waiting puts */
private final Condition notFull = putLock.newCondition();//未满 条件
}

UML类图:

  • 若某线程(线程A)要取出数据时,队列正好为空,则该线程会执行notEmpty.await()进行等待;当其它线程向队列中插入了数据之后,会调用notEmpty.signal()唤醒“notEmpty上的等待线程”。此时,线程A会被唤醒从而得以继续运行。 线程A在执行取操作前,会获取takeLock,在取操作执行完毕再释放takeLock。
  • 若某线程(线程H)要插入数据时,队列已满,则该线程会它执行notFull.await()进行等待;当其它线程取出数据之后,会调用notFull.signal()唤醒“notFull上的等待线程”。此时,线程H就会被唤醒从而得以继续运行。 线程H在执行插入操作前,会获取putLock,在插入操作执行完毕才释放putLock。

2.2 核心函数

  • 构造函数
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
public LinkedBlockingQueue() {
this(Integer.MAX_VALUE);//默认大小
}

public LinkedBlockingQueue(int capacity) {
if (capacity <= 0) throw new IllegalArgumentException();
this.capacity = capacity;
last = head = new Node<E>(null);
}

public LinkedBlockingQueue(Collection<? extends E> c) {
this(Integer.MAX_VALUE);
final ReentrantLock putLock = this.putLock;
putLock.lock(); // Never contended, but necessary for visibility
try {
int n = 0;
for (E e : c) {
if (e == null)
throw new NullPointerException();
if (n == capacity)
throw new IllegalStateException("Queue full");
enqueue(new Node<E>(e));
++n;
}
count.set(n);
} finally {
putLock.unlock();
}
}
  • 添加元素:offer(E e)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
public boolean offer(E e) {
if (e == null) throw new NullPointerException();
final AtomicInteger count = this.count;
if (count.get() == capacity)//队列已满
return false;
int c = -1;
Node<E> node = new Node<E>(e);
final ReentrantLock putLock = this.putLock;
putLock.lock();//插入锁 加锁
try {
if (count.get() < capacity) {
enqueue(node);//入队
c = count.getAndIncrement();
if (c + 1 < capacity)
notFull.signal();//c+1后队列未满,则唤醒notFull上的等待线程
}
} finally {
putLock.unlock();
}
if (c == 0)
signalNotEmpty();//入队前队列为空,则唤醒notEmpty上的等待线程
return c >= 0;
}

//添加元素到队列尾部
private void enqueue(Node<E> node) {
// assert putLock.isHeldByCurrentThread();
// assert last.next == null;
last = last.next = node;
}

//唤醒notEmpty上的等待线程
private void signalNotEmpty() {
final ReentrantLock takeLock = this.takeLock;
takeLock.lock();
try {
notEmpty.signal();
} finally {
takeLock.unlock();
}
}
  • 取出元素: take()
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
public E take() throws InterruptedException {
E x;
int c = -1;
final AtomicInteger count = this.count;
final ReentrantLock takeLock = this.takeLock;
takeLock.lockInterruptibly();//获取“取出锁”,若当前线程是中断状态,则抛出InterruptedException异常
try {
while (count.get() == 0) {//队列为空
notEmpty.await();
}
x = dequeue();//出队
c = count.getAndDecrement();//取出元素后,将count-1,返回原始count
if (c > 1)
notEmpty.signal();//唤醒notEmpty上等待的线程
} finally {
takeLock.unlock();
}
if (c == capacity)//取出元素前,队列是满的,则唤醒在notFull上等待的线程
signalNotFull();
return x;
}
//删除队列头节点,并把head设置为h.next
private E dequeue() {
// assert takeLock.isHeldByCurrentThread();
// assert head.item == null;
Node<E> h = head;
Node<E> first = h.next;
h.next = h; // help GC
head = first;
E x = first.item;
first.item = null;
return x;
}

//唤醒在notFull上等待的线程
private void signalNotFull() {
final ReentrantLock putLock = this.putLock;
putLock.lock();
try {
notFull.signal();
} finally {
putLock.unlock();
}
}
  • iterator()
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
public Iterator<E> iterator() {
return new Itr();
}

private class Itr implements Iterator<E> {
/*
* Basic weakly-consistent iterator. At all times hold the next
* item to hand out so that if hasNext() reports true, we will
* still have it to return even if lost race with a take etc.
*/

private Node<E> current;
private Node<E> lastRet;
private E currentElement;

Itr() {
fullyLock();//同时获取插入锁和取出锁
try {
current = head.next;
if (current != null)
currentElement = current.item;
} finally {
fullyUnlock();
}
}

public boolean hasNext() {
return current != null;
}
public E next() {
fullyLock();
try {
if (current == null)
throw new NoSuchElementException();
E x = currentElement;
lastRet = current;
current = nextNode(current);//下一个节点
currentElement = (current == null) ? null : current.item;
return x;
} finally {
fullyUnlock();
}
}
}

3. 参考

http://www.cnblogs.com/skywang12345/p/3503458.html