yield-bytes

沉淀、分享与无限进步

Java高级主题:深度讨论jdk1.7的ConcurrentHashMap设计及其核心方法实现

阅读本文前,需要部分JDK源码深度知识储备:Unsafe与CAS原理、jdk1.7的HashMap原理设计以及分析过其源代码

在这里插入图片描述

《gitee 博客封面图》

1、为何引入ConcurrentHashMap这种适应并发场景数据结构?

  • HashTable

从Java7和Java8的HashMap源码设计可以得出基本结论:两者都不是线程安全,如果非得在并发情况下使用它们,会出现一些问题,如Java7的HashMap死循环导致CPU利用率飙高、put/get不一致问题等,当然也有绝对线程安全的HashTable,但从其源码实现来看,HashTable简单粗暴地在put/get/size等方法前加一个synchronized 修饰:

1
2
3
4
public synchronized V get(Object key) 
public synchronized V put(K key, V value)
public synchronized int size()
//...

在并发操作HashTable的线程数量不多的场景下,其性能影响不大,而且线程隔离程度最高(保证线程安全);但在高并发场景下,HashTable的性能显得力不从心,考察以下情况:

例如有1000个线程对HashTable(数组长度为1024)进行读写操作,由于synchronized是对整个数组进行加锁,只要有一个线程在数组的某个桶位上进行put/get等操作,其他999个线程都会被阻塞无法做其他事情,就连读操作也会被阻塞而不能并发读。

  • Collections.synchronizedMap(map)

此方法也能实现线程安全:用它可以将非线程安全版本的HashMap“包装”为线程安全的HashMap

1
2
3
4
Map<Integer,String> map=Collections.synchronizedMap(new HashMap<Integer,String>());
map.put(1,"a");
map.put(2,"b");
System.out.println(map);

其内部实现方式,如外界不指定加锁对象,那么它会对“当前SynchronizedMap对象的引用”加锁,然后再套用HashMap的方法即可完成线程安全的设计,简单、粗暴、直接:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
SynchronizedMap(Map<K,V> m) {
if (m==null)
throw new NullPointerException();
this.m = m; //
mutex = this;// 当前SynchronizedMap对象的引用,后面的get、put会对其加锁
}

public V get(Object key) {
// HashTable是在方法前面加synchronized修饰
synchronized (mutex) {return m.get(key);}
}

public V put(K key, V value) {
synchronized (mutex) {return m.put(key, value);}
}
  • Collections.unmodifiableMap(map)

当然也还有一个冷门的unmodifiableMap类支持多线程并发的读的场景,但不支持并发写,因为unmodifiableMap类是只读的map,如下所示:

1
2
3
4
5
HashMap<Integer,String> map=new HashMap<>();
map.put(1,"a");
map.put(2,"b");
Map<Integer, String> map1 = Collections.unmodifiableMap(map);
map1.put(1,"1"); // 抛出UnsupportedOperationException

查看unmodifiableMap源码即可了解其设计思想:对“写入操作”相关方法改造为抛出异常,从而实现将可读写的map“包装”成只读map

1
2
3
4
5
6
public V put(K key, V value) {
throw new UnsupportedOperationException();
}
public V remove(Object key) {
throw new UnsupportedOperationException();
}

此外unmodifiableMap类业务场景受限,它不支持并发写入等复杂操作的场景。

以上三种map虽然都是线程安全map,但它们在高并发场景下,读写(当然unmodifiableMap不支持写入操作)性能低,因此有必要介绍能支持高并发的HashMap,也即ConcurrentHashMap简称CHM,当然本文特指jdk1.7版本的CHM。

2、ConcurrentHashMap双层结构图及其基本术语

在这里插入图片描述

从图中可以看出,ConcurrentHashMap具有两层结构:Segment数组+HashEntry数组

  • 第一层结构:是一个Segment数组,该数组每个元素都是一个Segment对象,Segment对象是什么?首先它是ConcurrentHashMap的一个内部“辅佐类”,本质是一个锁对象,继承了ReentrantLock,只不过在ReentrantLock基础上增加各种功能,Segment定义源码如下:
1
2
3
4
5
6
7
8
9
static final class Segment<K,V> extends ReentrantLock implements Serializable {
// ...
Segment(float lf, int threshold, HashEntry<K,V>[] tab) {
this.loadFactor = lf;
this.threshold = threshold;
this.table = tab;
}
//...
}

所谓的“分段锁”、“锁分段”说的就是Segment数组里每个元素(每个桶位)都是一个锁,换句话说:Segment数组的每个桶位都持有一个锁,都可以独立加锁和解锁,桶位之间的读写操作互不影响,这种直接继承ReentrantLock来封装成为新的“工具”的设计思想确实巧妙。

  • 第二层结构:从Segment类的定义可以看出,每个Segment对象又包含了一个HashEntry数组,它才是键值对或者冲突链真正存放的地方,显然这一层结构就是大家所熟悉的Java7 HashMap结构。在Segment对象内部,对HashEntry数组的并发写操作采用CAS无锁原子操作机制,并发读操作使用volatile和Unsafe机制实现。

这里还是以前面第1节的案例说明:

例如有1000个线程对ConcurrentHashMap(数组长度为1024)进行(key,value)读写操作,假设1000个线程能够均匀分别1000个不同Segment数组桶位上,由于Segment数组每个桶位都是一把锁,每个线程对自己桶位进行加锁后独立进行读写操作,完全不会阻塞其他桶位上的线程操作,因此1000个线程可以同一时刻并发执行读写,整个ConcurrentHashMap的操作效率大增。

3、重点成员变量以及构造方法

从最熟悉的用法put开始:

1
2
ConcurrentHashMap<Integer,String> map=new ConcurrentHashMap<>();
map.put(1,"a");

通常是以该类的重点成员变量以及最多参数那个构造方法作为分析切入口

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
		// 第一层结构Segment数组的默认初始容量
static final int DEFAULT_INITIAL_CAPACITY = 16;
//第二层结构里面HashMap的负载因子,注意:这里不是Segment数组的负载因子,Segment数组没有扩容机制,初始化设定多少就是多少(原因看后面的源码分析),因此不需要LOAD_FACTOR
static final float DEFAULT_LOAD_FACTOR = 0.75f;
// 写并发度,也即允许同时操作多少个线程来操作该Segment数组,数组的一个桶位对应一个写操作并发度,读并发度不限制
static final int DEFAULT_CONCURRENCY_LEVEL = 16;
// 第二层结构里面HashMap的最大容量
static final int MAXIMUM_CAPACITY = 1 << 30;
// 第二层结构里面HashMap的初始默认容量
static final int MIN_SEGMENT_TABLE_CAPACITY = 2;

