QA

  • 为什么会有ConcurrentHashMap
    HashMap线程不安全,HashTable线程安全但效率低下。
  • 为什么HashMap线程不安全
    1.7中transfer()链表使用头插法,多线程情况下,会成环;
    1.8中putVal(),多线程操作,值会出现覆盖情况。
  • 为什么HashTable效率低下
    HashTable之所以效率低下主要是因为它的实现使用了synchronized关键字对put等方法进行加锁,锁的是HashTable实例对象。
  • ConcurrentHashMap在JDK1.7和JDK1.8中实现有什么差别? JDK1.8解決了JDK1.7中什么问题
  • ConcurrentHashMap JDK1.7实现的原理是什么? 分段锁机制
  • ConcurrentHashMap JDK1.8实现的原理是什么? 数组+链表+红黑树,CAS
  • ConcurrentHashMap JDK1.7中Segment数(concurrencyLevel)默认值是多少? 为何一旦初始化就不可再扩容?
  • ConcurrentHashMap JDK1.7说说其put的机制?
  • ConcurrentHashMap JDK1.7是如何扩容的? rehash(注:segment 数组不能扩容,扩容是 segment 数组某个位置内部的数组 HashEntry<K,V>[] 进行扩容)
  • ConcurrentHashMap JDK1.8是如何扩容的? tryPresize
  • ConcurrentHashMap JDK1.8链表转红黑树的时机是什么? 临界值为什么是8?
  • ConcurrentHashMap JDK1.8是如何进行数据迁移的? transfe

ConcurrentHashMap加锁原理

JDK1.8

1.8只有1个table(Map.Entry数组),同步机制为CAS + synchronized保证并发更新。
锁的粒度更细,减少了冲突,提高了并发度。

1.8放弃了Segment,直接用Node数组+链表+红黑树的数据结构来实现,并发控制使用Synchronized + CAS来实现更细粒度的锁保护,整个看起来就像是优化过且线程安全的HashMap。
这里的更细粒度的锁是指锁table的首个Node节点。
在添加数据putVal的时候,根据 key 的 hash 值 定位到 Node节点,如果Node节点为空的话,则会使用CAS去赋值;不为空并且发生hash冲突则进入锁代码块,用 synchronized 去锁住首节点,执行插入链表或红黑树或转红黑树操作。

取消segments字段,采用table数组元素作为锁(使用synchronized锁住),从而实现了对每一行数据进行加锁,进一步减少并发冲突的概率。
可以说将segment和数组合二为一

JDK1.7

1.7是使用segements(16个segement),每个segement都有一个table(Map.Entry数组),相当于16个HashMap,同步机制为分段锁,每个segment继承ReentrantLock。
默认并发是16,一旦初始化,Segment 数组大小就固定,后面不能扩容。

ConcurrentHashMap 是由Segment 数组结构 + HashEntry 数组结构 + 链表组成。
Segment 是一种可重入锁 ReentrantLock(Segment 继承了ReentrantLock),在 ConcurrentHashMap 里扮演锁的角色。
HashEntry 则用于存储键值对数据。

put操作时,先获取锁 tryLock(),根据 key 的 hash 值 定位到 Segment ,再根据 key 的 hash 值 找到具体的 HashEntry ,再进行插入或覆盖,最后释放锁。

Segment数组的意义就是将一个大的table分割成多个小的table来进行加锁,也就是上面的提到的锁分离技术,而每一个Segment元素存储的是HashEntry数组+链表,这个和HashMap的数据存储结构一样。

ConcurrentHashMap - JDK 1.8

在JDK1.7之前,ConcurrentHashMap是通过分段锁机制来实现的,所以其最大并发度受Segment的个数限制。
因此,在JDK1.8中,ConcurrentHashMap的实现原理摒弃了这种设计,而是选择了与HashMap类似的数组+链表+红黑树的方式实现,而加锁则采用CAS和synchronized实现

数据结构

结构上和Java 8的HashMap基本上一样,不过它要保证线程安全,所以在源码上要复杂一些。

