Semaphore底层是基于AbstractQueuedSynchronizer来实现的。 Semaphore称为计数信号量,它允许n个任务同时访问某个资源,可以将信号量看做是在向外分发使用资源的许可证,只有成功获取许可证,才能使用资源。
带着BAT大厂的面试问题去理解
什么是Semaphore?
Semaphore内部原理?
Semaphore常用方法有哪些? 如何实现线程同步和互斥的?
Semaphore适合用在什么场景?
单独使用Semaphore是不会使用到AQS的条件队列?
Semaphore中申请令牌(acquire)、释放令牌(release)的实现?
Semaphore初始化有10个令牌,11个线程同时各调用1次acquire方法,会发生什么?
Semaphore初始化有10个令牌,一个线程重复调用11次acquire方法,会发生什么?
Semaphore初始化有1个令牌,1个线程调用一次acquire方法,然后调用两次release方法,之后另外一个线程调用acquire(2)方法,此线程能够获取到足够的令牌并继续运行吗?
Semaphore初始化有2个令牌,一个线程调用1次release方法,然后一次性获取3个令牌,会获取到吗?
源码分析 类的继承关系 1 2 3 4 5 6 7 8 9 public class Semaphore implements java .io .Serializable { abstract static class Sync extends AbstractQueuedSynchronizer {} static final class NonfairSync extends Sync {} static final class FairSync extends Sync {} }
Semaphore实现了Serializable接口,即可以进行序列化。
内部类 Semaphore与ReentrantLock的内部类的结构相同,类内部总共存在Sync、NonfairSync、FairSync三个类。 NonfairSync与FairSync类继承自Sync类,Sync类继承自AbstractQueuedSynchronizer抽象类。
Sync类 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 abstract static class Sync extends AbstractQueuedSynchronizer { private static final long serialVersionUID = 1192457210091910933L ; Sync(int permits) { setState(permits); } final int getPermits () { return getState(); } final int nonfairTryAcquireShared (int acquires) { for (;;) { int available = getState(); int remaining = available - acquires; if (remaining < 0 || compareAndSetState(available, remaining)) return remaining; } } protected final boolean tryReleaseShared (int releases) { for (;;) { int current = getState(); int next = current + releases; if (next < current) throw new Error("Maximum permit count exceeded" ); if (compareAndSetState(current, next)) return true ; } } final void reducePermits (int reductions) { for (;;) { int current = getState(); int next = current - reductions; if (next > current) throw new Error("Permit count underflow" ); if (compareAndSetState(current, next)) return ; } } final int drainPermits () { for (;;) { int current = getState(); if (current == 0 || compareAndSetState(current, 0 )) return current; } } }
NonfairSync类 NonfairSync类继承了Sync类,表示采用非公平策略获取资源,其只有一个tryAcquireShared方法,重写了AQS的该方法,其源码如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 static final class NonfairSync extends Sync { private static final long serialVersionUID = -2694183684443567898L ; NonfairSync(int permits) { super (permits); } protected int tryAcquireShared (int acquires) { return nonfairTryAcquireShared(acquires); } }
从tryAcquireShared方法的源码可知,其会调用父类Sync的nonfairTryAcquireShared方法,表示按照非公平策略进行资源的获取。
FairSync类 FairSync类继承了Sync类,表示采用公平策略获取资源,其只有一个tryAcquireShared方法,重写了AQS的该方法,其源码如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 static final class FairSync extends Sync { private static final long serialVersionUID = 2014338818796000944L ; FairSync(int permits) { super (permits); } protected int tryAcquireShared (int acquires) { for (;;) { if (hasQueuedPredecessors()) return -1 ; int available = getState(); int remaining = available - acquires; if (remaining < 0 || compareAndSetState(available, remaining)) return remaining; } } }
从tryAcquireShared方法的源码可知,它使用公平策略来获取资源,它会判断同步队列中是否存在其他的等待节点。
类的属性 1 2 3 4 5 6 public class Semaphore implements java .io .Serializable { private static final long serialVersionUID = -3222578661600680210L ; private final Sync sync; }
Semaphore自身只有两个属性,最重要的是sync属性,基于Semaphore对象的操作绝大多数都转移到了对sync的操作。
类的构造函数
Semaphore(int permits)
1 2 3 public Semaphore (int permits) {sync = new NonfairSync(permits); }
该构造函数会创建具有给定的许可数和非公平设置的Semaphore。
Semaphore(int permits, boolean fair)
1 2 3 public Semaphore (int permits, boolean fair) { sync = fair ? new FairSync(permits) : new NonfairSync(permits); }
该构造函数会创建具有给定的许可数和给定的公平设置的Semaphore。
核心函数 acquire函数 方法从信号量获取一个(多个)许可,在提供一个许可前一直将线程阻塞,或者线程被中断,其源码如下
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 public void acquire () throws InterruptedException { sync.acquireSharedInterruptibly(1 ); } public boolean tryAcquire () { return sync.nonfairTryAcquireShared(1 ) >= 0 ; } public boolean tryAcquire (long timeout, TimeUnit unit) throws InterruptedException { return sync.tryAcquireSharedNanos(1 , unit.toNanos(timeout)); } public void acquire (int permits) throws InterruptedException { if (permits < 0 ) throw new IllegalArgumentException(); sync.acquireSharedInterruptibly(permits); } public void acquireUninterruptibly (int permits) { if (permits < 0 ) throw new IllegalArgumentException(); sync.acquireShared(permits); } public boolean tryAcquire (int permits) { if (permits < 0 ) throw new IllegalArgumentException(); return sync.nonfairTryAcquireShared(permits) >= 0 ; } public boolean tryAcquire (int permits, long timeout, TimeUnit unit) throws InterruptedException { if (permits < 0 ) throw new IllegalArgumentException(); return sync.tryAcquireSharedNanos(permits, unit.toNanos(timeout)); }
该方法中将会调用Sync对象的**acquireSharedInterruptibly(从AQS继承而来的方法)**方法。
假设使用非公平策略,acquire方法大致的调用链如下:
核心函数 release函数 此方法释放一个(多个)许可,将其返回给信号量,源码如下:
1 2 3 public void release () { sync.releaseShared(1 ); }
该方法中将会调用Sync对象的releaseShared(从AQS继承而来的方法)方法
深入理解 单独使用Semaphore是不会使用到AQS的条件队列的 不同于CyclicBarrier和ReentrantLock,单独使用Semaphore是不会使用到AQS的条件队列的。 其实,只有进行Condition.await
操作才会进入条件队列;其他的都是在同步队列中,只是当前线程会被park。