// 第一层结构Segment数组的最大容量65536,Doug Lea自己也认为该最值设定有点保守,因为同一台服务器,cpu、内存等配置不同,同时开启的线程数量也不同,性能好的服务器,同时打开10万个线程也不成问题
static final int MAX_SEGMENTS = 1 << 16; // slightly conservative

/**
* Number of unsynchronized retries in size and containsValue
* methods before resorting to locking.
*/
// 在计算size时,加阻塞锁的最大尝试次数,后面的size方法会提到。
static final int RETRIES_BEFORE_LOCK = 2;

// 对应第一层结构的Segment数组,注意这里是final修饰,也再次能说明:Segment数组一旦创建,以后就不能修改和扩容了
final Segment<K,V>[] segments;
@SuppressWarnings("unchecked")
public ConcurrentHashMap(int initialCapacity,
float loadFactor, int concurrencyLevel) {
if (!(loadFactor > 0) || initialCapacity < 0 || concurrencyLevel <= 0)
throw new IllegalArgumentException();
// 如果给定的并发度例如10万,显然大于默认Segment数组的长度,只能取到65536,
if (concurrencyLevel > MAX_SEGMENTS)
concurrencyLevel = MAX_SEGMENTS;

// Find power-of-two sizes best matching arguments
// 用于算出给定相关参数下的最佳Segment数组容量,2的n次幂,以下以默认构造方法的入参作为说明,因此给定initialCapacity=16作为实例说明,其他参数loadFactor=0.75,concurrencyLevel=16。
int sshift = 0;
int ssize = 1; // ssize变量就是创建Segment数组的容量值
while (ssize < concurrencyLevel) {
// 找出大于等于并发度的2整数幂值,因为concurrencyLevel默认为16,因此经过以下简单计算,sshift=4,ssize=16
++sshift;
ssize <<= 1;
}
// segmentShift=32-4=28,用于计算key的hash值,采用这种方式能够使得key分布更加离散
this.segmentShift = 32 - sshift;
// segmentMask=16-1=15<==>1111,使用二进制能够快速计算定位到key对应的桶位号,这个知识点已经在jdk1.8HashMap解析的文章详细解析过。
this.segmentMask = ssize - 1;
if (initialCapacity > MAXIMUM_CAPACITY)
initialCapacity = MAXIMUM_CAPACITY;
int c = initialCapacity / ssize; // 由于initialCapacity=16, ssize=16,那么c计算为1

// 由于c*ssize=1*16恰好能满足c * ssize>=initialCapacity
if (c * ssize < initialCapacity)
++c; // 此时c=1
int cap = MIN_SEGMENT_TABLE_CAPACITY; //默认创建HashEntry[]的容量,值为2
while (cap < c) // 由于c=1,显然这里cap不再调整,cap还是取默认值MIN_SEGMENT_TABLE_CAPACITY=2
cap <<= 1;

// create segments and segments[0]
// 创建第0个segment对象,用于未来其他线程创建新segment对象时以s0作为模板去“克隆创建”
Segment<K,V> s0 =
new Segment<K,V>(loadFactor, (int)(cap * loadFactor),
(HashEntry<K,V>[])new HashEntry[cap]); // 从上面可知,这里cap就是2,因此位于s0位置的HashEntry数组长度为2
// 创建一个容量为ssize的segments数组
Segment<K,V>[] ss = (Segment<K,V>[])new Segment[ssize];
// 数组的segments[0]=s0
UNSAFE.putOrderedObject(ss, SBASE, s0); // ordered write of segments[0]
this.segments = ss; // segments数组内部变量名
}

该构造方法有以下几个重要细节:

  • 1、Segment数组是在new ConcurrentHashMap<>()时创建的,默认容量为16,同时会在Segment数组的第0个桶位上创建一个Segment对象,赋给变量s0,而Segment数组其他1~15的位置则为null,也即惰性创建,在以后有新的key put进来时再到相应的下标创建新的Segment。此外,在Segment数组第0号下标也创建好一个默认容量为2的HashEntry数组。

  • 2、这里使用UNSAFE.putOrderedObject(ss, SBASE, s0) ,为何不使用 putObjectVolatile()方法?

首先putOrderedObject()是有序、延迟版本的 putObjectVolatile方法,用它写入值不会立即被其他线程获取到,这样做的好处是写入性能比 putObjectVolatile() 高,

  • 3、如何理解“这样做的好处是写入性能比 putObjectVolatile() 高?

首先,假设在new构造方法阶段和put阶段,采用putObjectVolatile(),那么s0会马上写入主存,由于new阶段之后紧接着put操作,而put方法里有相关操作也涉及到立即写入主存的需求,如果这些写入操作都直接使用putObjectVolatile,那么相当于主存与cpu缓存之间来回跑了几次,其实完全可以这么设计:“从new到put方法的finally之前,积攒多个写入操作后再一次性写入主存”

Doug Lea就是这么干的:在new构造方法开始使用putOrderedObject,让s0变量写入主存的时间延迟到put方法的结尾前,也即在finally的unclock执行前,就能实现“积攒多个写入操作后一次性写入主存”,因此写入性能有提高的,而且保证的数据一致性。

注意:JMM规定,对一个变量执行unlock操作之前,必须先把此变量同步到主内存中(执行store和write操作)

参考:https://www.zhihu.com/question/60888757

4 put方法解析

基于ConcurrentHashMap两层结构可知,要put一个key进去,需要两次hash定位,第一次定位在Segment数组对应的Segment上,第二次定位:在第一次定位的基础上,确定key位于HashEntry数组的哪个桶位上,显然,第二次定位才是key真正要put的位置,由于是多线程并发put,因此代码设计相对复杂:

第一次定位

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
public V put(K key, V value) {
Segment<K,V> s;
// 这里可以看出ConcurrentHashMap不允许放入null
if (value == null)
throw new NullPointerException();
int hash = hash(key);
// 对比HashMap的index=hashCode&table.length-1的桶位计算即可知道,j就是确定key所在的Segment数据下标位置
// 第一次定位,也即找出key所在的Segment数组的下标位置,这里的hash右移了segmentShift位,目的是让key的高位特征参与桶位计算,使得hash进一步分散
int j = (hash >>> segmentShift) & segmentMask;

// 投机写法:若当前还未有其他线程写入,那么可以使用getObject快速获取segment实例
// 如果key对应的Segment对象恰好为null,说明还未创建Segment对象,因此需要在ensureSegment方法内部创建key对应的Segment对象,具体逻辑参考ensureSegment方法的解析
if ((s = (Segment<K,V>)UNSAFE.getObject // nonvolatile; recheck in ensureSegment
(segments, (j << SSHIFT) + SBASE)) == null)
s = ensureSegment(j);
// 真正将键值对put入HashEntry数组
return s.put(key, hash, value, false);
}

第二次定位 s.put(key, hash, value, false)

该put方法是Segment对象自己的内部方法,其内部设计也很巧妙,这里给出其总体设计思想:

1、此put方法是真正将key放入HashEntry数组的逻辑,那么当前线程必须要获得锁才能保证独占put

2、使用非租塞方式自旋方式获取锁,但不会让当前线程白白浪费自旋,因此在自旋期间给当前线程安排了任务(对应下面的scanAndLockForPut方法)

3、任务是:麻烦你(当前线程)在自旋的过程中,顺便帮我检查key是否存在链表中,如果不存在,顺便帮我提前创建key对应的Entry节点,以便我获取锁后可以马上使用。(对应下面的HashEntry node = tryLock() ? null :scanAndLockForPut(key, hash, value);)

4、拿到锁后,即可在HashEntry数组上找到合适位置并插入新key

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
// 对应上面的 s.put(key, hash, value, false)调用       
final V put(K key, int hash, V value, boolean onlyIfAbsent) {
//由于segment对象自己就是一个ReentrantLock,因此当然有tryLock()加锁方法,所以在这里先尝试第一次加锁
// 若第一次加锁成功,则node没机会提前被创建,因此对应为null。若第一次加锁失败(说明有其他线程竞争并抢占成功),当前线程会在scanAndLockForPut里面尝试加锁且顺便提前为key创建好对应的node节点。
HashEntry<K,V> node = tryLock() ? null :scanAndLockForPut(key, hash, value);
// ===========================加锁成功(进入临界区)===========================
V oldValue;
try {
// 这一句代码非常能体现作者对并发性能的理解是有多细腻!参考后面解释
HashEntry<K,V>[] tab = table;
int index = (tab.length - 1) & hash;
// 使用Volatile语义获取key对应的桶位头节点,也即冲突链的头结点,用于下面遍历
HashEntry<K,V> first = entryAt(tab, index);
// 以下内容其实在HashMap解析中已经讲过,逻辑不难
// 从冲突链的头节点开始遍历,在合适位置插入key,若key已经在链表中存在,则更新其value
for (HashEntry<K,V> e = first;;) {
//冲突链的头节点不为空,则进入链表遍历,若在链中找到对应key,则结束遍历。
if (e != null) {
K k;
if ((k = e.key) == key ||
(e.hash == hash && key.equals(k))) {
oldValue = e.value;
if (!onlyIfAbsent) {
e.value = value;
// 更新value后,ConcurrentMap结构修改次数加1
++modCount;
}
break;
}
// 用于保持链表的遍历
e = e.next;
}
// 程序运行到该分支,说明key所在的桶位头节点是null
else {
// 若上面的自旋加锁期间顺便把key对应的node节点创建好,那么就直接使用头插法将此node插入桶位上
if (node != null)
node.setNext(first);
else
// 若上面 的tryLock() ? 没有机会提前将key对应的node节点创建好,在这里也可以为该key创建节点,显然是用头插法
node = new HashEntry<K,V>(hash, key, value, first);
// HashEntry容量统计加1
int c = count + 1;
// 添加新节点后,若HashEntry数组容量达到扩容阈值,则进行扩容rehash
if (c > threshold && tab.length < MAXIMUM_CAPACITY)
rehash(node);
else
/*
注意,这里是在HashEntry数组的index桶位创建key对应的node节点,也是用了延迟写入主存的策略(putOrderedObject):
setEntryAt(tab,i,e)
{
UNSAFE.putOrderedObject(tab, ((long)i << TSHIFT) + TBASE, e);
}
*/
setEntryAt(tab, index, node);
++modCount;
count = c;
oldValue = null;
break;
}
}
} finally {
/*
put方法在代码开始位置就获得了锁,那么key写入后,自然需要释放锁,注意这里需联系前面构造方法里面的
UNSAFE.putOrderedObject(ss, SBASE, s0)使用延迟写入主存的原因
*/
unlock();// 执行unlock之前,jvm能把之前每个调用putOrderedObject的写入操作在此刻全部一起写入到主存
// ===========================成功释放锁(离开临界区)===========================
}
return oldValue;
}

在put方法的前面,Dung Lea安排了HashEntry<K,V>[] tab = table,将原本volatile修饰的table变量转为普通变量,

这是因为:在这一句之前,当前线程已经通过tryLock获取了独占锁,因此只会有一个线程对table进行写操作,既然如此,那就不必再使用volatile语义的table变量,将table赋值给put方法中的一个局部变量,从而使得能够减少volatile带来的不必要性能损耗。

ensureSegment方法解析

该方法的解析主要是来自前面put方面里面的s = ensureSegment(j),主要设计思想:

1、先投机性去主存Segment数组j下标取segment对象,若该位置对应的Segment对象不为空,则直接返回它,

2、若在1步骤发现j位置下Segment为null,那么当前线程准备把它创建了,创建方式当然基于自旋+CAS

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40

