带着BAT大厂的面试问题去理解

  • 什么是CyclicBarrier?
  • CyclicBarrier底层实现原理?
  • CountDownLatch和CyclicBarrier对比?
  • CyclicBarrier的核心函数有哪些?
  • CyclicBarrier适用于什么场景?

CyclicBarrier简介

对于CountDownLatch,其他线程为游戏玩家,比如英雄联盟,主线程为控制游戏开始的线程。在所有的玩家都准备好之前,主线程是处于等待状态的,也就是游戏不能开始。当所有的玩家准备好之后,下一步的动作实施者为主线程,即开始游戏。

对于CyclicBarrier,假设有一家公司要全体员工进行团建活动,活动内容为翻越三个障碍物,每一个人翻越障碍物所用的时间是不一样的。但是公司要求所有人在翻越当前障碍物之后再开始翻越下一个障碍物,也就是所有人翻越第一个障碍物之后,才开始翻越第二个,以此类推。类比地,每一个员工都是一个“其他线程”。当所有人都翻越的所有的障碍物之后,程序才结束。而主线程可能早就结束了,这里我们不用管主线程。

和CountDownLatch对比

  • CountDownLatch减计数,CyclicBarrier加计数。
  • CountDownLatch是一次性的,CyclicBarrier可以重用。
  • CountDownLatch和CyclicBarrier都有让多个线程等待同步然后再开始下一步动作的意思
    • CountDownLatch的下一步的动作实施者是主线程,具有不可重复性;
    • CyclicBarrier的下一步动作实施者是最后一个进入屏障的线程,具有往复多次实施动作的特点。

源码分析

类的继承关系

CyclicBarrier没有显示继承哪个父类或者实现哪个父接口,它的AQS和重入锁不是通过继承实现的,而是通过组合实现的。

1
2
3
4
5
6
7
8
9
10
public class CyclicBarrier {
/**
* 类的内部类
* CyclicBarrier类存在一个内部类Generation,每一次使用的CycBarrier可以当成Generation的实例,其源代码如下
*/
private static class Generation {
// 属性broken,用来表示当前屏障是否被损坏。
boolean broken = false;
}
}

类的属性

1
2
3
4
5
6
7
8
9
10
11
12
13
14
public class CyclicBarrier {
// 可重入锁
private final ReentrantLock lock = new ReentrantLock();
// 条件队列
private final Condition trip = lock.newCondition();
// 参与的线程数量
private final int parties;
// 由最后一个进入 barrier 的线程执行的操作
private final Runnable barrierCommand;
// 当前代
private Generation generation = new Generation();
// 正在等待进入屏障的线程数量
private int count;
}

属性有一个为ReentrantLock对象,有一个为Condition对象,而Condition对象又是基于AQS的,所以,归根到底,底层还是由AQS提供支持。
CyclicBarrier组合ReentrantLock和Condition来实现。

类的构造函数

  • CyclicBarrier(int parties, Runnable barrierAction)

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    public CyclicBarrier(int parties, Runnable barrierAction) {
    // 参与的线程数量小于等于0,抛出异常
    if (parties <= 0) throw new IllegalArgumentException();
    // 设置parties
    this.parties = parties;
    // 设置count
    this.count = parties;
    // 设置barrierCommand
    this.barrierCommand = barrierAction;
    }

    该构造函数可以指定关联该CyclicBarrier的线程数量,并且可以指定在所有线程都进入屏障后的执行动作,该执行动作由最后一个进行屏障的线程执行。

  • CyclicBarrier(int parties)

    1
    2
    3
    4
    public CyclicBarrier(int parties) {
    // 调用含有两个参数的构造函数
    this(parties, null);
    }

    该构造函数仅仅执行了关联该CyclicBarrier的线程数量,没有设置执行动作。

核心函数 dowait

此函数为CyclicBarrier类的核心函数,CyclicBarrier类对外提供的await函数在底层都是调用该了doawait函数。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
// CyclicBarrier#await
public int await() throws InterruptedException, BrokenBarrierException {
try {
return dowait(false, 0L);
} catch (TimeoutException toe) {
throw new Error(toe); // cannot happen
}
}
public int await(long timeout, TimeUnit unit)
throws InterruptedException,
BrokenBarrierException,
TimeoutException {
return dowait(true, unit.toNanos(timeout));
}

