Fork me on GitHub

【Java多线程】JUC锁 09. CyclicBarrier

CyclicBarrier

1. 前言

  • 同步辅助类,允许一组线程互相等待,直到到达某个公共屏障点
  • 计数器可重置后使用
  • CyclicBarrier 和 CountDownLatch的区别:
CountDownLatch CyclicBarrier
减计数方式 加计数方式
计算为0时释放所有等待的线程 计数达到指定值时释放所有等待线程
计数为0时,无法重置 计数达到指定值时,计数置为0重新开始
调用countDown()方法计数减一,调用await()方法只进行阻塞,对计数没任何影响 调用await()方法计数加1,若加1后的值不等于构造方法的值,则线程阻塞
不可重复利用 可重复利用

2. 源码解析

2.1 数据结构

CyclicBarrier 包含 ReentrantLock对象Lock和Condition对象trip,由独占锁ReentrantLock实现。

2.2 成员变量

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
private final ReentrantLock lock = new ReentrantLock();
/** Condition to wait on until tripped */
private final Condition trip = lock.newCondition();
/** The number of parties */
private final int parties; //必须同时到达barrier的线程数
/* The command to run when tripped */
private final Runnable barrierCommand; //到达屏障所执行的操作
/** The current generation */
private Generation generation = new Generation(); //内部类,同一批线程属于同一代(到达屏障后重置)
//处于等待的parties数目
private int count;

private static class Generation {
boolean broken = false;
}

2.3 核心方法

  • CyclicBarrier()构造方法
1
2
3
4
5
6
7
8
9
10
public CyclicBarrier(int parties) {
this(parties, null);
}
//有屏障操作的构造方法
public CyclicBarrier(int parties, Runnable barrierAction) {
if (parties <= 0) throw new IllegalArgumentException();
this.parties = parties;
this.count = parties;
this.barrierCommand = barrierAction;
}
  • await()
1
2
3
4
5
6
7
public int await() throws InterruptedException, BrokenBarrierException {
try {
return dowait(false, 0L);
} catch (TimeoutException toe) {
throw new Error(toe); // cannot happen
}
}

await()在dowait()中实现

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
//timed - 是否设置超时时间
//让当前线程阻塞,直到“有parties个线程到达barrier” 或 “当前线程被中断” 或 “超时”这3者之一发生,当前线程才继续执行
private int dowait(boolean timed, long nanos)
throws InterruptedException, BrokenBarrierException,
TimeoutException {
final ReentrantLock lock = this.lock;
// 获取“独占锁(lock)”
lock.lock();
try {
// 保存“当前的generation”
final Generation g = generation;

// 若“当前generation已损坏”,则抛出异常。
if (g.broken)
throw new BrokenBarrierException();

// 如果当前线程被中断,则通过breakBarrier()终止CyclicBarrier,唤醒CyclicBarrier中所有等待线程。
if (Thread.interrupted()) {
breakBarrier();
throw new InterruptedException();
}

// 将“count计数器”-1
int index = --count;
// 如果index=0,则意味着“有parties个线程到达barrier”。
if (index == 0) { // tripped
boolean ranAction = false;
try {
// 如果barrierCommand不为null,则执行该动作。
final Runnable command = barrierCommand;
if (command != null)
command.run();
ranAction = true;
// 唤醒所有等待线程,并更新generation。
nextGeneration();
return 0;
} finally {
if (!ranAction)
breakBarrier();
}
}

// 当前线程一直阻塞,直到“有parties个线程到达barrier” 或 “当前线程被中断” 或 “超时”这3者之一发生, 当前线程才继续执行。
for (;;) {
try {
// 如果不是“超时等待”,则调用awati()进行等待;否则,调用awaitNanos()进行等待。
if (!timed)
trip.await(); //放入AQS等待队列
else if (nanos > 0L)
nanos = trip.awaitNanos(nanos);
} catch (InterruptedException ie) {
// 如果等待过程中,线程被中断,则执行下面的函数。
if (g == generation && ! g.broken) {
breakBarrier();
throw ie;
} else {
Thread.currentThread().interrupt();
}
}

// 如果“当前generation已经损坏”,则抛出异常。
if (g.broken)
throw new BrokenBarrierException();

// 如果“generation已经换代”,则返回index。
if (g != generation)
return index;

// 如果是“超时等待”,并且时间已到,则通过breakBarrier()终止CyclicBarrier,唤醒CyclicBarrier中所有等待线程。
if (timed && nanos <= 0L) {
breakBarrier();
throw new TimeoutException();
}
}
} finally {
// 释放“独占锁(lock)”
lock.unlock();
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
//更新换代,唤醒所有等待的线程
private void nextGeneration() {
// signal completion of last generation
trip.signalAll();
// set up next generation
count = parties;
generation = new Generation();
}

//中断Barrier
private void breakBarrier() {
generation.broken = true; //将该Generation中断
count = parties; //重置count
trip.signalAll(); //唤醒所有等待的线程
}

dowait() 流程:

  • 如果当前线程被中断,即Thread.interrupted()为true;则通过breakBarrier()终止CyclicBarrier
  • 将count -1,判断是否有parties个线程到达Barrier;如果没到达,线程进行等待,直到“有parties个线程到达barrier” 或 “当前线程被中断” 或 “超时”这3者之一发生, 当前线程才继续执行
  • 如果所有线程都达到Barrier,有设置barrierCommand,则执行barrierCommand,然后唤醒所有等待的线程,更新generation

3. 参考

http://www.cnblogs.com/skywang12345/p/3533995.html