/**
* Returns the segment for the given index, creating it and
* recording in segment table (via CAS) if not already present.
*
* @param k the index
* @return the segment
*/
@SuppressWarnings("unchecked")
private Segment<K,V> ensureSegment(int k) {
// 构造方法里面的初始化的Segment数组
final Segment<K,V>[] ss = this.segments;
// key定位到j下标对应的相对地址,注意j下标和u的不同
// int j = (hash >>> segmentShift) & segmentMask;
// s = ensureSegment(j);
long u = (k << SSHIFT) + SBASE; // raw offset
Segment<K,V> seg;
//到这这一步,可能出现多线程并发创建Segment,因此使用Volatile语义判断Segment数组j下标是否已经创建Segment实例以及该Segment对应的hashEntry数组
if ((seg = (Segment<K,V>)UNSAFE.getObjectVolatile(ss, u)) == null) {
// 若当前j下标元素为空,用Segment数组的第0号桶位上的segment实例作为模板为桶位j创建segment实例
Segment<K,V> proto = ss[0]; // use segment 0 as prototype
int cap = proto.table.length;
float lf = proto.loadFactor;
int threshold = (int)(cap * lf);
HashEntry<K,V>[] tab = (HashEntry<K,V>[])new HashEntry[cap];
// 再次检查当前j下标是否有其他线程创建了segment实例,若没有创建,就使用自旋+CAS方式来创建,这时一定原子操作,而且仅有一个线程会创建segment成功
if ((seg = (Segment<K,V>)UNSAFE.getObjectVolatile(ss, u))
== null) { // recheck
Segment<K,V> s = new Segment<K,V>(lf, threshold, tab);
// 自旋检查j下标位置是否有其他线程创建了segment实例,若自旋中恰好发现还未创建segment实例,则使用CAS原子操作创建
while ((seg = (Segment<K,V>)UNSAFE.getObjectVolatile(ss, u))
== null) {
if (UNSAFE.compareAndSwapObject(ss, u, null, seg = s))
// 若当前线程创建segment实例成果,则退出自旋
break;
}
}
}
return seg;
}

作者这样考虑ensureSegment的设计:考察多个线程一起调用ensureSegment情况,其实只要其中一个线程抢先在j位置创Segment对象,那么其他线程只需要判断该位置不为空时就不用重复在j位置创建segment实例,直接 return seg即可。

再使用并发思维理解以下代码片段:

1
2
3
4
5
6
7
 Segment<K,V> s = new Segment<K,V>(lf, threshold, tab);