默认值

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
// 散列表最大容量
private static final int MAXIMUM_CAPACITY = 1 << 30;
// 散列表默认容量
private static final int DEFAULT_CAPACITY = 16;
// 最大数组长度
static final int MAX_ARRAY_SIZE = Integer.MAX_VALUE - 8;
// 默认并发级别 jdk1.7 之前遗留的 1.8只用于初始化
private static final int DEFAULT_CONCURRENCY_LEVEL = 16;
// 负载因子
private static final float LOAD_FACTOR = 0.75f;
// 链表树化条件
static final int TREEIFY_THRESHOLD = 8;
// 取消树化条件
static final int UNTREEIFY_THRESHOLD = 6;
// 结点树化条件
static final int MIN_TREEIFY_CAPACITY = 64;
// 线程迁移数据最小步长 控制线程迁移任务最小区间的一个值
private static final int MIN_TRANSFER_STRIDE = 16;
// 扩容用 计算扩容生成一个标识戳
private static final int RESIZE_STAMP_BITS = 16;
// 65535 标识并发扩容最大线程数量
private static final int MAX_RESIZERS = (1 << (32 - RESIZE_STAMP_BITS)) - 1;
// 扩容相关
private static final int RESIZE_STAMP_SHIFT = 32 - RESIZE_STAMP_BITS;

// node 结点的hash 是-1 表示 当前结点是forwardingNode结点
static final int MOVED = -1; // hash for forwarding nodes
// 红黑树的代理结点
static final int TREEBIN = -2; // hash for roots of trees
// 临时保留的散列表
static final int RESERVED = -3; // hash for transient reservations
// 0x7fffffff = 31个1 用于将一个负数变成一个正数 但是不是取绝对值
static final int HASH_BITS = 0x7fffffff; // usable bits of normal node hash

// 系统CPu数量
static final int NCPU = Runtime.getRuntime().availableProcessors();
// 散列表
transient volatile Node<K,V>[] table;
// 扩容用的临时散列表
private transient volatile Node<K,V>[] nextTable;
// LongAdder 的baseCount
private transient volatile long baseCount;

/**
sizeCtl <0
1. -1 的时候 表示table正在初始化(有线程正在初始化 , 当前线程应该自旋等待)
2. 其他情况 表示当前map正在进行扩容 高16位表示 扩容的标识戳 , 低16位表示 扩容线程数量
sizeCtl = 0
表示创建数组 使用默认容量 16
sizeCtl >0
1. 如果table 未初始化 表示 初始化大小
2. 如果table 已经初始化 表示下次扩容的阈值
*/
private transient volatile int sizeCtl;

//扩容过程中,记录当前进度,所有线程都需要从transferIndex中分配区间任务,去执行自己的任务
private transient volatile int transferIndex;
// 0 表示 无锁 1 表示加锁
private transient volatile int cellsBusy;
// LongAdder 中的cells 数组 当baseCount发生竞争后 会创建cells 数组
// 线程会通过计算hash值 取到自己的cell中
private transient volatile CounterCell[] counterCells;

初始化

1
2
3
4
5
6
7
8
9
10
11
12
// 这构造函数里,什么都不干
public ConcurrentHashMap() {
}
public ConcurrentHashMap(int initialCapacity) {
if (initialCapacity < 0)
throw new IllegalArgumentException();
int cap = ((initialCapacity >= (MAXIMUM_CAPACITY >>> 1)) ?
MAXIMUM_CAPACITY :
tableSizeFor(initialCapacity + (initialCapacity >>> 1) + 1));
// >>> 1 无符号右移1位,相当于除2
this.sizeCtl = cap;
}

这个初始化方法有点意思,通过提供初始容量,计算了 sizeCtl,sizeCtl = 【 (1.5 * initialCapacity + 1),然后向上取最近的 2 的 n 次方】。
如 initialCapacity 为 10,那么得到 sizeCtl 为 16,如果 initialCapacity 为 11,得到 sizeCtl 为 32。

putVal put过程分析

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
public V put(K key, V value) {
return putVal(key, value, false);
}

