Fork me on GitHub

【Java多线程】JUC集合 08. PriorityBlockingQueue

PriorityBlockingQueue

1. 前言

  • 线程安全的无界优先级队列

  • 基于数组实现的二叉堆,原理和结构根PriorityQueue基本一致

2. 源码分析

2.1 数据结构

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
public class PriorityBlockingQueue<E> extends AbstractQueue<E>
implements BlockingQueue<E>, java.io.Serializable {
//默认数组容量大小
private static final int DEFAULT_INITIAL_CAPACITY = 11;

//数组最大容量
private static final int MAX_ARRAY_SIZE = Integer.MAX_VALUE - 8;

//优先级队列数组,queue[n]的2个左右子元素为queue[2*n+1]和queue[2*(n+1)]
private transient Object[] queue;

//队列元素个数
private transient int size;

//比较器,构造时可以选择传入,若没有则使用元素的自然排序
private transient Comparator<? super E> comparator;

//重入锁
private final ReentrantLock lock;

//队列为空的时候条件队列
private final Condition notEmpty;

//自旋锁
private transient volatile int allocationSpinLock;

//序列化的时候使用PriorityQueue
private PriorityQueue q;
}

UML类图如下:

2.2 核心函数

  • 构造函数
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
/**
* 默认构造,使用默认容量,没有比较器
*/
public PriorityBlockingQueue() {
this(DEFAULT_INITIAL_CAPACITY, null);
}

public PriorityBlockingQueue(int initialCapacity) {
this(initialCapacity, null);
}

/**
* 最终调用的构造
*/
public PriorityBlockingQueue(int initialCapacity,
Comparator<? super E> comparator) {
if (initialCapacity < 1)
throw new IllegalArgumentException();
this.lock = new ReentrantLock();
this.notEmpty = lock.newCondition();
this.comparator = comparator;
this.queue = new Object[initialCapacity];
}
  • 添加元素: offer(E e)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
public boolean offer(E e) {
if (e == null)
throw new NullPointerException();
final ReentrantLock lock = this.lock;
lock.lock();//获取锁
int n, cap;
Object[] array;
while ((n = size) >= (cap = (array = queue).length))
tryGrow(array, cap);//如果元素数量大于数组大小,则进行扩容
try {
Comparator<? super E> cmp = comparator;
if (cmp == null)
siftUpComparable(n, e, array);
else
siftUpUsingComparator(n, e, array, cmp);//自底向上调整堆
size = n + 1;
notEmpty.signal();//唤醒在notEmpty等待的线程
} finally {
lock.unlock();//释放锁
}
return true;
}

//扩容
private void tryGrow(Object[] array, int oldCap) {
//数组扩容的时候使用自旋锁,不需要锁主锁,先释放
lock.unlock(); // must release and then re-acquire main lock
Object[] newArray = null;
if (allocationSpinLock == 0 &&
UNSAFE.compareAndSwapInt(this, allocationSpinLockOffset,
0, 1)) {
try {
int newCap = oldCap + ((oldCap < 64) ?
(oldCap + 2) : // grow faster if small 双倍扩容
(oldCap >> 1));// 扩容1.5倍
if (newCap - MAX_ARRAY_SIZE > 0) { // possible overflow
int minCap = oldCap + 1;
if (minCap < 0 || minCap > MAX_ARRAY_SIZE)
throw new OutOfMemoryError();
newCap = MAX_ARRAY_SIZE;
}
if (newCap > oldCap && queue == array)
newArray = new Object[newCap];
} finally {
allocationSpinLock = 0;//扩容后释放自旋锁
}
}
if (newArray == null) // back off if another thread is allocating
Thread.yield();
lock.lock();
if (newArray != null && queue == array) {
queue = newArray;
System.arraycopy(array, 0, newArray, 0, oldCap);//复制数组元素
}
}

//使用比较器,自底向上调整堆
private static <T> void siftUpUsingComparator(int k, T x, Object[] array,
Comparator<? super T> cmp) {
while (k > 0) {
int parent = (k - 1) >>> 1;
Object e = array[parent];
if (cmp.compare(x, (T) e) >= 0)//x比父节点“大”则中止循环
break;
array[k] = e;//向上调整父节点
k = parent;
}
array[k] = x;
}

