yield-bytes

沉淀、分享与无限进步

Java高级主题:Unsafe类实现的JUC框架free-lock设计原理讨论

Unsafe类在整套JUC框架中绝对是核心的一个概念,它是实现free-lock的底层核心设计,它内部直接调用的是Java的JNI,只有理解它的CAS原子操作的内部设计原理,才能更加深入理解JUC的free-lock设计。

使用unsafe操作数组

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
package hashmap.demo;

import java.lang.reflect.Field;
import sun.misc.Unsafe;


public class Unsafe3 {

private static Unsafe unsafe;

static{
try {
Field field = Unsafe.class.getDeclaredField("theUnsafe");
field.setAccessible(true);
unsafe = (Unsafe)field.get(null);
} catch (Exception e) {
e.printStackTrace();
}
}

public static void main(String[] args) throws Exception {

int[] intArr1={10,20,30,40};
int[] intArr2={1,3,5,7,8,9,10,1,1,1,1,1,1,1,1};

// byte类型数组内存每个"单元格"容量是1个字节,对应Scale就是1
int byteScale=unsafe.arrayIndexScale(byte[].class);
// short类型数组内存每个"单元格"容量是2个字节,对应Scale就是2
int shortScale=unsafe.arrayIndexScale(short[].class);
// int类型数组内存每个"单元格"容量是4个字节,对应Scale就是4
int intScale=unsafe.arrayIndexScale(int[].class);
// long类型数组内存每个"单元格"容量是8个字节,对应Scale就是8
int longScale=unsafe.arrayIndexScale(long[].class);
// Integer类型数组内存每个"单元格"容量是4个字节,对应Scale就是4
int integerScale=unsafe.arrayIndexScale(Integer[].class);
// String类型数组内存每个"单元格"容量是4个字节,对应Scale就是4
int stringScale=unsafe.arrayIndexScale(String[].class);

//
boolean b=intArr1.getClass().equals(intArr2.getClass()) && int[].class.equals(intArr1.getClass()) && int[].class.equals(intArr2.getClass());
System.out.println(b); // true

// intArr1数组实例和intArr2的数组实例都是同一对象,指向int[]
System.out.println(intArr1.getClass().equals(intArr2.getClass()));


// 获取当前intArr1数组实例的相对基址
long baseOffset1=unsafe.arrayBaseOffset(intArr1.getClass());

System.out.println(baseOffset1); // 16
// 获取当前intArr2数组实例的相对基址
long baseOffset2=unsafe.arrayBaseOffset(intArr2.getClass());
System.out.println(baseOffset2); // 16
// 从baseOffset1和baseOffset2的值都是16可以看出,这是他们指向对象的相对基址,而不是指向对象绝对地址(如果是执行对象绝对地址,那么这里两个值一定不同)


System.out.println(unsafe.getIntVolatile(intArr1,baseOffset1)); //打印intArr1第1个(首地址)对应的元素,也就是intArr1[0]
System.out.println(unsafe.getIntVolatile(intArr1,baseOffset1+intScale)); //打印intArr1的第2个元素,也就是intArr1[1]
System.out.println(unsafe.getIntVolatile(intArr1,baseOffset1+intScale* 2L)); //打印intArr1的第3个元素,也就是intArr1[2]
System.out.println(unsafe.getIntVolatile(intArr1,baseOffset1+intScale* 3L)); //打印intArr1的第4个元素,也就是intArr1[3]

// 获取intArr2的数据的第3个元素
System.out.println(unsafe.getIntVolatile(intArr2,baseOffset2+intScale* 2L)); //打印intArr2第3个对应的元素,也就是intArr2[2]

}
}

使用Unsafe操作自定义类(非并发条件下)

以下定义个三个类及其属性,可以清楚看到unsafe操作对象的逻辑:

首先使用objectFieldOffset取到该对象的字段相对基址

其次,getXX取值方面,也即“读”:

  • A、非并发编程条件下,基础类型使用getInt、getDouble等方法获取字段值,引用类型使用getObject取值

  • B、并发编程条件下,基础类型使用getIntVolatile、getDoubleVolatile等方法获取字段值,引用类型使用getObjectVolatile取值

