yield-bytes

沉淀、分享与无限进步

Java高级主题:jdk1.8的ConcurrentHashMap源码深入分析(一)

前言

虽然jdk1.7的CHM能够解决非阻塞高并发的读写情况,但它仍然不够理想,例如写并发度受限于Segment数组初始大小,一旦创建就无法再扩容并发度;如果key非常多且分布不均匀,例如都落在同一个Segment位置上,相当于多个线程竞争同一把“全局锁”,CHM“仿佛”退化为HashTable。此外,因为CHM的Segment上使用jdk1.7的HashMap,它的性能显然没有jdk1.8的HashMap强,jdk1.7resize扩容处理时只能单线程完成扩容操作,jdk1.7计算size方法可能要对每个Segment加阻塞锁,基于以上jdk1.7的CHM缺点,Doug Lea重新设计新版本的CHM,其源代码行数总共6312行,性能确实高了,但代价是代码逻辑的复杂度要高出很多。

理解本文所有源码分析以及相关技术都需要读者已经掌握1.8/1.7HashMap、1.7 CHM、Unsafe、CAS这些进阶知识,否则难以消化相关知识,可以提前阅读本博客相关文章:

(以下“CHM”表示ConcurrentHashMap简写)

数据结构图示

分为普通状态下(非扩容期间)和扩容期间

(1)CHM非扩容时,也即普通正常状态下的内部数据结构图:

可以看到跟jdk1.8的HashMap数据结构不同的地方:CHM用了一个称为TreeBin的节点作为桶位头节点,不是HashMap的TreeNode:
在这里插入图片描述
(2)CHM正在扩容时的内部数据结构图:

可以看到正在扩容时的内部节点结构,数组尾部的一些桶位头节点放入了ForwardingNode节点。

在这里插入图片描述

其实还有一个情况,以上两张图也没有展示出来:当CHM调用computeIfAbsent或者compute方法来插入系节点时,会在桶位上放置一个ReservationNode用于占位操作,本博客也有关于相关深度分析的文章

重要成员变量说明

CHM重要常量
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
  /* ---------------- Constants -------------- */
// CHM数组长度最大值
private static final int MAXIMUM_CAPACITY = 1 << 30;

// CHM数组初始化默认长度
private static final int DEFAULT_CAPACITY = 16;

/**
* 非最大2的整次幂数组长度:最大值-8。适用于toArray及其相关方法的场合
* The largest possible (non-power of two) array size.
* Needed by toArray and related methods.
*/
static final int MAX_ARRAY_SIZE = Integer.MAX_VALUE - 8;

/**
* 为了兼容jdk1.7版本的CHM,在这里也给出相关成员变量的
* The default concurrency level for this table. Unused but
* defined for compatibility with previous versions of this class.
*/
private static final int DEFAULT_CONCURRENCY_LEVEL = 16;

// 与1.8 HashMap一致,此处不再说明
private static final float LOAD_FACTOR = 0.75f;

// 与1.8 HashMap一致,此处不再说明
static final int TREEIFY_THRESHOLD = 8;

// 与1.8 HashMap一致,此处不再说明
static final int UNTREEIFY_THRESHOLD = 6;

// 与1.8 HashMap一致,此处不再说明
static final int MIN_TREEIFY_CAPACITY = 64;

// 由于本文重点讨论fullAddCount,与扩容相关的常量在扩容文章给出。
重要成员变量
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
  /* ---------------- Fields -------------- */

/**
HashMap的底层数组,注意它被volatile语义修饰:让table数组符合happen-before的规则,第一次put才会被初始化,长度为2的整数幂。
* The array of bins. Lazily initialized upon first insertion.
* Size is always a power of two. Accessed directly by iterators.
*/
transient volatile Node<K,V>[] table;


// 以下4个字段需要结合每小节内容去理解才能掌握其设计含义

/**
* Base counter value, used mainly when there is no contention,
* but also as a fallback during table initialization
* races. Updated via CAS.
*/
private transient volatile long baseCount;

/**
设计最精妙的变量之一,对它实施位运算可以实现对CHM多种操作的控制
*/
private transient volatile int sizeCtl;

/**
* 理解fullAddCount方法实现后才能明白此变量的意义。Spinlock (locked via CAS) used when resizing and/or creating CounterCells.
*/
private transient volatile int cellsBusy;

/**
* 理解fullAddCount方法实现后才能明白此变量的意义。Table of counter cells. When non-null, size is a power of 2.
*/
private transient volatile CounterCell[] counterCells;

put方法

关于spread方法、tableSizeFor方法、comparableClassFor方法、compareComparables方法可以参考jdk1.8的HashMap,本文不再赘述。

put方法最适合作为源码分析入口,如下

