相信大部分人在看AQS的时候都能看到注释上有这么一段话:
为了更好的理解AQS中使用锁的思想,所以有必要好好了解下CLH锁。
原理解释 自旋锁 在了解CLH Node FIFO队列之前,首先要了解这种队列的几个要素之一:自旋锁。
所谓自旋锁 ,就是某一线程去尝试获取某个锁时,如果该锁已经被其他线程占用的话,当前线程将不断循环检查该锁是否被释放,而不是让当前线程挂起或睡眠。
自旋锁属于为了保证共享资源而提出的一种锁机制,与互斥锁类似,保证了公共资源在任意时刻最多只能由一个线程获取使用。不同的是,互斥锁在获取锁失败后线程将进入睡眠或阻塞状态。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 public class SpinLock { private static Unsafe unsafe = null ; private static final long valueOffset; private volatile int value = 0 ; static { try { unsafe = getUnsafeInstance(); valueOffset = unsafe.objectFieldOffset(SpinLock.class.getDeclaredField("value" )); } catch (Exception e) { throw new RuntimeException(e); } } @SneakyThrows private static Unsafe getUnsafeInstance () { Field theUnsafeInstance = Unsafe.class.getDeclaredField("theUnsafe" ); ReflectionUtils.makeAccessible(theUnsafeInstance); return (Unsafe) theUnsafeInstance.get(Unsafe.class); } public void lock () { System.out.println(Thread.currentThread().getName() + " spinlock try" ); for (;;) { int newV = value + 1 ; if (newV == 1 && (unsafe.compareAndSwapInt(this , valueOffset, 0 , newV))) { return ; } } } public void unlock () { unsafe.compareAndSwapInt(this , valueOffset, 1 , 0 ); } public static void main (String[] args) { final SpinLock spinLock = new SpinLock(); for (int i = 0 ; i < 10 ; i++) { new Thread(() -> { try { spinLock.lock(); System.out.println(Thread.currentThread().getName() + " spinlock success" ); Thread.sleep(500 ); } catch (InterruptedException ignored) { } finally { spinLock.unlock(); System.out.println(Thread.currentThread().getName() + " unlock" ); } }).start(); } } }
这是一个很简单的自旋锁。 对于lock方法 ,假如有若干线程竞争,能成功通过CAS操作修改value值为newV的线程即是成功获取锁的线程,将直接通过,而其他的线程则不断循环 检测value值是否又改回0。 而将value改为0的操作就是获取锁的线程执行完后对该锁进行释放,通过unlock方法 释放锁,释放后若干线程又对该锁竞争。
CLH核心思想 CLH的作者:Craig, Landin, and Hagersten。
CLH lock is Craig, Landin, and Hagersten (CLH) locks, CLH lock is a spin lock, can ensure no hunger, provide fairness first come first service. The CLH lock is a scalable, high performance, fairness and spin lock based on the list, the application thread spin only on a local variable, it constantly polling the precursor state, if it is found that the pre release lock end spin.
我们能看到它是一个自旋锁,能确保无饥饿性,提供先来先服务的公平性。同时它也是一种基于链表的可扩展、高性能、公平的自旋锁,申请线程只在本地变量上自旋,它不断轮询前驱的状态,如果发现前驱释放了锁就结束自旋。
其核心思想是:通过一定手段将所有线程对某一共享变量轮询竞争 转化为一个线程队列且队列中的线程各自轮询自己的本地变量 。
这个转化过程有两个要点:
一是构建怎样的队列、如何构建队列。 为了保证公平性,构建的是一个FIFO队列,构建的时候主要通过移动尾部节点tail实现队列的排队,每个想获取锁的线程创建一个新节点并通过CAS原子操作将新节点赋予tail,然后让当前线程轮询前一节点的某个状态位。
二是如何释放锁。 线程执行完后只需将当前线程对应的节点状态位更新为解锁状态,由于下一节点一直在轮询,下个线程便可获取到锁。
Java代码实现
实现juc的Lock接口
定义CLHLock 节点之间是通过隐形的链表相连,之所以叫隐形的链表是因为这些节点之间没有明显的next指针,而是通过myPred所指向的节点的变化情况来影响myNode的行为。 CLHLock上还有一个尾指针tail,始终指向队列的最后一个节点。
定义QNode内部类 CLH队列中的节点QNode中含有一个locked字段,该字段若为true表示该线程需要获取锁,且不释放锁,为false表示线程释放了锁。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 public class CLHLock implements Lock { static final class Node { volatile boolean locked; } private AtomicReference<Node> tail; private ThreadLocal<Node> myPrev; private ThreadLocal<Node> myNode; public CLHLock () { this .tail = new AtomicReference<>(new Node()); this .myNode = ThreadLocal.withInitial(Node::new ); this .myPrev = ThreadLocal.withInitial(() -> null ); } @Override public void lock () { System.out.println(Thread.currentThread().getName() + " try lock" ); Node node = myNode.get(); node.locked = true ; Node prev = tail.getAndSet(node); this .myPrev.set(prev); while (prev.locked) {} System.out.println(Thread.currentThread().getName() + " get lock" ); } @Override public void unlock () { Node node = myNode.get(); node.locked = false ; System.out.println(Thread.currentThread().getName() + " unlock" ); this .myNode.remove(); this .myPrev.remove(); } @Override public void lockInterruptibly () throws InterruptedException {} @Override public boolean tryLock () { return false ; } @Override public boolean tryLock (long time, TimeUnit unit) throws InterruptedException { return false ; } @Override public Condition newCondition () { return null ; } public static void main (String[] args) { final Lock lock = new CLHLock(); for (int i = 0 ; i < 10 ; i++) { new Thread(() -> { try { lock.lock(); Thread.sleep(500 ); } catch (InterruptedException ignored) { } finally { lock.unlock(); } }).start(); } } }
CLHLock的获取锁、释放锁过程
获取锁
当一个线程需要获取锁时,会创建一个新的QNode,
将其中的locked设置为true表示需要获取锁,
然后当前线程对tail域调用getAndSet方法,使自己(当前线程)成为队列的尾部,同时tail域返回旧的尾部节点作为当前线程的前驱引用myPred,
然后该线程就在前驱节点的locked字段上自旋,直到前驱节点释放锁。
释放锁 当一个线程需要释放锁时,将当前节点的locked域设置为false,同时回收前驱节点(把当前节点从链表的头部删除)。
如下图所示,线程A需要获取锁,其myNode域为true,此时tail指向线程A的节点,然后线程B也加入到线程A后面,tail指向线程B的节点。然后线程A和B都在它的myPred域上旋转,一旦它的myPred节点的locked字段变为false,它就可以获取到锁。明显线程A的myPred locked值为false,此时线程A获取到了锁。