/** * The entries in this hash map extend WeakReference, using * its main ref field as the key (which is always a * ThreadLocal object). Note that null keys (i.e. entry.get() * == null) mean that the key is no longer referenced, so the * entry can be expunged from table. Such entries are referred to * as "stale entries" in the code that follows. */ // ThreadLocalMap底层数组存放的WeakReference类型的entry,使用弱引用类型是为了能够高效GC,避免内存泄露,文章后面给出此设计的讨论 staticclassEntryextendsWeakReference<ThreadLocal<?>> { /** The value associated with this ThreadLocal. */ Object value; /* entry的key就是ThreadLocal对象,例如一个线程内部有10个ThreadLocal变量,那么此线程内部的ThreadLocalMap将存放这10个entry,这里的value就是ThreadLocal变量的“值”。 例如demo1中: ThreadLocal<String> var1=new ThreadLocal<>(); var1.set("foo A") 那么entry的key就是这个名称为var1的ThreadLocal对象,value就是字符串“foo A” */ Entry(ThreadLocal<?> k, Object v) { super(k); value = v; } }
/** * The initial capacity -- MUST be a power of two. */ // 数组的初始容量16 privatestaticfinalint INITIAL_CAPACITY = 16;
/** * The table, resized as necessary. * table.length MUST always be a power of two. */ //ThreadLocalMap的底层数组,这里也采用2的次方,原因在HashMap的源代码讨论已经给出深入的解析,这里不再累赘。 private Entry[] table;
/** * The number of entries in the table. */ // 数据含有entry的个数,注意即使entry的key处于stale状态,它也算一个entry privateint size = 0;
/** * The next size value at which to resize. */ privateint threshold; // Default to 0
/** * Set the resize threshold to maintain at worst a 2/3 load factor. */ // 注意区别于HashMap的扩容阈值是len*3/4 privatevoidsetThreshold(int len){ threshold = len * 2 / 3; }
/** * Increment i modulo len. */ // 返回数组当前位置i的下一个位置i+1,如果下一个位置超过数组长度,那么下一个位置又从下标0开始,这种方式实现了所谓的“环形数组”,在后面get、set方法中或者stale entry清空机制的处理中可以看到它的用处 privatestaticintnextIndex(int i, int len){ return ((i + 1 < len) ? i + 1 : 0); }
/** * Decrement i modulo len. */ // 逻辑同上,方向相反,返回当前位置i的前一个位置i+1,如果来到数组头部,那么前一个位置即回到数组末尾 privatestaticintprevIndex(int i, int len){ return ((i - 1 >= 0) ? i - 1 : len - 1); }
/** * Set the value associated with key. * * @param key the thread local object * @param value the value to be set */ privatevoidset(ThreadLocal<?> key, Object value){
// We don't use a fast path as with get() because it is at // least as common to use set() to create new entries as // it is to replace existing ones, in which case, a fast // path would fail more often than not. Entry[] tab = table; int len = tab.length; // 1.计算给定key对应的桶位,此hash算法能够最大程度将key平均分布到数组对应的桶位上,具体算法参考文后说明 int i = key.threadLocalHashCode & (len-1); /* 2.线性探测发的实现 从定位到i桶位开始遍历,直到遇到一个entry确实是null的空桶位,如果此遍历过程中遇到stale entry那么将其替换即可完成set操作。 */ for (Entry e = tab[i]; e != null; e = tab[i = nextIndex(i, len)]) { ThreadLocal<?> k = e.get(); // 3.如果此桶位的key恰好是给定给定的key,那么更新此桶位的value后可直接返回。 if (k == key) { e.value = value; return; } // 4. 当前桶位的entry的key为null(注意这个key是弱引用类型,说明此entry已经被GC),使用replaceStaleEntry放入新entry if (k == null) { replaceStaleEntry(key, value, i); return; } } // 5.在2的线性探测过程中,遇到entry为空的即可来到这流程,直接放入新entry,并且数组的entry数量加1,在这里应该可以猜到,当向数组添加一个新entry后接下来就要判断是否需要扩容 tab[i] = new Entry(key, value); int sz = ++size; // 6.满足扩容的条件:cleanSomeSlots返回False且entry数量达到扩容阈值 if (!cleanSomeSlots(i, sz) && sz >= threshold) rehash(); }
关于线性探测法的说明
1.为何会有“环形数组或者环形遍历”的设计:nextIndex
1 2 3
for (Entry e = tab[i]; e != null; e = tab[i = nextIndex(i, len)])
publicclassThreadLocal<T> { /** * ThreadLocals rely on per-thread linear-probe hash maps attached * to each thread (Thread.threadLocals and * inheritableThreadLocals). The ThreadLocal objects act as keys, * searched via threadLocalHashCode. This is a custom hash code * (useful only within ThreadLocalMaps) that eliminates collisions * in the common case where consecutively constructed ThreadLocals * are used by the same threads, while remaining well-behaved in * less common cases. */ privatefinalint threadLocalHashCode = nextHashCode();
/** * The next hash code to be given out. Updated atomically. Starts at * zero. */ // 巧妙利用了原子累加器 privatestatic AtomicInteger nextHashCode = new AtomicInteger();
/** * The difference between successively generated hash codes - turns * implicit sequential thread-local IDs into near-optimally spread * multiplicative hash values for power-of-two-sized tables. */ // hash值递增步长,每次对新的i计算hash值前先加上此数,计算结果能更加离散,这个值对应的十进制数为1640531527,这个值就是带符号的32位int的最大值的黄金分割值取正 privatestaticfinalint HASH_INCREMENT = 0x61c88647;
/** * Returns the next hash code. */ privatestaticintnextHashCode(){ // 每次对新的i计算hash值前先加上此数,计算结果能更加离散 return nextHashCode.getAndAdd(HASH_INCREMENT); }
模拟ThreadLocalMap的hash计算方式:
1 2 3 4 5 6 7 8 9 10 11
publicclassThreadLocalHashCode{ privatestaticfinalint HASH_INCREMENT = 0x61c88647; publicstaticvoidmain(String[] args){ int tableLength=16; for (int i = 0; i <32 ; i++) { int h=i*HASH_INCREMENT+HASH_INCREMENT; int index=h & (tableLength-1); System.out.println(i+"定位的桶位是:"+index); } } }
privatevoidreplaceStaleEntry(ThreadLocal<?> key, Object value, int staleSlot){ Entry[] tab = table; int len = tab.length; Entry e;
// Back up to check for prior stale entry in current run. // We clean out whole runs at a time to avoid continual // incremental rehashing due to garbage collector freeing // up refs in bunches (i.e., whenever the collector runs). int slotToExpunge = staleSlot; /* 1、staleSlot是对于给定key用线性探测法“前向遍历”找到的首次出现的stale entry对应的下标 为何第1步骤的前向遍历没有安排类似第2步骤的“替换操作等逻辑呢”,因为“ThreadLocal本身用的是开发地址法,冲突的key都被放置在后面空的slot,就算来到table末尾再从头遍历,它也是遵循“向后放置发生冲突的key”” */ for (int i = prevIndex(staleSlot, len); (e = tab[i]) != null; i = prevIndex(i, len)) if (e.get() == null) slotToExpunge = i;
// Find either the key or trailing null slot of run, whichever // occurs first // 2. 从set方法传入的staleSlot下标开始向后遍历 for (int i = nextIndex(staleSlot, len); (e = tab[i]) != null; i = nextIndex(i, len)) { ThreadLocal<?> k = e.get();
// If we find key, then we need to swap it // with the stale entry to maintain hash table order. // The newly stale slot, or any other stale slot // encountered above it, can then be sent to expungeStaleEntry // to remove or rehash all of the other entries in run. /* 以下逻辑非常关键: 3.1 如果从stale slot开始的“后向遍历”的第i下标又出现了key冲突,说明给定的key“本应放在stale slot 下标位置,但是因为冲突,被迫挪到比stale slot 更靠后的位置i”,既然现在stale slot已失效,那么就可以将给定key放回本应该更靠近hash定位的下标位置staleSlot。这里采用交换两者位置即可实现此逻辑。这就是”to swap it with the stale entry to maintain hash table order”所要表达的逻辑。 */ if (k == key) { e.value = value; tab[i] = tab[staleSlot]; tab[staleSlot] = e;
// Start expunge at preceding stale entry if it exists // 3.2 如果slotToExpunge还是staleSlot,说明第1步骤的“前向探测”没有stale entry,那么就将清理起始下标改到i,因为i下标位置存放的是在3.1交换过来的stale entry:tab[i] = tab[staleSlot] if (slotToExpunge == staleSlot) slotToExpunge = i; /* 3.3 到此,我们知道,截止到i下标的stale entry情况ß: [某个空slot,staleSlot):从staleSlot的前向位置都没有stale entry staleSlot:将i位置的有效entry交换过来tab[staleSlot] = e [staleSlot+1,i-1]:后向遍历没有出现stale entry i:存放的是从staleSlot交换过来的stale entry 因此slotToExpunge肯定是从i下标开始做清理工作。 */ cleanSomeSlots(expungeStaleEntry(slotToExpunge), len); return; }
// If we didn't find stale entry on backward scan, the // first stale entry seen while scanning for key is the // first still present in the run. // 3.4 后向遍历在第i下标发现了一个stale entry且在前向遍历没有出现stale entry,那么清理开始下标当然要重置为i,那么staleSlot位置还存放着stale entry且没有也没有像3.1这样的“swap it”的设计,那么staleSlot自己是如何处理呢。它会在接下里的第4步骤中被处理掉! 那么在这个步骤发现第i号下标新的stale entry又是如何处理呢? 它会在第5个步骤被清理掉 if (k == null && slotToExpunge == staleSlot) slotToExpunge = i; }
// If key not found, put new entry in stale slot /* 4. 在整个run都没有找到对应的key且也没有发现stale entry(除了staleSlot本身整个),那么好办,直接将staleSlot这个在set方法一开始就发现的stale entry的位置替换为新 entry即可,这就是为何方法名字命名为replaceStaleEntry。 */ tab[staleSlot].value = null; tab[staleSlot] = new Entry(key, value);
// If there are any other stale entries in run, expunge them // 第5步骤:接第3.4步骤出现的情况。 if (slotToExpunge != staleSlot) cleanSomeSlots(expungeStaleEntry(slotToExpunge), len); }
关于“A run”的理解(理解run及其内部清理标记逻辑才能透彻理解set背后的原理)
As a side effect, this method expunges all stale entries in the “run” containing the stale entry(A run is a sequence of entries between two null slots.)
/** * Expunge a stale entry by rehashing any possibly colliding entries * lying between staleSlot and the next null slot. This also expunges * any other stale entries encountered before the trailing null. See * Knuth, Section 6.4 * * @param staleSlot index of slot known to have null key * @return the index of the next null slot after staleSlot * (all between staleSlot and this slot will have been checked * for expunging). */ // 1.这里入参的staleSlot,就是replaceStaleEntry探测到的slotToExpunge下标 privateintexpungeStaleEntry(int staleSlot){ Entry[] tab = table; int len = tab.length;
// Rehash until we encounter null // 3. 在slotToExpunge+1到恰好遇到null slot之间进行逐个探测清理 Entry e; int i; for (i = nextIndex(staleSlot, len); (e = tab[i]) != null; i = nextIndex(i, len)) { ThreadLocal<?> k = e.get(); // 3.1 又出现stale entry可直接清空 if (k == null) { e.value = null; tab[i] = null; size--; } else { // 3.2 再次计算当前位置i放置的entry对应的hash值,如果hash值和当前i桶位一致,说明没有冲突,此entry恰好就是位于“本属于自己的桶位上”,如果hash值和当前i不一致,说明“此entry因为冲突被迫放到了第i位置,而第i位置不是此entry的直接定位”,可以将位于i桶位的“entry”放在“属于自己的h桶位”,这样就保证了entry能最大程度靠近或者就位于“本属于自己的桶位”范围以内,目的是为了提供线性探测查询效率! int h = k.threadLocalHashCode & (len - 1); // 从向后探测的开放地址法可知,h值更小,i值更大,正是因为原h位置有冲突,e才被放置到更靠后的第i位置 if (h != i) { // 因为发生冲突被迫放置在i位置的entry,后面会被放到它的直接定位h桶位,因此i位置可以置为null tab[i] = null;
// Unlike Knuth 6.4 Algorithm R, we must scan until // null because multiple entries could have been stale. /* 虽然h桶位就是此entry的直接定位,但是考虑到h桶位可能被放置了其他entry,因此需要加入“向后探测”的逻辑,直到发现下一个位置为null slot。 tab[h] = e 的写法就实现了“因为发生冲突被迫放置在i位置的entry,现在能够最接近地放到本属于自己直接定位的h桶位*/ while (tab[h] != null) h = nextIndex(h, len); tab[h] = e; } } }
return i; /* 显然此i就是第3步骤for循环里面从slotToExpunge向后探测到下一个null slot下标,此下标会被cleanSomeSlots方法中利用起来。 for (i = nextIndex(staleSlot, len); (e = tab[i]) != null; i = nextIndex(i, len)) */ }
/** * Re-pack and/or re-size the table. First scan the entire * table removing stale entries. If this doesn't sufficiently * shrink the size of the table, double the table size. */ privatevoidrehash(){ //1.扩容前,先从头到尾线性清理一下stale entry,运气好的话,清理的stale entry后恰好有足够多的null slot,这样省去真正的扩容操作,效率更高。 expungeStaleEntries();
// Use lower threshold for doubling to avoid hysteresis // 2.此时的size是在1步骤清理完stale entry后的实际entry个数,只有当此时的size达到了0.75threshold才会去扩容, if (size >= threshold - threshold / 4) resize(); }
/** * Double the capacity of the table. */ privatevoidresize(){ Entry[] oldTab = table; int oldLen = oldTab.length; // 这里看出是两倍扩容 int newLen = oldLen * 2; Entry[] newTab = new Entry[newLen]; int count = 0; // 1.从旧表开始逐个遍历 for (int j = 0; j < oldLen; ++j) { Entry e = oldTab[j]; if (e != null) { ThreadLocal<?> k = e.get(); // 2.旧表当前j位置出现stale entry,那么直接将entry的value强引用设为null,Help the GC if (k == null) { e.value = null; // Help the GC } else { // 3. 旧表当前遍历位置j是正常的entry,那么用新表newLen计算它在新表的桶位号 int h = k.threadLocalHashCode & (newLen - 1); // 4. 开放地址法在新表中为“当前旧表遍历位置下entry”找到对应的null slot新表h位置 while (newTab[h] != null) h = nextIndex(h, newLen); // 注意这里是在新表计算 newTab[h] = e; count++; } } }
/** * Returns the value in the current thread's copy of this * thread-local variable. If the variable has no value for the * current thread, it is first initialized to the value returned * by an invocation of the {@link #initialValue} method. * * @return the current thread's value of this thread-local */ public T get(){ Thread t = Thread.currentThread(); ThreadLocalMap map = getMap(t); // 1、ThreadLocalMap已经存在时 if (map != null) { // getEntry才是正在在底层table去查找给定key对应的entry ThreadLocalMap.Entry e = map.getEntry(this); if (e != null) { @SuppressWarnings("unchecked") T result = (T)e.value; return result; } } // 2. 如果线程t的ThreadLocal内部ThreadLocalMap还未初始化,直接返回ThreadLocal初始化时设定的初始值 return setInitialValue(); }
/** * Get the map associated with a ThreadLocal. Overridden in * InheritableThreadLocal. * * @param t the current thread * @return the map */ ThreadLocalMap getMap(Thread t){ return t.threadLocals; }
/** * Create the map associated with a ThreadLocal. Overridden in * InheritableThreadLocal. * * @param t the current thread * @param firstValue value for the initial entry of the map */ // 这里可以看出,原来ThreadLocal并不是独立存在,而是它里面的ThreadLocalMap绑定当前线程的成员变量threadLocals,因此ThreadLocalMap的生命周期和线程同在 voidcreateMap(Thread t, T firstValue){ t.threadLocals = new ThreadLocalMap(this, firstValue); }
/** * Construct a new map initially containing (firstKey, firstValue). * ThreadLocalMaps are constructed lazily, so we only create * one when we have at least one entry to put in it. */ ThreadLocalMap(ThreadLocal<?> firstKey, Object firstValue) { table = new Entry[INITIAL_CAPACITY]; int i = firstKey.threadLocalHashCode & (INITIAL_CAPACITY - 1); table[i] = new Entry(firstKey, firstValue); size = 1; setThreshold(INITIAL_CAPACITY); }
/** * Get the entry associated with key. This method * itself handles only the fast path: a direct hit of existing * key. It otherwise relays to getEntryAfterMiss. This is * designed to maximize performance for direct hits, in part * by making this method readily inlinable. * * @param key the thread local object * @return the entry associated with key, or null if no such */ private Entry getEntry(ThreadLocal<?> key){ int i = key.threadLocalHashCode & (table.length - 1); Entry e = table[i]; // 1. 对应上面源代码注释提到“Only the fast path:a direct hit of existing key” 逻辑,也即给定key对应的i桶位的entry恰好存放的就是key的entry。 if (e != null && e.get() == key) return e; // 2. 说明这个key在之前是发生冲突了,放置到比i更靠后的位置,需要采用“后向探测”去检索。 else return getEntryAfterMiss(key, i, e); }
/** * Version of getEntry method for use when key is not found in * its direct hash slot. * * @param key the thread local object * @param i the table index for key's hash code * @param e the entry at table[i] * @return the entry associated with key, or null if no such */ private Entry getEntryAfterMiss(ThreadLocal<?> key, int i, Entry e){ Entry[] tab = table; int len = tab.length; // 从给定的entry开始后向遍历探测 while (e != null) { ThreadLocal<?> k = e.get(); // 1.找到,则返回 if (k == key) return e; // 2.遇到stale entry,调用expungeStaleEntry清理它,此时i位置就是slotToExpunge起始清理的下标 if (k == null) expungeStaleEntry(i); else // 3.继续向后探测下一个entry i = nextIndex(i, len); e = tab[i]; } returnnull; }
remove方法
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19
/** * Remove the entry for key. */ privatevoidremove(ThreadLocal<?> key){ Entry[] tab = table; int len = tab.length; int i = key.threadLocalHashCode & (len-1); for (Entry e = tab[i]; e != null; e = tab[i = nextIndex(i, len)]) { if (e.get() == key) { // 目的是主动让entry的父类成员变量置null,那么entry自然就不会存在任何引用了,直接从正常的entry变成stale entry e.clear(); // expungeStaleEntry(i); return; } } }
/** * Clears this reference object. Invoking this method will not cause this * object to be enqueued. * * <p> This method is invoked only by Java code; when the garbage collector * clears references it does so directly, without invoking this method. */ publicvoidclear(){ this.referent = null; }
/** 注意以下源代码的解析 To help deal with * very large and long-lived usages, the hash table entries use * WeakReferences for keys. */ staticclassThreadLocalMap{
/** * The entries in this hash map extend WeakReference, using * its main ref field as the key (which is always a * ThreadLocal object). Note that null keys (i.e. entry.get() * == null) mean that the key is no longer referenced, so the * entry can be expunged from table. Such entries are referred to * as "stale entries" in the code that follows. */ staticclassEntryextendsWeakReference<ThreadLocal<?>> { /** The value associated with this ThreadLocal. */ Object value;
Entry(ThreadLocal<?> k, Object v) { super(k); // key 是弱引用类型 value = v; // value 是强引用类型 } }
/** 定义一个给每个线程自己的内部读锁计数器 * A counter for per-thread read hold counts. * Maintained as a ThreadLocal; cached in cachedHoldCounter */ staticfinalclassHoldCounter{ int count = 0; // Use id, not reference, to avoid garbage retention finallong tid = getThreadId(Thread.currentThread()); }
/** * ThreadLocal subclass. Easiest to explicitly define for sake * of deserialization mechanics. */ // 如何实现每个线程独立记录的读锁计数器? 使用ThreadLocal即可保证线程隔离的计数,互不影响。 staticfinalclassThreadLocalHoldCounter extendsThreadLocal<HoldCounter> { public HoldCounter initialValue(){ // 线程自己持有的读锁计数器初始值0 returnnew HoldCounter(); } }
/** * The number of reentrant read locks held by current thread. * Initialized only in constructor and readObject. * Removed whenever a thread's read hold count drops to 0. */ privatetransient ThreadLocalHoldCounter readHolds;
Sync() { readHolds = new ThreadLocalHoldCounter(); setState(getState()); // ensures visibility of readHolds }
// 线程释放自己持有的读锁 protectedfinalbooleantryReleaseShared(int unused){ Thread current = Thread.currentThread(); // 第一个持有读锁的线程恰好是当前线程, if (firstReader == current) { // assert firstReaderHoldCount > 0; // “第一个持有读锁的线程” 也准备释放读锁,firstReader不再指向任何读线程 if (firstReaderHoldCount == 1) firstReader = null; // 否则“第一个持有读锁的线程”重入锁次数减1 else firstReaderHoldCount--; } else { HoldCounter rh = cachedHoldCounter; // 如果线程自己缓存的读锁计数器对象为空,或者线程自己的读锁计数器缓存的线程不是当前线程 if (rh == null || rh.tid != getThreadId(current)) // 当前线程持有读锁的计数器readHolds rh = readHolds.get(); int count = rh.count; // 当前线程持有读锁的计小于等于1,说明在本次读锁退出后,当前线程不再持有任何读锁,也即不再使用“计数器ThreadLocalHoldCounter”,因此用在它身上的ThreadLocal<HoldCounter>对象需要马上移除,避免ThreadLocal发生内存泄露。 if (count <= 1) { readHolds.remove(); if (count <= 0) throw unmatchedUnlockException(); } // 线程自己持有读锁自减1 --rh.count; } for (;;) { // 总的读锁锁-1 int c = getState(); int nextc = c - SHARED_UNIT; if (compareAndSetState(c, nextc)) // Releasing the read lock has no effect on readers, // but it may allow waiting writers to proceed if // both read and write locks are now free. return nextc == 0; } }
This class provides thread-local variables. These variables differ from their normal counterparts in that each thread that accesses one (via its get or set method) has its own, independently initialized copy of the variable. ThreadLocal instances are typically private static fields in classes that wish to associate state with a thread (e.g., a user ID or Transaction ID).
尤其这句ThreadLocal instances are typically private static fields in classes that wish to associate state with a thread (e.g., a user ID or Transaction ID).
import java.util.concurrent.atomic.AtomicInteger; publicclassThreadId{ // Atomic integer containing the next thread ID to be assigned privatestaticfinal AtomicInteger nextId = new AtomicInteger(0); // Thread local variable containing each thread's ID // 放在类的静态字段位置,这样类的其他方法可以直接使用“此线程局部变量” privatestaticfinal ThreadLocal<Integer> threadId = new ThreadLocal<Integer>() { @Override protected Integer initialValue(){ return nextId.getAndIncrement(); } }; // Returns the current thread's unique ID, assigning it if necessary publicstaticintget(){ return threadId.get(); } }
其实很好理解,类的静态变量确保在类的多次实例化后仍然保持在内存中仅有一份副本,或者说为了避免重复创建thread specific object(与线程相关的变量),例如ThreadLocal变量管理了一个Session对象,那么当然希望在同一个线程中,此Session对象仅有一份实例,如果存在多份,那么就无法实现所谓“在同一session完成相关业务”的设计,导致逻辑出错。
为何ThreadLocal没有直接采用ConcurrentHashMap这样的Map数据结构?
首先,如果ThreadLocal使用ConcurrentHashMap来达到key-value管理目的,那么是无法实现“线程本地变量即:每个线程持有自己的本地实例”这样的需求,因此对于Josh Bloch and Doug Lea来说,需要给ThreadLocal设计全新一套的数据结构及其一些算法细节,以打造出可以支持和实现“线程本地变量且不需要基于任何锁的支持即可实现线程隔离”功能的数据结构,这显然是非常创新的工作,虽然ConcurrentHashMap的源代码设计已经堪称十分优秀。