相信大部分人在看AQS的时候都能看到注释上有这么一段话:

1
2
3
4
5
6
/**
* Wait queue node class.
*
* <p>The wait queue is a variant of a "CLH" (Craig, Landin, and
* Hagersten) lock queue.
*/

为了更好的理解AQS中使用锁的思想,所以有必要好好了解下CLH锁。

原理解释

自旋锁

在了解CLH Node FIFO队列之前,首先要了解这种队列的几个要素之一:自旋锁。

所谓自旋锁,就是某一线程去尝试获取某个锁时,如果该锁已经被其他线程占用的话,当前线程将不断循环检查该锁是否被释放,而不是让当前线程挂起或睡眠。

自旋锁属于为了保证共享资源而提出的一种锁机制,与互斥锁类似,保证了公共资源在任意时刻最多只能由一个线程获取使用。不同的是,互斥锁在获取锁失败后线程将进入睡眠或阻塞状态。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
/**
* 自旋锁
*/
public class SpinLock {

private static Unsafe unsafe = null;
private static final long valueOffset;
private volatile int value = 0;

static {
try {
unsafe = getUnsafeInstance();
valueOffset = unsafe.objectFieldOffset(SpinLock.class.getDeclaredField("value"));
} catch (Exception e) {
throw new RuntimeException(e);
}
}

@SneakyThrows
private static Unsafe getUnsafeInstance() {
Field theUnsafeInstance = Unsafe.class.getDeclaredField("theUnsafe");
ReflectionUtils.makeAccessible(theUnsafeInstance);
return (Unsafe) theUnsafeInstance.get(Unsafe.class);
}

// 加锁
public void lock() {
System.out.println(Thread.currentThread().getName() + " spinlock try");

for (;;) {
int newV = value + 1;
if (newV == 1 && (unsafe.compareAndSwapInt(this, valueOffset, 0, newV))) {
return;
}
}
}

// 解锁
public void unlock() {
unsafe.compareAndSwapInt(this, valueOffset, 1, 0);
}


public static void main(String[] args) {
final SpinLock spinLock = new SpinLock();

for (int i = 0; i < 10; i++) {
new Thread(() -> {
try {
spinLock.lock();
System.out.println(Thread.currentThread().getName() + " spinlock success");

Thread.sleep(500);
} catch (InterruptedException ignored) {
} finally {
spinLock.unlock();
System.out.println(Thread.currentThread().getName() + " unlock");
}
}).start();
}
}
}

这是一个很简单的自旋锁。
对于lock方法,假如有若干线程竞争,能成功通过CAS操作修改value值为newV的线程即是成功获取锁的线程,将直接通过,而其他的线程则不断循环检测value值是否又改回0。
而将value改为0的操作就是获取锁的线程执行完后对该锁进行释放,通过unlock方法释放锁,释放后若干线程又对该锁竞争。

CLH核心思想

CLH的作者:Craig, Landin, and Hagersten。

CLH lock is Craig, Landin, and Hagersten (CLH) locks,
CLH lock is a spin lock, can ensure no hunger, provide fairness first come first service.
The CLH lock is a scalable, high performance, fairness and spin lock based on the list,
the application thread spin only on a local variable, it constantly polling the precursor state,
if it is found that the pre release lock end spin.

我们能看到它是一个自旋锁,能确保无饥饿性,提供先来先服务的公平性。同时它也是一种基于链表的可扩展、高性能、公平的自旋锁,申请线程只在本地变量上自旋,它不断轮询前驱的状态,如果发现前驱释放了锁就结束自旋。

其核心思想是:通过一定手段将所有线程对某一共享变量轮询竞争转化为一个线程队列且队列中的线程各自轮询自己的本地变量