putXX设置值方面(也即set字段值),也即“写”:

  • A、非并发编程条件下,基础类型使用putInt、putDouble等方法对字段进行赋值,引用类型使用putObject等方法对字段进行赋值

  • B、并发编程条件下,由于涉及多线程对临界区的写入,因此需要使用CAS机制去写入,而不是简单的putObject方法,对于基础类型和引用类型常用的CAS方法:

  • ```
    compareAndSwapInt、compareAndSwapLong、compareAndSwapObject

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    47
    48
    49
    50
    51
    52
    53
    54
    55
    56
    57
    58
    59
    60
    61
    62
    63
    64
    65
    66
    67
    68
    69
    70
    71
    72
    73
    74
    75
    76
    77
    78
    79
    80
    81
    82
    83
    84
    85

    对应的demo程序如下:

    ```java
    package hashmap.demo;

    import sun.misc.Unsafe;

    import java.lang.reflect.Field;
    import java.util.concurrent.ConcurrentHashMap;

    class A {
    private int a = 1;
    private String aa;
    }


    class B {
    private int b = 2;
    private String bb = "Unsafe looks like a C Pointer";
    }

    class C {
    private int c = 3;
    private String cc;
    }

    public class Unsafe4 {
    private static Unsafe unsafe;

    static {
    try {
    Field field = Unsafe.class.getDeclaredField("theUnsafe");
    field.setAccessible(true);
    unsafe = (Unsafe) field.get(null);
    } catch (Exception e) {
    e.printStackTrace();
    }
    }

    public static void main(String[] args) throws NoSuchFieldException {
    long baseOffset1 = unsafe.objectFieldOffset(A.class.getDeclaredField("a"));
    long baseOffset2 = unsafe.objectFieldOffset(B.class.getDeclaredField("b"));
    long baseOffset3 = unsafe.objectFieldOffset(C.class.getDeclaredField("c"));
    long baseOffset4 = unsafe.objectFieldOffset(B.class.getDeclaredField("bb"));
    long baseOffset5 = unsafe.objectFieldOffset(C.class.getDeclaredField("cc"));

    System.out.println(baseOffset1); // 12
    System.out.println(baseOffset2); // 12
    System.out.println(baseOffset3); // 12
    System.out.println(baseOffset4); // 16
    System.out.println(baseOffset5); // 16

    // 非并发情况下,使用getInt方法获取字段值
    System.out.println(unsafe.getInt(new A(), baseOffset1)); // a=1
    System.out.println(unsafe.getInt(new B(), baseOffset2)); // a=1
    System.out.println(unsafe.getInt(new C(), baseOffset3)); // c=3


    // 引用类型的字段需要使用getObject或者getObjectVolatile获取其值
    System.out.println(unsafe.getObject(new B(), baseOffset4)); // Unsafe looks like a C Pointer
    System.out.println(unsafe.getObject(new C(), baseOffset5)); // null


    // 并发情况下,因为同一对象的同一字段可能有多线程并发get或者set,线程栈内部的字段值和主存字段值可能会不一致,因此需要使用getIntVolatile方法获取字段值
    System.out.println(unsafe.getIntVolatile(new A(), baseOffset1)); // a=1
    System.out.println(unsafe.getIntVolatile(new B(), baseOffset2)); // a=1
    System.out.println(unsafe.getIntVolatile(new C(), baseOffset3)); // c=3


    System.out.println(unsafe.getObjectVolatile(new B(), baseOffset4)); // Unsafe looks like a C Pointer

    /* unsafe.getObjectVolatile返回的是Object类型,那么这个Object类型具体代表是Person类型、Integer类型、C类型还是其他什么类型?
    * 转型的依据:cc字段是什么类型就转型为什么类型,例如在ConcurrentHashMap的源码中:
    * seg = (Segment<K,V>)UNSAFE.getObjectVolatile(ss, u)
    * 直接从Segments数组取出位于u下标的Object具体来说一个Segment<K,V>类型元素,因此需将取出的Object转型为对应的Segment<K,V>类型
    * 其实这里取到的是cc字段,因此可以转型为String类型
    * 这里需要说明的是:String类型其实不需要显式转型
    */
    String ccField = (String) unsafe.getObjectVolatile(new C(), baseOffset5);
    System.out.println(ccField); // null

    }
    }