// 自旋检查j下标位置是否有其他线程创建了segment实例
while ((seg = (Segment<K,V>)UNSAFE.getObjectVolatile(ss, u))
== null) {
if (UNSAFE.compareAndSwapObject(ss, u, null, seg = s))
// 若当前线程能成功创建segment实例,则退出自旋
break;

考察A、B两个线程同时执行到上面compareAndSwapObject,由于原子性操作,因此可保证只有A线程(假设A线程先抢到)去操作CAS且返回true,B线程去UNSAFE.compareAndSwapObject(ss, u, null, seg = s) 就会返回false,此时的u位置不再是null而是A线程已写入的segment实例,线程B在while循环发现seg已不为空,所以B线程不会再重复创建segment对象。

scanAndLockForPut解析1

put方法里面,因为要写入key,线程对当前segment写入操作前,当然需要先获取锁

1
2
HashEntry<K,V> node = tryLock() ? null :scanAndLockForPut(key, hash, value);
V oldValue;

为了能够通俗理解scanAndLockForPut逻辑设计,这先给出ReentrantLock阻塞锁和非阻塞锁的对比,因此这一节称为scanAndLockForPut解析1,真正的源码解析安排下一节:scanAndLockForPut解析2

  • 阻塞锁
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
final ReentrantLock Lock= new ReentrantLock();

new Thread(new Runnable() {
@Override
public void run() {
Lock.lock();
System.out.println("线程A在运行中");
try {
Thread.sleep(5*1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
Lock.unlock();

}
}).start();

Thread.sleep(1);// 让线程A先运行

// 线程A在工作时,线程B被阻塞了,啥事都做不了
new Thread(new Runnable() {
@Override
public void run() {
Lock.lock();
System.out.println("线程B在运行中");
Lock.unlock();
}
}).start();

从上面的运行结果来看:线程A在工作时,线程B被阻塞了,而且被阻塞了5秒。在5秒里,线程B啥事都做不了,线程B明明也可以干活,但却被阻塞,这种并发设计未能充分利用cpu。

  • 非租塞锁

如果使用非阻塞锁,那么线程A在工作时,线程B不会被阻塞,线程B可以自由做其他事情,没有白白自旋

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
    final ReentrantLock Lock= new ReentrantLock();

new Thread(new Runnable() {
@Override
public void run() {
Lock.lock();
System.out.println("线程A在运行中");
try {
Thread.sleep(5*1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
Lock.unlock();

}
}).start();
Thread.sleep(1);// 让线程A先运行

// 线程A在工作时,使用tryLock非阻塞锁,这样线程B不会被线程A阻塞,线程A工作的同时,线程B同时也可以去干点别的事情
new Thread(new Runnable() {
@Override
public void run() {
while (!Lock.tryLock()){
System.out.println("线程B在工作中,对链表扫描是否存在key节点");
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}

}
Lock.unlock();

}
}).start();

输出

1
2
3
4
5
6
线程A在运行中
线程B在工作中,对链表扫描是否存在key节点
线程B在工作中,对链表扫描是否存在key节点
线程B在工作中,对链表扫描是否存在key节点
线程B在工作中,对链表扫描是否存在key节点
线程B在工作中,对链表扫描是否存在key节点

使用trylock非阻塞锁后,在这5秒时间内线程B没有白白浪费CPU资源,它顺便完成了查询key节点的工作,对比scanAndLockForPut的while (!tryLock()),即可理解其设计思想:

1
2
3
4
5
6
7
8
9
10
11
private HashEntry<K,V> scanAndLockForPut(K key, int hash, V value) {
HashEntry<K,V> first = entryForHash(this, hash);
HashEntry<K,V> e = first;
HashEntry<K,V> node = null;
int retries = -1; // negative while locating node
while (!tryLock()) { // 若当前线程抢锁失败后不会白白浪费自旋cpu资源,而是顺便完成链表的遍历以及新key节点的创建
// ......
node = new HashEntry<K,V>(hash, key, value, null);
// ......
return node;
}

此外,在scanAndLockForPut 内部使用非阻塞锁,也不会阻塞读线程在桶位的并发读,一石二鸟!

scanAndLockForPut解析2

完整逻辑分析

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
	private HashEntry<K,V> scanAndLockForPut(K key, int hash, V value) {
// 获取key定位在HashEntry数组的桶位头节点
HashEntry<K,V> first = entryForHash(this, hash);
HashEntry<K,V> e = first;
// 该节点就是线程自旋抢锁过程中“顺便”把key节点提前创建好
HashEntry<K,V> node = null;
// 记录自旋次数
int retries = -1; // negative while locating node
// 自旋尝试抢锁(加锁)
while (!tryLock()) {
// 以下的执行流都是基于“未拿到锁的条件下”进行的
HashEntry<K,V> f; // to recheck first below
// 分支1:把能做的事情先做好了:创建了一个新的node节点,但还没获取到锁,只能继续循环
if (retries < 0) {
// 若当前key定位到的桶位为空
if (e == null) {
if (node == null) // speculatively create node
node = new HashEntry<K,V>(hash, key, value, null);
// 将retries置为0:是为了下一次循环跳到分支2或者分支3处理逻辑
retries = 0;
}
//如果给定的key能在链表中找到,就不需要new一个node节点,继续while去获取锁
else if (key.equals(e.key))
retries = 0;
else
// 如果给定的key不是头节点key,说明要继续遍历该链表
e = e.next;
} // 分支2:如果自旋获取锁的尝试大于64次,此时调用独占锁(阻塞锁),保证当前线程一定能获取到锁
else if (++retries > MAX_SCAN_RETRIES) {
lock();
break;
}// 分支3:注意,此分支还未加锁,因此多个线程可能并发执行以下逻辑,如果获取锁的尝试次数未超默认值,在1~64次重试过程中,每达到偶数次:(retries & 1) == 0,当前线程就去检查头节点是否发生了改变,如果头结点发生改变,说明有其他线程已经抢先将某个新节点放入链表头位置,因此当前线程只能重新实施while (!tryLock())。
else if ((retries & 1) == 0 &&
(f = entryForHash(this, hash)) != first) {
e = first = f; // re-traverse if entry changed
// 重置尝试次数
retries = -1;
}
}
return node;
}

这里有一个细节需要注意:代码最后部分“关于重试次数为偶数时就去判断当前桶位的头结点有无变化”,作者设计是基于这么思考:其实无需遍历64次,因为“安排当前线程在自旋过程中顺便去创建node节点”这个任务可要也可不要,如果在头节点恰好在奇数次例如(第3次)发生改变,那么在偶数次(第4次及其以后)就会错过判断,怎么办? 从代码逻辑可以看出,retries次数达到64后,就会直接加阻塞锁成功,然后退出自旋,那么node节点将会在scanAndLockForPut外部被创建。

1
2
3
4
5
6
7
while(!tryLock()){
//
else if (++retries > MAX_SCAN_RETRIES) {
lock();
break;
//
}

Doug Lea设计的scanAndLockForPut确实精妙,拿捏得死死的!( 让线程没有”空跑、摸鱼”的余地)

5、size方法

先理解主要设计设计思想:

1、从ConcurrentHashMap的两层结构可知:将每个segment段的HashEntry数组节点数进行累加即可得到总数,在多线程场景下,可以这样设计:先锁住所有segment段,累加后再解锁所有segment段。但这样设计有一个问题:直接加锁后,会阻塞其他线程对当前Segment段的并发写,有一丁点性能损耗。

2、基于1的优化:先“投机性赌一把”,赌遍历时没有其他线程来更新该map,那么当前线程可以先不加锁尝试3次遍历累计所有segment的更改次数(modCounts),如果相邻两次更改次数一样,说明这两次统计过程中没有其他线程来更新该map,赌成功了,此时size的计算结果相对很准确。(后面会解释为何是相对很准确,而不是100%准确)

如果相邻两次更改次数不一样,说明这两次统计过程中有其他线程来(put/remove/clear)该map,赌失败了,下一次尝试统计就需要对所有segment段加锁再统计size

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
   public int size() {
// 先尝试几次不加锁情况下的统计,如果统计失败,说明有持续的并发线程来更改HashEntry数组, resort to直译是使用(武力),在这里则表示使用”暴力“的阻塞锁
// 作者英文注释,简单明了解释了size方法设计思想:Try a few times to get accurate count. On failure due to continuous async changes in table, resort to locking.
final Segment<K,V>[] segments = this.segments;
int size;
// 统计size是否有溢出32位最大值
boolean overflow; // true if size overflows 32 bits
// sum 和last 用于比较相邻两次的map修改数
long sum; // sum of modCounts
long last = 0L; // previous sum
int retries = -1; // first iteration isn't retry
try {
for (;;) {
// 如果无锁情况下,统计尝试次数达到3次(从-1到1),则对所有segment段都加锁
if (retries++ == RETRIES_BEFORE_LOCK) {
for (int j = 0; j < segments.length; ++j)
ensureSegment(j).lock(); // force creation
}
sum = 0L;
size = 0;
overflow = false;
// 遍历所有segment并累计modCount,以及累计所有segment的HashEntry的节个数c
for (int j = 0; j < segments.length; ++j) {
Segment<K,V> seg = segmentAt(segments, j);
if (seg != null) {
sum += seg.modCount;
int c = seg.count;
ß // 判断累计的节个数c是否有溢出
if (c < 0 || (size += c) < 0)
overflow = true;
}
}
// 相邻两次modCount总修改数相等,说明当前没有其他线程来(put/remove/clear)map结构,可以结束统计
if (sum == last)
break;
last = sum;
}
} finally {
// 前面加锁统计完size后,这里就需要释放锁
if (retries > RETRIES_BEFORE_LOCK) {
for (int j = 0; j < segments.length; ++j)
segmentAt(segments, j).unlock();
}
}
// size溢出则返回Integer.MAX_VALUE,否则返回实际统计的size值
return overflow ? Integer.MAX_VALUE : size;
}

从finally的逻辑可以看出,size结果是相对很准确,而不是100%准确,因为它在逐个释放锁的过程中,有可能其他线程正在对“已释放锁的Segment”进行写操作(put/remove)。

6、rehash方法

rehash方法是Segment内部类方法,因此所谓的ConcurrentHashMap扩容,也是只是委托了对应的Segment段里面的HashEntry数组扩容,而不是Segment数组扩容!需要留意的是,rehash在put方法内部调用,执行rehash方法时,当前线程已经获得阻塞锁,因此可以独立扩容处理。

此外,java7的ConcurrentHashMap的原表节点转移到新表的设计跟java7的HashMap有点不一样

java7的HashMap扩容设计相对简单:针对冲突链的处理,将低位节点和高位节点区分开,分别放在新表的i位置、oldTab.length+i位置

而java7的ConcurrentHashMap的扩展设计:针对冲突链的处理,将“在新数组桶位相同的连续子链”和其他剩余节点区分开,然后两者放到新表对应位置。

新数组桶位相同的连续子链是指 :子链尾节点必须是该父链尾节点,此子链才能定义为”连续子类“,所以19->35->51不是rehash定义的“连续子链”, 参考后面的图示。

这里找子链的方法其实也不难,在一些leecode算法题例如在长字符串找一个“字符连续相同的子串”(abaaaab=>aaaa)、在链表里找一个子链等。

以下准备了两张图解用于解释其扩容过程,基本约定:

1、hash计算约定:假设使用简单hash方法,也即hash(数字)=数字本身,桶位计算方式:j=hash&oldTab.length-1

2、原数组长度为8,扩容阈值=8*0.75=6,链表已经达到6个节点,因此该桶位链表需要扩容,新表容量为16

3、在原数组桶位i=3位置上,形成冲突链,给定两种假设:

A、桶位含有“在新数组桶位相同的连续子链”的链表:3->11->19->35->51->67->null,如下图所示

(图中也清楚展示了“低位节点”和“高位节点”的情况,关于什么是低位和高位节点,只要有看过本博客关于HashMap的深度文章解析,这些是基本常识,这里不再累赘)
在这里插入图片描述

“桶位有连续子链”对应的扩容过程图解:

在这里插入图片描述

B、桶位不含有“子链”的链表:3->11->19->35->51->27->null,如下图所示

图中也清楚展示了“低位节点”和“高位节点”的情况

在这里插入图片描述

“桶位无连续子链”对应的扩容过程下面图解:

在这里插入图片描述

rehash源码详细解析

基于以上四张图,再理解rehash源码设计思路则会简单很多。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
        @SuppressWarnings("unchecked")
private void rehash(HashEntry<K,V> node) {
// 以下的原数组和新数组分别指代同一个segment段下,原HashEntry数组和新HashEntry数组
HashEntry<K,V>[] oldTable = table;
int oldCapacity = oldTable.length;
// 新数组容量=2倍原数组容量
int newCapacity = oldCapacity << 1;
// 新数组的扩容阈值
threshold = (int)(newCapacity * loadFactor);

// new一个新数组
HashEntry<K,V>[] newTable =(HashEntry<K,V>[]) new HashEntry[newCapacity];

// 使用位方式计算节点在新数组的桶位,在jdk1.8的HashMap源码解析中,已经对这部分知识讲得非常清楚,这里不再赘述
int sizeMask = newCapacity - 1;
// 遍历原数组每个桶位
for (int i = 0; i < oldCapacity ; i++) {
HashEntry<K,V> e = oldTable[i];
// 若原数组i桶位头节点不为空,且next节点为null,说明该桶位仅有一个头节点,计算出该节点在新数组的idx位置后,头插法:newTable[idx] = e

if (e != null) {
HashEntry<K,V> next = e.next;
int idx = e.hash & sizeMask;
if (next == null) // Single node on list
newTable[idx] = e;
// 若原数组i桶位头节点不为空,且next节点不为空,说明该桶位是一条冲突链,它的扩容处理比较复杂,可结合上面的图解内容进行综合分析
else { // Reuse consecutive sequence at same slot
HashEntry<K,V> lastRun = e;
int lastIdx = idx;
for (HashEntry<K,V> last = next;
last != null;
last = last.next) {
// 计算当前遍历节点在新数组桶位号
int k = last.hash & sizeMask;

// 如果前一个节点在新数组桶位号和后一个节点在新数组桶位号相同,说明找到“连续的、新表桶位相同的子链”(对应图解里面图4的情况),将lastRun指向该子链头节点,对应图中节点11。
if (k != lastIdx) {
lastIdx = k;
lastRun = last;
}
}
//
newTable[lastIdx] = lastRun;// lastRun指向的子链整体迁移
// Clone remaining nodes
for (HashEntry<K,V> p = e; p != lastRun; p = p.next) {// 迁移其他非lastRun节点
V v = p.value;
int h = p.hash;
int k = h & sizeMask;
HashEntry<K,V> n = newTable[k];
newTable[k] = new HashEntry<K,V>(h, p.key, v, n); // 注意这里是用旧桶位节点的key和value来创建新节点,因此原桶位节点不会有任何改动,也说明rehash的同时也支持并发读
}
}
}
}

// 完成扩容后,对rehash(node)里面的新节点node放入到新newTable里面
int nodeIndex = node.hash & sizeMask; // add the new node
/*
这里用putOrderedObject添加新节点到新数组里面,也是将其写入主存的操作延迟到外部方法put的finally的unlock位置,目的还是为减少频繁的“线程工作内存<-->主存”之间“来回跑”,等”凑齐“多个延迟写入的操作,然后在unlock前一并写入主存,提高put性能,作者对put性能提高的设计确定足够细腻!!!
final void setNext(HashEntry<K,V> n) {
UNSAFE.putOrderedObject(this, nextOffset, n);
}
*/
node.setNext(newTable[nodeIndex]);
newTable[nodeIndex] = node;
table = newTable; // 将table引用指向newTable,原table指向的对象将会被GC

}

7、其他方法

这里需要强调的是,前面第4到6节核心方法的设计及其源码都能理解的话,那么关于jdk1.7的ConcurrentHashMap其他方法的理解则变得相对简单很多

get方法:

两次定位,第一次定位到key对应segment位置,第二次定位到对应的HashEntry数组位置

两次定位都使用UNSAFE.getObjectVolatile方法实现,使用无锁实现高效率并发读,这也证明了读操作是并发的,不受写操作、独占锁影响,而且基于 happen-before

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
public V get(Object key) {
Segment<K,V> s; // manually integrate access methods to reduce overhead
HashEntry<K,V>[] tab;
int h = hash(key);
long u = (((h >>> segmentShift) & segmentMask) << SSHIFT) + SBASE;
// key对应的segment和HashEntry数组都在的情况下
if ((s = (Segment<K,V>)UNSAFE.getObjectVolatile(segments, u)) != null &&
(tab = s.table) != null) {
// 定位HashEntry数组的对应桶位,然后再桶位上查找是否key这个节点
for (HashEntry<K,V> e = (HashEntry<K,V>) UNSAFE.getObjectVolatile
(tab, ((long)(((tab.length - 1) & h)) << TSHIFT) + TBASE);
e != null; e = e.next) {
K k;
// 找到则返回值
if ((k = e.key) == key || (e.hash == h && key.equals(k)))
return e.value;
}
}
// 找不到key,返回null
return null;
}
remove方法

首先remove方法设计相对简单,它是委托key所在的Segment段实施的,因此不会影响其他Segment段的并发读写操作。

可以看出,remove属于写操作,跟put方法一样,首先需要获取独占锁。

remove两次hash定位和get方法一样,第一次先定位(路由、查找)到对应的Segment段,第二次定位HashEntry数组的桶位,remove方法内部内部封装了Segment的remove方法。

1
2
3
4
5
6
 // map.remove("foo")  
public V remove(Object key) {
int hash = hash(key);
Segment<K,V> s = segmentForHash(hash);
return s == null ? null : s.remove(key, hash, null);
}

Segment段内部remove方法设计:

设计说明

A、remove头节点情况:例如需要移除3->7->11->15->null的头节点3

e=3,next=7->11->15->null,

因此只需将节点7插入原桶位头节点即可:setEntryAt(tab, index, next),而且使用UNSAFE.putOrderedObject方式,不急着写入到主存,等到finally的unlock前才一并写入主存。

B、remove非头结点情况:例如需要移除3->7->11->15->null的节点7

pred=3,e=7,next=11->15->null

只需将节点3的next指向节点11即可:pred.setNext(next),也是使用UNSAFE.putOrderedObject方法,不急着写入到主存,等到finally的unlock前才一并写入主存

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
final V remove(Object key, int hash, Object value) {
// 先获取独占锁,跟scanAndLockForPut方法类似,但更简单,少了创建Node节点的任务。
if (!tryLock())
scanAndLock(key, hash);
//====tryLock加锁成功,进入临界区====
V oldValue = null;
try {
HashEntry<K,V>[] tab = table;
int index = (tab.length - 1) & hash;
// 定位key所在的桶位
HashEntry<K,V> e = entryAt(tab, index);
HashEntry<K,V> pred = null;
// 遍历链表,找出链表中与给定key相同的节点
while (e != null) {
K k;
HashEntry<K,V> next = e.next;
if ((k = e.key) == key ||
(e.hash == hash && key.equals(k))) {
V v = e.value;
if (value == null || value == v || value.equals(v)) {
// 对应A情况
if (pred == null)
setEntryAt(tab, index, next);
// 对应B情况
else
pred.setNext(next);
// 删除一个节点,map结构变化次数加1,节点数量当然需要减1
++modCount;
--count;
oldValue = v;
}
break;
}
pred = e;
e = next;
}
} finally {
//====释放锁,离开临界区====
// 释放独占锁, 前面所有使用UNSAFE.putOrderedObject的逻辑,将在这里一并写入到主存中
unlock();
}
return oldValue;
}

replace方法类似,此处不再累赘。

isEmpty方法

第一次遍历所有Segment,对每个Segment的modCount进行累加:sum += seg.modCount,而且在遍历过程中,只要出现Segment对应的HashEntry数组长度不为0时,即可直接返回结果,不需再遍历剩余的Segment。

第二次遍历所有Segment,如果Segment里面的HashEntry数组长度为0,则进行操作:sum -= seg.modCount

设计思路:

  • 要证明ConcurrentHashMap为空,那么以上两次操作后,sum变量必须为0,表示两次统计modCount没有发生变化且为0次ModCount。

  • 要证明ConcurrentHashMap不为空,那么在第二次遍历所有Segment,只要出现任意一个Segment的的HashEntry数据长度不为0,即可证明当前ConcurrentHashMap不为空

以上就是isEmpty的设计思想,当然遍历数组Segment过程中访问Segment段还是会使用UNSAFE.getObjectVolatile(ss, u)方式来取。

为何不用map.size()==0来判断是否为空?

对比size方法计算流程和isEmpty计算流程可以得出答案,isEmpty代码简洁且效率会快一些,因为isEmpty在遍历过程出现一个Segment的HashEntry数组长度不为0即可返回结果,而size方法要计算所有Segment且需要计算两次(运气不好时还需要加锁计算)才能得出结论。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
public boolean isEmpty() {
long sum = 0L;
final Segment<K,V>[] segments = this.segments;
// 对应第一次遍历
for (int j = 0; j < segments.length; ++j) {
Segment<K,V> seg = segmentAt(segments, j);
if (seg != null) {
// 只要有其中一个Segment的HashEntry数组长度不为0,即可返回结果:不空
if (seg.count != 0)
return false;
sum += seg.modCount;
}
}
// 对应第二次遍历的情况
if (sum != 0L) { // recheck unless no modifications
for (int j = 0; j < segments.length; ++j) {
Segment<K,V> seg = segmentAt(segments, j);
if (seg != null) {
// 只要有其中一个Segment的HashEntry数组长度不为0,即可返回结果:不空
if (seg.count != 0)
return false;
sum -= seg.modCount;
}
}

if (sum != 0L)
return false;
}
return true;
}

8、关键总结

HashEntry的成员变量valuenext是被关键字volatile修饰的,也就是说所有线程都可以及时检查到其他线程对这两个变量的改变,因而可以在不加锁的情况下读取到这两个引用的最新值

8.1 ConcurrentHashMap与HashMap不同点对比
  • 最简单区别当然线程安全:ConcurrentHashMap写操作:put/remove/replace都是需要加锁(scanAndLockForPut、scanAndLock)

  • HashMap允许Key和Value为null,而ConcurrentHashMap不允许key、value为空,参考8.5解释

  • HashMap不允许通过Iterator遍历的同时通过HashMap修改(强一致性要求),而ConcurrentHashMap允许该行为(本质原因CHM是弱一致性),并且该更新对后续的遍历可见,参考8.6

8.2 ConcurrentHashMap的并发度问题

ConcurrentHashMap的并发度concurrencyLevel在new 构造方法就已经固定(默认并发度16个线程),例如一开始给定Segment数组是128,那么并发度最多128线程同时写操作,但对于读操作,则不限制,可以是128也可以10000等不同数量线程并发读。

此提问可衍生另外一个知识点:

如果ConrruentHashMap需要扩容,通过第6节的rehash方法可知,它是委托key所在的Segment段去扩容该段里面的HashEntry数组,而不是对Segment数组本身扩容,对于这个问题,如果不了解ConrruentHashMap,应该也会惯性思维认为:“扩容时,Segment数组也会被扩容”这样的错误理解。

由于Segment数组初始化就限制了并发度,因此需要你提前根据业务场景设定号并发度值,这也算是jdk1.7 ConrruentHashMap需要优化的地方。

8.3 ConcurrentHashMap分段锁的巧妙设计思想是值得借鉴

尤其在一些计高并发计数场景,例如jdk1.8的LongAdder、jdk1.8 ConcurrentHashMap里面的fullAddCount方法,但jdk1.8的思路更优,采用分段无锁方式,比Segment分段锁lock更高性能。

8.4 ConcurrentHashMap有可能退化成SynchronizedMap

假设有些业务的key不够合理,绝大部分的key都hash到同一个segment段,那么容易导致多个线程仅在这个Segment段进行写操作,退化成SynchronizedMap,这个段就是全局锁,这也是jdk1.7 ConrruentHashMap需要优化的地方。

8.5 关于ConcurrentHashMap的key和value都不能为空的讨论
  • ConcurrentHashMap的value不能为空的原因

考察HashMap:在单线程操作的HashMap场景下,value可以放入null值,当使用get方法返回的值是null时,这个“null”存在二义性:要么key对应的value为null,要么key不在map里面,那么怎么唯一区分呢?很简单,只需结合containsKey方法就可以唯一确定取出的null是属于哪种情况,演示逻辑如下:

1
2
3
4
5
6
7
8
// 单线程能顺序执行以下所有逻辑
// 如果map包含该key,那么get返回的null是属于“key存在且对应的value为null”的情况
// 否则抛出key不存在的提示,这时就可以知道get返回的null是属于“key存不在map里面”的情况
if (m.containsKey(k)) {
return m.get(k);
} else {
throw new KeyNotPresentException();
}

基于以上的知识铺垫,下面我们再通过反证法来论证ConcurrentHashMap的value不能真实原因

在并发场景下 ,若将ConcurrentHashMap的value可以设为null,当使用get方法返回的值是null时,存在二义性:要么key对应的value为null,要么key不在map里面,考察使用containsKey方法来区分,参考图解

1
2
3
4
5
if (m.containsKey(k)) {
return m.get(k);
} else {
throw new KeyNotPresentException();
}

在这里插入图片描述

这张图很好解释了ConcurrentHashMap的value不能设为空的原因,并发条件下,执行m.containsKey(k)和m.get(k)之间会有其他线程“捣乱流程”:

线程A 执行m.containsKey(k)后返回true,线程A此时认为key存在map中,正准执行m.get前,线程B提前删除该key,接着线程A使用m.get得到的是null,然后线程A很自信得出“key是存在,只是value对应为null”的结论,而实际上呢,该key已经被线程B删除,实际情况为key不存在map中,ConcurrentHashMap这就是无法解决的二义性。

  • 至于ConcurrentHashMap的key不能为空的依据:

1、纯粹是基于Java关于null是否符合“程序优雅设计与否”的经验知识,其实Doug Lea认为map中允许键值为null是一种不合理的设计,HashMap虽然可以判断二义性,但是Doug Lea仍然觉得这样设计是不合理的。在java项目中,如果key为null通常意味着有些地方已经有出错的苗头,所以早点抛异常比允许null key更合适。

2、允许key为null另外一个不够优雅地方就是不方便遍历哈希表,尤其对于ConcurrentMap。

3、基于以上背景,Doug Lea在源码hash计算方法设计上就不支持key为null的处理,若为null,那么使用k.hashCode()抛出NPE,可以参考hash(key)方法。

8.5 Fast-fail产生原因

http://www.jasongj.com/java/concurrenthashmap/

在使用迭代器的过程中如果HashMap被修改,那么ConcurrentModificationException将被抛出,也即Fast-fail策略。

当HashMap的iterator()方法被调用时,会构造并返回一个新的EntryIterator对象,并将EntryIterator的expectedModCount设置为HashMap的modCount(该变量记录了HashMap被修改的次数)。

1
2
3
4
5
6
7
8
HashIterator() {
expectedModCount = modCount;
if (size > 0) { // advance to first entry
Entry[] t = table;
while (index < t.length && (next = t[index++]) == null)
;
}
}

在通过该Iterator的next方法访问下一个Entry时,它会先检查自己的expectedModCount与HashMap的modCount是否相等,如果不相等,说明HashMap被修改,直接抛出ConcurrentModificationException。该Iterator的remove方法也会做类似的检查。该异常的抛出意在提醒用户及早意识到线程安全问题。

8.6 ConcurrentHashMap高并发读为什么可以无锁?

首先:JMM实现了对volatile的保证:对volatile域的写入操作happens-before于每一个后续对同一个域的读写操作。

HashEntry定义中使用volatile修饰value、next字段,恰恰能享受到以上JMM所提及volatile两点收益:

1、volatile语义可以保证写操作在读操作之前(写操作happens-before于读操作),也即保证了写操作对后续的读操作都是可见的

2、其次在写入操作的时候使用`UNSAFE.putOrderedObjectE写入主存的时机延迟到put方法的unlock前,保证了数据的一致性。

有了以上两个机制后,那么使用 UNSAFE.getObjectVolatile即可支持并发无锁读

8.8 为何HashEntry节点类型设置为final

首先ConcurrentHashMap的HashEntry为final类型(一旦创建就成为不可变类),而HashMap的Entry节点是非final类型,ConcurrentHashMap为何这么设计?

要回答这一问题,其实需要理解final有什么用?jvm会对final定义的变量做怎样的处理?具体解释可参考此文章

简单来说:

1、对于使用final修饰的对象,java编译器会保证在读final域之前一定被写入过,即保证写在读之前发生,避免多线程并发条件下出现读后写,读到非预期的值。

ok,按这样的思路去解释HashEntry类:java编译器会保证在读HashEntry节点域里面的属性如key、value之前,一定会保证key、value先被写入后再来读,避免并发情况下出现“读后写,读到非预期的值”,也即保证线程安全。

2、对比volatile,final不需要额外的线程本地内存和主存之间的同步开销。

8.7 HashMap在第一次put时才完成初始化,那么ConcurrentHashMap在什么时机开始初始化?

ConcurrentHashMap的初始化在new 构造方法时就已经完成部分初始化,完成Segment数组创建、第0号位的Segment对象创建以及在其里面的HashEntry容量2的创建,因此,它的初始化可不是在第一次put才初始化