这个转化过程有两个要点:

  • 一是构建怎样的队列、如何构建队列。
    为了保证公平性,构建的是一个FIFO队列,构建的时候主要通过移动尾部节点tail实现队列的排队,每个想获取锁的线程创建一个新节点并通过CAS原子操作将新节点赋予tail,然后让当前线程轮询前一节点的某个状态位。
  • 二是如何释放锁。
    线程执行完后只需将当前线程对应的节点状态位更新为解锁状态,由于下一节点一直在轮询,下个线程便可获取到锁。

Java代码实现

  • 实现juc的Lock接口
  • 定义CLHLock
    节点之间是通过隐形的链表相连,之所以叫隐形的链表是因为这些节点之间没有明显的next指针,而是通过myPred所指向的节点的变化情况来影响myNode的行为。
    CLHLock上还有一个尾指针tail,始终指向队列的最后一个节点。
  • 定义QNode内部类
    CLH队列中的节点QNode中含有一个locked字段,该字段若为true表示该线程需要获取锁,且不释放锁,为false表示线程释放了锁。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
/**
* CLH锁
*/
public class CLHLock implements Lock {

/**
* Node节点内部类
*/
static final class Node {
volatile boolean locked;
}

// CLH队列尾节点
private AtomicReference<Node> tail;
// 前驱节点
private ThreadLocal<Node> myPrev;
// 当前节点
private ThreadLocal<Node> myNode;

public CLHLock() {
// 初始化一个释放锁的节点,用作当前隐式队列的尾节点
this.tail = new AtomicReference<>(new Node());
this.myNode = ThreadLocal.withInitial(Node::new);
this.myPrev = ThreadLocal.withInitial(() -> null);
}

/**
* 加锁
*/
@Override
public void lock() {
System.out.println(Thread.currentThread().getName() + " try lock");

Node node = myNode.get();
node.locked = true;

// 将tail中缓存的node拿出来赋值给prev,将current放入tail中
Node prev = tail.getAndSet(node);
this.myPrev.set(prev);

// 自旋判断上一个节点是否释放锁,直至获取到锁
// locked == true 表示当前节点期望获取锁
// locked == false 表示当前节点已经释放锁
// prev.locked == true 表示上一个节点未释放锁
// prev.locked == false 表示上一个节点释放锁了, 也就意味着自己竞争到锁了
while (prev.locked) {}

System.out.println(Thread.currentThread().getName() + " get lock");
}

/**
* 解锁
*/
@Override
public void unlock() {
Node node = myNode.get();
node.locked = false;
System.out.println(Thread.currentThread().getName() + " unlock");
this.myNode.remove();
this.myPrev.remove();
}

@Override
public void lockInterruptibly() throws InterruptedException {}

@Override
public boolean tryLock() {
return false;
}

@Override
public boolean tryLock(long time, TimeUnit unit) throws InterruptedException {
return false;
}

@Override
public Condition newCondition() {
return null;
}

public static void main(String[] args) {
final Lock lock = new CLHLock();
for (int i = 0; i < 10; i++) {
new Thread(() -> {
try {
lock.lock();
Thread.sleep(500);
} catch (InterruptedException ignored) {
} finally {
lock.unlock();
}
}).start();
}
}
}

CLHLock的获取锁、释放锁过程

  • 获取锁
    1. 当一个线程需要获取锁时,会创建一个新的QNode,
    2. 将其中的locked设置为true表示需要获取锁,
    3. 然后当前线程对tail域调用getAndSet方法,使自己(当前线程)成为队列的尾部,同时tail域返回旧的尾部节点作为当前线程的前驱引用myPred,
    4. 然后该线程就在前驱节点的locked字段上自旋,直到前驱节点释放锁。
  • 释放锁
    当一个线程需要释放锁时,将当前节点的locked域设置为false,同时回收前驱节点(把当前节点从链表的头部删除)。

如下图所示,线程A需要获取锁,其myNode域为true,此时tail指向线程A的节点,然后线程B也加入到线程A后面,tail指向线程B的节点。然后线程A和B都在它的myPred域上旋转,一旦它的myPred节点的locked字段变为false,它就可以获取到锁。明显线程A的myPred locked值为false,此时线程A获取到了锁。