Semaphore底层是基于AbstractQueuedSynchronizer来实现的。
Semaphore称为计数信号量,它允许n个任务同时访问某个资源,可以将信号量看做是在向外分发使用资源的许可证,只有成功获取许可证,才能使用资源。

带着BAT大厂的面试问题去理解

  • 什么是Semaphore?
  • Semaphore内部原理?
  • Semaphore常用方法有哪些? 如何实现线程同步和互斥的?
  • Semaphore适合用在什么场景?
  • 单独使用Semaphore是不会使用到AQS的条件队列?
  • Semaphore中申请令牌(acquire)、释放令牌(release)的实现?
  • Semaphore初始化有10个令牌,11个线程同时各调用1次acquire方法,会发生什么?
  • Semaphore初始化有10个令牌,一个线程重复调用11次acquire方法,会发生什么?
  • Semaphore初始化有1个令牌,1个线程调用一次acquire方法,然后调用两次release方法,之后另外一个线程调用acquire(2)方法,此线程能够获取到足够的令牌并继续运行吗?
  • Semaphore初始化有2个令牌,一个线程调用1次release方法,然后一次性获取3个令牌,会获取到吗?

源码分析

类的继承关系

1
2
3
4
5
6
7
8
9
public class Semaphore implements java.io.Serializable {

abstract static class Sync extends AbstractQueuedSynchronizer {}

// 非公平锁
static final class NonfairSync extends Sync {}
// 公平锁
static final class FairSync extends Sync {}
}

Semaphore实现了Serializable接口,即可以进行序列化。

内部类

Semaphore与ReentrantLock的内部类的结构相同,类内部总共存在Sync、NonfairSync、FairSync三个类。
NonfairSync与FairSync类继承自Sync类,Sync类继承自AbstractQueuedSynchronizer抽象类。

Sync类

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
// 内部类,继承自AQS
abstract static class Sync extends AbstractQueuedSynchronizer {
// 版本号
private static final long serialVersionUID = 1192457210091910933L;

// 构造函数
Sync(int permits) {
// 设置状态数
setState(permits);
}

// 获取许可
final int getPermits() {
return getState();
}

// 共享模式下非公平策略获取
final int nonfairTryAcquireShared(int acquires) {
for (;;) { // 无限循环
// 获取许可数
int available = getState();
// 剩余的许可
int remaining = available - acquires;
if (remaining < 0 ||
compareAndSetState(available, remaining)) // 许可小于0或者比较并且设置状态成功
return remaining;
}
}

// 共享模式下进行释放
protected final boolean tryReleaseShared(int releases) {
for (;;) { // 无限循环
// 获取许可
int current = getState();
// 可用的许可
int next = current + releases;
if (next < current) // overflow
throw new Error("Maximum permit count exceeded");
if (compareAndSetState(current, next)) // 比较并进行设置成功
return true;
}
}

// 根据指定的缩减量减小可用许可的数目
final void reducePermits(int reductions) {
for (;;) { // 无限循环
// 获取许可
int current = getState();
// 可用的许可
int next = current - reductions;
if (next > current) // underflow
throw new Error("Permit count underflow");
if (compareAndSetState(current, next)) // 比较并进行设置成功
return;
}
}

// 获取并返回立即可用的所有许可
final int drainPermits() {
for (;;) { // 无限循环
// 获取许可
int current = getState();
if (current == 0 || compareAndSetState(current, 0)) // 许可为0或者比较并设置成功
return current;
}
}
}

NonfairSync类

NonfairSync类继承了Sync类,表示采用非公平策略获取资源,其只有一个tryAcquireShared方法,重写了AQS的该方法,其源码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
static final class NonfairSync extends Sync {
// 版本号
private static final long serialVersionUID = -2694183684443567898L;

// 构造函数
NonfairSync(int permits) {
super(permits);
}
// 共享模式下获取
protected int tryAcquireShared(int acquires) {
return nonfairTryAcquireShared(acquires);
}
}

从tryAcquireShared方法的源码可知,其会调用父类Sync的nonfairTryAcquireShared方法,表示按照非公平策略进行资源的获取。

FairSync类

FairSync类继承了Sync类,表示采用公平策略获取资源,其只有一个tryAcquireShared方法,重写了AQS的该方法,其源码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
static final class FairSync extends Sync {
private static final long serialVersionUID = 2014338818796000944L;

FairSync(int permits) {
super(permits);
}

protected int tryAcquireShared(int acquires) {
for (;;) {
if (hasQueuedPredecessors())
return -1;
int available = getState();
int remaining = available - acquires;
if (remaining < 0 ||
compareAndSetState(available, remaining))
return remaining;
}
}
}

从tryAcquireShared方法的源码可知,它使用公平策略来获取资源,它会判断同步队列中是否存在其他的等待节点。

类的属性

1
2
3
4
5
6
public class Semaphore implements java.io.Serializable {
// 版本号
private static final long serialVersionUID = -3222578661600680210L;
// 属性
private final Sync sync;
}

Semaphore自身只有两个属性,最重要的是sync属性,基于Semaphore对象的操作绝大多数都转移到了对sync的操作。

类的构造函数

  • Semaphore(int permits)

    1
    2
    3
    public Semaphore(int permits) {
    sync = new NonfairSync(permits);
    }

    该构造函数会创建具有给定的许可数和非公平设置的Semaphore。

  • Semaphore(int permits, boolean fair)

    1
    2
    3
    public Semaphore(int permits, boolean fair) {
    sync = fair ? new FairSync(permits) : new NonfairSync(permits);
    }

    该构造函数会创建具有给定的许可数和给定的公平设置的Semaphore。

核心函数 acquire函数

方法从信号量获取一个(多个)许可,在提供一个许可前一直将线程阻塞,或者线程被中断,其源码如下

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
public void acquire() throws InterruptedException {
sync.acquireSharedInterruptibly(1);
}

public boolean tryAcquire() {
return sync.nonfairTryAcquireShared(1) >= 0;
}

public boolean tryAcquire(long timeout, TimeUnit unit)
throws InterruptedException {
return sync.tryAcquireSharedNanos(1, unit.toNanos(timeout));
}

public void acquire(int permits) throws InterruptedException {
if (permits < 0) throw new IllegalArgumentException();
sync.acquireSharedInterruptibly(permits);
}

public void acquireUninterruptibly(int permits) {
if (permits < 0) throw new IllegalArgumentException();
sync.acquireShared(permits);
}

public boolean tryAcquire(int permits) {
if (permits < 0) throw new IllegalArgumentException();
return sync.nonfairTryAcquireShared(permits) >= 0;
}

public boolean tryAcquire(int permits, long timeout, TimeUnit unit)
throws InterruptedException {
if (permits < 0) throw new IllegalArgumentException();
return sync.tryAcquireSharedNanos(permits, unit.toNanos(timeout));
}

该方法中将会调用Sync对象的**acquireSharedInterruptibly(从AQS继承而来的方法)**方法。

假设使用非公平策略,acquire方法大致的调用链如下:

核心函数 release函数

此方法释放一个(多个)许可,将其返回给信号量,源码如下:

1
2
3
public void release() {
sync.releaseShared(1);
}

该方法中将会调用Sync对象的releaseShared(从AQS继承而来的方法)方法

深入理解

单独使用Semaphore是不会使用到AQS的条件队列的

不同于CyclicBarrier和ReentrantLock,单独使用Semaphore是不会使用到AQS的条件队列的。
其实,只有进行Condition.await操作才会进入条件队列;其他的都是在同步队列中,只是当前线程会被park。