yield-bytes

沉淀、分享与无限进步

Java高级主题:jdk1.8的ConcurrentHashMap源码深入分析(二)

在前面文章中,addCount方法分为两部分:

第一部分是put一个节点后,需要对size加1计数,这部分交由fullAddCount完成,它的设计和逻辑可谓精妙,非常值得在实际项目参考其代码实现。

第二部分是加1计数后,需要判断是否需要对table进行扩容,扩容思想设计及其源代码实现同样非常精妙,值得多次阅读和学以致用!

本文将重点深入分析CHM核心扩容逻辑:transfer、helpTransfer、以及resizeStamp。

CHM的sc设置基数图示

《gitee 博客文章封面》

1、addCount的扩容判断设计

第1个执行扩容线程

本章节最精彩的地方:分析Doug Lea 如何安排“每个加入扩容任务线程对sc进行cas加1计数”、“每个结束自己扩容任务线程对sc进行减1”、以及“最后一个结束扩容线程要干些什么收尾工作”。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
// putVal调用addCount(1L, binCount),因此这里x就是1,check即binCount,对于put入一个key(key已存在,则binCount=0,新key,binCount>0),那么binCount自然是>=0
private final void addCount(long x, int check) {
CounterCell[] as; long b, s;
// 主分支1:完成加1计数逻辑,之前文章fullAddCount已经详细分析,本文重点讲解addCount主分支2
// ......忽略部分


// 主分支2:当前线程检查是否需要扩容,若需要,则执行transfer扩容逻辑
// check即binCount,每次新增节点,当然要检查是否需要扩容
if (check >= 0) {
Node<K,V>[] tab, nt; int n, sc;
// 这里的s就是主分支1里面的 s = sumCount(),含义:完成加1计数后,统计当前CHM节点总数量s,看看s有无达到扩容阈值sizeCtl
while (s >= (long)(sc = sizeCtl) && (tab = table) != null &&
(n = tab.length) < MAXIMUM_CAPACITY) {
int rs = resizeStamp(n); // jdk 1.8这里的写法是一个bug,后面有指出原因。
//分支2.1:sc的值为负数时,表明CHM还在扩容期,原因参考后面小节的sc、resizeStamp方法的解析
if (sc < 0) {
// 分支2.1.1:while循环扩容结束点,扩容结束条件有5个,(nt = nextTable) == null 以及transferIndex <= 0条件再看完transfer源码解析后,可以很容易理解,但前面3个条件目前理解会很困难,需要理解后面小节的sc、resizeStamp方法解析后才能准确理解其含义,也即这里先跳过这个5个条件的解释。
if ((sc >>> RESIZE_STAMP_SHIFT) != rs || sc == rs + 1 ||
sc == rs + MAX_RESIZERS || (nt = nextTable) == null ||
transferIndex <= 0)
break;
// 分支2.1.2:第1个执行扩容线程在分支2.2将sc设置基数值后,以后每进来一个扩容线程都会对sc进行cas加1
if (U.compareAndSwapInt(this, SIZECTL, sc, sc + 1))
transfer(tab, nt);
}
//分支2.2:第1个执行扩容线程会执行此逻辑,将sc(sizeCtl)设为一个基础数(该数为负数),为什么设置一个负数呢?后面resizeStamp方法给出非常完整解答!没有充分积累,此处看起来将很难理解其设计意图。
else if (U.compareAndSwapInt(this, SIZECTL, sc,
(rs << RESIZE_STAMP_SHIFT) + 2))
transfer(tab, null);
s = sumCount();
}
}
}

对于分支2.1:因为表示有其他线程在扩容,说明是并发状态,这部分逻辑先放着,我们先考察分支2.2的逻辑。

分支2.2: 对于ConcurrentHashMap 第1个进入扩容的线程会执行此逻辑,会将sc(sizeCtl)设为一个基础负数:(rs << RESIZE_STAMP_SHIFT) + 2

“第1个执行扩容线程”:当ConcurrentHashMap首次扩容时,负责首次扩容任务的线程就是“第1个执行扩容线程”,这个概念很重要!

1
2
3
4
5
6
//分支2.2:当ConcurrentHashMap首次扩容时,第1个执行扩容的线程会先执行此分支
else if (U.compareAndSwapInt(this, SIZECTL, sc,
(rs << RESIZE_STAMP_SHIFT) + 2))
//第1个负责扩容的线程执行以下扩容逻辑
{transfer(tab, null);}

正是因为有了“第1个执行扩容线程”将sc(sizeCtl)设为一个基础负数,此时回看分支2.1即可明白其含义:

此时sc是个基础负数,因此sc<0必然成立,换句话说:只要有第2个以及后续更多的线程进入whlie,sc<0成立,表明当前CHM正在扩容状态(或者有线程正在对CHM进行扩容处理)

1
2
3
4
      //分支2.1    
if (sc < 0) {
//...省略
}

有了以上解析的铺垫,现在我们重新理顺以下执行流程:假设当前CHM已经达到扩容阈值时,“第1个执行扩容线程”以及“第2个以及以后更多线程加入到扩容”在进入addCount后的执行流,从序号①开始,按升序推进,如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
  
//新put一个节点后,addCount(x=1, check=binCount=1)
private final void addCount(long x, int check) {
CounterCell[] as; long b, s;
// 主分支1:完成fullAddCount加1计算后,计算
s = sumCount();
// 主分支2:当前线程检查是否需要扩容,若需要,就执行transfer扩容逻辑
if (check >= 0) {
Node<K,V>[] tab, nt; int n, sc;
//① 我们已假设需要扩容,也即整个CHM的节点数量s已经达到扩容阈值sc=sizeCtl,“第1个执行扩容线程”go to ②
//④ 由于我们已经假设:第2个以及以后更多线程加入到扩容,这些线程能进入while,go to ⑤
while (s >= (long)(sc = sizeCtl) && (tab = table) != null &&
(n = tab.length) < MAXIMUM_CAPACITY) {
int rs = resizeStamp(n);
//② “第1个执行扩容线程”进来后,此时sc还是正值(扩容阈值sc=sizeCtl),go to ③
//⑤ 在上一轮“第1个执行扩容线程”执行③后,sc就是一个基数负值,因此“第2个以及以后更多线程加入扩容”的线程们会go to ⑥
if (sc < 0) {
// 这里的条件先放着,等看完后面的transfer方法解析,这些条件自然能迎刃而解
if ((sc >>> RESIZE_STAMP_SHIFT) != rs || sc == rs + 1 ||
sc == rs + MAX_RESIZERS || (nt = nextTable) == null ||
transferIndex <= 0)
break;
//⑥ “第2个以及以后更多线程加入到扩容”的线程们:在首轮中第1个线程将sc设置基数值后,以后每进来一个扩容线程,该线程对sc进行cas加1,为什么这么安排,请看下一节内容:sc设置为一个基础负数的实际意义
if (U.compareAndSwapInt(this, SIZECTL, sc, sc + 1))
// “第2个以及以后更多线程执行扩容”真正进入扩容处理的逻辑
transfer(tab, nt);
}
//③ “第1个执行扩容线程”用cas将sc设为一个基数负值
else if (U.compareAndSwapInt(this, SIZECTL, sc,
(rs << RESIZE_STAMP_SHIFT) + 2))
// “第1个执行扩容线程”真正进入扩容处理的逻辑
transfer(tab, null);
s = sumCount();
}
}
}
sc设置为一个基础负数的实际意义

sizeCtl在CHM扩容期间的用处大有来头:如下图所示

CHM的sc设置基数图示

此图可以清晰解释Doug Lea设计sc=(rs << RESIZE_STAMP_SHIFT) + 2)的意图:

第1个线程将sc设置基数值后,以后每进来一个扩容线程,该线程对sc进行cas加1(结合上图),代码实现是在上面的addCount的分支2.1.2

1
2
3
4
   // 分支2.1.2:第1个扩容线程将sc设置基数值后,以后每进来一个扩容线程,那个线程就会对sc进行cas加1后,再执行transfer扩容
if (U.compareAndSwapInt(this, SIZECTL, sc, sc + 1))
transfer(tab, nt);
}

对sc加1有什么用?其实到这里应该可以理解了:Doug Lea用这种方式记录当前参与扩容线程的数量,为何要记录参与扩容线程的数量?

两个目的,而且都是非常“聪明”的目的!

  • 第一个目的:为了限制参与扩容的总线程数

因为一个JVM进程开启太过多的线程数量(例如10万个线程)参与到扩容,意味着当前OS大量线程在运行,高并发下的线程上下文切换、大量线程栈占用空间,一定程度上导致CHM性能不增加反降低,甚至影响到“系统层面的稳定性”,至于为何设计最大扩容线程数量为65535(对应二进制1111 1111 1111 1111),需从文章后面的resizeStemp解析中找到答案。

题外话:65535是不是很熟悉?windows 最大可开启的tcp端口号数量?想想为何是这个数2^16-1?或者思考:一个Java进程到底能创建多少线程,注意到最大线程理论值=进程的用户地址空间除以线程栈的大小,用户地址空间和线程栈又取决于操作系统、内存、jvm参数等,具体可参考这篇文章JVM源码分析之一个Java进程究竟能创建多少线程

  • JVM:XmxXssMaxPermSizeMaxDirectMemorySizeReservedCodeCacheSize
  • Kernel:max_user_processesmax_map_countmax_threadspid_max

如果参与扩容的线程数量达到了最大值,后面再来第65536、65537、65538个等更多线程,这些线程不会参与到扩容逻辑代码,控制实现在addCount的分支2.1.1的条件:如果sc == rs + MAX_RESIZERS,也即当前参与到扩容队伍的总线程数量达到设定的最大值,再进来的线程不再安排扩容,这些线程会直接break返回。

