yield-bytes

沉淀、分享与无限进步

Java高级主题:深入解析ThreadLocal的数据结构设计原理及其源代码实现

ThreadLocal可以实现完全基于无锁且也不是基于CAS的线程隔离需求,让每个线程可以有自己的本地实例,但如果对ThreadLocal底层设计不了解,那么对甚至无法正确ThreadLocal及其可能出现的内存泄露问题。可以说ThreadLocal的源代码设计也一种非常优秀的可支持“高并发”的实现。

在这里插入图片描述

《gitee 博客文章封面》

基本用法:

demo1

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
public class ThreadLocalDemo {
static class ProducerA extends Thread{
@Override
public void run(){
ThreadLocal<String> var1=new ThreadLocal<>();
ThreadLocal<String> var2=new ThreadLocal<>();
// 普通用法
var1.set("foo A");
var2.set("bar A");
System.out.println(var1.get()); // 输出foo A
System.out.println(var2.get()); // 输出bar A
}
}

static class ProducerB extends Thread{
@Override
public void run(){
ThreadLocal<String> var1=new ThreadLocal<>();
ThreadLocal<String> var2=new ThreadLocal<>();
// 普通用法
var1.set("foo B");
var2.set("bar B");
System.out.println(var1.get()); // 输出foo B
System.out.println(var2.get()); // 输出bar B
}
}


public static void main(String[] args) {

new ProducerA().start();
new ProducerB().start();
}
}

这里创建了两个线程,每个线程内部有自己的ThreadLocal变量,线程之间ThreadLocal变量内部的set和get互相独立,互不影响,无需使用锁即可实现了线程安全操作。线程内部的变量使用set方法给定初始值、get方法取值,可以猜测其内部有类似HashMap这样的设计,但是否照搬HashMap数据结构设计呢? 其实不然。

在这里,ProducerA内部其实是创建了一个称为“ThreadLocalMap”的Map结构用于存放ThreadLocal变量和它的value,ProducerB内部也创建了一个ThreadLocalMap,也即每个线程绑定一个自己内部ThreadLocalMap,这里提到的ThreadLocalMap就是提供了set、get方法的底层Map数据结构,所谓的ThreadLocal数据结构分析其实就是特指其内部的ThreadLocalMap的数据结构分析。

demo2

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
public class ThreadLocalDemo {

static class HoldCount{
int count;
final long tid=0;
}
public static void main(String[] args) {

ThreadLocal<HoldCount> rh=ThreadLocal.withInitial(HoldCount::new); // 设定rh这个ThreadLocal变量的初始值
rh.set(new HoldCount()); // 将计数器放在rh中缓存
HoldCount h= rh.get();
System.out.println(h.count); // 这里输出的rh初始值,也即HoldCount的count属性初始值:0。
for (int i = 0; i < 10; i++) {
h.count++;
}
System.out.println(h.count);

HoldCount newh=rh.get();// 更新缓存计数器后,再从ThreadLocal重新读取
System.out.println(newh.count); // 可以读取新的计数值
// 在rh这个ThreadLocal里面的Map结构中移除HoldCount实例对象
rh.remove();
System.out.println(rh.get()); // 此时rh里面Map已经不存在HoldCount对象,因此这里返回NUll

}
}

在demo2中,给出了使用ThreadLocal后需要及时删除其实例对象的情况,这部分原因将在文章后面给出深入分析。

ThreadLocal内部数据结构简析

可以看到set方法是由内部ThreadLocalMap实现的set方法,既然是个“Map”,那么当然可以猜测是否跟HashMap的数据结构:数据+链表+红黑树类似呢?

1
2
3
4
5
6
7
8
public void set(T value) {
Thread t = Thread.currentThread();
ThreadLocalMap map = getMap(t);
if (map != null)
map.set(this, value);
else
createMap(t, value);
}

其实ThreadLocalMap的数据结构没有HashMap数据结构复杂,ThreadLocalMap底层仅有一个table数组,这里,也许你会好奇:HashMap为了解决hash冲突,在数组的桶位上加入一条单向链表,冲突的entry自然会放入到此链表中(或者红黑树),那么问题来了,ThreadLocalMap底层仅有一个table数组,它是如何解决hash冲突?以下正式其设计原理之一,这里的数组给出最简单的情况,不包括“stale entry(无效entry)”的情况,以便让读者快速理解ThreadLocalMap设计原理:
在这里插入图片描述
可以看到图中所说的“解决冲突的方式:从i=3开始向后遍历出首个空slot,也即i=5,将keyC放入此空slot即可”的逻辑被称为“线性探测法”,所谓的“线性”就是o(n)复杂度的遍历操作,所谓的“探测”就是不断向后“探测、寻找”,直到找到首个空slot位置。

以上内容为ThreadLocalMap的放入new Entry的简单情况,如果有理解HashMap源代码设计的读者应该可以猜到其他重要设计:例如,当数组容量不够时,如何扩容,也即rehash(注意ThreadLocalMap里面的resize和rehash不是同一个逻辑),再例如ThreadLocalMap里面已经存在的entry,如果它的key已经变成无效(stale),那么如何该清理,或者说在set和get的线性探测过程中遇到有stale entry时,该如何清理?这些问题将在后面逐个深入探讨。