关于使用getObjectgetObjectVolatile返回值为Object的转型说明

两个方法源码:

1
2
public native Object getObject(Object var1, long var2);
public native Object getObjectVolatile(Object var1, long var2);

这里需要清楚一个基本常识:

往内存某个地址put一个“Person”类型对象,那么从相同的地址取出来也应该是相同的Person类型对象,如果取出来是Dog类型,那么就不符合设计规范了!或者取出的Object应该转型为Person类型,你给它强制转换为Dog类型,编译器当然会抛出一个:ClassCastException

因此使用getObject和getObjectVolatile方法时,需显式转型为具体类型的实例,以便操作该实例相关方法,这里以在ConcurrentHashMap的某个ensureSegment方法源码作为说明:

UNSAFE.getObjectVolatile(ss, u)表示直接Segments数组取出位于u下标的Object,而这个Object结合上下文可知它是一个Segment类型的实例,因此对取出的Object将其转型为对应的Segment类型

1
2
3
4
5
while ((seg = (Segment<K,V>)UNSAFE.getObjectVolatile(ss, u))
== null) {
if (UNSAFE.compareAndSwapObject(ss, u, null, seg = s))
break;
}

然后你会发现在ConcurrentHashMap有13个关于getObjectXX类型方法,无一例外都是这样固定转型用法,不管是上面的ensureSegment方法还是下面segmentForHash方法:

(Segment<K,V>) UNSAFE.getObjectVolatile(segments, u)

1
2
3
4
private Segment<K,V> segmentForHash(int h) {
long u = (((h >>> segmentShift) & segmentMask) << SSHIFT) + SBASE;
return (Segment<K,V>) UNSAFE.getObjectVolatile(segments, u);
}

使用Unsafe操作自定义类(并发写和读条件下)

首先使用以下demo程序在单线程情况下compareAndSwapInt的用法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
package hashmap.demo;

import sun.misc.Unsafe;
import java.lang.reflect.Field;

class Counter<V> {
public volatile V value;
}

public class Unsafe4 {
private static Unsafe unsafe;

static {
try {
Field field = Unsafe.class.getDeclaredField("theUnsafe");
field.setAccessible(true);
unsafe = (Unsafe) field.get(null);
} catch (Exception e) {
e.printStackTrace();
}
}


public static void main(String[] args) throws NoSuchFieldException {
long baseOffset = unsafe.objectFieldOffset(Counter.class.getDeclaredField("value"));


// 1、单线程情况,对Counter的value字段进行读和写
Counter<Integer> counter=new Counter<>();
System.out.println(unsafe.getInt(counter, baseOffset)); // 0

// 2、使用非CAS机制的putInt方法将counter的value字段写入10
unsafe.putInt(counter,baseOffset,10);
// 3、取出value字段值是否为10
System.out.println(unsafe.getInt(counter, baseOffset)); // 10

// 4、使用CAS机制的原子写入value的值,写入条件描述为:如果主存里面的value字段值等于所期待的10,那么就将该value字段值更新为新值20
boolean isSet= unsafe.compareAndSwapInt(counter,baseOffset,10,20);
// 5、从第3点可知,主存里面的value字段值一定为10,因此compareAndSwapInt会对value进行更新为20
System.out.println(isSet); // true
System.out.println(unsafe.getInt(counter, baseOffset)); // 20

// 6、使用CAS机制的原子写入value的值,写入条件描述为:如果主存里面的value字段值等于所期待的21,那么就将该value字段值使用原子操作更新为新值30
boolean isSet1= unsafe.compareAndSwapInt(counter,baseOffset,21,30);
// 7、从第5点可知,主存里面的value字段值一定为20,显然不符合所期待的21,因此compareAndSwapInt会放弃本次对value写入(更新)
System.out.println(isSet1); // false
System.out.println(unsafe.getInt(counter, baseOffset)); // 20

}
}

这里有个简单的类比:

counter.value就好比桌子的一张白纸,假设白纸上当前写的初始值为0,同学A此时准备走到桌子前使用compareAndSwapInt去更改白纸的数字,但要满足这样条件:

1、如果同学A走到桌子前发现白纸的数字是0,与他期待的数字0一致,说明当前没有其他同学抢先更改白纸数字,因此同学A能安全的原子性更改白纸的数字,对其加1,也即同学A使用CAS对白纸的数字改为1,CAS返回true。

2、如果同学A走到桌子前发现白纸的数字是1,与他期待的数字0一致,说明在他之前已经有另外一个同学早就把白纸数字改为1,因此同学A此时需要放弃对白纸数字的写入,CAS返回false,那么同学A只能重新再到桌子上该,并且他期待白纸的数字是1

再看一下说明:

1
unsafe.compareAndSwapInt(counter, baseOffest, expect, update);

可以这样理解:对应counter实例,value字段对应的在主存相对地址为baseOffest,如果该value在主存的值等于线程栈空间存放的expect值,说明没有其他线程去主动更新主存的value字段,此时本线程使用compareAndSwapInt原子更新value字段一定不会产生冲突,而且能成功更新主存value的值。

这里有个理解技巧:compareAndSwapInt(counter, baseOffest, expect, update) 首先expect跟update是没有任何联系的,不要混淆错看为:expect等于update时才写入,这是非常典型的误解。CAS整个用法只关注线程栈空间的expect值即可,expect会被native方法拿去跟主存中的value字段比较。

有了以上“只关注线程栈空间的expect去比较主存的值”,接着我们再来理解Unsafe类的getAndSet:

1
2
3
4
5
6
7
8
9
10
11
    public final V getAndSet(V newValue) {
while (true) {
V x = get();
if (compareAndSet(x, newValue))
return x;
}
}
// compareAndSet其实封装了`compareAndSwapInt(counter, baseOffest, expect, update)`
public final boolean compareAndSet(V expect, V update) {
return unsafe.compareAndSwapObject(this, valueOffset, expect, update);
}

显然getAndSet的思想是:

1、先去主存去取值回来:x = get()

2、将取出的值作为expect,也即compareAndSet(x, newValue)的x变量

3、按照“只关注线程栈空间的expect去比较主存的值”思路:当前期待的值即为x,它被取出来后(暂时存放到线程栈空间),然后再被拿去跟主存的x去比较,如果两者相等,说明在它之前没有其他线程去更改主存的x,这时当前线程就可以放心用newValue写入主存的x,而且写入操作是原子性的;否则进入下一次尝试

有了上面的CAS机制铺垫,以下是无锁并发的编程demo:

使用1万个线程对Counter类的value字段进行加1的CAS原子操作,如果能正确工作,那么最终打印counter.value的是10000

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
package hashmap.demo;
import sun.misc.Unsafe;
import java.lang.reflect.Field;
import java.util.ArrayList;
import java.util.List;

public class Unsafe5 {
static class Counter {
public volatile int value; // 多线程情况下爱,value字段需要使用volatile原语定义,以保证线程间对它的可见性
}

// 1、创建unsafe实例以及获取counter.value字段的相对地址,static块是按照源码组织形式的写法
private static final Counter counter=new Counter();
private static Unsafe unsafe;
private static long baseOffset;

static {
try {
Field field = Unsafe.class.getDeclaredField("theUnsafe");
field.setAccessible(true);
unsafe = (Unsafe) field.get(null);
baseOffset = unsafe.objectFieldOffset(Counter.class.getDeclaredField("value"));
//或者 baseOffset = unsafe.objectFieldOffset(counter.getClass().getDeclaredField("value"));

} catch (Exception e) {
e.printStackTrace();
}
}


public static void main(String[] args) throws InterruptedException {

List<Thread> threadList = new ArrayList<>();
// 2、创建1万个线程,每个线程对counter.value进行加1操作,这里使用的自旋+CAS,也即直到当前线程能够成功完成对主存的value进行
for (int i = 0; i < 10000; i++) {
Thread t = new Thread(new Runnable() {
@Override
public void run() {
for (; ; ) {
// 在每个线程内部,使用CAS无锁操作对主存中的 counter实例的value字段进行原子加1,如果写入成功,则当前线程退出
int x = counter.value;
if (unsafe.compareAndSwapInt(counter, baseOffset, x, x + 1)) {
break;
}
}
}
});

threadList.add(t);
t.start();
}
for(Thread t:threadList){
t.join();
}
// value最终的值应为100000
System.out.println(counter.value);

}

}