// CyclicBarrier#dowait
private int dowait(boolean timed, long nanos)
throws InterruptedException, BrokenBarrierException,
TimeoutException {
// 保存当前锁
final ReentrantLock lock = this.lock;
// 锁定
lock.lock();
try {
// 保存当前代
final Generation g = generation;

if (g.broken) // 屏障被破坏,抛出异常
throw new BrokenBarrierException();

if (Thread.interrupted()) { // 线程被中断
// 损坏当前屏障,并且唤醒所有的线程,只有拥有锁的时候才会调用
breakBarrier();
// 抛出异常
throw new InterruptedException();
}

// 减少正在等待进入屏障的线程数量
int index = --count;
if (index == 0) { // 正在等待进入屏障的线程数量为0,所有线程都已经进入
// 运行的动作标识
boolean ranAction = false;
try {
// 保存运行动作
final Runnable command = barrierCommand;
if (command != null) // 动作不为空
// 运行
command.run();
// 设置ranAction状态
ranAction = true;
// 进入下一代
nextGeneration();
return 0;
} finally {
if (!ranAction) // 没有运行的动作
// 损坏当前屏障
breakBarrier();
}
}

// loop until tripped, broken, interrupted, or timed out
// 无限循环
for (;;) {
try {
if (!timed) // 没有设置等待时间
// 等待
trip.await();
else if (nanos > 0L) // 设置了等待时间,并且等待时间大于0
// 等待指定时长
nanos = trip.awaitNanos(nanos);
} catch (InterruptedException ie) {
if (g == generation && ! g.broken) { // 等于当前代并且屏障没有被损坏
// 损坏当前屏障
breakBarrier();
// 抛出异常
throw ie;
} else { // 不等于当前带后者是屏障被损坏
// We're about to finish waiting even if we had not
// been interrupted, so this interrupt is deemed to
// "belong" to subsequent execution.
// 中断当前线程
Thread.currentThread().interrupt();
}
}

if (g.broken) // 屏障被损坏,抛出异常
throw new BrokenBarrierException();

if (g != generation) // 不等于当前代
// 返回索引
return index;

if (timed && nanos <= 0L) { // 设置了等待时间,并且等待时间小于0
// 损坏屏障
breakBarrier();
// 抛出异常
throw new TimeoutException();
}
}
} finally {
// 释放锁
lock.unlock();
}
}

核心函数 nextGeneration

此函数在所有线程进入屏障后会被调用,即生成下一个版本,所有线程又可以重新进入到屏障中。
其源代码如下:

1
2
3
4
5
6
7
8
9
10
private void nextGeneration() {
// signal completion of last generation
// 唤醒所有线程
trip.signalAll();
// set up next generation
// 恢复正在等待进入屏障的线程数量
count = parties;
// 新生一代
generation = new Generation();
}

在此函数中会调用AQS的signalAll方法,即唤醒所有等待线程。如果所有的线程都在等待此条件,则唤醒所有线程。

在此函数中会调用AQS的signalAll方法,即唤醒所有等待线程。如果所有的线程都在等待此条件,则唤醒所有线程。
其源代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
public abstract class AbstractQueuedSynchronizer
extends AbstractOwnableSynchronizer
implements java.io.Serializable {

public class ConditionObject implements Condition, java.io.Serializable {

// 此函数判断头节点是否为空,即条件队列是否为空,不为空会调用doSignalAll函数
public final void signalAll() {
if (!isHeldExclusively()) // 不被当前线程独占,抛出异常
throw new IllegalMonitorStateException();
// 保存condition队列头节点
Node first = firstWaiter;
if (first != null) // 头节点不为空
// 唤醒所有等待线程
doSignalAll(first);
}

// 此函数会依次将条件队列中的节点转移到同步队列中,会调用到transferForSignal函数
private void doSignalAll(Node first) {
lastWaiter = firstWaiter = null;
do {
Node next = first.nextWaiter;
first.nextWaiter = null;
transferForSignal(first);
first = next;
} while (first != null);
}

// 此函数的作用就是将处于条件队列中的节点转移到同步队列中,并设置结点的状态信息,其中会调用到enq函数
final boolean transferForSignal(Node node) {
/*
* If cannot change waitStatus, the node has been cancelled.
*/
if (!compareAndSetWaitStatus(node, Node.CONDITION, 0))
return false;

/*
* Splice onto queue and try to set waitStatus of predecessor to
* indicate that thread is (probably) waiting. If cancelled or
* attempt to set waitStatus fails, wake up to resync (in which
* case the waitStatus can be transiently and harmlessly wrong).
*/
Node p = enq(node);
int ws = p.waitStatus;
if (ws > 0 || !compareAndSetWaitStatus(p, ws, Node.SIGNAL))
LockSupport.unpark(node.thread);
return true;
}

// 此函数完成了结点插入同步队列的过程
private Node enq(final Node node) {
for (;;) { // 无限循环,确保结点能够成功入队列
// 保存尾结点
Node t = tail;
if (t == null) { // 尾结点为空,即还没被初始化
if (compareAndSetHead(new Node())) // 头节点为空,并设置头节点为新生成的结点
tail = head; // 头节点与尾结点都指向同一个新生结点
} else { // 尾结点不为空,即已经被初始化过
// 将node结点的prev域连接到尾结点
node.prev = t;
if (compareAndSetTail(t, node)) { // 比较结点t是否为尾结点,若是则将尾结点设置为node
// 设置尾结点的next域为node
t.next = node;
return t; // 返回尾结点
}
}
}
}
}
}

breakBarrier函数

此函数的作用是损坏当前屏障,会唤醒所有在屏障中的线程。
源代码如下:

1
2
3
4
5
6
7
8
private void breakBarrier() {
// 设置状态
generation.broken = true;
// 恢复正在等待进入屏障的线程数量
count = parties;
// 唤醒所有线程
trip.signalAll();
}

可以看到,此函数也调用了AQS的signalAll函数,由signal函数提供支持。