// onlyIfAbsent --> ture 表示如果遇到相同的key 进行不进行置换;false 表示置换
final V putVal(K key, V value, boolean onlyIfAbsent) {
//无论key还是value,不允许空
if (key == null || value == null) throw new NullPointerException();
//此处获取hash值的方法与HashTable类似,通过扰动函数 计算出hash 高16位也参与运算
int hash = spread(key.hashCode());
// binCount 表示当前k-v封装成node后插入到指定桶位后,在桶位中所属链表的下标位置
int binCount = 0;
// 循环
for (Node<K,V>[] tab = table;;) {
/*
f -> 头结点
n -> 代表table的长度
i -> 索引
fh -> 头结点hash
*/
Node<K,V> f; int n, i, fh;
//1. 如果节点数组为null,或者长度为0,初始化节点数组
if (tab == null || (n = tab.length) == 0)
tab = initTable();

//2. 找该 hash 值对应的数组下标,得到第一个节点 f
// 如果数组该位置为空
else if ((f = tabAt(tab, i = (n - 1) & hash)) == null) {
// 以cas方式进行替换,替换成功就中断循环
// 如果CAS替换失败,那就是有并发操作,则进行下一次循环
if (casTabAt(tab, i, null,
new Node<K,V>(hash, key, value, null)))
break; // no lock when adding to empty bin
}

//3. 表已经创建 && 头结点不是空
// 判断头结点的hash 是不是等于-1 看是不是forwardingNode结点 如果是 说明哈希表正在处于扩容的情况
else if ((fh = f.hash) == MOVED)
// 帮助进行扩容
tab = helpTransfer(tab, f);

//4. 表已经创建 && 头结点不是空 && 不处于扩容中 && 头结点不是key相同的结点
else {
V oldVal = null;
// 锁定节点数组的头节点
synchronized (f) {
// 再次查询头结点是不是等于f
// 防止你加锁的过程中 别人已经修改了头结点的值,导致操作出现问题
if (tabAt(tab, i) == f) {
// 如果当前该节点为链表形态
// 头节点的 hash 值大于 0,说明是链表
if (fh >= 0) {
// 用于累加,记录链表的长度
binCount = 1;
// 遍历链表
for (Node<K,V> e = f;; ++binCount) {
K ek;
//1. 链表中存在相同的key,判断是否要进行值覆盖
if (e.hash == hash &&
((ek = e.key) == key ||
(ek != null && key.equals(ek)))) {
oldVal = e.val;
if (!onlyIfAbsent)
e.val = value;
break;
}
Node<K,V> pred = e;
//2. 如果不存在相同的key,就添加到链表到末尾
if ((e = e.next) == null) {
pred.next = new Node<K,V>(hash, key,
value, null);
break;
}
}
}
// 如果当前为红黑树形态,进行红黑树到查找和替代(存在相同的key),或者放入红黑树到新叶节点上(key不存在)
else if (f instanceof TreeBin) {
Node<K,V> p;
binCount = 2;
// 调用红黑树的插值方法插入新节点
if ((p = ((TreeBin<K,V>)f).putTreeVal(hash, key,
value)) != null) {
oldVal = p.val;
if (!onlyIfAbsent)
p.val = value;
}
}
}
}
//如果一个节点中的数量大于1(只有大于1的才会有binCount)
if (binCount != 0) {
//如果链表长度超过了8,链表转红黑树
if (binCount >= TREEIFY_THRESHOLD)
//进行转换成红黑树处理
treeifyBin(tab, i);
if (oldVal != null)
return oldVal;
break;
}
}
}
//统计节点个数,检查是否需要扩容
addCount(1L, binCount);
return null;
}

1.8 中 ConcurrentHashMap put操作为什么用synchronized?

有两个原因:

  1. 减少内存开销
    假设使用可重入锁来获得同步支持,那么每个节点都需要通过继承AQS来获得同步支持。但并不是每个节点都需要获得同步支持的,只有链表的头节点(红黑树的根节点)需要同步,这无疑带来了巨大内存浪费。
  2. 获得JVM的支持
    可重入锁毕竟是API这个级别的,后续的性能优化空间很小。
    synchronized则是JVM直接支持的,JVM能够在运行时作出相应的优化措施:锁粗化、锁消除、锁自旋等等。这就使得synchronized能够随着JDK版本的升级而不改动代码的前提下获得性能上的提升。

初始化数组 initTable