1
2
ConcurrentHashMap<Integer, String> map = new ConcurrentHashMap<>();
map.put(3, "foo3");

put方法内部调用的是putVal方法

1
2
3
public V put(K key, V value) {
return putVal(key, value, false);
}

对于CHM来说,put一个key到桶桶上会有以下几种情况:
1、table为空 2、桶位为空 3、桶位是一条冲突链 4、桶位是一个treeBin节点 5、桶位是一个ForwardingNode节点(表示有其他线程正在扩容) 6、桶位是一个ReservationNode节点:表示有其他线程正在调用computeIfAbsent或者compute方法来插入在该桶位插入节点,注意同一桶位的同一时刻,5和6是不会同时发生在,两个情况都是表达插入节点的逻辑。

因此put主体逻辑设计主要分为以下7个步骤:

1、如果tab为空,那么进行table初始化,该方法里面使用自旋+CAS让线程自己竞争初始化权

2、如果key所定位的桶位头节点f为空节点,线程使用CAS竞争将key节点插入到该桶位中

3、如果key所定位的桶位头节点hash为-1,也即表示该桶位的节点是个fwd节点,说明当前CHM正在扩容,那么当前put线程遇到这个节点会让去帮助扩容线程。

4、若不是以上1~3三种情况,那么先使用synchronized锁在该桶位,如果该桶位上一条冲突链,遍历该链插入key,如果该桶位上是一个treeBin,说明桶位已经是一棵红黑树,则按红黑树插入节点方法来插入key

5、判断key插入的链表长度是否大于等于8,来决定是否需要树化

6、将CHM的节点总数size加1(采用并发计数器累加)

7、判断节点总数是否达到扩容阈值而进行扩容

本文重点分析1-6的设计,第7点是扩容逻辑,由于扩容逻辑相对复杂、内容丰富,需要单独起一篇文章研究。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77

/** Implementation for put and putIfAbsent */
final V putVal(K key, V value, boolean onlyIfAbsent) {
if (key == null || value == null) throw new NullPointerException();
int hash = spread(key.hashCode());
int binCount = 0;
for (Node<K,V>[] tab = table;;) {
// f是key所定位的first头节点,fh:first头节点的Hash值,n:当前table数组长度,i:key定位到的桶位下标
Node<K,V> f; int n, i, fh;
// 1、如果tab为空,那么进行table初始化,initTable方法里面使用自旋+CAS让线程去竞争初始化权
if (tab == null || (n = tab.length) == 0)
tab = initTable();
// 2、如果key所定位的桶位头节点f为空节点,线程使用CAS竞争将key节点插入到该桶位中,如果成功插入,则退出循环
else if ((f = tabAt(tab, i = (n - 1) & hash)) == null) {
if (casTabAt(tab, i, null,new Node<K,V>(hash, key, value, null)))
break; // no lock when adding to empty bin
}
// 3、如果key所定位的桶位头节点hash为-1,说明当前CHM正在扩容,也即表示该桶位已被其他扩容线程迁移完数据,那么当前线程要进入扩容去帮助CHM扩容,注意不是帮助此桶位扩容!
else if ((fh = f.hash) == MOVED)
tab = helpTransfer(tab, f);
else {
V oldVal = null;
// 4、若不是以上1~3三种情况,那么先使用synchronized锁在该桶位first头节点,从这里开始看出jdk1.8的CHM锁粒度比1.7要小,锁对象就是桶位本身
synchronized (f) {
// 再次确认当前first头节点有没变更过,如果f已被改变,则回到for循环,重新按1-4流程再来一次
if (tabAt(tab, i) == f) {
// 4.1 如果当前桶位是一条冲突链表,遍历冲突链,并对binCount进行计数,如果存在key则覆盖,否则使用尾插法将key节点插入到链表尾部,此逻辑跟jdk1.8的类似
if (fh >= 0) {
// 对冲突链表长度进行计数,用于树化判断
binCount = 1;
for (Node<K,V> e = f;; ++binCount) {
K ek;
if (e.hash == hash &&
((ek = e.key) == key ||
(ek != null && key.equals(ek)))) {
oldVal = e.val;
if (!onlyIfAbsent)
e.val = value;
break;
}
Node<K,V> pred = e;
//来到链表尾部,尾插法
if ((e = e.next) == null) {
pred.next = new Node<K,V>(hash, key,
value, null);
break;
}
}
}
// 4.2 如果当前桶位头节点是一个TreeBin节点
else if (f instanceof TreeBin) {
Node<K,V> p;
// 这里binCount是从2开始,解释参考本博客的ThreeBin读写锁机制分析的文章
binCount = 2;
if ((p = ((TreeBin<K,V>)f).putTreeVal(hash, key,
value)) != null) {
oldVal = p.val;
if (!onlyIfAbsent)
p.val = value;
}
}
}
}
if (binCount != 0) {
// 判断冲突链表长度是否需要树化
if (binCount >= TREEIFY_THRESHOLD)
treeifyBin(tab, i);
if (oldVal != null)
return oldVal;
break;
}
}
}
// 5、将CHM的节数总数size加1(采用并发计数器累加)
addCount(1L, binCount);
return null;
}