从此demo程序对于counter.value的加1操作设计,可以总结出一套基于unsafe的原子操作类的模板,这就是java.util.concurrent.atomic里面所有AtomicXXX的设计原理:

具体来说,java.util.concurrent.atomic中的类可以分成4种:

  • 基本类型:AtomicBoolean,AtomicInteger,AtomicLong

  • 引用类型:AtomicReference

  • 数组类:AtomicIntegerArray,AtomicLongArray,AtomicReferenceArray

  • 更新器类:AtomicLongFieldUpdater,AtomicIntegerFieldUpdater,AtomicReferenceFieldUpdater

  • 复合变量类:AtomicMarkableReference,AtomicStampedReference

CAS引起的ABA问题

CAS的思想主要是:如果主存的值跟expect值相同,说明主存的值没有被其他线程改动过,那么当前线程(以下称为T1)当然可以去原子写入

这里有个重要的前提,“如果主存的值跟expect值相同”,包含以下两种情况:

1、主存的值为A,在被T1更新前,这个A没有被其他线程更改过,那么就是我们熟悉的compareAndSet,T1一定可以CAS操作成功,这个没有问题。

2、主存的值为A,中间被其他线程改为B,随后又被其他线程改回A,那么对于T1来说它看到主存的值还是A(实际上此A已经非彼A),因此T1使用compareAndSet成功操作,尽管在这里T1是操作成功了,但CAS竟然无法发现“A变为B再变为A”的特殊情况,这会引起一些潜在的bug:

现在考察以下暴露的隐藏问题:

多个线程对栈进行操作,这个栈为:A->B->C->D,现在线程T1要使用compareAndSet(A,F)对这个栈的栈顶改为B,在T1准备CAS时,线程T2被cpu优先调度执行

线程T2对A,B,C出栈后,再对A入栈,此时栈变为A->D,线程T1此时被cpu调度执行

线程T1使用compareAndSet(A,F),此刻它认为栈顶的A和它期待的A显然是相同的,于是更新栈顶为B,T1返回的栈为为F->D,问题出现在哪里?

正常来说在阻塞式多线程并发情况下:T1执行compareAndSet(A,F)后,T1返回的栈为F->B->C->D

但是在并发CAS情况下,T1执行compareAndSet(A,F)后,T1返回的栈为F->D,B和C两个元素就这样凭空消失了!!!

这就是所谓的“ABA问题”,根本原因就是CAS只是简单比较主存的值和expect的值比较是否相同,它无法发现主存的值是否被改变了多次

如何解决ABA问题?

CAS为对A添加一个版本号,即[A,oldVersion],执行CAS时,不仅要比较主存的值和A是否相同,而且也要比较主存的值的版本号oldVersion有无变化,只有当

[主存的值,oldVersion] 等于[A,oldVersion]时,才可以执行原子写入操作。

这就是AtomicStampedReference的设计原理,它有一个内部类Pair,目的就是构造一个具有值和版本号标识的”结对子对象”:Pair(reference,stamp),它可以唯一表示出