初始化一个合适大小的数组,然后会设置 sizeCtl。
初始化方法中的并发问题是通过对 sizeCtl 进行一个 CAS 操作来控制的。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
private final Node<K,V>[] initTable() {
Node<K,V>[] tab; int sc;
while ((tab = table) == null || tab.length == 0) {
// 初始化的"功劳"被其他线程"抢去"了
if ((sc = sizeCtl) < 0)
Thread.yield(); // lost initialization race; just spin
// CAS 一下,将 sizeCtl 设置为 -1,代表抢到了锁
else if (U.compareAndSwapInt(this, SIZECTL, sc, -1)) {
try {
if ((tab = table) == null || tab.length == 0) {
// DEFAULT_CAPACITY 默认初始容量是 16
int n = (sc > 0) ? sc : DEFAULT_CAPACITY;
// 初始化数组,长度为 16 或初始化时提供的长度
Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n];
// 将这个数组赋值给 table,table 是 volatile 的
table = tab = nt;
// 如果 n 为 16 的话,那么这里 sc = 12
// 其实就是 0.75 * n
sc = n - (n >>> 2);
}
} finally {
// 设置 sizeCtl 为 sc,我们就当是 12 吧
sizeCtl = sc;
}
break;
}
}
return tab;
}

链表转红黑树 treeifyBin

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
private final void treeifyBin(Node<K,V>[] tab, int index) {
Node<K,V> b; int n, sc;
if (tab != null) {
// MIN_TREEIFY_CAPACITY 为 64
// 所以,如果数组长度小于 64 的时候,其实也就是 32 或者 16 或者更小的时候,会进行数组扩容
if ((n = tab.length) < MIN_TREEIFY_CAPACITY)
// 后面我们再详细分析这个方法
tryPresize(n << 1);
// b 是头节点
else if ((b = tabAt(tab, index)) != null && b.hash >= 0) {
// 加锁
synchronized (b) {
if (tabAt(tab, index) == b) {
// 下面就是遍历链表,建立一颗红黑树
TreeNode<K,V> hd = null, tl = null;
for (Node<K,V> e = b; e != null; e = e.next) {
TreeNode<K,V> p =
new TreeNode<K,V>(e.hash, e.key, e.val,
null, null);
if ((p.prev = tl) == null)
hd = p;
else
tl.next = p;
tl = p;
}
// 将红黑树设置到数组相应位置中
setTabAt(tab, index, new TreeBin<K,V>(hd));
}
}
}
}
}

扩容 tryPresize

如果说 Java8 ConcurrentHashMap 的源码不简单,那么说的就是扩容操作迁移操作
这里的扩容也是做翻倍扩容的,扩容后数组容量为原来的 2 倍。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
// 首先要说明的是,方法参数 size 传进来的时候就已经翻了倍了
private final void tryPresize(int size) {
// c: size 的 1.5 倍,再加 1,再往上取最近的 2 的 n 次方。
int c = (size >= (MAXIMUM_CAPACITY >>> 1)) ? MAXIMUM_CAPACITY :
tableSizeFor(size + (size >>> 1) + 1);
int sc;
while ((sc = sizeCtl) >= 0) {
Node<K,V>[] tab = table; int n;

// 这个 if 分支和之前说的初始化数组的代码基本上是一样的,在这里,我们可以不用管这块代码
if (tab == null || (n = tab.length) == 0) {
n = (sc > c) ? sc : c;
if (U.compareAndSwapInt(this, SIZECTL, sc, -1)) {
try {
if (table == tab) {
@SuppressWarnings("unchecked")
Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n];
table = nt;
sc = n - (n >>> 2); // 0.75 * n
}
} finally {
sizeCtl = sc;
}
}
}
else if (c <= sc || n >= MAXIMUM_CAPACITY)
break;
else if (tab == table) {
// 我没看懂 rs 的真正含义是什么,不过也关系不大
int rs = resizeStamp(n);

if (sc < 0) {
Node<K,V>[] nt;
if ((sc >>> RESIZE_STAMP_SHIFT) != rs || sc == rs + 1 ||
sc == rs + MAX_RESIZERS || (nt = nextTable) == null ||
transferIndex <= 0)
break;
// 2. 用 CAS 将 sizeCtl 加 1,然后执行 transfer 方法
// 此时 nextTab 不为 null
if (U.compareAndSwapInt(this, SIZECTL, sc, sc + 1))
transfer(tab, nt);
}
// 1. 将 sizeCtl 设置为 (rs << RESIZE_STAMP_SHIFT) + 2)
// 我是没看懂这个值真正的意义是什么? 不过可以计算出来的是,结果是一个比较大的负数
// 调用 transfer 方法,此时 nextTab 参数为 null
else if (U.compareAndSwapInt(this, SIZECTL, sc,
(rs << RESIZE_STAMP_SHIFT) + 2))
transfer(tab, null);
}
}
}