如果对jdk1.8 HashMap有深入研究的话,其实CHM的put本身设计逻辑并不难理解,而最复杂且精妙的设计反而是put方面里面的辅助方法:initTable方法、helpTransfer方法、transfer方法、addCount方法、fullAddCount方法,每一个方法无不体现JUC并发代码的精巧设计。

在前面的“CHM重要常量”,特意将以下三个变量放在此来说明它们的作用,分别代表三种节点的哈希值:

1
2
3
4
5
static final int MOVED = -1;// ForwardingNode(扩容期间需要用到)的哈希值

static final int TREEBIN = -2;// TreeBin节点(树化需要用到)的哈希值

static final int RESERVED = -3;// ReservationNode节点(在computeIfAbsent、compute方法需要到)

在put一个节点需要判断当前桶位节点类型,其中用到以下逻辑,其中只要头节点的哈希值fh>=0,我们就可以认为它是一条链表(或者单节点),所以可以使用遍历方式去put入key

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
synchronized (f) {
if (tabAt(tab, i) == f) {
// 4.1 如果当前桶位是一条冲突链表,遍历冲突链,并对binCount进行计数,如果存在key则覆盖,否则使用尾插法将key节点插入到链表尾部,此逻辑跟jdk1.8的类似
if (fh >= 0) {
// 对冲突链表长度进行计数,用于树化判断
binCount = 1;
// 4.2 若头节点哈希值小于0,要么是fwd节点要么是ThreeBin节点
else if (f instanceof TreeBin) {
Node<K,V> p;
binCount = 2;
if ((p = ((TreeBin<K,V>)f).putTreeVal(hash, key,
value)) != null) {
oldVal = p.val;
if (!onlyIfAbsent)
p.val = value;
}
}
}
}

如果头节点哈希值小于0也即:-1,-2,-3,它们代表第三种类型节点,因此还需要进一步判断f节点是否为TreeBin节点,因此有

if (f instanceof TreeBin), 为何不设计为: if(fh==TREEBIN),个人认为:

纯粹是为了源代码的可读性,因为我们put一个key到桶位前,更关心关心的是key所在桶位的“节点”是什么类型,当开发者看到if (f instanceof TreeBin),自然就会理解为:如果当前桶位头节点是一个TreeBin节点。

而如果写成if(fh==TREEBIN), 显然就没上面的理解来得更加“人性化、可读性高”

initTable方法

在重要成员变量里面,有一个叫sizeCtl的东西,在这里才能很好理解的它的作用:

initTable总体设计逻辑:例如线程A使用自旋+CAS去竞争table初始化权,CAS成功,则sizeCtl值会被改为-1表示“已成功抢到锁”,其他线程看到sizeCtl=-1就不再争抢初始化权而是让出cpu时间片。线程A接着完成new table初始化操作,并将sizeCtl设为当前table的扩容阈值:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
private final Node<K,V>[] initTable() {
Node<K,V>[] tab; int sc;
// 1、如果当前线程遇到table不为空,说明已经被其他线程完成了table初始化,直接返回
while ((tab = table) == null || tab.length == 0) {
// 2、如果当前线程遇到sizeCtl=-1,说明其他线程早于当前线程将sc设为-1并正在table初始化,那么当前线程只能放弃初始化权
if ((sc = sizeCtl) < 0)
Thread.yield(); // lost initialization race; just spin
// 3、当前线程成功将sizeCtl用cas改为-1,表是:“我在对table正在初始化中”,注意SIZECTL是offset内存偏移地址,区别于sizeCtl
else if (U.compareAndSwapInt(this, SIZECTL, sc, -1)) {
try {
// 当前线程CAS成功后,再次判断tab是否为空,保证“以下操作仅由当前线程来初始化”的语义
if ((tab = table) == null || tab.length == 0) {
// 4、创建新table
// sc已经是-1,因此这里的n将采用DEFAULT_CAPACITY,值为16
int n = (sc > 0) ? sc : DEFAULT_CAPACITY;
@SuppressWarnings("unchecked")
// 创建新的节点数组
Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n];
table = tab = nt;
// 计算新表扩容阈值,这里使用位计算,原理不变,还是newTable.length*0.75,
sc = n - (n >>> 2);//=> n-n/4=>n(1-0.25)=>n*0.75
}
} finally {
// 保证初始化后,sizeCtl一定是新数组的扩容阈值
sizeCtl = sc;
}
break;
}
}
return tab;
}

