yield-bytes

沉淀、分享与无限进步

文章说明:因为CSM解析内容较多,因此全文分为“深度讨论高并发跳表数据结构ConcurrentSkipListMap的源代码实现(上)”和“深度讨论高并发跳表数据结构ConcurrentSkipListMap的源代码实现(下)”两篇文章
上篇:CSM数据结构设计原理、doGet、doPut核心方法解析
下篇:doRemove核心方法解析、总结

在这里插入图片描述

《gitee 博客文章封面》

remove方法:

删除操作的设计原理

这里先介绍Doug Lea在源代码注释给出算法设计说明:n.helpDelete(b,f)的设计原理

上一篇文章的doGet、doPut方法中都有涉及到遇到被标记”删除“的节点时都会加入n.helpDelete(b,f)的处理逻辑,

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
* In addition to using deletion markers, the lists also use
* nullness of value fields to indicate deletion, in a style
* similar to typical lazy-deletion schemes. If a node's value is
* null, then it is considered logically deleted and ignored even
* though it is still reachable. This maintains proper control of
* concurrent replace vs delete operations -- an attempted replace
* must fail if a delete beat it by nulling field, and a delete
* must return the last non-null value held in the field. (Note:
* Null, rather than some special marker, is used for value fields
* here because it just so happens to mesh with the Map API
* requirement that method get returns null if there is no
* mapping, which allows nodes to remain concurrently readable
* even when deleted. Using any other marker value here would be
* messy at best.)
* n是当前要删除的节点,b是n的前驱节点,f是n的后继节点,开始删除前,(b,n,f)的指向关系如下
* Here's the sequence of events for a deletion of node n with
* predecessor b and successor f, initially:
*
* +------+ +------+ +------+
* ... | b |------>| n |----->| f | ...
* +------+ +------+ +------+
*
* 1. CAS n's value field from non-null to null.
* From this point on, no public operations encountering
* the node consider this mapping to exist. However, other
* ongoing insertions and deletions might still modify
* n's next pointer.
*
* 1、 将删除节点n的value使用cas设成null,表示此刻起,n节点处于待删除状态,虽然其value为null,对于public相关方法如get等,在执行遇到此类型节点时不会将该节点视为有效节点(也即会被跳过处理),但是如果对于正在执行的插入和删除操作的方法来说,可能也可以去更改“此待删除节点n”的后继节点。
* 2. CAS n's next pointer to point to a new marker node.
* From this point on, no other nodes can be appended to n.
* which avoids deletion errors in CAS-based linked lists.
*
* +------+ +------+ +------+ +------+
* ... | b |------>| n |----->|marker|------>| f | ...
* +------+ +------+ +------+ +------+
* 2、使用cas将n节点的next字段指向一个marker节点后,从此刻起,任何节点都不会被放在n的后继节点位置,也即不可能出现n->n1、n->n2等,只有n->marker
* 3. CAS b's next pointer over both n and its marker.
* From this point on, no new traversals will encounter n,
* and it can eventually be GCed.
* +------+ +------+
* ... | b |----------------------------------->| f | ...
* +------+ +------+
* 3、通过cas将b的next指针(越过n节点以及n的后继marker节点)指向f节点,从此刻起,n节点不会被相关操作遍历到,n最终会被GC。

可以看出CSM删除操作方面,借用marker节点来实现,将待删除节点的value设为null值来表示该节点处在删除状态但还未真正删除,这种设计风格就像“惰性删除语义(lazy-deletion schemes)”,先标记删,等某个时机再真正执行之。如果CSM的一个节点的value是null,说明该节点在逻辑上已经被删除。

阅读全文 »

在单线程场景下,HashMap适用于key为无序的键值对存放场景,而TreeMap适用于key为有序的键值对存放场景。

在高并发场景下,ConcurrentHashMap适用于key为无序的键值对存场景,但对于高并发且要求key有序的场景下,TreeMap非线程安全显然无法满足此场景, 在Concurrent包里面只有跳表:ConcurrentSkipListMap可以满足”基于乐观锁高性能的并发读写、key有序”的需求,而且其设计不会像ConcurrentHashMap这么复杂,但确有着恰当的应用场景,例如对于时序流式数据的存放(最近比较热门的物联网大数据引擎TDengine),可以将乱序的记录以时间戳作为key插入到跳表中,跳表内部处理插入时会比较key的hash值大小以找到节点合适的插入位置,那么在读取时跳表返回的记录就是有序了。