数据迁移 transfer

get过程分析

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
// java.util.concurrent.ConcurrentHashMap#get
public V get(Object key) {
Node<K,V>[] tab; Node<K,V> e, p; int n, eh; K ek;
int h = spread(key.hashCode());
if ((tab = table) != null && (n = tab.length) > 0 &&
(e = tabAt(tab, (n - 1) & h)) != null) {
// 判断头节点是否就是我们需要的节点
if ((eh = e.hash) == h) {
if ((ek = e.key) == key || (ek != null && key.equals(ek)))
return e.val;
}
else if (eh < 0) // 如果头节点的 hash 小于 0,说明 正在扩容,或者该位置是红黑树
// 参考 ForwardingNode.find(int h, Object k) 和 TreeBin.find(int h, Object k)
return (p = e.find(h, key)) != null ? p.val : null;

// 遍历链表
while ((e = e.next) != null) {
if (e.hash == h &&
((ek = e.key) == key || (ek != null && key.equals(ek))))
return e.val;
}
}
return null;
}

// java.util.concurrent.ConcurrentHashMap#tabAt
static final <K,V> Node<K,V> tabAt(Node<K,V>[] tab, int i) {
return (Node<K,V>)U.getObjectVolatile(tab, ((long)i << ASHIFT) + ABASE);
}

get过程:

  1. 计算 hash 值
  2. 根据 hash 值找到数组对应位置: (n - 1) & h
  3. 根据该位置处结点性质进行相应查找
    1. 如果该位置为 null,那么直接返回 null 就可以了
    2. 如果该位置处的节点刚好就是我们需要的,返回该节点的值即可
    3. 如果该位置节点的 hash 值小于 0,说明正在扩容,或者是红黑树,后面我们再介绍 find 方法
    4. 如果以上 3 条都不满足,那就是链表,进行遍历比对即可

JDK 1.8 ConcurrentHashMap 读操作为什么不用加锁?

读的时候如果不是恰好读到写线程写入相同Hash值的位置(可以认为我们的操作一般是读多写少,这种几率也比较低)
ConcurrentHashMap 的 get 方法会调用 tabAt 方法,这是一个Unsafe类volatile的操作,保证每次获取到的值都是最新的。(强制将修改的值立即写入主存)

ConcurrentHashMap - JDK 1.7

在JDK1.5~1.7版本,Java使用了分段锁机制实现ConcurrentHashMap。
简而言之,ConcurrentHashMap在对象中保存了一个Segment数组,即将整个Hash表划分为多个分段;而每个Segment元素,即每个分段则类似于一个Hashtable;这样,在执行put操作时首先根据hash算法定位到元素属于哪个Segment,然后对该Segment加锁即可。因此,ConcurrentHashMap在多线程并发编程中可实现多线程put操作。

数据结构

整个 ConcurrentHashMap 由一个个 Segment 组成,Segment 代表”部分“或”一段“的意思,所以很多地方都会将其描述为分段锁。注意,行文中,我很多地方用了来代表一个 segment。

简单理解就是,ConcurrentHashMap 是一个 Segment 数组,Segment 通过继承 ReentrantLock 来进行加锁,所以每次需要加锁的操作锁住的是一个 segment,这样只要保证每个 Segment 是线程安全的,也就实现了全局的线程安全。

concurrencyLevel: 并行级别、并发数、Segment 数,怎么翻译不重要,理解它。默认是 16,也就是说 ConcurrentHashMap 有 16 个 Segments。所以理论上,这个时候,最多可以同时支持 16 个线程并发写,只要它们的操作分别分布在不同的 Segment 上。
这个值可以在初始化的时候设置为其他值,但是一旦初始化以后,它是不可以扩容的

再具体到每个 Segment 内部,其实每个 Segment 很像之前介绍的 HashMap,不过它要保证线程安全,所以处理起来要麻烦些。