addCount方法

首先回想jdk1.8 HashMap的put方法:插入key后,对size自增1,如果size超过阈值则对HashMap底层table进行2倍扩容:

1
2
3
4
5
6
7
8
final V putVal(int hash, K key, V value, boolean onlyIfAbsent,boolean evict) {
// .......
++modCount;
if (++size > threshold)
resize();
afterNodeInsertion(evict);
return null;
}

那么addCount方法也是类似的设计思想:

先对size加1,然后根据size节点总数判断是否需要扩容。

但这里最复杂的地方在于:多线程并发情况下,如何正确实现对size原子加1?addCount及其内部的fullAddCount方法可以完成此任务,代价是:程序设计相对复杂(设计思路十分精巧,值得在日常项目引入),设计原理如下图所示:
在这里插入图片描述

当线程竞争不激烈情况下,通过自旋+cas对baseCount进行加1计数

当线程竞争十分激烈的情况下,有一部分线程很幸运能够抢到cas权力成功对baseCount加1,而剩下对baseCount加1cas失败的线程,它们就会创建一个CounterCells计数的数组,然后线程给对应自己的桶位Cell对象进行cas加1操作,这样一来就实现了“线程分流”,减少竞争。

最后统计总数时,将baseCount和CounterCells数组每个桶位的计数值累加起来,就是size的大小。该原理对应的源码如下

1
2
3
4
5
6
7
8
9
10
11
12
final long sumCount() {
CounterCell[] as = counterCells; CounterCell a;
long sum = baseCount;
if (as != null) {
for (int i = 0; i < as.length; ++i) {
if ((a = as[i]) != null)
// baseCount加上每一个桶位的value,最后就是总数sum
sum += a.value;
}
}
return sum;
}

另外一个有类似功能的类,专门设计给高并发计数的场景:LongAdder类(java.util.concurrent.atomic.LongAdder),它比AtomicLong原子计数器性能更强,本博客也给出相关性能测试。

addCount骨架代码:

1
2
3
4
5
6
7
private final void addCount(long x, int check) {
CounterCell[] as; long b, s;
// 1、使用baseCount以及 CounterCell数组完成并发环境下的加1计数
if ((as = counterCells) != null ||
!U.compareAndSwapLong(this, BASECOUNT, b = baseCount, s = b + x)){...}
// 2、判断CHM是否需要扩容
if (check >= 0){...}

完整源码分析:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
		// 调用addCount(1L, binCount),因此以下的入参x是1,check是binCount,
private final void addCount(long x, int check) {
/*
CounterCell是一个简单的内部类,它有一个value属性,用于被线程CAS加值操作
static final class CounterCell {
volatile long value;
CounterCell(long x) { value = x; }
}
*/
// as:存放如上图所示的CounterCell数组的临时变量,不同线程会对其不同段进行cas加1操作
// b:baseCount变量
CounterCell[] as; long b, s;
// 判断条件1:若counterCells数组不是空,说明已经有其他线程创建了它,也说明此刻出现线程竞争
// 判断条件2: 当前线程对baseCount进行CAS加1操作失败,说明有其他线程竞争着baseCount,也说明此刻出现线程竞争
if ((as = counterCells) != null ||
!U.compareAndSwapLong(this, BASECOUNT, b = baseCount, s = b + x)) {
// a变量:CounterCell数组某个桶位(也称某个段),v变量:CounterCell计数器里面value变量,m变量:CounterCell数组长度
CounterCell a; long v; int m;
// uncontended线程之间有无激烈竞争标志,true:表示线程之间竞争还不激烈,false:表示线程之间竞争很激烈
boolean uncontended = true;
// 注意这里4个判断条件
// 判断条件1:该条件执行是因为第1个if走的是判断条件2,说明多个线程在竞争cas操作baseCount,但是还没线程去创建CounterCell数组,那么当前线程可用fullAddCount完成加1操作。
// 判断条件2:该条件执行是因为第1个if走的是判断条件1,你想想,能调用length属性说明as是非空对象,也即as = counterCells) != null,但还未放入cell对象,此时as.length=0或者as.length-1<0
// 判断条件3:该条件执行是因为第1个if走的是判断条件1,当前线程通过hash定位到counterCells数组对应桶位为空,说明当前线程接下来将可以在桶位上对CounterCell的value进行加1操作,将交由fullAddCount完成。
// 判断条件4: 当线程对它所在的桶位cas加1操作失败,说明已经有多个线程正在激烈竞争counterCells数组,接下来需要对counterCells数组扩容来分散(降低)线程竞争,将交由fullAddCount完成
if (as == null || (m = as.length - 1) < 0 ||
// 线程定位到CounterCell数组的hash方法:ThreadLocalRandom.getProbe() & m
(a = as[ThreadLocalRandom.getProbe() & m]) == null ||
!(uncontended =
U.compareAndSwapLong(a, CELLVALUE, v = a.value, v + x))) {
// 多个线程同时竞争修改baseCount时,竞争失败的线程会执行fullAddCount,把x的值插入到counterCell数组对应的单元value里面。从这里也可以立即看出uncontended=false真实含义:表示当前多线程并发计数竞争激烈
// fullAddCount(1,uncontended=false),该方法能保证完成加1操作
fullAddCount(x, uncontended);
// 如果执行流执行到这里,说明fullAddCount已经完成加1操作,可以返回。
return;
}
// 如果check(binCount)<=1,显然不需要扩容,可直接return结束
if (check <= 1)
return;
s = sumCount(); // 计算CHM的总节点数量,以便之后用s >= (long)(sc = sizeCtl) 判断是否需要扩容
}

fullAddCount方法

在源码中,第2500行,有一片代码段称为“Counter support”,作者指出它是用于分布式计数,fullAddCount方法改编自LongAdder and Striped64

1
2
3
4
5
/* ---------------- Counter support -------------- */
/**
* A padded cell for distributing counts. Adapted from LongAdder
* and Striped64. See their internal docs for explanation.
*/

fullAddCount方法的设计和实现是复杂且有精妙的,主要分为三个逻辑:

1、如果CounterCell数组不为空且数组里面已经有Cell对象,说明CounterCell数组已经被其他线程完成了创建,那么当前线程自然无需再创建它,而是尝试在当前下线程对应的空桶位放入Cell(1),相当于对value加1操作(如果桶位已有Cell对象,就使用cas对其加1计数)

2、在条件1不满足情况下,此时CounterCell数组为空表示还未被创建,那么当前线程把cellsBusy状态改为1(相当于加锁),并把CounterCell数组创建好,默认长度2,创建好后顺便在对应的桶位放入Cell(1)

3、桶位Cell加1操作失败、又没抢到CounterCell数组的创建权,总不能白跑一趟,因此到了这一步当前线程顺便再尝试对baseCount加1,若cas成功就可以直接break。

以上三个分支逻辑对应以下骨架代码:主分支1、主分支2、主分支3,其中主分支1的逻辑设计是最复杂的,它包含6个次分支:1.1到1.6。

这里一定要有一个基本常识:首先,线程的每次循环,只能执行3个主分支中的其中1个。另外一个基本常识:如果线程进入到主分支1,那么在1.1到1.6总共6个次分支中,线程也只能进入其中一个分支

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
for(;;)
{ // 主分支1,对应以上第1点
if ((as = counterCells) != null && (n = as.length) > 0) {
// 1.1
if ((a = as[(n - 1) & h]) == null) {...}
// 1.2
else if (!wasUncontended)
wasUncontended = true;
// 1.3
else if (U.compareAndSwapLong(a, CELLVALUE, v = a.value, v + x))
break;
// 1.4
else if (counterCells != as || n >= NCPU)
collide = false;
// 1.5
else if (!collide)
collide = true;
// 1.6
else if (cellsBusy == 0 &&
U.compareAndSwapInt(this, CELLSBUSY, 0, 1)) {
// 完成CounterCell数组容量2倍扩容
collide = false;
continue; }
// 为当前线程生成新的hash值,用于下一次遍历,(希望它可以定位到另外一个桶位)
h = ThreadLocalRandom.advanceProbe(h);
}

// 主分支2,对应以上第2点
else if(cellsBusy==0 && counterCells==as && U.compareAndSwapInt(this, CELLSBUSY, 0, 1)){}
// 主分支3,对应以上第3点
else if(U.compareAndSwapLong(this, BASECOUNT, v = baseCount, v + x))
}

讲解完整源码之前,先了解cellsBusy“锁”的设计理论,cellsBusy本身只是一个volatile变量,而cellsBusy+CAS就可以用无锁方式实现“加锁功能”,线程就是用这种“轻锁机制”去完成CounterCell数组创建、将Cell对象放入桶位、CounterCell数组扩容,用法如下:

1
2
3
4
5
6
7
8
9
// ① 尝试“加锁”:若为true就相当于加锁成功,但该锁机制很轻量!
if (cellsBusy == 0 && U.compareAndSwapInt(this, CELLSBUSY, 0, 1)){
try{
// ② 临界区代码:完成CounterCell数组创建或者将Cell对象放入桶位或者CounterCell数组扩容
}finally{
// ③ 释放“锁”
cellsBusy == 0
}
}

完整源码解析,以下for(;;)循环内部逻辑按前面提到的三个主分支进行分段解析,这是因为太多if和else if条件,需要分段解析才比较清晰其内部设计逻辑

fullAddCount主分支1

线程从addCount首次进入到fullAddCount时,假设先进入到分支1

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
 // fullAddCount(x=1, uncontended=false)
private final void fullAddCount(long x, boolean wasUncontended) {
int h;
// 如果当前线程hash值为0,则强制进行线程hash初始化处理,这里为何直接用Random方法呢?因为从ThreadLocalRandom字面也可以看出,这里是每个线程生成自己用的随机数,可以理解高并发下每个线程都有自己的随机数。
if ((h = ThreadLocalRandom.getProbe()) == 0) {
ThreadLocalRandom.localInit(); // force initialization
h = ThreadLocalRandom.getProbe();
wasUncontended = true;
}

// collide:表示线程定位到的Cell桶位是否有冲突,显然当CounterCell数组最后一个桶位都不空,说明已经出现线程操作数组冲突
boolean collide = false; // True if last slot nonempty
for (;;) {
// as:CounterCell数组的临时变量,a:线程定位到桶位上的Cell对象,n: CounterCell数组的长度,v:桶位上的Cell对象的value属性
CounterCell[] as; CounterCell a; int n; long v;

// 分支1、如果CounterCell数组已经被其他线程创建,接下来当前线程尝试占据桶位来完成加1的任务
if ((as = counterCells) != null && (n = as.length) > 0) {
// 1.1 线程所在的CounterCell桶位为空,有机会将Cell对象放入到桶位上
if ((a = as[(n - 1) & h]) == null) {
// 1.1.2 若cellsBusy未被加锁,则当前线程先创建一个Cell对象,准备放入桶位
if (cellsBusy == 0) { // Try to attach new Cell
CounterCell r = new CounterCell(x); // 这里的x也就入参1,表示加1
// 1.1.3 线程再次检查cellsBusy未被其他线程加锁且当前线程对cellsBusy加锁成功,那么接下来当前线程就可以尝试将Cell放入桶位
if (cellsBusy == 0 &&
U.compareAndSwapInt(this, CELLSBUSY, 0, 1)) {
// 1.1.4 Cell是否创建的标识,显然程序执行到这,Cell对象还未放入桶位
boolean created = false;
try { // Recheck under lock
// cellsBusy标识锁并不是用于锁CounterCell数组本身,而是数组的某个桶位,CounterCell数组此时可能被线程扩容更改过,因此需要重新检查
CounterCell[] rs; int m, j;
// 如果counterCells数组还是一开始的那个数组且已被创建,这时当前线程可以放心将Cell对象放入线程hash定位到对应桶位,并将创建标识created置为true
if ((rs = counterCells) != null &&
(m = rs.length) > 0 &&
rs[j = (m - 1) & h] == null) {
rs[j] = r;
created = true;
}
} finally {
// 当前线程操作完桶位放入cell对象后,释放cellsBusy锁
cellsBusy = 0;
}
// 上面的CounterCell(1)能放入桶位,当然线程完成加1计数任务,可直接返回
if (created)
break;
// 源代码注释:Slot is now non-empty,说明线程未能成功将Cell(1)对象放在桶位,则再次回到for循环
continue;
}
}
// 代码执行到这里时,说明分支1.1.2 的cellsBusy被其他线程加锁,自己没抢到更改权,此时先不认为是冲突,使用新hash值 h = ThreadLocalRandom.advanceProbe(h) 回到for循环重试
collide = false;
// 执行流下一条语句是执行h = ThreadLocalRandom.advanceProbe(h) 然后回到for循环,而不继续执行1.2次分支,这里很容易误解。
}

次分支1.2的设计目的:
1
2
3
        // 1.2  // fullAddCount(long x=1, boolean wasUncontended=false)  
else if (!wasUncontended) // CAS already known to fail
wasUncontended = true; // Continue after rehash

当前线程第1次遇到CounterCell数组不为空的情况下,且线程定位到的桶位不是空,并不急着安排线程对桶位上Cell对象进行cas加1计数。由于addCount方法调用fullAddCount入参wasUncontended时被设为false,因此线程会进入分支1.2,此时wasUncontended设为true表示当前线程暂时还没有遇到竞争,更新hash值(h = ThreadLocalRandom.advanceProbe(h)),再直接回到for进行第2次遍历重试。

如果第2次遍历时,线程所定位的桶位又不为空,由于wasUncontended是true,因此会跳过次分支1.2,来到分支1.3:既然第2次定位到桶位又是不空,那么可以尝试去对桶位上Cell对象直接cas加1操作

这里为何设计2次遍历? 因为CounterCell首次被创建时容量为2,也即有rs[0],rs[1]两个空桶位,假设线程第1次定位到rs[0]不为空(此时不能认为存在线程竞争导致的,只是rs[0]里面已经有Cell对象,这也是wasUncontende设为true的原因)接下来当然是再换线程的hash看看能不能定位到rs[1]桶位,如果恰好定位到rsp[1]是空桶位,那么线程就可以放入Cell对象。如果2次重试线程定位到的桶位都不为空,线程第3次遍历重试就会进入下文的次分支1.3(当然换完hash值也有可能又定位到rs[0]桶位)

次分支1.3的设计目的:

如果第2次遍历时,线程所定位的桶位又不为空,由于wasUncontended是true,因此会跳过次分支1.2,尝试去对桶位上Cell对象直接cas加1操作,如果加1成功,线程就可以退出for循环了,如果失败就来到次分支1.4

1
2
3
		// 1.3
else if (U.compareAndSwapLong(a, CELLVALUE, v = a.value, v + x))
break;
次分支1.5以及1.6的设计目的:

不妨先考察次分支1.4两个条件都不成立时的执行流:

这时线程就会跳过分支1.4去执行次分支1.5(因为collide默认为false,此时线程会进入分支1.5),将collide设为true,更新hash值,再直接回到for进行第3次遍历:

**实际含义为:我(当前线程)第1次循环定位桶位不为空,第2次循环定位桶位也不为空,且对桶位上Cell对象cas加1失败,说明出现了明显线程竞争,那么第3次循环重试我就选择去扩容CounterCell数组** 此时线程进行第3次循环时,因为collide已经在上一轮设为true,所以线程在第3次重试时会跳过次分支1.5,来到次分支1.6代码区,此时线程会将CounterCell数组做2倍扩容,然后将collide设为false,表示:既然都扩容了,说明有新空桶位可供操作,线程间的冲突得以解决,当前线程只需直接continue回到for重试,也即源码注释提到的: Retry with expanded table
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
        // 1.4
else if (counterCells != as || n >= NCPU)
collide = false; // At max size or stale
// 1.5
else if (!collide)
collide = true;
// 1.6 占有cellsBusy锁
else if (cellsBusy == 0 &&
U.compareAndSwapInt(this, CELLSBUSY, 0, 1)) {
try {
if (counterCells == as) {// Expand table unless stale
// 两倍扩容
CounterCell[] rs = new CounterCell[n << 1];
for (int i = 0; i < n; ++i)
rs[i] = as[i];
counterCells = rs;
}
} finally {
cellsBusy = 0;
}
// 冲突得以解决
collide = false;
// 扩容后肯定有空桶位,当然可直接回到for循环重试
continue; // Retry with expanded table
}
次分支1.4的设计目的(最难理解的次分支)

该分支必须结合分支1.5和分支1.6的设计意义来理解:

分支1.6说明:线程一旦遇到明显的冲突,也即多次for循环定位到桶位不为空且对非空桶位的Cell对象cas加1失败(collide=true),那它会执行分支1.6逻辑:对CounterCell数组进行2倍扩容。那么会不会出现这样的情况:如果不断有后续更多线程也遭遇“冲突”,那么就有线程会无休止对CounterCell数组进行2倍扩容,有无停止扩容的条件

当然是有的,这就是次分支1.4的设计目的:防止CounterCell数组无限被扩容,如何实现?

次分支1.4有2个判断条件:

先看条件2:n >= NCPU,也即CounterCell数组长度达到cpu个数时,就将collide 改为false,更新hash值,再直接回到for重试

而分支1.5、1.6逻辑都被会跳过不执行,下一次循环如果又来到分支1.4,因为n >= NCPU成立,所以分支1.5、1.6逻辑再次被跳过不执行,之后的循环也类似处理。

所以:CounterCell数组停止的条件是,只要它的长度达到cpu个数,当前线程就不会再执行次分支1.6扩容的代码,而是从分支1.4将collide改为false,然后直接回到for循环、

当CounterCell数组不再扩容时,这些线程该如何执行多个分支代码呢

显然,当CounterCell数组不再扩容时,那么空桶位很快会被线程们放满Cell对象,也即CounterCell数组数组没有空桶位,之后线程是这么执行的:

次分支1.1 if ((a = as[(n - 1) & h]) == null)不再成立,所以会跳到次分支1.2,由于wasUncontended早已被设为true(请回去看次分支1.2的设计目的),次分支1.2也会被跳过,线程继续来到次分支1.3,此时线程会使用cas对Cell对象加1操作,若加成功就可以break,否则就来到次分支1.4,因为n=>NCPU一直为true,将collide改为false,更新hash值,再直接回到for重试。

简单总结:当CounterCell数组不再扩容时,且没有空桶位时,这些更新线程其实就会简化成以下的执行逻辑

1
2
3
4
5
6
7
8
9
for(;;)
{
// 当前线程使用cas对Cell对象加1操作,加成功就结束循环!,否则更新hash值重试
if (U.compareAndSwapLong(a, CELLVALUE, v = a.value, v + x)){
break;
}
// 更新当前线程hash值,以便下一次重试能够定位不同于本次的桶位
h = ThreadLocalRandom.advanceProbe(h);
}

从以上简化代码可以看出,线程一定能在某个时刻在对应桶位的Cell对象上加1成功,然后退出循环,这也是fullAddCOunt设计真实目的:保证每个线程进入到fullAddCount到离开fullAddCount之前一定能够cas加1成功

此时,我们再来考察次分支1.4的条件1:

有了条件2的铺垫,其他条件1很好理解:counterCells != as,表明恰好有其他线程正在扩容CounterCell数组(导致as指向新的CounterCells数组对象),接下来可能在扩容后的CounterCell数组有更多的空桶位,那么当前线程就没必要去竞争扩容了,只需将collide 改为false,更新自己hash值,然后直接回到for循环重试看看能否运气好拿到新的空桶位。

fullAddCount主分支2:

若CounterCell数组为空时(还未被创建),也即fullAddCount的主分支1会被跳过,当前线程会进入主分支2代码区,那么当前线程把cellsBusy状态改为1(相当于加锁),创建好一个默认长度为2的CounterCell数组,并顺便在对应的桶位放入Cell(1)完成加1计数

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22

else if (cellsBusy == 0 && counterCells == as &&
U.compareAndSwapInt(this, CELLSBUSY, 0, 1)) {
boolean init = false;
try { // Initialize table
// 再次确认counterCells数组没有被其他线程改动过,如果被改动过,那么就跳过初始化并释放cellsBusy“锁”
if (counterCells == as) {
// CounterCell数组就是在这里首次被创建
CounterCell[] rs = new CounterCell[2];
// h&1结果:要么0要么1,也即将Cell(1)对象放在两个空桶位rs[0]、rs[1]中的任一个
rs[h & 1] = new CounterCell(x);
counterCells = rs;
init = true;
}
} finally {
// 既然当前线程已经完成创建CounterCell数组,当然可以释放cellsBusy锁
cellsBusy = 0;
}
//当前线程完成CounterCell数组初始化后,因为在创建期间就完成Cell(1)加1计数,故可直接break退出。
if (init)
break;
}
fullAddCount主分支3:

主分支3:线程执行fullAddCount的主分支1桶位Cell加1操作失败,接着执行fullAddCount主分支2时又没抢到CounterCell数组的创建权,

那么当前线程总不能白跑一趟,因此到了这一步当前线程顺便尝试对baseCount做加加1计数(或者加x计数,x代表正负数都可以),若成功就退出,失败就回到for循环重试

1
2
3
4
5
   
else if (U.compareAndSwapLong(this, BASECOUNT, v = baseCount, v + x))
break; // Fall back on using base
}
}

小结:从三个主分支的设计来看,主分支1无疑是最复杂的,因为判断多,条件多,理解起来象比较绕,需要用全局、逆向思维去分析,最好是先找到一个容易理解的分支,然后根据这个分支的实际含义来顺藤摸瓜再找到其他分支的实际含义

从for循环退出的条件也可理解fullAddCount的内在设计逻辑

有以上详细的源码解释后,考察线程for自旋退出的条件(成功计数加1的条件),其实就是对应前面提到的3个主分支。

1、第2552行,主分支1-次分支1.1,当前线程对应的桶位为null然后成功放入Cell对象时可退出for自旋

1
2
if (created)
break;

2、第2561行,主分支1-次分支1.3,进入CounterCell数组的操作逻辑,当前线程对非空桶位的Cell对象,使用CAS加1成功时可退出for自旋

1
2
else if (U.compareAndSwapLong(a, CELLVALUE, v = a.value, v + x))
break;

3、第2597行,主分支2:当前线程创建默认长度2的CounterCell数组并选其中一个桶位放入Cell对象后可退出for自旋

1
2
if (init)
break;

4、第2600行,主分支3:当前线程以上尝试三种逻辑都失败时,总不能白跑一趟,再尝试去CAS对baseCount加1,成功时可退出for循环

1
2
else if (U.compareAndSwapLong(this, BASECOUNT, v = baseCount, v + x))
break; // Fall back on using base

关于addcount完成计数加1后的CHM扩容逻辑分析,则单独在另外一篇博客给出,因为它的设计相对复杂,需要分析的内容较多。