jdk1.8的ConcurrentSkipListMap在本文简写为CSM,Dung Lea在源代码开头的注释详细介绍了CSM总体设计思路并给出字符型展示的CSM结构图,如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
* Head nodes          Index nodes
* +-+ right +-+ +-+
* |2|---------------->| |--------------------->| |->null
* +-+ +-+ +-+
* | down | |
* v v v
* +-+ +-+ +-+ +-+ +-+ +-+
* |1|----------->| |->| |------>| |----------->| |------>| |->null
* +-+ +-+ +-+ +-+ +-+ +-+
* v | | | | |
* Nodes next v v v v v
* +-+ +-+ +-+ +-+ +-+ +-+ +-+ +-+ +-+ +-+ +-+ +-+
* | |->|A|->|B|->|C|->|D|->|E|->|F|->|G|->|H|->|I|->|J|->|K|->null
* +-+ +-+ +-+ +-+ +-+ +-+ +-+ +-+ +-+ +-+ +-+ +-+

CSM源代码解析文章说明:由于CSM解析内容较多,因此全文分为“深度讨论高并发跳表数据结构ConcurrentSkipListMap的源代码实现(上)”和“深度讨论高并发跳表数据结构ConcurrentSkipListMap的源代码实现(下)”两篇文章
上篇关注的重点:CSM数据结构设计原理、doGet、doPut核心方法解析
下篇关注的中断:doRemove核心方法解析、总结

在这里插入图片描述

《gitee 博客文章封面》

阅读全文 »

ThreadLocal可以实现完全基于无锁且也不是基于CAS的线程隔离需求,让每个线程可以有自己的本地实例,但如果对ThreadLocal底层设计不了解,那么对甚至无法正确ThreadLocal及其可能出现的内存泄露问题。可以说ThreadLocal的源代码设计也一种非常优秀的可支持“高并发”的实现。

在这里插入图片描述

《gitee 博客文章封面》

阅读全文 »

本文基于本博客已发布的CHM文章中分析fullAddCount方法基础上,介绍LongAdder这个高并发计数性能,并通过与原子计数器AtomicLong进行比较,最后给出Striped64类相关分析(因其内部真正实现了高并发计数逻辑)。

为何需要介绍下LongAdder类?
因为在fullAddCount方法的定义上,因为在CHM源代码上,Doug Lea有提到:fullAddCount的设计原理可通过LongAdder这个类得到相关解释::