1
2
3
4
5
6
7
8
private static class Pair<T> {
final T reference;
final int stamp;
private Pair(T reference, int stamp) {
this.reference = reference;
this.stamp = stamp;
}
private volatile Pair<V> pair;

还是以“A同学更改桌子上白纸上的数字(或者字符)”作为例子说明

1、首先白纸上已经写着一个初始值“A”,在A的旁边还写着一个版本0

2、A同学成功更改白纸字符对应的过程:A同学手上拿着期待值“A”和版本号0走到桌子前,看到白纸上值恰好写着A而且版本也是0,说明没有其他同学主动来更改白纸,因此A同学可以使用CAS原子更改白纸的值,对应代码如下

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
public class Unsafe6 {
static String paper = "A";
//asr:模拟桌子上的白纸,白纸的初始值为A,初始版本为0
static AtomicStampedReference<String> asr = new AtomicStampedReference<>(paper, 0);
public static void main(String[] args) {
new Thread(new Runnable() {
@Override
public void run() {
// 白纸被更新前对应的值和版本号
System.out.println(asr.getReference() + ":" + asr.getStamp()); // A:0
// A同学手上拿着期待值A和期待版本0,以及准备写入的新值B,准备写入的新版号
boolean isSet = asr.compareAndSet("A", "B", 0, 1);
System.out.println(isSet); //true
// 白纸被CAS成功更新后对应的值和版本号
System.out.println(asr.getReference() + ":" + asr.getStamp());// B:1
}
}).start();
}
}

3、A同学放弃此次更改白纸字符对应的过程:同学A手上拿着期待值“A”和版本号0走到桌子,看到白纸上写着A但是版本号写着2,虽然值相同,但两者版本不一样,说明其他同学主动来更改白纸的值A,A同学放弃本次更新。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46

public class Unsafe6 {
static String paper = "A";
static AtomicStampedReference<String> asr = new AtomicStampedReference<>(paper, 0);
public static void main(String[] args) throws InterruptedException {
// B同学优先使用CAS对白纸进行写入
new Thread(new Runnable() {
@Override
public void run() {
System.out.println(asr.getReference() + ":" + asr.getStamp()); // A:0
boolean isSet = asr.compareAndSet("A", "B", 0, 1);
System.out.println(isSet);// true
System.out.println(asr.getReference() + ":" + asr.getStamp());//B:1
}
}).start();
Thread.sleep(1);

// C同学优先使用CAS对白纸进行写入
new Thread(new Runnable() {
@Override
public void run() {
System.out.println(asr.getReference() + ":" + asr.getStamp()); //B:1
boolean isSet = asr.compareAndSet("B", "A", 1, 2);
System.out.println(isSet);//true
System.out.println(asr.getReference() + ":" + asr.getStamp());//A:2
}
}).start();

Thread.sleep(1);

// 等前面两个同学对白纸完成CAS操作后,白纸的值和版本号为A:2
// 此时A同学手上拿着期待值A和期待版本0,以及准备写入的新值B,准备写入的新版号
// 结果发现白纸的值虽然为A,但是版本号为2,因此isSet为false也即CAS操作失败,白纸的值维持为A
new Thread(new Runnable() {
@Override
public void run() {
System.out.println(asr.getReference() + ":" + asr.getStamp());// A:2
boolean isSet = asr.compareAndSet("A", "B", 0, 1);
System.out.println(isSet);//false
System.out.println(asr.getReference() + ":" + asr.getStamp());//A:2
}
}).start();

}
}

有了demo例子的铺垫,结合前面的unsafe多个demo的案例,AtomicStampedReference就是基于compareAndSwapObject方法进行设计的

1
public final native boolean compareAndSwapObject(Object var1, long var2, Object var4, Object var5);

AtomicStampedReference在内部中

1
UNSAFE.compareAndSwapObject(this, pairOffset, expectPair, newPair);

源码解析:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
package java.util.concurrent.atomic;

public class AtomicStampedReference<V> {

private static class Pair<T> {
final T reference; // 用于被比较的引用类型(当然可以传入任意类型,因为这里是T泛型),在以下被称为值
final int stamp; // 版本号

// 内部结对类,是辅助类,用于解决ABA的关键手段之一
private Pair(T reference, int stamp) {
this.reference = reference;
this.stamp = stamp;
}
// 用于构建一个值和版本号绑在一起的“结对”实例
static <T> Pair<T> of(T reference, int stamp) {
return new Pair<T>(reference, stamp);
}
}

// unsafe进行CAS操作的实例object,考虑并发场景下,需要使用volatile内存可见性原语
private volatile Pair<V> pair;

/** 使用促使的引用类型实例和促使版本号构造AtomicStampedReference实例,例如
AtomicStampedReference<String> asr = new AtomicStampedReference<>("A", 0);
那么这里initialRef就是"A",initialStamp就是0
*/
public AtomicStampedReference(V initialRef, int initialStamp) {
pair = Pair.of(initialRef, initialStamp);
}

// 只能返回获取值
public V getReference() {
return pair.reference;
}
// 只能返回获取值对应的版本号
public int getStamp() {
return pair.stamp;
}



//可以同时返回值和版本号,注意这里返回的版本号是放在入参数组stampHolder[0]中,至于为何不是直接用整型变量返回版本号?使用整型数据更方便让get方法将版本号放在给定的入参数据里,算是普通思路。
/*常见用法:
int[] stampHolder=new int[1];
这时asr.get(stampHolder)取到值,而stampHolder[0]就取到值对应的版本号
System.out.println("ref:"+asr.get(stampHolder)+" stamp:"+stampHolder[0]);
*/
public V get(int[] stampHolder) {
Pair<V> pair = this.pair;
stampHolder[0] = pair.stamp;
return pair.reference;
}

// 跟compareAndSet类似,可以看做是弱CAS,不常用
public boolean weakCompareAndSet(V expectedReference,
V newReference,
int expectedStamp,
int newStamp) {
return compareAndSet(expectedReference, newReference,
expectedStamp, newStamp);
}

/**
该方法就是用户调用的CAS方法,这里做了一个优化小技巧:如果新值和新版本号恰好等于主存的的值以及主存的版本号,显然连CAS操作都省了,直接返回true,有点点耍小聪明和碰运气
*/
public boolean compareAndSet(V expectedReference,
V newReference,
int expectedStamp,
int newStamp) {
Pair<V> current = pair;
return
// 通过对比期待值和主存值、期待版本号和主存值对应的版本号,可以确保不再出现所谓的ABA问题,能“感知”值的改变次数
expectedReference == current.reference &&
expectedStamp == current.stamp &&
((newReference == current.reference &&
newStamp == current.stamp) ||
casPair(current, Pair.of(newReference, newStamp)));
}


// 没有使用CAS去更新,只是简单的更新操作,这一方法适合用在单线程或者冲突不严重的并发情况,看你对业务的经验
public void set(V newReference, int newStamp) {
Pair<V> current = pair;
if (newReference != current.reference || newStamp != current.stamp)
this.pair = Pair.of(newReference, newStamp);
}

/**
不管版本号是否相同,只要期待值和主存的值相等,就用新版本号更新主存值的旧版本号,
注意:每次调用该方法都可能会更新失败,需要结合自旋直到更新成功为止
*/
public boolean attemptStamp(V expectedReference, int newStamp) {
Pair<V> current = pair;
return
expectedReference == current.reference &&
(newStamp == current.stamp ||
casPair(current, Pair.of(expectedReference, newStamp)));
}

// Unsafe mechanics
// 以下就是前面我们熟悉的unsafe相关操作:创建unsafe实例、pair的相对地址
private static final sun.misc.Unsafe UNSAFE = sun.misc.Unsafe.getUnsafe();
private static final long pairOffset =
objectFieldOffset(UNSAFE, "pair", AtomicStampedReference.class);

// 到了这里才是真正执行对主存对象进行操作的逻辑,可以看到这里使用compareAndSwapObject(Object var1, long var2, Object var4, Object var5);
// 注意对比前面的unsafe.compareAndSwapInt(被更新的对象,相对地址,期待值,新值);
private boolean casPair(Pair<V> cmp, Pair<V> val) {
return UNSAFE.compareAndSwapObject(this, pairOffset, cmp, val);
}

static long objectFieldOffset(sun.misc.Unsafe UNSAFE,
String field, Class<?> klazz) {
try {
return UNSAFE.objectFieldOffset(klazz.getDeclaredField(field));
} catch (NoSuchFieldException e) {
// Convert Exception to corresponding Error
NoSuchFieldError error = new NoSuchFieldError(field);
error.initCause(e);
throw error;
}
}
}

与AtomicStampedReference的内部的结对对象是[值,版本]设计思想类似的还有AtomicMarkableReference,它内部是结对对象是[值,标志位],也即[值,1]或者[值,0],如下面所示

AtomicStampedReference内部是结对类

1
2
3
4
5
6
final T reference;
final int stamp;
private Pair(T reference, int stamp) {
this.reference = reference;
this.stamp = stamp;
}

AtomicMarkableReference内部是结对类

1
2
3
4
5
6
final T reference;
final boolean mark;
private Pair(T reference, boolean mark) {
this.reference = reference;
this.mark = mark;
}

它们两个在使用场合有什么不同呢?

AtomicStampedReference可以给值(reference)加上版本号后,可以追踪到这个值的整个变化过程,如:
[A,0] => [B,1] => [C,2] => [A,3],通过getStamp返回的版本号大小,可以知道值在中途被更改了3次。

场景另外一种并发场景:也需要你并不关心值(reference)中途更改了多少次,而是只关注这个主存的值是否有被修改过,AtomicMarkableReference显然很适合。

ABA 问题

https://blog.csdn.net/tiandao321/article/details/80811103?utm_medium=distribute.pc_relevant_t0.none-task-blog-2%7Edefault%7EBlogCommendFromMachineLearnPai2%7Edefault-1.control&depth_1-utm_source=distribute.pc_relevant_t0.none-task-blog-2%7Edefault%7EBlogCommendFromMachineLearnPai2%7Edefault-1.control

ReentrantLock里面的阻塞锁和非柱塞锁的区别

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
final ReentrantLock reentrantLock= new ReentrantLock();

new Thread(new Runnable() {
@Override
public void run() {
reentrantLock.lock();
System.out.println("线程A在运行中");
try {
Thread.sleep(5*1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
reentrantLock.unlock();

}
}).start();

Thread.sleep(1);// 让线程A先运行

// 线程A在工作时,线程B被阻塞了,啥事都做不了,干等,浪费资源
new Thread(new Runnable() {
@Override
public void run() {
reentrantLock.lock();
System.out.println("线程B在运行中");
reentrantLock.unlock();

}
}).start();

从上面的运行结果来看:线程A在工作时,线程B被阻塞了,啥事都做不了,干等,浪费资源

如果使用非阻塞锁,那么线程A在工作时,线程B不会被阻塞,线程B可以自由做其他事情,没有白白自旋

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
final ReentrantLock reentrantLock= new ReentrantLock();

new Thread(new Runnable() {
@Override
public void run() {
reentrantLock.lock();
System.out.println("线程A在运行中");
try {
Thread.sleep(5*1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
reentrantLock.unlock();

}
}).start();

Thread.sleep(1);// 让线程A先运行

// 线程A在工作时,使用tryLock非阻塞锁,这样线程B不会被线程A阻塞,线程A工作的同时,线程B也可以去干点别的事情!
new Thread(new Runnable() {
@Override
public void run() {
while (!reentrantLock.tryLock()){
System.out.println("线程B在工作中,对链表扫描是否存在key节点");
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}

}
reentrantLock.unlock();

}
}).start();

输出:可以看到在第5秒后,线程B获取到了锁因此退出while循环,使用trylock非阻塞锁后,在这5秒时间内,至少线程B没有白白浪费,而是干了一些查询key节点的工作,这就是scanAndLockForPut的设计思想

1
2
3
4
5
6
线程A在运行中
线程B在工作中,对链表扫描是否存在key节点
线程B在工作中,对链表扫描是否存在key节点
线程B在工作中,对链表扫描是否存在key节点
线程B在工作中,对链表扫描是否存在key节点
线程B在工作中,对链表扫描是否存在key节点

对比scanAndLockForPut的while (!tryLock())即可理解其设计思想:

1
2
3
4
5
6
7
8
9
10
11
private HashEntry<K,V> scanAndLockForPut(K key, int hash, V value) {
HashEntry<K,V> first = entryForHash(this, hash);
HashEntry<K,V> e = first;
HashEntry<K,V> node = null;
int retries = -1; // negative while locating node
while (!tryLock()) { // 某线程没有获取锁的期间,顺便完成了链表的遍历以及新key节点的创建工作
// ......
node = new HashEntry<K,V>(hash, key, value, null);
// ......
return node;
}