ThreadLocalMap的基本成员变量

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
static class ThreadLocalMap {

/**
* The entries in this hash map extend WeakReference, using
* its main ref field as the key (which is always a
* ThreadLocal object). Note that null keys (i.e. entry.get()
* == null) mean that the key is no longer referenced, so the
* entry can be expunged from table. Such entries are referred to
* as "stale entries" in the code that follows.
*/
// ThreadLocalMap底层数组存放的WeakReference类型的entry,使用弱引用类型是为了能够高效GC,避免内存泄露,文章后面给出此设计的讨论
static class Entry extends WeakReference<ThreadLocal<?>> {
/** The value associated with this ThreadLocal. */
Object value;
/* entry的key就是ThreadLocal对象,例如一个线程内部有10个ThreadLocal变量,那么此线程内部的ThreadLocalMap将存放这10个entry,这里的value就是ThreadLocal变量的“值”。
例如demo1中:
ThreadLocal<String> var1=new ThreadLocal<>();
var1.set("foo A")
那么entry的key就是这个名称为var1的ThreadLocal对象,value就是字符串“foo A”
*/
Entry(ThreadLocal<?> k, Object v) {
super(k);
value = v;
}
}

/**
* The initial capacity -- MUST be a power of two.
*/
// 数组的初始容量16
private static final int INITIAL_CAPACITY = 16;

/**
* The table, resized as necessary.
* table.length MUST always be a power of two.
*/
//ThreadLocalMap的底层数组,这里也采用2的次方,原因在HashMap的源代码讨论已经给出深入的解析,这里不再累赘。
private Entry[] table;

/**
* The number of entries in the table.
*/
// 数据含有entry的个数,注意即使entry的key处于stale状态,它也算一个entry
private int size = 0;

/**
* The next size value at which to resize.
*/
private int threshold; // Default to 0

/**
* Set the resize threshold to maintain at worst a 2/3 load factor.
*/
// 注意区别于HashMap的扩容阈值是len*3/4
private void setThreshold(int len) {
threshold = len * 2 / 3;
}

/**
* Increment i modulo len.
*/
// 返回数组当前位置i的下一个位置i+1,如果下一个位置超过数组长度,那么下一个位置又从下标0开始,这种方式实现了所谓的“环形数组”,在后面get、set方法中或者stale entry清空机制的处理中可以看到它的用处
private static int nextIndex(int i, int len) {
return ((i + 1 < len) ? i + 1 : 0);
}

/**
* Decrement i modulo len.
*/
// 逻辑同上,方向相反,返回当前位置i的前一个位置i+1,如果来到数组头部,那么前一个位置即回到数组末尾
private static int prevIndex(int i, int len) {
return ((i - 1 >= 0) ? i - 1 : len - 1);
}

set方法完全解析

set方法本身

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
  /**
* Set the value associated with key.
*
* @param key the thread local object
* @param value the value to be set
*/
private void set(ThreadLocal<?> key, Object value) {

// We don't use a fast path as with get() because it is at
// least as common to use set() to create new entries as
// it is to replace existing ones, in which case, a fast
// path would fail more often than not.

Entry[] tab = table;
int len = tab.length;
// 1.计算给定key对应的桶位,此hash算法能够最大程度将key平均分布到数组对应的桶位上,具体算法参考文后说明
int i = key.threadLocalHashCode & (len-1);
/* 2.线性探测发的实现
从定位到i桶位开始遍历,直到遇到一个entry确实是null的空桶位,如果此遍历过程中遇到stale entry那么将其替换即可完成set操作。
*/
for (Entry e = tab[i];
e != null;
e = tab[i = nextIndex(i, len)]) {
ThreadLocal<?> k = e.get();
// 3.如果此桶位的key恰好是给定给定的key,那么更新此桶位的value后可直接返回。
if (k == key) {
e.value = value;
return;
}
// 4. 当前桶位的entry的key为null(注意这个key是弱引用类型,说明此entry已经被GC),使用replaceStaleEntry放入新entry
if (k == null) {
replaceStaleEntry(key, value, i);
return;
}
}
// 5.在2的线性探测过程中,遇到entry为空的即可来到这流程,直接放入新entry,并且数组的entry数量加1,在这里应该可以猜到,当向数组添加一个新entry后接下来就要判断是否需要扩容
tab[i] = new Entry(key, value);
int sz = ++size;
// 6.满足扩容的条件:cleanSomeSlots返回False且entry数量达到扩容阈值
if (!cleanSomeSlots(i, sz) && sz >= threshold)
rehash();
}

关于线性探测法的说明

1.为何会有“环形数组或者环形遍历”的设计:nextIndex

1
2
3
for (Entry e = tab[i];
e != null;
e = tab[i = nextIndex(i, len)])

在这里插入图片描述

图1所示,假设目前有一个keyX定位到的桶位是i=13,但此桶位已存在entry,只能继续向后探测,来到数组尾部的桶位也不为空,此时经过e = tab[i = nextIndex(i, len)]的计算后,线性探测再次回到数组的头部位置重新遍历,如图2所示,当遍历到i=6时,发现此桶位为空,即可跳出循环接着在此位置放置新的entry,这就是“环形数组或者环形遍历”的底层设计逻辑。

更加离散的hash计算

ThreadLocalMap内部的hash计算方式没有采用类似HashMap的计算方式,而是自行设计了一套

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
public class ThreadLocal<T> {
/**
* ThreadLocals rely on per-thread linear-probe hash maps attached
* to each thread (Thread.threadLocals and
* inheritableThreadLocals). The ThreadLocal objects act as keys,
* searched via threadLocalHashCode. This is a custom hash code
* (useful only within ThreadLocalMaps) that eliminates collisions
* in the common case where consecutively constructed ThreadLocals
* are used by the same threads, while remaining well-behaved in
* less common cases.
*/
private final int threadLocalHashCode = nextHashCode();

/**
* The next hash code to be given out. Updated atomically. Starts at
* zero.
*/
// 巧妙利用了原子累加器
private static AtomicInteger nextHashCode =
new AtomicInteger();

/**
* The difference between successively generated hash codes - turns
* implicit sequential thread-local IDs into near-optimally spread
* multiplicative hash values for power-of-two-sized tables.
*/
// hash值递增步长,每次对新的i计算hash值前先加上此数,计算结果能更加离散,这个值对应的十进制数为1640531527,这个值就是带符号的32位int的最大值的黄金分割值取正
private static final int HASH_INCREMENT = 0x61c88647;

/**
* Returns the next hash code.
*/
private static int nextHashCode() {
// 每次对新的i计算hash值前先加上此数,计算结果能更加离散
return nextHashCode.getAndAdd(HASH_INCREMENT);
}

模拟ThreadLocalMap的hash计算方式:

1
2
3
4
5
6
7
8
9
10
11
public class ThreadLocalHashCode {
private static final int HASH_INCREMENT = 0x61c88647;
public static void main(String[] args) {
int tableLength=16;
for (int i = 0; i <32 ; i++) {
int h=i*HASH_INCREMENT+HASH_INCREMENT;
int index=h & (tableLength-1);
System.out.println(i+"定位的桶位是:"+index);
}
}
}

可以观察不同桶位计算出的hash值确实足够离散:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
0定位的桶位是:7
1定位的桶位是:14
2定位的桶位是:5
3定位的桶位是:12
4定位的桶位是:3
5定位的桶位是:10
6定位的桶位是:1
7定位的桶位是:8
8定位的桶位是:15
9定位的桶位是:6
10定位的桶位是:13
11定位的桶位是:4
12定位的桶位是:11
13定位的桶位是:2
14定位的桶位是:9
15定位的桶位是:0
16定位的桶位是:7
17定位的桶位是:14
18定位的桶位是:5
19定位的桶位是:12
20定位的桶位是:3
21定位的桶位是:10
22定位的桶位是:1
23定位的桶位是:8
24定位的桶位是:15
25定位的桶位是:6
26定位的桶位是:13
27定位的桶位是:4
28定位的桶位是:11
29定位的桶位是:2
30定位的桶位是:9
31定位的桶位是:0

replaceStaleEntry方法解析(核心内容)

在set方法中的第4点:替换失效的entry

1
2
3
4
// 4. 当前桶位的entry的key为null(注意这个key是弱引用类型,说明此entry已经被GC),使用replaceStaleEntry放入新entry
if (k == null) {
replaceStaleEntry(key, value, i);
return;

其源码设计包括两个重要的核心功能:替换对应位置失效的entry和具有顺带功能(As a side effect)的清理其他失效entry,其中清理entry的逻辑设计最为复杂。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
private void replaceStaleEntry(ThreadLocal<?> key, Object value,
int staleSlot) {
Entry[] tab = table;
int len = tab.length;
Entry e;

// Back up to check for prior stale entry in current run.
// We clean out whole runs at a time to avoid continual
// incremental rehashing due to garbage collector freeing
// up refs in bunches (i.e., whenever the collector runs).
int slotToExpunge = staleSlot;
/*
1、staleSlot是对于给定key用线性探测法“前向遍历”找到的首次出现的stale entry对应的下标
为何第1步骤的前向遍历没有安排类似第2步骤的“替换操作等逻辑呢”,因为“ThreadLocal本身用的是开发地址法,冲突的key都被放置在后面空的slot,就算来到table末尾再从头遍历,它也是遵循“向后放置发生冲突的key””
*/
for (int i = prevIndex(staleSlot, len);
(e = tab[i]) != null;
i = prevIndex(i, len))
if (e.get() == null)
slotToExpunge = i;

// Find either the key or trailing null slot of run, whichever
// occurs first
// 2. 从set方法传入的staleSlot下标开始向后遍历
for (int i = nextIndex(staleSlot, len);
(e = tab[i]) != null;
i = nextIndex(i, len)) {
ThreadLocal<?> k = e.get();

// If we find key, then we need to swap it
// with the stale entry to maintain hash table order.
// The newly stale slot, or any other stale slot
// encountered above it, can then be sent to expungeStaleEntry
// to remove or rehash all of the other entries in run.
/*
以下逻辑非常关键:
3.1 如果从stale slot开始的“后向遍历”的第i下标又出现了key冲突,说明给定的key“本应放在stale slot 下标位置,但是因为冲突,被迫挪到比stale slot 更靠后的位置i”,既然现在stale slot已失效,那么就可以将给定key放回本应该更靠近hash定位的下标位置staleSlot。这里采用交换两者位置即可实现此逻辑。这就是”to swap it with the stale entry to maintain hash table order”所要表达的逻辑。
*/
if (k == key) {
e.value = value;
tab[i] = tab[staleSlot];
tab[staleSlot] = e;

// Start expunge at preceding stale entry if it exists
// 3.2 如果slotToExpunge还是staleSlot,说明第1步骤的“前向探测”没有stale entry,那么就将清理起始下标改到i,因为i下标位置存放的是在3.1交换过来的stale entry:tab[i] = tab[staleSlot]
if (slotToExpunge == staleSlot)
slotToExpunge = i;
/* 3.3 到此,我们知道,截止到i下标的stale entry情况ß:
[某个空slot,staleSlot):从staleSlot的前向位置都没有stale entry
staleSlot:将i位置的有效entry交换过来tab[staleSlot] = e
[staleSlot+1,i-1]:后向遍历没有出现stale entry
i:存放的是从staleSlot交换过来的stale entry
因此slotToExpunge肯定是从i下标开始做清理工作。
*/
cleanSomeSlots(expungeStaleEntry(slotToExpunge), len);
return;
}

// If we didn't find stale entry on backward scan, the
// first stale entry seen while scanning for key is the
// first still present in the run.
// 3.4 后向遍历在第i下标发现了一个stale entry且在前向遍历没有出现stale entry,那么清理开始下标当然要重置为i,那么staleSlot位置还存放着stale entry且没有也没有像3.1这样的“swap it”的设计,那么staleSlot自己是如何处理呢。它会在接下里的第4步骤中被处理掉! 那么在这个步骤发现第i号下标新的stale entry又是如何处理呢? 它会在第5个步骤被清理掉
if (k == null && slotToExpunge == staleSlot)
slotToExpunge = i;
}

// If key not found, put new entry in stale slot
/* 4. 在整个run都没有找到对应的key且也没有发现stale entry(除了staleSlot本身整个),那么好办,直接将staleSlot这个在set方法一开始就发现的stale entry的位置替换为新 entry即可,这就是为何方法名字命名为replaceStaleEntry。

*/
tab[staleSlot].value = null;
tab[staleSlot] = new Entry(key, value);

// If there are any other stale entries in run, expunge them
// 第5步骤:接第3.4步骤出现的情况。
if (slotToExpunge != staleSlot)
cleanSomeSlots(expungeStaleEntry(slotToExpunge), len);
}

关于“A run”的理解(理解run及其内部清理标记逻辑才能透彻理解set背后的原理)

As a side effect, this method expunges all stale entries in the “run” containing the stale entry(A run is a sequence of entries between two null slots.)
在这里插入图片描述
如上图所示:

一个”run”就是在两个空slot之间的slots,例如上图,i=0和i=12之间就是一个“run”。

1.为何这“run”是以两个空slot作为边界呢?

这是因为replaceStaleEntry的第1步骤使用prevIndex前向探测,直到遇到null slot则结束循环,而第2步骤使用nextIndex后向探测,直到遇到null slot则结束循环,因此可以得出一前一后都是null slot作为边界。

2.结合replaceStaleEntry的源代码分析的第1点:显然经过prevIndex的“前向探测”探测到了首个i=1的stale entry,因此slotToExpunge指向i=1表示此下标是接下expungeStaleEntry清理的起始下标。

3.根据1可知,如果slotToExpunge下标和staleSlot下标相等,说明“前向探测”根本没发现stale entry,也即slotToExpunge指向没动过。

4.根据第3.4步骤可知,k == null && slotToExpunge == staleSlot,说明除了在set方法第一次发现的staleSlot,还在replaceStaleEntry的后向探测中的第i位置又发现了一个stale entry,因此起始清理下标要重置为slotToExpunge=i

在这里插入图片描述

关于replaceStaleEntry内部最关键的“替换算法”,也即对应第第3.1步骤到第3.3步骤如上图1和图2所示:

不妨假设i=8就是“给定的key”hash定位时发生的冲突下标,假设i=11的key1等于“给定的key”,也即对应第3.1步骤,

此时实施3.1步骤的“swap”逻辑:

将staleSlot=9的stale entry交换到key1的i=11位置,原i=11位置entry交换到staleSlot=9位置并且key不变但value被更新为“给定key对应的value”。

经过这么处理,“给定的key”所在桶位显然更靠近原本属于它的8号桶位,而不是像之前“被迫挪到”11号桶位,这就是源代码注释说提到的“we need to swap it with the stale entry to maintain hash table order”

以上的算法设计可以抽象为以下类比逻辑:

A本应坐在1号位,但发现来晚了,1号位置有人坐了,2、3、4也有人坐了,A被迫坐在5号位,某个时刻“新来的B”发现2号位已经变成staleSlot且1号位还有人在坐,那么此时B可以将A交换到2号位且把2号位的stale entry交换到5号位,那么此刻位于2号位的A显然更靠近“本属于自己的1号座位

关于staleSlot的清理逻辑设计

在4.2 中replaceStaleEntry为我们展示了精密的如何找出“起始清理下标”的算法设计,从

cleanSomeSlots(expungeStaleEntry(slotToExpunge), len);可知,清理逻辑被设计为两部分:
第一部分:expungeStaleEntry(slotToExpunge),此方法返回一个i下标。第一部分的清理可称为“线性地清理”——“linear expunge”。注意此过程还包括rehash过程!!

第二部分:cleanSomeSlots(i, len),第二部分的清理可以称为“Heuristically expunge”,这里并不打算翻译为“启发性清理”,因为此处不建议使用中文硬翻译。(若要翻译,则可以翻译为“试探性地清理”)

以下是关于“线性地清理-expungeStaleEntry”的解析:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
/**
* Expunge a stale entry by rehashing any possibly colliding entries
* lying between staleSlot and the next null slot. This also expunges
* any other stale entries encountered before the trailing null. See
* Knuth, Section 6.4
*
* @param staleSlot index of slot known to have null key
* @return the index of the next null slot after staleSlot
* (all between staleSlot and this slot will have been checked
* for expunging).
*/
// 1.这里入参的staleSlot,就是replaceStaleEntry探测到的slotToExpunge下标
private int expungeStaleEntry(int staleSlot) {
Entry[] tab = table;
int len = tab.length;

// expunge entry at staleSlot
// 2.首先清空当前slotToExpunge下标的stale entry
tab[staleSlot].value = null;
tab[staleSlot] = null;
size--;

// Rehash until we encounter null
// 3. 在slotToExpunge+1到恰好遇到null slot之间进行逐个探测清理
Entry e;
int i;
for (i = nextIndex(staleSlot, len);
(e = tab[i]) != null;
i = nextIndex(i, len)) {
ThreadLocal<?> k = e.get();
// 3.1 又出现stale entry可直接清空
if (k == null) {
e.value = null;
tab[i] = null;
size--;
} else {
// 3.2 再次计算当前位置i放置的entry对应的hash值,如果hash值和当前i桶位一致,说明没有冲突,此entry恰好就是位于“本属于自己的桶位上”,如果hash值和当前i不一致,说明“此entry因为冲突被迫放到了第i位置,而第i位置不是此entry的直接定位”,可以将位于i桶位的“entry”放在“属于自己的h桶位”,这样就保证了entry能最大程度靠近或者就位于“本属于自己的桶位”范围以内,目的是为了提供线性探测查询效率!

int h = k.threadLocalHashCode & (len - 1);
// 从向后探测的开放地址法可知,h值更小,i值更大,正是因为原h位置有冲突,e才被放置到更靠后的第i位置
if (h != i) {
// 因为发生冲突被迫放置在i位置的entry,后面会被放到它的直接定位h桶位,因此i位置可以置为null
tab[i] = null;

// Unlike Knuth 6.4 Algorithm R, we must scan until
// null because multiple entries could have been stale.
/*
虽然h桶位就是此entry的直接定位,但是考虑到h桶位可能被放置了其他entry,因此需要加入“向后探测”的逻辑,直到发现下一个位置为null slot。
tab[h] = e 的写法就实现了“因为发生冲突被迫放置在i位置的entry,现在能够最接近地放到本属于自己直接定位的h桶位*/
while (tab[h] != null)
h = nextIndex(h, len);
tab[h] = e;
}
}
}


return i;
/*
显然此i就是第3步骤for循环里面从slotToExpunge向后探测到下一个null slot下标,此下标会被cleanSomeSlots方法中利用起来。
for (i = nextIndex(staleSlot, len);
(e = tab[i]) != null;
i = nextIndex(i, len))
*/
}

expungeStaleEntry难点其实在第3.2步骤中遇到的情况,需要做个简单的rehash,保证entry更加靠近“本属于自己的直接定位h桶位”,过程解析参考以下算法流程:
在这里插入图片描述
当然,如果在图2的中i=8不是null slot,那么从h位置开始while (tab[h] != null)探测,也会探测到i=10位置是个null slot,结果就是table[h=i=10]=e,e还是位于第i位置上。

cleanSomeSlots(i, len)方法

注意,要清楚cleanSomeSlots(i, len) i和len含义,否则无法理解cleanSomeSlots目的,这里的i就是expungeStaleEntry返回的一个空slot,len是table长度。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
private boolean cleanSomeSlots(int i, int n) {
boolean removed = false;
Entry[] tab = table;
int len = tab.length;
do {
// 1.
i = nextIndex(i, len);
Entry e = tab[i];
//2.
if (e != null && e.get() == null) {
// 3.
n = len;
removed = true;
//4 .
i = expungeStaleEntry(i);
}
// 5.控制探测的次数
} while ( (n >>>= 1) != 0);
return removed;
}

1.首先,考虑最简单的情况,如果第i和len之前都不存在stale entry,那么就相当于在1和len范围内折半探测,时间复杂度为log2(n)

2.其次,考虑到探测在i和len过程中,出现了stale entry,此时会将n重置为len长度,继续while,再一轮log2(n)次遍历

这就是所谓的“Heuristically scan”,因为是log2(n),即使出现如2情况,此试探性的探测动作也是可以很快完成。

set方法内部的rehash

1
2
3
4
5
6
7
8
9
      tab[i] = new Entry(key, value);
int sz = ++size;
/*
1. 显然如果cleanSomeSlots返回true,表明在table中清理了不少于1个的stale entry,恰好可以腾出不少于1个空slot,显然不需要table扩容。
2. 当!cleanSomeSlots(i, sz) 表示没有遇到stale entry且table的entry数量已经达到了阈值,可以进入扩容逻辑
*/
if (!cleanSomeSlots(i, sz) && sz >= threshold)
rehash();
}

rehash内部设计:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
  /**
* Re-pack and/or re-size the table. First scan the entire
* table removing stale entries. If this doesn't sufficiently
* shrink the size of the table, double the table size.
*/
private void rehash() {
//1.扩容前,先从头到尾线性清理一下stale entry,运气好的话,清理的stale entry后恰好有足够多的null slot,这样省去真正的扩容操作,效率更高。
expungeStaleEntries();

// Use lower threshold for doubling to avoid hysteresis
// 2.此时的size是在1步骤清理完stale entry后的实际entry个数,只有当此时的size达到了0.75threshold才会去扩容,
if (size >= threshold - threshold / 4)
resize();
}

/**
* Double the capacity of the table.
*/
private void resize() {
Entry[] oldTab = table;
int oldLen = oldTab.length;
// 这里看出是两倍扩容
int newLen = oldLen * 2;
Entry[] newTab = new Entry[newLen];
int count = 0;
// 1.从旧表开始逐个遍历
for (int j = 0; j < oldLen; ++j) {
Entry e = oldTab[j];
if (e != null) {
ThreadLocal<?> k = e.get();
// 2.旧表当前j位置出现stale entry,那么直接将entry的value强引用设为null,Help the GC
if (k == null) {
e.value = null; // Help the GC
} else {
// 3. 旧表当前遍历位置j是正常的entry,那么用新表newLen计算它在新表的桶位号
int h = k.threadLocalHashCode & (newLen - 1);
// 4. 开放地址法在新表中为“当前旧表遍历位置下entry”找到对应的null slot新表h位置
while (newTab[h] != null)
h = nextIndex(h, newLen); // 注意这里是在新表计算
newTab[h] = e;
count++;
}
}
}

setThreshold(newLen);
size = count;
table = newTab; // table 指向新表引用
}

rehash的逻辑相对简单。

get方法解析

有了set方法完全解析流程后,对于get方法则很好理解

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
/**
* Returns the value in the current thread's copy of this
* thread-local variable. If the variable has no value for the
* current thread, it is first initialized to the value returned
* by an invocation of the {@link #initialValue} method.
*
* @return the current thread's value of this thread-local
*/
public T get() {
Thread t = Thread.currentThread();
ThreadLocalMap map = getMap(t);
// 1、ThreadLocalMap已经存在时
if (map != null) {
// getEntry才是正在在底层table去查找给定key对应的entry
ThreadLocalMap.Entry e = map.getEntry(this);
if (e != null) {
@SuppressWarnings("unchecked")
T result = (T)e.value;
return result;
}
}
// 2. 如果线程t的ThreadLocal内部ThreadLocalMap还未初始化,直接返回ThreadLocal初始化时设定的初始值
return setInitialValue();
}

getMap的逻辑

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
 /**
* Get the map associated with a ThreadLocal. Overridden in
* InheritableThreadLocal.
*
* @param t the current thread
* @return the map
*/
ThreadLocalMap getMap(Thread t) {
return t.threadLocals;
}

/**
* Create the map associated with a ThreadLocal. Overridden in
* InheritableThreadLocal.
*
* @param t the current thread
* @param firstValue value for the initial entry of the map
*/
// 这里可以看出,原来ThreadLocal并不是独立存在,而是它里面的ThreadLocalMap绑定当前线程的成员变量threadLocals,因此ThreadLocalMap的生命周期和线程同在
void createMap(Thread t, T firstValue) {
t.threadLocals = new ThreadLocalMap(this, firstValue);
}


/**
* Construct a new map initially containing (firstKey, firstValue).
* ThreadLocalMaps are constructed lazily, so we only create
* one when we have at least one entry to put in it.
*/
ThreadLocalMap(ThreadLocal<?> firstKey, Object firstValue) {
table = new Entry[INITIAL_CAPACITY];
int i = firstKey.threadLocalHashCode & (INITIAL_CAPACITY - 1);
table[i] = new Entry(firstKey, firstValue);
size = 1;
setThreshold(INITIAL_CAPACITY);
}

getEntry整体设计:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
/**
* Get the entry associated with key. This method
* itself handles only the fast path: a direct hit of existing
* key. It otherwise relays to getEntryAfterMiss. This is
* designed to maximize performance for direct hits, in part
* by making this method readily inlinable.
*
* @param key the thread local object
* @return the entry associated with key, or null if no such
*/
private Entry getEntry(ThreadLocal<?> key) {
int i = key.threadLocalHashCode & (table.length - 1);
Entry e = table[i];
// 1. 对应上面源代码注释提到“Only the fast path:a direct hit of existing key” 逻辑,也即给定key对应的i桶位的entry恰好存放的就是key的entry。
if (e != null && e.get() == key)
return e;
// 2. 说明这个key在之前是发生冲突了,放置到比i更靠后的位置,需要采用“后向探测”去检索。
else
return getEntryAfterMiss(key, i, e);
}

/**
* Version of getEntry method for use when key is not found in
* its direct hash slot.
*
* @param key the thread local object
* @param i the table index for key's hash code
* @param e the entry at table[i]
* @return the entry associated with key, or null if no such
*/
private Entry getEntryAfterMiss(ThreadLocal<?> key, int i, Entry e) {
Entry[] tab = table;
int len = tab.length;
// 从给定的entry开始后向遍历探测
while (e != null) {
ThreadLocal<?> k = e.get();
// 1.找到,则返回
if (k == key)
return e;
// 2.遇到stale entry,调用expungeStaleEntry清理它,此时i位置就是slotToExpunge起始清理的下标
if (k == null)
expungeStaleEntry(i);
else
// 3.继续向后探测下一个entry
i = nextIndex(i, len);
e = tab[i];
}
return null;
}

remove方法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
/**
* Remove the entry for key.
*/
private void remove(ThreadLocal<?> key) {
Entry[] tab = table;
int len = tab.length;
int i = key.threadLocalHashCode & (len-1);
for (Entry e = tab[i];
e != null;
e = tab[i = nextIndex(i, len)]) {
if (e.get() == key) {
// 目的是主动让entry的父类成员变量置null,那么entry自然就不会存在任何引用了,直接从正常的entry变成stale entry
e.clear();
//
expungeStaleEntry(i);
return;
}
}
}

e.clear()关键逻辑:此方法来自import java.lang.ref.Reference;可以看到e.clear()目的是主动让entry的父类成员变量置null,那么entry自然就不会存在任何引用了,直接从正常的entry变成stale entry

1
2
3
4
5
6
7
8
9
10
11
/**
* Clears this reference object. Invoking this method will not cause this
* object to be enqueued.
*
* <p> This method is invoked only by Java code; when the garbage collector
* clears references it does so directly, without invoking this method.
*/
public void clear() {
this.referent = null;
}

ThreadLocal弱引用和内存泄露问题

在前面的所有内容中,我们都知道在线性探测中用if (k == null) 去判断当前桶位的entry是否为变为一个stale entry,放入一个正常的entry的为何会在某个时刻变成“失效的entry”?这是因为entry的key被设计为WeakReference ,这是ThreadLocalMap关键设计之一。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
/**
注意以下源代码的解析
To help deal with
* very large and long-lived usages, the hash table entries use
* WeakReferences for keys.
*/
static class ThreadLocalMap {

/**
* The entries in this hash map extend WeakReference, using
* its main ref field as the key (which is always a
* ThreadLocal object). Note that null keys (i.e. entry.get()
* == null) mean that the key is no longer referenced, so the
* entry can be expunged from table. Such entries are referred to
* as "stale entries" in the code that follows.
*/
static class Entry extends WeakReference<ThreadLocal<?>> {
/** The value associated with this ThreadLocal. */
Object value;

Entry(ThreadLocal<?> k, Object v) {
super(k); // key 是弱引用类型
value = v; // value 是强引用类型
}
}

所谓的java弱引用:一旦有gc,那么WeakReference类型的对象就会被回收,用以下demo说明:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
public class WeakReferenceDemo {
static class Entry{
private String key;
private String value;
Entry(String key,String value) {
this.key = key;
this.value=value;
}

@Override
public String toString() {
return "Entry{" +
"key='" + key + '\'' +
", value='" + value + '\'' +
'}';
}
}

public static void main(String[] args) {
Entry entry=new Entry("foo","bar"); // entry显然是一个强引用
WeakReference<Entry> entryWeakReference=new WeakReference<>(entry); // entryWeakReference是一个弱引用

System.gc();// gc1
System.out.println("after gc1,entry:"+entry); // Entry{key='foo', value='bar'}
System.out.println("after gc1,entryWeakReference:"+entryWeakReference.get()); // Entry{key='foo', value='bar'}


entry=null; // 此时entry强引用被置为null,那么会被gc回收
System.gc();// gc2
System.out.println("after gc2,entryWeakReference:"+entryWeakReference.get()); // null

}
}

输出:

1
2
3
after gc1,entry:Entry{key='foo', value='bar'}
after gc1,entryWeakReference:Entry{key='foo', value='bar'}
after gc2,entryWeakReference:null

注意到ThreadLocalMap中的Entry,key类型是WeakReference<ThreadLocal<?>> 弱引用的,因此一旦此key没有指向强引用,那么key显然会变为null,那么gc时作为key的ThreadLocal对象在jvm堆中就会被回收,对应的Entry就是一个stale entry,注意,如果Entry的value此时还不是null也即还是处于强引用类型状态,这会引出另外一个问题:ThreadLocal内存泄露问题,或者说:

为何ThreadLocal会有内存泄露问题?

其实比较好理解,首先ThreadLocal内部ThreadLocalMap存放的Entry对象和当前线程的生命周期一致,只要线程不结束,且Entry的value也即给ThreadLocal对象设置的value没有被删除(强引用还在),那么这个Entry就不会被回收,假设一个线程内部的ThreadLocalMap里面有很多这样的Entry,那么就会面临内存泄露的风险,

考虑线程池的情况,例如有线程使用ThreadlLocal对象,此线程位于线程池中会一直保持运行,对于它的ThreadlLocal对象内部的ThreadLocalMap来说,如果map中Entry的value没有被外界使用完后及时删除,就导致此Entry一直得不到回收,容易发生内存泄露。

Entry的key采用弱引用类型,value为何不采用同样的弱引用类型设计呢?

首先,key是线程本地变量ThreadLocal,它本身可以被回收,但是其变量的值value本身是在其他地方被使用着,例如value放着的是一个Session对象或者事务管理中的Connection对象,如果value被设计为弱引用类型,那么在也业务层面被使用“线程本地变量的value”——Session对象或者Connection对象就会随机被回收,导致业务层出错,显然无法接受这种情况。所以value保持强引用的设计才是符合实际情况的。

如何避免内存泄露

代码中满足一定ThreadLocal.get()、ThreadLocal.set()逻辑设计的情况下,主动调用ThreadLocalMap.remove 来移除Entry对象的引用关系,这种高级且科学用法,其实在ReentrantReadWriteLock的源代码设计有所体现:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
     /**
定义一个给每个线程自己的内部读锁计数器
* A counter for per-thread read hold counts.
* Maintained as a ThreadLocal; cached in cachedHoldCounter
*/
static final class HoldCounter {
int count = 0;
// Use id, not reference, to avoid garbage retention
final long tid = getThreadId(Thread.currentThread());
}

/**
* ThreadLocal subclass. Easiest to explicitly define for sake
* of deserialization mechanics.
*/
// 如何实现每个线程独立记录的读锁计数器? 使用ThreadLocal即可保证线程隔离的计数,互不影响。
static final class ThreadLocalHoldCounter
extends ThreadLocal<HoldCounter> {
public HoldCounter initialValue() { // 线程自己持有的读锁计数器初始值0
return new HoldCounter();
}
}

/**
* The number of reentrant read locks held by current thread.
* Initialized only in constructor and readObject.
* Removed whenever a thread's read hold count drops to 0.
*/
private transient ThreadLocalHoldCounter readHolds;

Sync() {
readHolds = new ThreadLocalHoldCounter();
setState(getState()); // ensures visibility of readHolds
}


// 线程释放自己持有的读锁
protected final boolean tryReleaseShared(int unused) {
Thread current = Thread.currentThread();
// 第一个持有读锁的线程恰好是当前线程,
if (firstReader == current) {
// assert firstReaderHoldCount > 0;
// “第一个持有读锁的线程” 也准备释放读锁,firstReader不再指向任何读线程
if (firstReaderHoldCount == 1)
firstReader = null;
// 否则“第一个持有读锁的线程”重入锁次数减1
else
firstReaderHoldCount--;
} else {
HoldCounter rh = cachedHoldCounter;
// 如果线程自己缓存的读锁计数器对象为空,或者线程自己的读锁计数器缓存的线程不是当前线程
if (rh == null || rh.tid != getThreadId(current))
// 当前线程持有读锁的计数器readHolds
rh = readHolds.get();
int count = rh.count;
// 当前线程持有读锁的计小于等于1,说明在本次读锁退出后,当前线程不再持有任何读锁,也即不再使用“计数器ThreadLocalHoldCounter”,因此用在它身上的ThreadLocal<HoldCounter>对象需要马上移除,避免ThreadLocal发生内存泄露。
if (count <= 1) {
readHolds.remove();
if (count <= 0)
throw unmatchedUnlockException();
}
// 线程自己持有读锁自减1
--rh.count;
}
for (;;) {
// 总的读锁锁-1
int c = getState();
int nextc = c - SHARED_UNIT;
if (compareAndSetState(c, nextc))
// Releasing the read lock has no effect on readers,
// but it may allow waiting writers to proceed if
// both read and write locks are now free.
return nextc == 0;
}
}

从这里ReentrantReadWriteLock的关于ThreadLocal的使用中,再次理解了源代码注释提到的“可以使得状态和线程关联起来”,这里的“状态”就是读线锁中的每个线程持有读锁的数量,显然它和该线程绑定了起来,因此体现ThreadLocal变量使用的完美场景。

ThreadLocal在类中常见用法

在ThreadLocal源代码文件的注释开头有提到以下说明:

This class provides thread-local variables. These variables differ from their normal counterparts in that each thread that accesses one (via its get or set method) has its own, independently initialized copy of the variable. ThreadLocal instances are typically private static fields in classes that wish to associate state with a thread (e.g., a user ID or Transaction ID).

尤其这句ThreadLocal instances are typically private static fields in classes that wish to associate state with a thread (e.g., a user ID or Transaction ID).

ThreadLocal实例通常是位于类中的私有静态字段,目的是为了实现把“状态”与线程(例如,用户ID或事务ID)绑定起来。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
import java.util.concurrent.atomic.AtomicInteger;

public class ThreadId {
// Atomic integer containing the next thread ID to be assigned
private static final AtomicInteger nextId = new AtomicInteger(0);

// Thread local variable containing each thread's ID
// 放在类的静态字段位置,这样类的其他方法可以直接使用“此线程局部变量”
private static final ThreadLocal<Integer> threadId =
new ThreadLocal<Integer>() {
@Override
protected Integer initialValue() {
return nextId.getAndIncrement();
}
};

// Returns the current thread's unique ID, assigning it if necessary
public static int get() {
return threadId.get();
}
}

另外一个例子是在《Thinkinkg in Java 4》提供的demo

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
class Accessor implements Runnable{
private final int id;
public Accessor(int idn){id=idn;}

@Override
public void run() {
while (!Thread.currentThread().isInterrupted()){
ThreadLocalHolder.increment();
System.out.println(this);
Thread.yield();
}
}

@Override
public String toString() {
return "#" +id+":"+ThreadLocalHolder.get();
}
}

public class ThreadLocalHolder {
// 作为类ThreadLocalHolder的静态变量,并指定初始值的生成方式
private static ThreadLocal<Integer> value=new ThreadLocal<Integer>(){
private Random rand=new Random(10);
protected synchronized Integer initialValue(){
return rand.nextInt(10000);
}
};

public static void increment(){
value.set(value.get()+1);
}
public static int get(){return value.get();}

// 创建5个线程,每个线程都有ThreadLocalHolder的一个副本,且独立计数
public static void main(String[] args) throws InterruptedException {
ExecutorService exec= Executors.newCachedThreadPool();
for(int i=0;i<5;i++){
exec.execute(new Accessor(i));
}
TimeUnit.SECONDS.sleep(3);
exec.shutdownNow();
}

}

输出:可以看到5个线程实现自己内部的独立自增计数

1
2
3
4
5
6
7
8
9
10
11
#0:7114
#1:2381
#2:7294
#3:3291
#4:5247
#0:7115
#1:2382
#2:7295
#3:3292
#4:5248
....

这里也可以引出另外一个问题,ThreadLocal变量放在类中使用时,一般作为类的静态字段使用,为何?

其实很好理解,类的静态变量确保在类的多次实例化后仍然保持在内存中仅有一份副本,或者说为了避免重复创建thread specific object(与线程相关的变量),例如ThreadLocal变量管理了一个Session对象,那么当然希望在同一个线程中,此Session对象仅有一份实例,如果存在多份,那么就无法实现所谓“在同一session完成相关业务”的设计,导致逻辑出错。

为何ThreadLocal没有直接采用ConcurrentHashMap这样的Map数据结构?

  1. 首先,如果ThreadLocal使用ConcurrentHashMap来达到key-value管理目的,那么是无法实现“线程本地变量即:每个线程持有自己的本地实例”这样的需求,因此对于Josh Bloch and Doug Lea来说,需要给ThreadLocal设计全新一套的数据结构及其一些算法细节,以打造出可以支持和实现“线程本地变量且不需要基于任何锁的支持即可实现线程隔离”功能的数据结构,这显然是非常创新的工作,虽然ConcurrentHashMap的源代码设计已经堪称十分优秀。

  2. 其次,既然不采用ConcurrentHashMap这样内部复杂设计的Map结构,那么就要设计出非常高效、简约的数据结构,因此设计了底层只有一个数组table的ThreadLocalMap,不再有什么单链表、红黑树等结构,采用开放寻址法解决hash冲突,同时,只基于数组实现相关逻辑的代码会变得更加直观、简单,例如在扩容、清理stale entry方面,仅需基于数组的前后线性遍历即可。

  3. ThreadLocal底层只基于一个数组table,结合设计特定的hash魔数,可以使得Entry的hash在数组中分散很均匀,大大降低了冲突概率,提高查询效率。