1
2
3
4
5
6
7
8
9
10
11
if (check >= 0) {
Node<K,V>[] tab, nt; int n, sc;
// 完成加1计数后,算下当前CHM节点总数量s,看看s有无达到扩容阈值sizeCtl
while (s >= (long)(sc = sizeCtl) && (tab = table) != null &&
(n = tab.length) < MAXIMUM_CAPACITY) {
int rs = resizeStamp(n);
//分支2.1:这里是sc=sizeCtl,如果sizeCtl是一个负数(因为第1个进来扩容的线程将sc设置为一个基数负值),说明有其他线程正在处理table扩容,那么当前线程自然要看看自己能否参与到扩容逻辑中
if (sc < 0) {
// 分支2.1.1:如果扩容线程达到最大值,那么后续进来再多扩容线程都会直接break掉,也即不参与扩容逻辑。
if ((条件1|| 条件2 ||sc == rs + MAX_RESIZERS || 条件4 || 条件5)
break;
  • 第二个目的:为了确定到底哪个线程是“最后一个完成扩容的线程”?

为了确定到底哪个线程是“最后一个完成扩容的线程”,并让它来告知外界整个CHM已经完成了扩容,具体如何实现?

Doug Lea这么设计:每结束一个扩容线程,那个线程就对sc进行cas减1(结合上图理解),直到有一个线程对sc进行cas减1时恰好使得sc就是一开始设置的基础值,那么这个线程就是要找的“最后一个完成扩容的线程”,于是可以将finishing置为true,表示整个CHM的扩容已经完成,对应的源码如下,在transfer方法里面
1
2
3
4
5
6
7
8
9
10
11
12
for (int i = 0, bound = 0;;) {
Node<K,V> f; int fh;
while (advance) {
//...省略部分
if (U.compareAndSwapInt(this, SIZECTL, sc = sizeCtl, sc - 1)) {
//① 这里的逻辑被可改为以下的②、③写法
if ((sc - 2) != resizeStamp(n) << RESIZE_STAMP_SHIFT)
return;
finishing = advance = true;
i = n; // recheck before commit
}
//...省略部分
可把`(sc - 2) != resizeStamp(n) << RESIZE_STAMP_SHIFT` 改写为`sc = (resizeStamp(n) << RESIZE_STAMP_SHIFT) +2`,所以以上①逻辑可以改写下面的②和③写法:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
for (int i = 0, bound = 0;;) {
Node<K,V> f; int fh;
while (advance) {
//...省略部分
// 线程完成自己管辖的桶位节点转移到新表后,那么这个线程就对sc进行cas减1
if (U.compareAndSwapInt(this, SIZECTL, sc = sizeCtl, sc - 1)) {
//② 直到有一个线程对sc进行cas减1时恰好使得sc就是一开始设置的基础值,
if (sc==(resizeStamp(n) << RESIZE_STAMP_SHIFT) +2){
// 说明这个线程就是“最后一个完成扩容的线程”,由它来结束整个CHM的扩容流程
finishing = advance = true;
i = n; // recheck before commit
//③
}else{
// 注意这里在transfer解析中给出,请看完下面transfer方法解析再回来看这个return
return;
}
}
//...省略部分
这下逻辑清晰了: 执行`finishing = advance = true;i = n `后,“最后一个完成扩容的线程”在transfer内部就会按以下执行流程走(按序号升序来看执行流,从底下的①开始)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
for (int i = 0, bound = 0;;) {
Node<K,V> f; int fh;
//④ 因为③的advance=true,进入while,go to⑤
//⑥ 因为⑤进入if后,advance会改为false,故这里退出while,go to⑦
while (advance) {
int nextIndex, nextBound;
//⑤ 因为③的finishing=true,进入if
if (--i >= bound || finishing)
//回到while⑥
advance = false;
// 此分支与本次解析无关
else if ((nextIndex = transferIndex) <= 0) {
i = -1;
advance = false;
}
// 此分支与本次解析无关
else if (U.compareAndSwapInt
(this, TRANSFERINDEX, nextIndex,
nextBound = (nextIndex > stride ?
nextIndex - stride : 0))) {
bound = nextBound;
i = nextIndex - 1;
advance = false;
}
}
//⑦ 因为③的i=n,因此满足条件2:i>=n,进入if
if (i < 0 || i >= n || i + n >= nextn) {
int sc;
//⑧ 因为③的finishing=true,进入if
if (finishing) {
// 回收nextTable引用
nextTable = null;
// 将迁移好的新表赋给原来的table引用
table = nextTab;
//设置下次扩容阈值
sizeCtl = (n << 1) - (n >>> 1);
//⑨ 退出for循环,代表:整个CHM已完成所有节点的迁移
return;
}
//① 线程完成自己管辖的桶位节点转移到新表后,那么这个线程就对sc进行cas减1
if (U.compareAndSwapInt(this, SIZECTL, sc = sizeCtl, sc - 1)) {
//② 直到有一个线程对sc进行cas减1时恰好使得sc就是一开始设置的基础值,也即addCount分支2里面的:sc==(rs << RESIZE_STAMP_SHIFT) +2),go to ③
if ((sc - 2) != resizeStamp(n) << RESIZE_STAMP_SHIFT)
return;
//③ go to ④
finishing = advance = true;
i = n; // recheck before commit
}
}

当线程进入了transfer方法后,这些线程们是怎么知道自己负责第几号到第几号的桶位迁移呢、以及如何将自己管辖的桶位节点转移到新表中呢?这就是tranfer方法里面第二层要深入解析的内容。 #### 2、transfer内部精密的并发设计 ##### 2.1 为每个扩容线程分配“桶位区间” 假设现有4个线程并发同一时刻进入transfer逻辑,那么每个线程如何协同转移节点呢? 首先:为每个线程准确划分线程自己要管辖的“桶位区间”,既然要确定桶位区间`[left,right]`,就必然要在并发环境下计算每个线程自己区间的左边界下标和右边界下标,而右区间边界-左区间边界就是步长:stride,也即线程要完成转移的桶位个数(或者领取任务的个数/步长),如下:
1
2
3
4
5
private final void transfer(Node<K,V>[] tab, Node<K,V>[] nextTab) {
int n = tab.length, stride;
// 计算领取任务的步长,根据cpu个数来计算,如果计算值小于默认步长16,那么就采用默认步长16,否则步长采用计算值。本文为方便作图,将stride设为4
if ((stride = (NCPU > 1) ? (n >>> 3) / NCPU : n) < MIN_TRANSFER_STRIDE)
stride = MIN_TRANSFER_STRIDE; // subdivide range
以下以4个线程为例,table长度16,为了方便作图,本文的stride长度设为4,那么四个线程各自分配到桶位范围如下图所以 ![WX20210705-212539](https://img-blog.csdnimg.cn/img_convert/f84cad5ea7c6c4d721c6e3a65cb41900.png) 划分好每个线程自己的桶位区间有什么用?当然是各司其职,将自己管辖区间内的桶位节点迁移到新table上。 那么每个线程分配的桶位区间是如何以并发方式(无锁方式)计算出? 考察现在有线程1,2,3,4共4个线程同一时刻执行到`for (int i = 0, bound = 0;;) ` ,基于当前table=16和本文stride长度设为4。不妨假设线程1先执行,而且线程1是作为“第1个扩容线程”进入到transfer方法里面:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
    if ((stride = (NCPU > 1) ? (n >>> 3) / NCPU : n) < MIN_TRANSFER_STRIDE)
stride = MIN_TRANSFER_STRIDE; // subdivide range
// 首次扩容,transfer(table,null),也即nextTab为null,所以进入if
if (nextTab == null) { // initiating
try {
@SuppressWarnings("unchecked")
// 线程1先创建一个新表,两倍于旧表容量
Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n << 1];
nextTab = nt;
// 在本节中,我们假设旧table是16,因此线程1创建的新表为32,因此不会进入catch逻辑
} catch (Throwable ex) { // try to cope with OOME
// 如果线程本次扩容的新表容量恰好达到最大,就不再扩容,直接返回。
sizeCtl = Integer.MAX_VALUE;
return;
}
nextTable = nextTab;
transferIndex = n;
}
线程1上面流程走完后有:`transferIndex=n=16,stride=4` 线程1:从for循环开始,因为i = 0,bound = 0,因此分支1条件不成立,因为nextIndex = transferIndex=16,显然分支2的条件也不成立,那么只能执行分支3,假设4个线程此时都来到分支3,不妨假设此时线程1抢到CAS,那么对于线程1就会进入分支3逻辑,其他3个线程继续`while`自旋再回到分支3的CAS。线程1执行流程如下:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
   boolean advance = true;
boolean finishing = false;
for (int i = 0, bound = 0;;) {
// ....省略部分
while (advance) {
int nextIndex, nextBound;
//分支1:线程1刚进入for循环时,i=0,bound=0,显然不满足分支1条件
if (--i >= bound || finishing)
advance = false;
//分支2:这里的transferIndex的值就是前面transferIndex = n,nextIndex = transferIndex=16,此分支作用:判断CHM剩余可分配给线程桶位的数量,如果<=0,说明CHM所有桶位都分配给线程们了。
else if ((nextIndex = transferIndex) <= 0) {
i = -1;
advance = false;
}
//分支3:我们已经假设线程1首先抢到cas,故进入if。cas比较前有:nextIndex=transferIndex=16
else if (U.compareAndSwapInt(this, TRANSFERINDEX, nextIndex,
// nextBound=(16>4?) 16-4:0=12
nextBound = (nextIndex > stride ?
nextIndex - stride : 0))) {
// cas成功后,那么TRANSFERINDEX(transferIndex)就从16被设置成12.
// bound=nextBound=12
bound = nextBound;
// i=16-1=15
i = nextIndex - 1;
// 这里advance=false后,线程1就可以退出while循环,表明线程1分配到了桶位区间[12,15]分配
advance = false;
}
线程2:线程1完成cas逻辑后退出while,其他3个线程还得继续竞争分支3的cas,不妨假设线程2成功操作cas(线程3、线程4只能继续循环cas),那么线程2会执行以下逻辑:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
for (int i = 0, bound = 0;;) {
// ....省略部分
while (advance) {
int nextIndex, nextBound;
//分支1:线程2刚进入for循环时,i=0,bound=0,显然不满足分支1条件
if (--i >= bound || finishing)
advance = false;
//分支2:注意这里的transferIndex已经被线程1在上面的分支3cas阶段设置成12,含义:旧表16个桶位,目前还剩12个桶位未分配出去(线程1分配了4个)。nextIndex = transferIndex=12
else if ((nextIndex = transferIndex) <= 0) {
i = -1;
advance = false;
}
//分支3:已经假设本次由线程2抢到cas
else if (U.compareAndSwapInt
(this, TRANSFERINDEX, nextIndex,
// nextBound=(12>4?) 12-4:0=8
nextBound = (nextIndex > stride ?
nextIndex - stride : 0))) {
// 线程2cas成功后,那么TRANSFERINDEX(transferIndex)就从12被设置成8
// bound=nextBound=8
bound = nextBound;
// i=12-1=11
i = nextIndex - 1;
// 这里advance=false后,线程2就可以退出while循环,表示线程2现在分配了对应的桶位区间[8,11]
advance = false;
}
线程3:线程2完成cas逻辑后退出while,其他2个线程还是同时竞争分支3的cas,不妨假设线程3成功操作cas(此时线程4只能继续循环cas),那么线程3会执行以下逻辑:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
for (int i = 0, bound = 0;;) {
// ....省略部分
while (advance) {
int nextIndex, nextBound;
//分支1:线程3刚进入for循环时,i=0,bound=0,显然不满足分支1条件
if (--i >= bound || finishing)
advance = false;
//分支2:注意这里的transferIndex已经被线程2在上面的分支3cas阶段设置成8,含义:旧表16个桶位,目前还剩8个桶位未分配出去(线程1分配了4个,线程2分配了4个)。nextIndex = transferIndex=8
else if ((nextIndex = transferIndex) <= 0) {
i = -1;
advance = false;
}
//分支3:已经假设本次由线程3抢到cas
else if (U.compareAndSwapInt
(this, TRANSFERINDEX, nextIndex,
// nextBound=(8>4?) 8-4:0=4
nextBound = (nextIndex > stride ?
nextIndex - stride : 0))) {
// 线程3 cas成功后,那么TRANSFERINDEX(transferIndex)就从8被设置成4
// bound=nextBound=4
bound = nextBound;
// i=8-1=7
i = nextIndex - 1;
// 这里advance=false后,线程3就可以退出while循环,表示线程3现在分配了对应的桶位区间[4,7]
advance = false;
}
线程4:线程3完成cas逻辑后退出while,目前只有线程4操作分支3的cas,肯定能进入cas,那么线程4会执行以下逻辑:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
for (int i = 0, bound = 0;;) {
// ....省略部分
while (advance) {
int nextIndex, nextBound;
//分支1:线程4刚进入for循环时,i=0,bound=0,显然不满足分支1条件
if (--i >= bound || finishing)
advance = false;
//分支2:注意这里的transferIndex已经被线程3在上面的分支3cas阶段设置成4,含义:旧表16个桶位,目前还剩4个桶位未分配出去(线程1分配了4个、线程2分配了4个、线程3分配了4个)。nextIndex = transferIndex=4
else if ((nextIndex = transferIndex) <= 0) {
i = -1;
advance = false;
}
//分支3:本次由线程4抢到cas
else if (U.compareAndSwapInt
(this, TRANSFERINDEX, nextIndex,
// nextBound=(4>4?) 4-4:0=0
nextBound = (nextIndex > stride ?
nextIndex - stride : 0))) {
// 线程4 cas成功后,那么TRANSFERINDEX(transferIndex)就从4被设置成0,表示旧表16个桶位中,已经全部分配出去(给到各个线程)
// bound=nextBound=0
bound = nextBound;
// i=4-1=3
i = nextIndex - 1;
// 这里advance=false后,线程4就可以退出while循环,表示线程4现在分配了对应的桶位区间[0,3]
advance = false;
}
经过以上四轮while推演,可以发现四个线程并发同一时刻进入`for (int i = 0, bound = 0;;) `并来到`whlie(advance)` 的实际目的:就是为扩容线程们分配好各自要迁移的桶位区间,分配方式是从table尾部往头部方向的倒序分配,正如上面计算的线程1=>15到12,线程2=>11到8,线程3=>7到4,线程4=>3到0,而且巧妙的是:每个线程分配的桶位区间都不会重叠,原因在于:分支3需要线程自己抢到cas权才能对transferIndex扣减步长值

在4个线程分配好桶位区间的情况下,此时若存在第5个线程、第6个线程…..它们能分配到桶位区间吗?答案是不能,可以分开两种线程时刻讨论:

  • 第一种线程时刻:

不妨假设第5个线程、第6个线程…都是和线程1、线程2、线程3、线程4一起进入到for (int i = 0, bound = 0;;)并来到whlie(advance),由于前面4个线程分配好了桶位区间(但这个4个线程正准备扩容),此时考察第5个线程,不妨假设从while的①步骤开始

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
 for (int i = 0, bound = 0;;) {
// ....省略部分
boolean advance = true;
boolean finishing = false;
//① 线程5的首次advance为true,进入while
//⑤ 来自④的advance=false,go to ⑥
while (advance) {
int nextIndex, nextBound;
//② 线程5首次进入for循环时,i=0,bound=0,(且前面4个线程还没扩容完),显然不满足分支1条件finishing
if (--i >= bound || finishing)
advance = false;
//③ 从线程4的分支3可知,transferIndex已经被设置为0,含义:旧表16个桶位已经全部分配出去,所以线程5会进入该分支
else if ((nextIndex = transferIndex) <= 0) {
//④ 回到⑤while
i = -1;
advance = false;
}
// 线程5不会到此次分支
else if (U.compareAndSwapInt
(this, TRANSFERINDEX, nextIndex,
nextBound = (nextIndex > stride ?
nextIndex - stride : 0))) {
bound = nextBound;
i = nextIndex - 1;
advance = false;
}
//⑥ 因为④的i=-1,满足条件1,进入⑦
if (i < 0 || i >= n || i + n >= nextn) {
int sc;
// 前面4个线程还没扩容完,不会进入此if
if (finishing) {
nextTable = null;
table = nextTab;
sizeCtl = (n << 1) - (n >>> 1);
return;
}
//⑦ 线程5对sc进行cas减1计数:表示在整个CHM有1个线程正准备离开扩容队伍
if (U.compareAndSwapInt(this, SIZECTL, sc = sizeCtl, sc - 1)) {
//⑧ 由“前面4个线程还没扩容完”可知,线程5在⑦中对sc进行减1后,sc肯定还不是基础值,故去到⑨(在这里我们也可以知道,线程5不是“最后一个完成扩容的线程”)
if ((sc - 2) != resizeStamp(n) << RESIZE_STAMP_SHIFT)
//⑨ 线程5返回到外部的addCount方法⑪。
return;
finishing = advance = true;
i = n; // recheck before commit
}
}

由上面的①到⑨执行流,可以看出,由于transferIndex已经被线程4设置为0后(表示16个桶位都分配给前面4个线程),第5个线程就会进入③代码片区,最终从⑨结束并返回到外部addCount方法,可以清楚看到线程5在整个过程即没有“分配到桶位区间”也没有参与“桶位节点迁移工作”就直接返回了,继续看线程5返回点,从⑪开始:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
//新put一个节点后,addCount(x=1, check=binCount=1)
private final void addCount(long x, int check) {
CounterCell[] as; long b, s;
// .......省略部分

if (check >= 0) {
Node<K,V>[] tab, nt; int n, sc;
//⑫ 因为“前面4个线程还没扩容完”,且sc还是一个负数,因此s必然>=sc = sizeCtl,故线程5go to ⑬
while (s >= (long)(sc = sizeCtl) && (tab = table) != null &&
(n = tab.length) < MAXIMUM_CAPACITY) {
int rs = resizeStamp(n);
//⑬ 因为“前面4个线程还没扩容完”,那么sc还是个负值,故线程5go to ⑭
if (sc < 0) {
//⑭ 在上面tansfer方法的③步骤中已经知道,transferIndex已经是等于0,故满足条件5:transferIndex <= 0,所以线程5会被break并退出了while循环。
if ((sc >>> RESIZE_STAMP_SHIFT) != rs || sc == rs + 1 ||
sc == rs + MAX_RESIZERS || (nt = nextTable) == null ||
transferIndex <= 0)
break;
if (U.compareAndSwapInt(this, SIZECTL, sc, sc + 1))
transfer(tab, nt);
}
else if (U.compareAndSwapInt(this, SIZECTL, sc,
(rs << RESIZE_STAMP_SHIFT) + 2))
transfer(tab, null); //⑪ 线程5返回点,接着线程5会回到while
s = sumCount();
}
}
}

在这里⑭步骤可以知道,线程5就退出了。

由此,我们终于搞清楚这样的扩容安排:对于旧表16开始扩容时,已经有4个线程分配好各自待迁移桶位区间后,如果此时后续再来第5个线程、第6个线程等更多线程进入到扩容逻辑transfer方法,这些后来者线程(或者在分配桶位区间竞争失败的线程)最终即没有“分配到桶位区间”也没有参与“桶位节点迁移工作”,直接结束addCount方法调用。
  • 第二种线程时刻:

假设线程1、线程2、线程3、线程4已经分配好了桶位区间,并假设此时第5个线程、第6个线程等其他线程开始进入AddCount方法内部的while位置:此时考察第5个线程,由于transferIndex已经等于0可知,其实此时线程5(线程6等)就会回到上面第一种情况分析的⑭步骤:

1
2
3
4
5
6
7
//⑬ 因为“前面4个线程还没扩容完”,那么sc还是个负值,故线程5go to ⑭
if (sc < 0) {
//⑭ 在上面tansfer方法的③步骤中已经知道,transferIndex已经是等于0,故满足条件5:transferIndex <= 0,所以线程5会被break并退出了while循环。
if ((sc >>> RESIZE_STAMP_SHIFT) != rs || sc == rs + 1 ||
sc == rs + MAX_RESIZERS || (nt = nextTable) == null ||
transferIndex <= 0)
break;

结论跟第一种情况一样,此处不再累赘。

关于以上四个线程并发分配桶位区间的计算过程,总结如下图

四个线程桶位分配过程

2.2 桶位区间迁移节点的设计

前面给4个线程分配好各自的桶位区间后,接下来四个线程要完成自己管辖的桶位区间节点迁移任务。

不妨以线程1作为考察:线程1管辖的桶位区间是[12,15],从第i=15号桶位开始进行节点迁移,直到4个桶位节点都迁移完毕的图示流程:

线程1迁移桶位区间过程

图中的fwd节点:线程每完成一个节点迁移,就在旧表桶位放置一个fwd节点

以下的桶位区间节点迁移过程是基于i=15桶位开始:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
      // Forwarding中文译为:转发、转投,因此ForwardingNode表示“转发节点”,这个类也是jdk1.8CHM一个关键设计,起着“通告作用”,只要有写线程进行put、replace、clear、compute看到桶位是一个fwd节点,那么这些线程就会转而去加入扩容队伍(帮助扩容helpTransfer)、而读线程约到fwd节点就会“转到”新表去读数据节点
ForwardingNode<K,V> fwd = new ForwardingNode<K,V>(nextTab);
// 例如,线程1管辖的桶位区间是`[bound,i]`,第i个捅位节点迁移完毕后,advance为true时,表示线程1向前移动1步(i指针向右移动1步)以便继续迁移第--i个桶位节点
boolean advance = true;
boolean finishing = false; // to ensure sweep before committing nextTab
for (int i = 0, bound = 0;;) {
Node<K,V> f; int fh;
// 逻辑1:线程1首次拿到桶位区间[12,15]会退出while,走到逻辑2
while (advance) {
int nextIndex, nextBound;
// 分支1:接续下面的逻辑3、逻辑4、逻辑5,请看完逻辑3、逻辑4、逻辑5后再回来分支1的条件
if (--i >= bound || finishing)
advance = false;
// 分支2
else if ((nextIndex = transferIndex) <= 0) {
i = -1;
advance = false;
}
// 分支3:
else if (U.compareAndSwapInt
(this, TRANSFERINDEX, nextIndex,
nextBound = (nextIndex > stride ?
nextIndex - stride : 0))) {
bound = nextBound;
i = nextIndex - 1;
advance = false;
}
}
// 逻辑2:接逻辑1,i=15,不满足这三个判断条件,因此会跳过逻辑2走到逻辑3
if (i < 0 || i >= n || i + n >= nextn) {
int sc;
// 分支2.1
if (finishing) {
nextTable = null;
table = nextTab;
sizeCtl = (n << 1) - (n >>> 1);
return;
}
// 分支2.2
if (U.compareAndSwapInt(this, SIZECTL, sc = sizeCtl, sc - 1)) {
if ((sc - 2) != resizeStamp(n) << RESIZE_STAMP_SHIFT)
return;
finishing = advance = true;
i = n; // recheck before commit
}
}
/* 逻辑3:旧表中不是每个桶位都有节点数据,当线程1发现桶位15为null时,就在此放入一个ForwardingNode节点,告知外界:当前桶位正在迁移节点,这样当其他线程看到桶位节点是fwd时,就转去执行帮助扩容逻辑,如下所示
还记得putVal方法里面的以下设计吗:
其他线程看到桶位节点是fwd时,就转去执行帮助扩容的逻辑。
else if ((fh = f.hash) == MOVED)
tab = helpTransfer(tab, f);
*/
else if ((f = tabAt(tab, i)) == null)
advance = casTabAt(tab, i, null, fwd);

// 逻辑4:如果逻辑3条件不成立,就会走到逻辑4,也即线程1发现第15号桶位已经是一个fwd节点,说明当前桶位节点已经被其他线程迁移完毕,那么线程1可以跳过第15号桶位继续向前走1步,advance = true,也即去第14桶号桶,然后回到while逻辑1里面的分支1
else if ((fh = f.hash) == MOVED)
advance = true; // already processed
// 逻辑5:若线程1走到这里,说明当前桶位节点是一个正常的数据节点(链表、红黑树),直接给当前桶位头节点加独占锁后,执行迁移节点到新表的逻辑
else {
synchronized (f)
// 省略部分
//将i位置低位节点链迁移到新表的i位置
setTabAt(nextTab, i, ln);
//将i位置高位节点链迁移到新表的i+n位置
setTabAt(nextTab, i + n, hn);
//在当前桶位放入fwd,告知外界:当前桶位已经迁移完
setTabAt(tab, i, fwd);
//advance = true让线程1回到逻辑1while里面的分支1,表示继续处理第14号(--i)桶位节点
advance = true;
}

有了以上扩容线程1对i=15桶节点迁移流程的说明后,那么接下里继续讨论线程1如何知道已经完成[bound=12,i=15]桶位区间所有节点迁移且退出for循环的流程。

不妨考察线程1当前迁移的桶位是第i=13桶号,之后的执行流程按以下序列号进行,从底部的①开始按升序知道第⑰步的return,以下逻辑一定耐心看完,

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
      //addCount调用transfer(tab, null);
private final void transfer(Node<K,V>[] tab, Node<K,V>[] nextTab) {
// ......省略部分
for (int i = 0, bound = 0;;) {
Node<K,V> f; int fh;
//③ 从底部②的advance为ture,故进入while,go to ④
//⑥ 从⑤的advance为false,故退出while,go to ⑦
//⑪ 从底部⑩的advance为ture,故进入while,go to ⑫
//⑮ 从⑭的advance为false,故退出while,go to ⑯
while (advance) {
int nextIndex, nextBound;
//④ --i等于12>=bound=12,进入if
//⑫ --i等于11已经越过左边界bound=12,不进入if,go to ⑬
if (--i >= bound || finishing)
//⑤ 回到while⑥
advance = false;
//⑬ 由于16个桶位已全部分配出去,因此transferIndex=0,满足条件,进入if
else if ((nextIndex = transferIndex) <= 0) {
i = -1;
//⑭ 说明线程1已经迁移完自己管辖的4个桶位节点,不需要继续前进处理下一个桶位了,go to⑮
advance = false;
}
// 线程1已经分配好桶位区间,故分支与本次解析无关,可忽略
else if (U.compareAndSwapInt
(this, TRANSFERINDEX, nextIndex,
nextBound = (nextIndex > stride ?
nextIndex - stride : 0))) {
bound = nextBound;
i = nextIndex - 1;
advance = false;
}
}
//⑯ 因为⑭已经将i设为-1,这里满足条件i<0,进入if,go to ⑰
if (i < 0 || i >= n || i + n >= nextn) {
int sc;
// 我们假设线程1不是“最后一个完成扩容的线程”,故跳过逻辑
if (finishing) {
nextTable = null;
table = nextTab;
sizeCtl = (n << 1) - (n >>> 1);
return;
}
//⑰ 线程1对sc进行cas减1操作:表示在整个CHM有1个线程正准备离开扩容队伍
if (U.compareAndSwapInt(this, SIZECTL, sc = sizeCtl, sc - 1)) {
// 由于我们假设线程1不是“最后一个完成扩容的线程”,满足条件,执行return,退出for循环
if ((sc - 2) != resizeStamp(n) << RESIZE_STAMP_SHIFT)
return;
finishing = advance = true;
i = n; // recheck before commit
}
}
//⑦ 在这里我们假设第12号桶不为空,因此go to ⑧
// (如果第12号(i)桶为空,就直接给该桶位设为fwd表示已处理,将advance设为true:表示让线程1继续迁移下一个(第--i个)桶位节点。回到上面while循环)
else if ((f = tabAt(tab, i)) == null)
advance = casTabAt(tab, i, null, fwd);
//⑧ 在这里我们假设第12号桶不是fwd节点而是正常节点,因此go to ⑨
else if ((fh = f.hash) == MOVED)
//(如果第12号(i)桶是一个fwd节点,表示有其他“协助型线程”处理过(所以源码注释为already processed),那么线程1更省功夫了,直接将advance设为true:表示让线程1继续迁移下一个(第--i个)桶位节点。回到上面while循环)
advance = true;
//① 准备迁移i=13号桶位,去setTabAt等逻辑
//⑨ 准备迁移i=12号桶位,去setTabAt等逻辑
else {
synchronized (f) {
// 如果是桶位节点是普通链表
if (tabAt(tab, i) == f) {
// 将低位链表、高位链表迁移到新表对应的i、i+n位置
setTabAt(nextTab, i, ln);
setTabAt(nextTab, i + n, hn);
// 在第i=13号桶位放置fwd节点,表明该桶位已经处理完
setTabAt(tab, i, fwd);
// ② 将advance设为true,回到while③
// ⑩ 再次将advance设为true,回到while⑪
advance = true;
}
// (类似链表的逻辑)如果是桶位节点是TreeBin(也即桶位是一棵红黑树)
else if (f instanceof TreeBin) {
//(类似链表的逻辑)
setTabAt(nextTab, i, ln);
setTabAt(nextTab, i + n, hn);
setTabAt(tab, i, fwd);
advance = true;
}
}
}
}
}
}

现在考察线程1执行完⑰的return后,它会在回到⑲while处,然后流程如下:最后发现线程1完成自己桶位区间节点迁移后,会在以下的㉑步骤退出while循环结束addCount方法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
// addCount(long x, int check)
private final void addCount(long x, int check) {
// ......省略部分
if (check >= 0) {
Node<K,V>[] tab, nt; int n, sc;
//⑲ 由于线程2、3、4还在迁移桶位节点中,因此这里sc = sizeCtl还是负值,而s=sumCount()是正值,因此能进入while,go to ⑳
while (s >= (long)(sc = sizeCtl) && (tab = table) != null &&
(n = tab.length) < MAXIMUM_CAPACITY) {
int rs = resizeStamp(n);
//⑳ 由于线程2、3、4还在迁移桶位节点中,因此这里sc = sizeCtl还是负值
if (sc < 0) {
//㉑ 基于上面线程1到线程4的桶位分配完毕,因此transferIndex还是等于0,故线程1在这里就会break退出addCount方法
if ((sc >>> RESIZE_STAMP_SHIFT) != rs || sc == rs + 1 ||
sc == rs + MAX_RESIZERS || (nt = nextTable) == null ||
transferIndex <= 0)
break;
if (U.compareAndSwapInt(this, SIZECTL, sc, sc + 1))
transfer(tab, nt);
}
else if (U.compareAndSwapInt(this, SIZECTL, sc,
(rs << RESIZE_STAMP_SHIFT) + 2))
//⑱ 来自内部transfer第⑰步的return,线程1继续返回到while
transfer(tab, null);
s = sumCount();
}
}
}
2.3 桶位节点转移到新表

2.1关注的是线程自己如何一个桶位一个桶位迁移的控制逻辑,那么接下来就要解释扩容线程如何将旧表桶位节点转移到新表中的过程,其实只要掌握jdk1.8的HashMap双向链表到树和jdk1.7的ConcurrentHashMap链表转移设计思想,以下逻辑则很好理解:

针对两种数据节点类型对应两种迁移设计:

  • 链表迁移思路:

1、synchronized锁住当前桶位头节点f,保证当前线程独占操作。换句话说:只能有1个线程负责迁移当前桶位节点到新数组,请别误解为:jdk1.8多线程并发协同扩容是指多个线程一起在同一个桶位上“并发协同”迁移桶位节点。

2、如果桶位头节点是一条链表,先找出lastRun节点为首的子链,然后根据高位节点(p.hash & n 不等于0)和低位节点特征(p.hash & n等于0)分别构建一条低位节点链ln和一条高位节点链hn

(如果你已经掌握jdk1.7CHM的Segment数组扩容算法,你会发现jdk1.8的处理方式比1.7更优,因为jdk1.8引入了低位节点链和高位节点链作为迁移前的中间容器,而这里的低位链和高位链的设计需要你掌握jdk1.8HashMap的数组扩容算法)

3、链表除去lastRun子链,还剩下位于lsatRun节点前面的节点,将这些节点使用头插法插入到ln或者hn中

4、此时冲突链被完整的拆开成低位链和高位链,接下来就好办了:使用cas方式,低位链放在新表的第i位置,高位链放在新表的第i+n位置

5、第4点迁移完后,线程在当前桶位放置一个fwd,表示当前桶位已迁移完。

6、线程将advance设为true,回到while继续处理“桶位区间”的下一个待迁移桶位

1
2
3
4
while (advance) {
int nextIndex, nextBound;
if (--i >= bound || finishing)
advance = false;

这里再次提醒:“桶位区间”的迁移顺序是倒序的,例如当前线程负责的桶位区间为[12,15],当第15号桶位迁移完后,—i继续处理下一个第14号桶位节点,以此类推,直到—i等于11越过左边界12,说明此时桶位区间包含的4个桶位节点全部被当前线程迁移到新表里面。

lastRun子链是什么?

就是从链表的尾部节点开始,找到具有相同的(e.hash & n)值的连续最大子链,图示如下:

其中e.hash & n的计算公式可找出低位节点和高位节点。

lastRun示意图.001

  • TreeBin(红黑树)迁移思路

因为TreeBin持有红黑树的root节点,关于红黑树扩容的分析在在jdk1.8的HashMap文章有详细的过程,CHM也类似逻辑,这里不再累赘。

以上两种迁移思路就是下面transfer方面里面:synchronized (f)代码片段要实现的逻辑

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
private final void transfer(Node<K,V>[] tab, Node<K,V>[] nextTab) {
//......省略部分
else if ((f = tabAt(tab, i)) == null)
advance = casTabAt(tab, i, null, fwd);
else if ((fh = f.hash) == MOVED)
advance = true; // already processed

// 线程若能进入以下分支,说明此桶位头节点既不是null也不是转发节点fwd,那么该桶位头节点要么是一条冲突链表、要么是一个TreeBin节点(也即一棵红黑树)
else {
synchronized (f) {
// 链表的迁移逻辑
if (tabAt(tab, i) == f) {
// ln:用于存放低位节点链,hn:用于存放高位节点链
Node<K,V> ln, hn;
if (fh >= 0) {
int runBit = fh & n;
Node<K,V> lastRun = f;
//①
for (Node<K,V> p = f.next; p != null; p = p.next) {
int b = p.hash & n;
// 找出有相同特征的、以lastRun节点为首的子链
if (b != runBit) {
runBit = b;
lastRun = p;
}
}
// runBit也即p.hash & n=0,说明是低位节点
if (runBit == 0) {
// runBit == 0,说明lastRun子链是低位节点构成的,因此理应将lastRun放在低位链ln,此时hn还是设为null,hn在第②步骤会被用到
ln = lastRun;
hn = null;
}
// 也即runBit不为0,p.hash & n !=0,说明是高位节点
else {
hn = lastRun;
ln = null;
}

//②
for (Node<K,V> p = f; p != lastRun; p = p.next) {
int ph = p.hash; K pk = p.key; V pv = p.val;
//(ph & n) == 0的节点是低位特征节点
if ((ph & n) == 0)
// 将剩余的低位节点使用头插法插入到低位链中
ln = new Node<K,V>(ph, pk, pv, ln);
//(ph & n) !=0的节点是高位特征节点
else
// 将剩余的低位节点使用头插法插入到高位链中
hn = new Node<K,V>(ph, pk, pv, hn);
}

//使用cas机制:低位链放在新表的第i位置,
setTabAt(nextTab, i, ln);
//使用cas机制:高位链放在新表的第i+n位置
setTabAt(nextTab, i + n, hn);
//在旧表当前i桶位放置fwd节点,表示已迁移完
setTabAt(tab, i, fwd);
// 回到while继续处理下一个桶位
advance = true;
}
else if (f instanceof TreeBin) {
TreeBin<K,V> t = (TreeBin<K,V>)f;
// 用于存放红黑树中低位节点的单向链表
TreeNode<K,V> lo = null, loTail = null;
// 用于存放红黑树中高位节点的单向链表
TreeNode<K,V> hi = null, hiTail = null;
// lc计数器,用于判断低位节点链是否需要树化,hc同理
int lc = 0, hc = 0;

//③ jdk1.8的CHM红黑树本身也是一条单向链表
for (Node<K,V> e = t.first; e != null; e = e.next) {
int h = e.hash;
TreeNode<K,V> p = new TreeNode<K,V>
(h, e.key, e.val, null, null);
// 构建低位节点单向链表,并计数
if ((h & n) == 0) {
//如果p.prev和hiTail是空,说明当前p节点是头节点,放在lo位置
if ((p.prev = loTail) == null)
lo = p;
else
// 其他非头节点,按尾插法插入到链表尾部
loTail.next = p;
//遍历下一个节点
loTail = p;
//链表长度计数
++lc;
}
// 构建高位节点单向链表,并计数
else {
if ((p.prev = hiTail) == null)
hi = p;
else
hiTail.next = p;
hiTail = p;
++hc;
}
}
// 低位链表是否需要树化,计数小于等于6不进行树化,如果不存在高位链表,
/* 低位链表需要树化:
1、若hc!=0,将低位节点链表包装成TreeBin类型(内部会重新构建树),以便放入到新表对应的桶位头节点位置
2、若hc=0,说明新构建的低位链还是原来红黑树本身的链表,无需改动,直接返回ln=t。 TreeBin<K,V> t = (TreeBin<K,V>)f;
*/
ln = (lc <= UNTREEIFY_THRESHOLD) ? untreeify(lo) :
(hc != 0) ? new TreeBin<K,V>(lo) : t;
// 同上,不再赘述。
hn = (hc <= UNTREEIFY_THRESHOLD) ? untreeify(hi) :
(lc != 0) ? new TreeBin<K,V>(hi) : t;
setTabAt(nextTab, i, ln);
setTabAt(nextTab, i + n, hn);
setTabAt(tab, i, fwd);
advance = true;
}
}
}
}
}
}

2.4 transfer扩容并发度讨论

在jdk1.7的CHM中,它的并发度设计是相对受限的,在创建CHM中,可以指定CONCURRENCY_LEVEL的值来设并发度,遗憾的是:一旦创建好,CHM支持的并发度就不能更改。并且在jdk1.7的CHM中,指定的并发度设计的太大或者太小也会有问题:

  • 并发太小时,多个线程都在竞争数量不多的segment加剧竞争,可谓“僧多粥少”
  • 并发度设为太大时(例如segment并发度设为65535最大值),使得key原本可定位在同一个segment的读、写操作就会被分配到不同的segment中,结果是cpu cache命中率会下降,会在一定程度上降低CHM性能

而在jdk1.8创建CHM中,它不会限制并行度:

考察put/transfer方法,内部在桶位迁移用时synchronized锁住桶位头节点f的设计,也即锁粒度为桶位,那么随着transfer方法扩容CHM使得底层table更大后,因为锁粒度为桶位,因此并发度也随着变大,换句话说,jdk1.8的CHM的并发度是动态变化:并发度大小等于底层数组长度,底层数组每扩容1次,并发度就变大1倍,而不是像jdk1.7那样再创建期间并发度就被限制死了。

当然jdk1.8CHM的并发度因为是动态变大,当table容量太大时,也会面临cpu cache命中率会下降的性能问题。

2.5 CHM扩容时,可以同时支持读取get节点吗?

答案是支持的,CHM扩容完全不影响其他读线程读取key,为何?在“transfer方法桶位节点转移的处理逻辑”小节可以得到答案:

例如桶位的冲突链正在扩容,你发现它扩容时只是复制该链表的节点(ln = lastRun),也就是原链表还在桶位上,

1
2
3
4
5
6
7
8
9
10
11
// runBit也即p.hash & n=0,说明是低位节点
if (runBit == 0) {
// runBit == 0,说明lastRun子链是低位节点构成的,因此理应将lastRun放在低位链ln,此时hn还是设为null,hn在第②步骤会被用到
ln = lastRun;
hn = null;
}
// 也即runBit不为0,p.hash & n !=0,说明是高位节点
else {
hn = lastRun;
ln = null;
}

从红黑树的迁移更容易找到答案:

迁移线程将红黑树的节点复制到一条(lo—->loTail或者hi—->hiTail)或者两条新链表上(lo—->loTail以及hi—->hiTail),桶位上原链表结构(TreeNode的next属性构成)未发生任何改变,因此若此时有“读线程”来读桶位上key是完全OK的,不受扩容线程影响。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
else if (f instanceof TreeBin) {
TreeBin<K,V> t = (TreeBin<K,V>)f;
// 用于存放红黑树中低位节点的单向链表
TreeNode<K,V> lo = null, loTail = null;
// 用于存放红黑树中高位节点的单向链表
TreeNode<K,V> hi = null, hiTail = null;
// lc计数器,用于判断低位节点链是否需要树化,hc同理
int lc = 0, hc = 0;

//③ 将原链节点拷贝到hi/lo链上,这里可以证明:CHM扩容时也支持并发get原链表
for (Node<K,V> e = t.first; e != null; e = e.next) {
int h = e.hash;
TreeNode<K,V> p = new TreeNode<K,V>
(h, e.key, e.val, null, null); // 用原节点的key和value创建一个新的TreeNode,不就相当于拷贝节点,且原链表结构没有收到影响。
// 构建低位节点单向链表,并计数
if ((h & n) == 0) {
//如果p.prev和hiTail是空,说明当前p节点是头节点,放在lo位置
if ((p.prev = loTail) == null)
lo = p;
else
// 其他非头节点,按尾插法插入到链表尾部
loTail.next = p;
//遍历下一个节点
loTail = p;
//链表长度计数
++lc;
}

3、helpTransfer方法解析

有了上面transfer详细的多线程协同扩容细节铺垫后,那么helpTransfer方法则相当好理解:

该方法在CHM的相关“写操作”方法被调用,如:put、remove、replace、clear、compute、computeIfAbsent、merge,实际含义为:

当一个线程去对CHM进行写操作时(put、remove、replace、clear、compute、computeIfAbsent、merge)”,如果恰好遇到桶位节点是一个ForwardingNode节点,那么这个线程先转而去帮助当前CHM扩容。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32

/**留意源码官方注释
* Helps transfer if a resize is in progress.
*/
final Node<K,V>[] helpTransfer(Node<K,V>[] tab, Node<K,V> f) {
Node<K,V>[] nextTab; int sc;
//① 条件1和条件3都是判断底层table还在扩容中,条件2再次检查当前桶位是否是一个ForwardingNode节点
if (tab != null && (f instanceof ForwardingNode) &&
(nextTab = ((ForwardingNode<K,V>)f).nextTable) != null) {
int rs = resizeStamp(tab.length);
//③ while自旋配合cas进制是一套很常用的逻辑。三个条件都是能够判断“只要底层table还在扩容”
//条件1:nextTab == nextTable,说明nextTable还未指向table变量,
//条件2:table == tab,table变量还是指向旧表tab
//条件3:有了前面小节内容的铺垫,这里sizeCtl是负数表示CHM在扩容中
while (nextTab == nextTable && table == tab &&
(sc = sizeCtl) < 0) {
//④ 这4个条件都是判断是否协助扩容,任意一个条件成立都使得线程不需要再去参与扩容,直接break返回,当然前面3个判断条件的理解目前相对困难,可看完后面章节即可立即。
if ((sc >>> RESIZE_STAMP_SHIFT) != rs || sc == rs + 1 ||
sc == rs + MAX_RESIZERS || transferIndex <= 0)
break;
//⑤ 每个进入扩容的线程都要对sc进行加1计数,由于采用cas方法,一次cas不成功就继续回到③while重试,直到sc加1成功或者满足④判断条件才不会继续while重试
if (U.compareAndSwapInt(this, SIZECTL, sc, sc + 1)) {
transfer(tab, nextTab);
break;
}
}
//⑥ 只要③的while条件成立,说明底层table已经扩容完成,可直接返回新table引用:nextTab给到外部调用点:tab = helpTransfer(tab, f),这是tab指向的是扩容后的table
return nextTab;
}
//② 如果①有个一个条件不成立,说明已经完成底层数组扩容,那当前线程就不需要“帮助扩容”,直接返回table引用(注意此table引用其实是指向了newTab,也即扩容后的那个table)
return table;
}

4、addCount主分支2为何使用while循环

4.1 背景

只要CHM节点数量size达到扩容阈值且数组长度还未到最大值MAXIMUM_CAPACITY就可以进行扩容操作,这一点其实跟HashMap的扩容判断是一致的。

主分支2里面为何使用while 循环判断是否需要扩容呢? 就不能用if来取代吗?答案:不能,因为CHM是并发扩容,不是单线程的HashMap扩容操作,如果将主分支2的while改为if

1
2
3
4
5
6
7
8
9
10
11
   //addCount的主分支2  
if (check >= 0) {
Node<K,V>[] tab, nt; int n, sc;
//将while改成if
if (s >= (long)(sc = sizeCtl) && (tab = table) != null &&
(n = tab.length) < MAXIMUM_CAPACITY) {
//扩容逻辑
s = sumCount();
}
}
}

考察并发量高两种情形:

  • 情况1:使得扩容完成的线程能继续加入未完成的扩容任务

    在原table还有桶位未分配出去时,也即transferIndex>0,那么某线程A完成自己的桶位区间的节点迁移后,主分支2使用while语义就可以保证线程A继续加入到还未完成的扩容任务,而不是退出,显然线程A得到复用,而不需要创建新的线程,提高cpu效率。若使用if语义,当线程A完成自己的桶位区间的节点迁移后会直接退出,未充分利用已在执行态的线程A。

  • 情况2:上一阶段刚扩容完因并发量高导致节点数量立即达到下一阶段扩容阈值

    在CHM容量n0扩容到容量n1,有一扩容线程A完成扩容后退出,并发的其他线程很快把线程A扩容的容量n1的table写满,此时需要继续将容量n1的table扩容到容量n2(但线程A已经退出扩容处理),结果容量n2的新表很快又被并发的其他线程put满,此时急需第3次扩容(但线程A已经退出扩容处理),结果容量n3的新表很快又被并发的其他线程put满,以此类推…

    所以你会发现:既然线程第1次扩容的n1很快会被并发的其他线程put满,那么可以这么设计:线程完成扩容transfer逻辑后先不结束线程,而是使用`while`强制安排线程继续判断CHM是否需要下一阶段扩容,如果需要下一阶段扩容,线程立即设置基础值然后进入transfer开始下一阶段扩容,复用已经处在执行态的线程,而不是去创建新线程。
4.2 线程会不会无限次扩容

按情况2的解释,线程岂不是无限次扩容?这就是次分支1要解决的情况,里面有一个break,显然这就是while循环结束点,所以线程不会无限扩容下去,那么满足什么条件线程才会break,这里就需要第5节的resizeStamp 技术点来解释,请参考之。

1
2
3
4
5
6
7
//分支1
if (sc < 0) {
// 次分支1
if ((sc >>> RESIZE_STAMP_SHIFT) != rs || sc == rs + 1 ||
sc == rs + MAX_RESIZERS || (nt = nextTable) == null ||
transferIndex <= 0)
break;

5、resizeStamp(n)以及sc变量的解析(CHM最关键的设计之一)

5.1 rs设计意图?

resizeStamp方法和sc变量在transfer方法里面给我们造成很大困扰,它们的设计意图不好理解,resizeStamp是CHM的最关键设计之一,因此需要一步一步用位运算去探索和推演Doug Lea为何采用二进制的角度来实现sizeCtl支持多种状态标记以及rs扩容印记的设计,具体如下:

首先,使用场景肯定是用于扩容,不管从其方法命名还是从下面源码官方注释:

1
2
3
4
5
6
7
8
/* ---------------- Table Initialization and Resizing -------------- */

/**
* Returns the stamp bits for resizing a table of size n.
* Must be negative when shifted left by RESIZE_STAMP_SHIFT.
*/
static final int resizeStamp(int n) {
return Integer.numberOfLeadingZeros(n) | (1 << (RESIZE_STAMP_BITS - 1));

第二点,可以用resizeStamp的值和sc变量来判断整个CHM扩容是否结束:

1
2
3
4
5
if (sc < 0) {
if ((sc >>> RESIZE_STAMP_SHIFT) != rs || sc == rs + 1 ||
sc == rs + MAX_RESIZERS || (nt = nextTable) == null ||
transferIndex <= 0)
break;

第三点,sc变量可以记录进入扩容逻辑的线程数量

1
2
if (U.compareAndSwapInt(this, SIZECTL, sc, sc + 1))
transfer(tab, nt);

还可以实现对退出扩容逻辑线程的计数以及找出最后一个完成扩容的线程

1
2
3
4
5
if (U.compareAndSwapInt(this, SIZECTL, sc = sizeCtl, sc - 1)) {
if ((sc - 2) != resizeStamp(n) << RESIZE_STAMP_SHIFT)
return;
finishing = advance = true;
i = n; // recheck before commit

带着以上思路,现在以table长度n=16需要扩容进行逐步深入分析,以下全部计算过程采用位计算方式,Int类型为32位,分为高16位和低16位,第32位是符号位。

对于resizeStamp(16)有以下计算结果,其中Integer.numberOfLeadingZeros(n) 返回n对应的二进制高位0的个数,例如n=16,其高位0个数为32-5=27,假设现有一个线程A作为首个扩容线程,开始执行AddCount分支2流程:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
    //AddCount方法的分支2
if (check >= 0) {
Node<K,V>[] tab, nt; int n, sc;
while (s >= (long)(sc = sizeCtl) && (tab = table) != null &&
(n = tab.length) < MAXIMUM_CAPACITY) {
// 从这里开始推演相关位运算的逻辑
int rs = resizeStamp(n);
if (sc < 0) {
if ((sc >>> RESIZE_STAMP_SHIFT) != rs || sc == rs + 1 ||
sc == rs + MAX_RESIZERS || (nt = nextTable) == null ||
transferIndex <= 0)
break;
if (U.compareAndSwapInt(this, SIZECTL, sc, sc + 1))
transfer(tab, nt);
}
else if (U.compareAndSwapInt(this, SIZECTL, sc,
(rs << RESIZE_STAMP_SHIFT) + 2))
transfer(tab, null);
s = sumCount();
}
}
}

计算int rs = resizeStamp(n=16);

1
2
3
4
5
0000 0000 0000 0000 0000 0000 0001 1011  # 27
0000 0000 0000 0000 1000 0000 0000 0000 # 1<<15

或运算结果如下,得到容量16的rs:
0000 0000 0000 0000 1000 0000 0001 1011 # 32795

可以看出:rs的低16位记录了“table长度16信息”,其设计意图是?

目的就是为了给table等于16这一容量值打上一个“戳(印记)”,或者称为”table长度为16的扩容印记”,简称“扩容印记”。
需要注意的是:下面小节的sc变量的“table长度16的信息”是记录在sc的高16位上,区别于rs的低16位记录“table长度16信息”。

作为对比,假设table容量32需要扩容处理,那么table此时对应的“扩容印记”rs:

1
2
3
4
0000 0000 0000 0000 0000 0000 0001 1010  # 26
0000 0000 0000 0000 1000 0000 0000 0000 # 32768
或运算结果如下,得到容量32的rs:
0000 0000 0000 0000 1000 0000 0001 1010 # 32794

通过对比容量16和容量32的table扩容印记可知,不同容量下的扩容阶段都有其唯一“扩容印记”rs,那么只要线程在扩容时发现:前后读取的rs值不同,说明整个CHM已经完成上一阶段的扩容且正在进行下一阶段扩容,那么线程就可以直接退出,无需进入transfer方法,对应下面的代码片段的条件1用法:

1
2
3
4
5
6
7
8
9
10
11
12
13
if (check >= 0) {
Node<K,V>[] tab, nt; int n, sc;
while (s >= (long)(sc = sizeCtl) && (tab = table) != null &&
(n = tab.length) < MAXIMUM_CAPACITY) {
// 从这里开始推演相关位运算的逻辑
int rs = resizeStamp(n);
if (sc < 0) {
/*后面时刻读取的rs:sc >>> RESIZE_STAMP_SHIFT
前面时刻读取的rs: rs
*/
//后面时刻读取的rs与前面时刻读取的rs不等,说明整个CHM已经完成了扩容,那么线程就可以直接退出,无需进入transfer方法 // 条件1
if ((sc >>> RESIZE_STAMP_SHIFT) != rs || 其他4个条件)
break;

因此(sc >>> RESIZE_STAMP_SHIFT) != rs 是表达这种场景:线程进入CHM扩容任务发现CHM已经结束了本阶段的扩容,因此线程无需参与到扩容任务中,退出即可。

5.2 sc相关位运算的设计意图?

条件1:sc >>> RESIZE_STAMP_SHIFT) != rs 已经由上面的小节详细分析,此小节尝试回答条件2:sc == rs + 1 和条件3:sc == rs + MAX_RESIZERS 设计目的?

不妨做个猜测:通过对比条件2和条件3发现,条件2像是sc变量的最小值,条件3像是sc变量允许的最大值,从最大值的MAX_RESIZERS含义可推出:扩容线程数量最大值,说明条件3可用于限制同时参与扩容线程的最大数量。根据条件3设计了扩容线程的上限,我们可以大胆推理出:条件2指的是当前CHM已经没有扩容线程在扩容了,也即所有扩容线程已经退出扩容任务,此时再来新的线程即可break

1
2
3
4
5
6
7
if (sc < 0) {		
// 条件2:sc == rs + 1
// 条件3:sc == rs + MAX_RESIZERS
if ((sc >>> RESIZE_STAMP_SHIFT) != rs || sc == rs + 1 ||
sc == rs + MAX_RESIZERS || (nt = nextTable) == null ||
transferIndex <= 0)
break;

将上面基本猜测进行以下位计算的原理论证:

在第1节addCount给出的重要设计点“第一个执行扩容的线程”,它进入transfer方法前,将sc设为一个值:sc=(rs << RESIZE_STAMP_SHIFT) + 2,下面看看这个二进制串隐藏了什么“隐秘信息”,以容量16的戳印记rs为例

1
2
3
0000 0000 0000 0000 1000 0000 0001 1011 #  
右移RESIZE_STAMP_SHIFT=16位后如下:
1000 0000 0001 1011 0000 0000 0000 0000 # 第32位是符号位,1表示负值

先看高16的含义:对比原来rs二进制串可知,“容量16的扩容印记”放在sc的高16位上。

再看低16位的含义:

第1个进入扩容的线程执行:sc=(rs << RESIZE_STAMP_SHIFT) + 2,对应二进制串:

1
1000 0000 0001 1011 0000 0000 0000 0010 # 低16位+2

第2个进入扩容transfer方法前,线程都会对sc加1操作

1
1000 0000 0001 1011 0000 0000 0000 0011 # 低16位+3

第3个进入扩容transfer方法前,线程都会对sc加1操作

1
1000 0000 0001 1011 0000 0000 0000 0100 # 低16位+4

易归纳:
当n个线程加1时使得sc的低16位达到以下值时,参与扩容线程总数达到最大值:

1
1000 0000 0001 1011 1111 1111 1111 1111 # 低16位=65535

此时,低16位的值65535不就是Doug Lea设计的最大扩容线程数量:MAX_RESIZERS = (1 << (32 - RESIZE_STAMP_BITS)) - 1的值吗

以上流程可以得出这样一个规则:同一阶段扩容期,sc的高16位保持不变,对于sc低16位,除去第1个线程,后续每进一个扩容线程,低16位加1计数

综上,可以得出这样的结论:

① sc高16位是存放当前table容量的印记(扩容戳)信息,目的:只要是同一容量下CHM扩容,那么高16位的值固定不变

② sc低16位用于对参与扩容线程的计数。计数范围为: 0000 0000 0000 0010到1111 1111 1111 1111

在CHM还是扩容状态时,由高16位扩容印记的第32位1符号位可知,不管低16位现在记录了多少个扩容下线程,sc一定是负数,

因此在源码设计才有以下if (sc < 0) 成立就能判断出CHM还在扩容状态中。

1
2
3
4
5
6
7
8
9
10
11
            int rs = resizeStamp(n); // rs是正值:0000 0000 0000 0000 1000 0000 0001 1011 
//①
if (sc < 0) {
//② 这里也有rs,是正值还是负值呢?
if ((sc >>> RESIZE_STAMP_SHIFT) != rs || sc == rs + 1 ||
sc == rs + MAX_RESIZERS || (nt = nextTable) == null ||
transferIndex <= 0)
break;
//③
else if (U.compareAndSwapInt(this, SIZECTL, sc,
(rs << RESIZE_STAMP_SHIFT) + 2))
5.3 解析sc == rs + 1的真实含义(这是官方bug)

这个条件很诡异,首先线程进入if (sc < 0) 分支,说明sc是负数,在此之前rs = resizeStamp(n)扩容印记的计算结果是正值,那么sc == rs+1 以及sc == rs + MAX_RESIZERS (负数==正数)永远都不会成立,这个正是open jdk1.8的CHM出现一个重点bug。

本人一开始对此条件成立情况很是费解,但因为一直认为Doug Lea以及JSR-166 expert group在JUC造诣很深,因此还不曾怀疑此处设计有逻辑问题,接着出于要对resizeStamp方法和sc移位计算的设计彻底掌握的要求,去查阅了相关文章,恰好找到一篇高质量文章,它提到了这其实一个官方的bug:文章连接,到此本人才真正对sc == rs + 1诡异设计得到最官方的解决方案!! 这是官方bug描述的链接,bug已经java 12版本得到修复。本人在另外一篇文章中也详细给出了官方对这个bug的处理过程。

( [这篇关于CHM源码研究的文章](https://mp.weixin.qq.com/s/6HWJDKnNaceIEZQMOLZkog),作者水平确实不错,解析也足够细腻,但本人还是建议那些想要彻底掌握或者提升高级技术的人,切勿抄袭别人文章,务必独立完成相关源码的核心设计分析才能真正提升个人能力)

在去查阅了resizeStamp讨论的相关文章的过程中,你发下绝大部分的文章给出有效讨论真的很少,而且你会发现以下现象:

当前CSDN大量文章、微信公众号文章、以及所谓的JAVA进阶培训班绝大部分是基于针对jdk1.8CHM的源码解析,而且他们也不能解释`sc == rs + 1`的上下文计算过程及其设计目的,他们无法发现或者感知如此隐秘而有意义的bug,所以他们一般都会停留在表面类似这样源码解释:当前已经没有扩容线程在扩容或者参与扩容线程总数量达到最大值”

如果你已经吃透本文的所有分析,那么你应该可以猜出Doug Lea写这两行代码的含义如下:

sc=rs+1 其实是想表达sc=(resizeStamp(n)<<RESIZE_STAMP_SHIFT)+1

sc=rs+MAX_RESIZERS 其实是想表达sc=(resizeStamp(n)<<RESIZE_STAMP_SHIFT)+MAX_RESIZERS

用代码来表示其真正的写法如下所示

1
2
3
4
int rs = resizeStamp(n) << RESIZE_STAMP_SHIFT;
if(sc<0)
if(条件1||sc == rs + 1 || sc == rs + MAX_RESIZERS)

所以rs其实应该是这样的二进制串,显然是一个负数,这样才能使得sc == rs + 1在某个时刻成立

1
1000 0000 0001 1011 0000 0000 0000 0000

此外,因为rs已经是负值,那么在transfer有个地方不能直接用一开始计算的rs值,如下:

1
2
3
4
5
6
if (U.compareAndSwapInt(this, SIZECTL, sc = sizeCtl, sc - 1)) {
/* 大家第一次代码解析到这里时:应该会无法理解,为何不用rs,而是要用resizeStamp(n) 重新计算rs:
((sc - 2) != rs << RESIZE_STAMP_SHIFT)
*/
if ((sc - 2) != resizeStamp(n) << RESIZE_STAMP_SHIFT)
return;

因为transfer方法执行过程中,表示扩容状态下,rs的值已经是rs=resizeStamp(16) << RESIZE_STAMP_SHIFT ,也即rs位于sc的高16位上了:

1
1000 0000 0001 1011 0000 0000 0000 0000

如果使用((sc - 2) != rs << RESIZE_STAMP_SHIFT) 来判断,rs右移16位后,就变成0,此时sc没有实际含义:

1
0000 0000 0000 0000 0000 0000 0000 0000

因此这里不能用原来的rs值来判断条件,而是使用resizeStamp(n)重新计算rs值得出sc值是否等于原始基数,以此判断当前线程是否是最后一个退出扩容的线程

1
2
3
if ((sc - 2) != resizeStamp(n) << RESIZE_STAMP_SHIFT)
等价于
if (sc != (resizeStamp(n) << RESIZE_STAMP_SHIFT) +2 ) //

基于以上详实的分析,才能真正从背后移位设计的原理去理解sc==rs+1含义:表示当前CHM已经没有扩容线程在扩容了,也即所有扩容线程已经退出扩容任务,此时再来新的线程即可break。

5.4 为何用sc=rs+1来判断所有扩容线程已经退出而不是用sc=rs+0或者sc=rs+2呢?

假设当前有1个线程是最后一个完成扩容的线程,它在transfer方法中准备执行以下逻辑:

1
2
3
4
5
6
7
8
// ① 假设当前线程是最后一个扩容线程并持有sc值为rs+2,然后通过cas将主存上sizeCtl设置为sc-1,也即此时主存上的sizeCtl=(rs+2)-1=rs+1。
if (U.compareAndSwapInt(this, SIZECTL, sc = sizeCtl, sc - 1)) {
// ② 注意下面的sc不是主存的上值,而是线程在①持有的sc=rs+2的值,因此仅当sc-2=rs,其中rs= resizeStamp(n) << RESIZE_STAMP_SHIFT 那么这个线程就是要找的最后一个扩容线程。
if ((sc - 2) != resizeStamp(n) << RESIZE_STAMP_SHIFT)
return;
finishing = advance = true;
i = n; // recheck before commit
}

以上逻辑等价于

1
2
3
4
//当前线程
int sc=getAndAddInt(this,sizeCtl,-1) //先取出sizeCtl的值:rs+2,然后再将cas设置sizeCtl在主存的值为rs+1
//其他线程:
//当其他线程进入transfer方法时,读取的sc就是rs+1这个值,满足扩容完成的退出条件`sc==rs+1`

因此如果当前线程最后一个完成扩容的线程,那么它执行完以上逻辑后,主存中sizeCtl的值为rs+1,因此之后的线程读取sc的值并来到以下逻辑就会发现② 位置的sc == rs + 1成立,说明“当前CHM所有扩容线程已经退出,本阶段的CHM扩容已完成,因此可直接break退出transfer”

1
2
3
4
5
6
7
8
9
10
            int rs = resizeStamp(n); // rs是正值:0000 0000 0000 0000 1000 0000 0001 1011 

if (sc < 0) {
//② 这里也有rs,是正值还是负值呢?
if ((sc >>> RESIZE_STAMP_SHIFT) != rs || sc == rs + 1 ||
sc == rs + MAX_RESIZERS || (nt = nextTable) == null ||
transferIndex <= 0)
break;
else if (U.compareAndSwapInt(this, SIZECTL, sc,
(rs << RESIZE_STAMP_SHIFT) + 2))

5.5 首次扩容时sc的基础值为何不是加1而是加2?

刚开始解析addCount源码时,让人无法理解的为何首个扩容线程进入扩容逻辑时是加2开始而不是加1开始计算呢,有点反正常思维。

1
2
3
4

else if (U.compareAndSwapInt(this, SIZECTL, sc,
(rs << RESIZE_STAMP_SHIFT) + 2))
transfer(tab, null);

考察sizeCtl=-1 这个特殊的-1,换成有符号的二进制串:

1
1000 0000 0000 0000 0000 0000 0000 0001 // ①低16位的最低位为1

对比容量16且从加1开始线程计数:

第1个扩容线程进入transfer时:sc=rs << RESIZE_STAMP_SHIFT) + 1

1
1000 0000 0001 1011 0000 0000 0000 0001 // ②低16位的最低位为1

第2个扩容线程进入transfer时:sc+1

1
1000 0000 0001 1011 0000 0000 0000 0002

对比①sizeCtl=-1和②的二进制串可知,两者低16位的最低位都是1,状态标记冲突了:

1
xxxx xxxx xxxx xxxx 0000 0000 0000 0001 

考虑到:sc是用来扩容状态标记、sizeCtl也是用在initTable对sizeClt cas置-1的初始化标记,如果两者都取-1,就不好判断当前CHM的状态,因此Doug Lea为避免这种状态位的冲突,因此将扩容线程计数从加2开始计数。

虽然本文给出这样相对”牵强“的解释,但无法理解为何Doug Lea不在源码中给大家注释清楚?

6、sizeCtl状态小结

有了以上第4节和第5节深入的细节分析,现在可以总结sizeCtl也即sc取不同值的实际含义

sizeCtl为正值部分好理解:

① sizeCtl=0,默认值,在反序列化CHM对象时,如果CHM的size为0,那么此时sizeCtl设为0

② 当new CHM阶段,sizeCtl=table的初始默认容量或者指定的容量(会被修正为2的整数幂)

③ 当CHM扩容结束后,sizeCtl=下一次扩容阈值

对于负值部分的理解需要结合resizeStamp章节内容:

以table容量等于16作为分析,sizeCtl主要是以下四种特殊情况的负值:

① sizeCtl=-1。在new CHM后,第一次put入key,table为null,需要使用tab = initTable()方法初始化,多线程环境下,线程使用cas将 sizeCtl设为-1加锁做初始化工作。

② 第一个进入扩容线程将sizeCtl设为一个基础值:

1
1000 0000 0001 1011 0000 0000 0000 0010 // 最高位1,所以此值为负值

③ 达到最大扩容线程数量sizeCtl的值为:

1
1000 0000 0001 1011 1111 1111 1111 1111 // 最高位1,所以此值为负值

也即当CHM处于并发线程扩容时,sizeCtl的取值范围为:[rs+2,rs+65535]

④ 所有扩容线程退出扩容时,对应sc=rs+1:

1
1000 0000 0001 1011 0000 0000 0000 0001  // 最高位1,所以此值为负值

以上sizeCtl用于管理多线程并发操作与状态标记的背后原理。

小节

本文研究的重点放在CHM扩容设计逻辑上,不得不说,其并发源码写的确实高级,尤其是关于sizeCtl移位计数的逻辑设计让人佩服,也值得在项目尝试应用相关并发控制思想!
下一篇文章将重点讨论:jdk1.8 ConcurrentHashMap的TreeBin读写锁竞争机制