流程如下:

  1. 加锁,检查是否需要扩容,扩容先释放主锁,使用cas自旋锁,容量最少翻倍,释放自旋锁;可能存在竞争,检查是否扩容,如果扩容则复制数组,再度加主锁;
  2. 看构造入参是否有comparator,没有就使用自然排序;从数组待插入位置和父节点进行比较,如果大于父节点,那就直接待插入位置插入,否则就跟父节点交换,然后循环向上查找;队列数量加1,唤醒非空条件队列上的线程,最后释放锁。
  • 取出元素: take()
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
public E take() throws InterruptedException {
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();
E result;
try {
while ( (result = dequeue()) == null)
notEmpty.await();//数组没有元素则阻塞
} finally {
lock.unlock();
}
return result;
}


private E dequeue() {
int n = size - 1;
if (n < 0)
return null;
else {
Object[] array = queue;
E result = (E) array[0];//取出数组头节点
E x = (E) array[n];
array[n] = null;//清掉最后一个元素
Comparator<? super E> cmp = comparator;
if (cmp == null)//将数组尾节点自顶向下调整
siftDownComparable(0, x, array, n);
else
siftDownUsingComparator(0, x, array, n, cmp);
size = n;
return result;
}
}
//自顶向下调整
private static <T> void siftDownUsingComparator(int k, T x, Object[] array,
int n,
Comparator<? super T> cmp) {
if (n > 0) {
int half = n >>> 1; //最后一个叶子节点的父节点位置
while (k < half) {
int child = (k << 1) + 1;//左节点
Object c = array[child];
int right = child + 1;//右节点
if (right < n && cmp.compare((T) c, (T) array[right]) > 0)
c = array[child = right];//取左右节点间较小的
if (cmp.compare(x, (T) c) <= 0)//父节点和左右节点较小值比较
break;
array[k] = c;
k = child;
}
array[k] = x;
}
}

流程如下:

  1. 加锁,获取queue[0],清掉堆的最后一个叶子节点,并将其作为比较节点
  2. 调用从顶向下调整的方法:待调整位置节点左右子节点和之前的叶子节点比较,如果之前叶子节点最小,那就直接放入待调整位置;如果是子节点小,那就取小的那个放入待调整位置,并从子节点位置重新循环查找,循环次数根据2分查找,基本是元素数量的一半就到找到位置
  • 删除元素:remove(Object o)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
public boolean remove(Object o) {
final ReentrantLock lock = this.lock;
lock.lock();
try {
int i = indexOf(o);//查找o对于的位置
if (i == -1)//查找不到该元素
return false;
removeAt(i);//删除i处元素
return true;
} finally {
lock.unlock();
}
}

private int indexOf(Object o) {
if (o != null) {
Object[] array = queue;
int n = size;
for (int i = 0; i < n; i++)//数组遍历
if (o.equals(array[i]))
return i;
}
return -1;
}

private void removeAt(int i) {
Object[] array = queue;
int n = size - 1;
if (n == i) // removed last element
array[i] = null;//直接删除数组最后元素
else {
E moved = (E) array[n];
array[n] = null;//调整最后一个元素
Comparator<? super E> cmp = comparator;
if (cmp == null)
siftDownComparable(i, moved, array, n);//从删除的位置处自顶向下调整元素
else
siftDownUsingComparator(i, moved, array, n, cmp);
//经过从上向下调整后,如果是直接将比较节点放在待调整位置,那只能说明这个节点在以它为堆顶的堆里面最小,但不能说明从这个节点就向上查找就最大,所以这里需要自底向上再来一次调整
if (array[i] == moved) {
if (cmp == null)
siftUpComparable(i, moved, array);
else
siftUpUsingComparator(i, moved, array, cmp);
}
}
size = n;
}

3. 参考

https://blog.csdn.net/xiaoxufox/article/details/51860543