1
2
3
4
// See LongAdder version for explanation
private final void fullAddCount(long x, boolean wasUncontended) {
int h;
// 省略部分 ......

1、两种并发计数器的性能比较

longAdderCost和atomicLongCost的目的:指定一定数量的线程并发执行该逻辑:每个线程计数从0增加到1000*1000

阅读全文 »

在文章中《深度解析官方关于jdk1.8的resizeStamp的bug处理过程》,我们讨论关于CHM的核心设计——resizeStam需要修复的处理过程,本文再次基于openJDK的bugs讨论组提出的CHM源代码另外一个会造成死循环的bug,默认读者已经掌握CHM的核心源代码实现,否则无法从本文的讨论中获益。文章前部分先把computeIfAbsent的bug成因分析清楚,再来介绍官网ConcurrentHashMap.computeIfAbsent stuck in an endless loop的讨论过程,这样更容易看懂相关内容。

研究openJDK官方公布的相关源码bug有何“收益”:

虽然这些bug不是特别严重,修复起来也即几行代码,但如果想要解决这种看似“简单的bug”,要求对CHM设计原理、类、方法实现细节足够熟悉,也就是说,你要具备(至少在这个bug上下文的类、方法范围内)和源代码设计者同等思考视角才能去挖掘bug的本质原因并提出合理的修复建议。换句话说,你研究的不是这个bug本身,而是深入精通整个类的源代码实现,这种高级收益在日常业务开发几乎无法获得。

在这里插入图片描述

《gitee 博客文章封面》

认识computeIfAbsent用法

理解computeIfAbsent在一些场合下的用法,有助于帮助切入源代码分析。

computeIfAbsent使用场景1:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
package concurrent.demo;
import java.util.concurrent.ConcurrentHashMap;

public class Demo1 {
static int computeKeyLength(String key){ // 计算key的长度,将其作为该key对应的value
return key.length();
}
public static void main(String[] args){
ConcurrentHashMap<String,Integer> map=new ConcurrentHashMap<>();
map.put("foo",1);
map.computeIfAbsent("foobar",key->computeKeyLength(key));
System.out.println(map); //输出 {foobar=6, foo=1}
}

computeIfAbsent字面意思:如果key不在map里面,那么就使用给定的匿名函数(也叫映射函数)将key对应的value“计算出来”。(匿名函数也即lambda语法是jdk1.8语法新特性,这一点不必多说)

按这个思路可以有以下解释:

阅读全文 »

前言

首先给出以下open JDK版本的序号说明和Oracle JDK序号说明

(1)对于JDK8或者Java 8

即可指代openjdk-8-jdk或者java-1.8.0-openjdk,

也可指代Oracle家的Java SE 8或者JDK 8u211 and later

(1)对于JDK16或者Java 16

即可指代openjdk的JDK 16.0.2 ,也可指代Oracle家的Java SE 16或者 jdk16.0.1,这里为何给出Java 8和Java 16版本说明?

首先resizeStamp的bug在Java8出现,并在Java 12被修复,因此本文直接给出最新版Java 16作为bug修复前后对比即可。

以下做个约定:统一以Java X形式作为版本称号,CHM:ConcurrentHashMap的简称,以此减少阅读障碍。

在前面的文章中,关于Java 8 的CHM addCount方法里面分支2:resizeStampsc==rs+1、sc==rs+MAX_RESIZERS的讨论中,已经指出其bug嫌疑:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
private final void addCount(long x, int check) {
// 分支1 省略...

// 分支2
if (check >= 0) {
Node<K,V>[] tab, nt; int n, sc;
while (s >= (long)(sc = sizeCtl) && (tab = table) != null &&
(n = tab.length) < MAXIMUM_CAPACITY) {
int rs = resizeStamp(n); // 注意这里计算出的rs是正值
if (sc < 0) {
// sc是负值,怎么会等于rs+1或者rs + MAX_RESIZERS这个正值呢? 有可能是个bug
if ((sc >>> RESIZE_STAMP_SHIFT) != rs || sc == rs + 1 ||
sc == rs + MAX_RESIZERS || (nt = nextTable) == null ||
transferIndex <= 0)
break;
if (U.compareAndSetInt(this, SIZECTL, sc, sc + 1))
transfer(tab, nt);
}
else if (U.compareAndSetInt(this, SIZECTL, sc,
(rs << RESIZE_STAMP_SHIFT) + 2))
transfer(tab, null);
s = sumCount();
}
}

官方bug描述

其实这个bug在open jdk的官方bugs主页已经给出相关解释和修复过程,链接:官方bug描述页面
开发者在官方提交的resizeStamp的bug描述

从Detail这一块描述得到信息如下:

bug的描述:ConcurrentHashMap.addCount()设计逻辑中可能存在bug。(这里虽然提到addCount()方法,但本人更想强调的是扩容分支的resizeStamp的bug)

级别是:bug

当前状态:已经修复

影响的版本:Java 11、Java12

在哪个版本得到修复:Java 12

使用操作系统平台:所有

bug页面创建时间:2018-11-26

解决bug的最后时间:2018-12-11

bug所属库:Java的核心库——core-libs

提交者的修复建议

In the above code, condition of (sc == rs + 1 || sc == rs + MAX_RESIZERS ) would never be true , since the value of rs is positive and the value of sc is negative .

译:条件 (sc == rs + 1 || sc == rs + MAX_RESIZERS )永远不可能true,因为rs的值为正数,而sc值为负数

并建议修改为:

The correct condition should be (sc >>> RESIZE_STAMP_SHIFT) == rs + 1 || (sc >>> RESIZE_STAMP_SHIFT) == rs + MAX_RESIZERS, which can be used to dedect if resizing process finished or resizing threads reaches maxmium limitation

译:正常的条件应该是这样: (sc >>> RESIZE_STAMP_SHIFT) == rs + 1 || (sc >>> RESIZE_STAMP_SHIFT) == rs + MAX_RESIZERS,这两个条件表示扩容任务已结束或者参与扩容的线程总数达到最大值

阅读全文 »

在前面有多篇关于jdk1.8的ConcurrentHashMap研究是基于源代码给出的深度分析,要知道多线程环境下的ConcurrentHashMap内部运行机制是相对复杂的,好在IDEA提供的相关断点和Debug功能确实好用,使得多线程调试起来直观,通过这种方式能加深多线程操作CHM的执行流程。

前期准备

这部内容请参考文章中的小节部分,本文不再累赘。

使用埋点打印法观测

此方法相对繁琐,难度并不大,要求使用者对源代码设计足够理解,否则埋点位置不佳影响观测效果

1、测试代码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
package concurrent.demo;

public class ResizeStampBugTest {
public static void main(String[] args) {
// 设置64个线程并发put
int maxThreads = 64;
// 初始容量为8,内部会被调整为16
ConcurrentHashMap<Long, String> map = new ConcurrentHashMap<>(8);
for (int i = 0; i < maxThreads; i++) {
Thread t = new Thread(() -> map.put(Thread.currentThread().getId(),Thread.currentThread().getName()));
t.setName("Thread-"+i);
t.start();
// 因为多个线程并发执行不方便查看打印结果,可以让前一个线程领先后面线程一丁点,以便观察打印结果,当然也可以不需要,多执行几次看看打印结果即可。
// Thread.sleep(1);

}
}
}

可以看到这里ConcurrentHashMap类用的是项目concurrent.demo包下的ConcurrentHashMap.java 源码文件

2、更改桶位分配步长,将源码的16改为4,方便观察

1
2
private static final int MIN_TRANSFER_STRIDE = 4;
// private static final int MIN_TRANSFER_STRIDE = 16;

3、transfer方法加入打印每个线程分配的桶位区间

1
2
3
4
5
6
7
8
9
10
11
12
13
14
           else if (U.compareAndSwapInt
(this, TRANSFERINDEX, nextIndex,
nextBound = (nextIndex > stride ?
nextIndex - stride : 0))) {
bound = nextBound;
i = nextIndex - 1;
advance = false;
// 以下三行是新增代码
String s=String.format("%s分配的捅位区间为:[%d,%d]并挂起",Thread.currentThread().getName(),bound,i);
System.out.println(s);
// 在这里加一句挂起已经分配好桶位区间的线程,用于观察线程
LockSupport.park(this);

}

4、其他地方需要埋入打印语句

pulVal方法:当key定位到的桶位为空,直接放入key节点。

1
2
3
4
5
6
7
8
9
10
         
else if ((f = tabAt(tab, i = (n - 1) & hash)) == null) {
if (casTabAt(tab, i, null,
new Node<K,V>(hash, key, value, null))) {
// 埋入打印语句
System.out.println(Thread.currentThread().getName() + "在桶位["+i+"]put入Node节点并退出");

break; // no lock when adding to empty bin
}
}

addCount方法:

1
2
3
4
5
6
7
8
9
10
11
12
if (U.compareAndSwapInt(this, SIZECTL, sc, sc + 1)){
// 埋入打印语句
System.out.println(Thread.currentThread().getName()+"后续进入扩容逻辑transfer方法");
transfer(tab, nt);
}
}
else if (U.compareAndSwapInt(this, SIZECTL, sc,
(rs << RESIZE_STAMP_SHIFT) + 2)){
// 埋入打印语句
System.out.println(Thread.currentThread().getName()+"第1个进入扩容逻辑transfer方法");
transfer(tab, null);
}
阅读全文 »

在ReentrantLock的独占模式下,当需要在一个读写高度竞争场景中使用它的lock.lock()时,你会发现这是独占锁,它会让大量同时请求锁的线程们都不得不进入CLH阻塞队列中等待,如果在“读多写少”的场景中,ReentrantLock的这种独占锁方式显然会降低并发性能,因此ReentrantReadWriteLock就是为了解决这种“读多写少”的场景:一个线程正在请求锁进行读操作可以不影响其他线程同时请求读锁(共享锁),意味着多个线程可以并发读。

这里首先通过Doug Lea在源码注释给出的demo作为对ReentrantReadWriteLock一般用法说明:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
class CachedData {
// 读线程需要对data读取,写线程可以对data进行更新,那么data显然是线程不安全的,需要借助锁进行相关操作
Object data;
volatile boolean cacheValid; // 判断data是否已经被缓存
// 1、创建一个读写锁
final ReentrantReadWriteLock rwl = new ReentrantReadWriteLock();
//该方式的功能就是对已经缓存的数据进行预处理
void processCachedData() {
// 2、由于一开始,我们乐观认为data可能已经被缓存,因此我们不急着申请独占锁,而是申请并发的共享读锁
rwl.readLock().lock();
// 3、结果发现:我们太乐观了,原来data没缓存,因此还不能读取数据(如果非得去读,只能读到旧数据)
if (!cacheValid) {
// Must release read lock before acquiring write lock
// 4、既然乐观错估了情况,那么只能用上“强大的独占锁”,保证自己能独占的实施“写操作、更新操作、删除操作”:因为读写锁是互斥的,需先释放读锁,然后在升级为写锁。
//
rwl.readLock().unlock();
// 这就是所谓的“锁升级”。虽然说升级,但这里是当前线程请求独占锁,若同一时刻有其他线程已经拿到写锁,那么在这里当前线程会被阻塞在AQS里面的CLH阻塞队列
rwl.writeLock().lock();
try {
// Recheck state because another thread might have
// acquired write lock and changed state before we did.
// 5、由于我们获取写锁的时机也许比其他线程晚一步拿到,因此在这里拿到独占锁后,还得重新检查是否data已经提前被其他线程更新过
if (!cacheValid) {
data = ... // 例如从数据库重新读取最新的data,然后将cacheValid标记为true,表示data已经更新(或已经更新缓存)
cacheValid = true;
}
// 6、由于我们还得使用更新后的data,因此可以申请读锁。注意,由于当前线程还持有写锁,因此其他线程不可能获得写锁进行写操作,因此当前线程此时可以申请读锁
// Downgrade by acquiring read lock before releasing write lock
rwl.readLock().lock();
} finally {
// 7、显然data已经被我“独占式”地更新过,可以释放写锁了
rwl.writeLock().unlock(); // Unlock write, still hold read
}
}

try {
// 8、在第6点获得共享的读锁后,在这里可以使用已经缓存的新data
use(data);
} finally {
// 9、释放读锁
rwl.readLock().unlock();
}
}
}

关于读锁、写锁、读线程、写线程的一些说明

rwl.readLock().lock() `:很明显,线程在请求读锁,此锁是AQS的共享模式

rwl.writeLock().lock():很明显,线程在请求写锁,此锁是AQS的独占模式

严格来说:对于ReentrantReadWriteLock这种读写锁的设计,请求读锁的线程直接称为“读线程”可以吗,例如,请求写锁的线程,由于已经获取独占锁的线程可以去请求读锁,那么这个线程是应该称为“写线程”?“读线程”?还是“写、读线程”?还是“读写线程”呢? 这么看来,请求到读锁的线程似乎也不适合称为读线程?成功请求到写锁的线程也不适合称为写线程。也许换一个角度可以更容易区分:

在用户代码层面的角度出发:

1、用户设计此方法具有明显“读取”数据的逻辑时,该方法首先使用`rwl.readLock().lock() `,过程中不管是否需要再次请求写锁,都可以把执行用户方法的线程称为“读线程”

2、用户设计此方法具有明显“更新、删除、改、插入”数据的逻辑时,该方法首先使用`rwl.writeLock().lock() `,过程中不管是否需要再次请求写锁、还是读锁,都可以把执行用户方法的线程称为“写线程”

但是从ReentrantReadWriteLock内部设计来看,可能按以下思路去想会更清晰:

1、仅在ReentrantReadWriteLock内部调用tryAcquire方法来看,调用tryAcquire方法的线程可以看成是写线程,因为它是从rwl.writeLock().lock()来的

2、仅在ReentrantReadWriteLock内部调用tryAcquireShared方法来看,调用tryAcquireShared方法的线程可以看成是读线程,因为它是从rwl.readLock().lock()来的

阅读全文 »

CyclicBarrier的基本使用

这里的Barrier可以翻译为屏障、篱栅、栅栏、“关口”,本文统一称为屏障,因此CyclicBarrier如果非得翻译的话,可以理解为:一个可循环制造屏障的同步器。

CyclicBarrier是一个(用于多线程协调的)同步器,它允许一组线程实现互相等待,直到所有线程的执行流都来到一个公共屏障点 (然后再去做一个任务)

A synchronization aid that allows a set of threads to all wait for each other to reach a common barrier point

要理解CyclicBarrier的使用或者其设计意图,可以通过以下简单的demo来了解:

阅读全文 »

countdown:倒计时,latch:门闩;插销

其实CountDownLatch是一种共享锁资源的同步器,用于协调多个线程并行执行(注意不是并发),也即控制多个线程完成后接着在同一时刻去完成某事。当然了,这种协调的底层实现是基于AQS的共享模式实现。

CountDownLatch一般用法如下

1
2
countdownLatch.countDown()  // 使得内部的“计数器值”减1,一般由子线程去调用
countdownLatch.await() // 当内部的“计算器值”为0时,说明所有子线程都已经完成任务,那么阻塞的主线程就会被唤醒再去执行其他任务

第一种用法:主线程等所有子线程完成后再统一做某事

阅读全文 »