yield-bytes

沉淀、分享与无限进步

CopyOnWriteArrayList数据结构的源代码核心方法深入解析

使用场景

ArrayList并发读写不安全示例:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
import java.util.*;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

public class ArrayListDemo {
public static void main(String[] args) throws InterruptedException {
// CopyOnWriteArrayList<String> list= new CopyOnWriteArrayList<>(Arrays.asList("foo","bar"));
ArrayList<String> list= new ArrayList<>(Arrays.asList("foo","bar"));
ArrayList<Thread> writeTreadList=new ArrayList<>();
ArrayList<Thread> readTreadList=new ArrayList<>();
int threadNum=10;
ExecutorService service= Executors.newFixedThreadPool(threadNum);
for(int i=0;i<threadNum;i++){
service.execute(new Writer(list));
service.execute(new Reader(list));
}
service.shutdown();
}
}

class Writer implements Runnable{
List<String> list;

public Writer(List<String> list){
this.list=list;
}
@Override
public void run() {
this.list.add("writer");
}
}

class Reader implements Runnable{
List<String> list;
public Reader(List<String> list){
this.list=list;
}
@Override
public void run() {
System.out.println(Thread.currentThread().getName()+":"+list);
}
}

提示ConcurrentModificationException:并发修改异常。这是因为有写线程和读线程同时操作list因为有写线程添加元素后,会改动modCount,导致某个读线程在读之前拿到的expectedModCount不等于正在读过程中modCount,故会在next()方法抛出异常

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
pool-1-thread-4:[foo, bar, writer, writer]
pool-1-thread-7:[foo, bar, writer, writer, writer]
pool-1-thread-9:[foo, bar, writer, writer, writer, writer]
pool-1-thread-11:[foo, bar, writer, writer, writer, writer, writer]
pool-1-thread-11:[foo, bar, writer, writer, writer, writer, writer, writer]
pool-1-thread-11:[foo, bar, writer, writer, writer, writer, writer, writer, writer]
pool-1-thread-11:[foo, bar, writer, writer, writer, writer, writer, writer, writer, writer]
pool-1-thread-11:[foo, bar, writer, writer, writer, writer, writer, writer, writer, writer, writer]
pool-1-thread-11:[foo, bar, writer, writer, writer, writer, writer, writer, writer, writer, writer, writer]
Exception in thread "pool-1-thread-2" java.util.ConcurrentModificationException
at java.util.ArrayList$Itr.checkForComodification(ArrayList.java:909)
at java.util.ArrayList$Itr.next(ArrayList.java:859)
at java.util.AbstractCollection.toString(AbstractCollection.java:461)
at java.lang.String.valueOf(String.java:2994)
at java.lang.StringBuilder.append(StringBuilder.java:131)
at concurrent.demo.Reader.run(ArrayListDemo.java:43)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)

ArrayList换成CopyOnWriteArrayList即可正常运行,因为CopyOnWriteArrayList 允许并发的读写操作,写操作不会影响到读操作,这就是所谓的读写分离设计。

构造方法

导读:CopyOnWriteArrayList是Java并发包中提供的一个并发容器,它是个线程安全且读操作无锁的ArrayList,写操作则通过创建底层数组的新副本来实现,是一种读写分离的并发策略,我们也可以称这种容器为”写时复制器”,Java并发包中类似的容器还有CopyOnWriteSet。本文会对CopyOnWriteArrayList的实现原理及源码进行分析

  • CopyOnWriteArrayList在修改容器元素的时候并不是直接在原来的数组上进行修改,它是先拷贝一份,然后在拷贝的数组上进行修改,在修改完成之后将引用赋给原来的数组。
关键成员参数
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
/** The lock protecting all mutators */
// 全局锁:仅在clone方法和readObject方法上有效
final transient ReentrantLock lock = new ReentrantLock();

/** The array, accessed only via getArray/setArray. */
// 注意这个array是volatile类型,意味着array发生改变时,读线程对这种改变是可见的
private transient volatile Object[] array;

/**
* Gets the array. Non-private so as to also be accessible
* from CopyOnWriteArraySet class.
*/
final Object[] getArray() {
return array;
}

/**
* Sets the array.
*/
final void setArray(Object[] a) {
array = a;
}

// 默认构造方法创建一个空数组
public CopyOnWriteArrayList() {
setArray(new Object[0]);
}

相关读方法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
public int indexOf(Object o) {
// 取出volatile的array,显然读操作不用加锁
Object[] elements = getArray();
return indexOf(o, elements, 0, elements.length);


/**
* {@inheritDoc}
*
* @throws IndexOutOfBoundsException {@inheritDoc}
*/
public E get(int index) {
//取出volatile的array,因为volatile语义能保证array的修改能够对当前读线程可见
return get(getArray(), index);

这里只给出两个方法,可以看到读是不需要加锁的

1
2
3
4
5
6
7
8
9
10
public void forEach(Consumer<? super E> action) {
if (action == null) throw new NullPointerException();
// 取出volatile的array,其他线程对array的写操作对当前读线程可见
Object[] elements = getArray();
int len = elements.length;
for (int i = 0; i < len; ++i) {
@SuppressWarnings("unchecked") E e = (E) elements[i];
action.accept(e);
}
}

相关写方法

add
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
public boolean add(E e) {
//全局独占锁,写操作仅能由一个线程操作
final ReentrantLock lock = this.lock;
lock.lock();
try {
// 获取旧数组
Object[] elements = getArray();
int len = elements.length;
//创建新数组用于存放添加1个元素和旧数组元素,
Object[] newElements = Arrays.copyOf(elements, len + 1);
newElements[len] = e;
// 将volatile的array指向新数组,从这里也可以看出:只要写线程还未执行setArray(newElements)逻辑,那么同一时刻的读线程读取的数组必然是原数组,而非添加了元素的新数组。
setArray(newElements);
return true;
} finally {
lock.unlock();
}
}

remove
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
public E remove(int index) {
//和add方法一样,
final ReentrantLock lock = this.lock;
lock.lock();
try {
Object[] elements = getArray();
int len = elements.length;
E oldValue = get(elements, index);
int numMoved = len - index - 1; // (len-1)-index
if (numMoved == 0)
setArray(Arrays.copyOf(elements, len - 1));
else {
//创建新数组用于存放删除1个节点的旧数组元素
Object[] newElements = new Object[len - 1];
System.arraycopy(elements, 0, newElements, 0, index);
System.arraycopy(elements, index + 1, newElements, index,
numMoved);
//将volatile的array指向新数组,从这里也可以看出:只要写线程还未执行setArray(newElements)逻辑,那么同一时刻的读线程读取的数组必然是包含删除元素的原数组。
setArray(newElements);
}
return oldValue;
} finally {
lock.unlock();
}
}

这里还有一个remove方法:

1
2
3
4
5
6
public boolean remove(Object o) {
// 这里为何使用snapshot快照这样命名? 因为这是在未加锁条件下读取的array数组,不保证读取的array是修改的新array,而是指在读取时刻的array,类似mysql的不可重复读机制.
Object[] snapshot = getArray();
int index = indexOf(o, snapshot, 0, snapshot.length);
return (index < 0) ? false : remove(o, snapshot, index);

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
private boolean remove(Object o, Object[] snapshot, int index) {
final ReentrantLock lock = this.lock;
lock.lock();
try {
Object[] current = getArray();
int len = current.length;
// 前面说到snapshot是加锁之前的快照,因此在这里加锁之后,需要再次检查加锁前、后的过程中快照数组和当前array是否一致。
if (snapshot != current) findIndex: {
//snapshot时刻前,和snapshot时刻后两个时刻array元素个数不一样的话,取最小的
int prefix = Math.min(index, len);
for (int i = 0; i < prefix; i++) {
if (current[i] != snapshot[i] && eq(o, current[i])) {
index = i;
break findIndex;
}
}
if (index >= len)
return false;
if (current[index] == o)
break findIndex;
index = indexOf(o, current, index, len);
if (index < 0)
return false;
}
Object[] newElements = new Object[len - 1];
System.arraycopy(current, 0, newElements, 0, index);
System.arraycopy(current, index + 1,
newElements, index,
len - index - 1);
setArray(newElements);
return true;
} finally {
lock.unlock();
}
}
clear
1
2
3
4
5
6
7
8
9
public void clear() {
final ReentrantLock lock = this.lock;
lock.lock();
try { // clear 过程中也加了独占锁,而且不需要遍历,clear性能很高
setArray(new Object[0]);
} finally {
lock.unlock();
}
}

从以上相关的写操作可以得出一个结论:如果写操作很频繁,那么CopyOnWriteArrayList的写入性能很低,因为每次写操作都需要将原数组元素拷贝到一个新数组中,如果底层的array是大数组,那么将会占用两倍内存空间。

sort

sort也需要加独占锁,保证

1
2
3
4
5
6
7
8
9
10
11
12
13
public void sort(Comparator<? super E> c) {
final ReentrantLock lock = this.lock;
lock.lock();
try {
Object[] elements = getArray();
Object[] newElements = Arrays.copyOf(elements, elements.length);
@SuppressWarnings("unchecked") E[] es = (E[])newElements;
Arrays.sort(es, c);
setArray(newElements);
} finally {
lock.unlock();
}
}
关于迭代器snapshot机制
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
import java.util.*;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

public class ArrayListDemo {
public static void main(String[] args) throws InterruptedException {
CopyOnWriteArrayList<String> list= new CopyOnWriteArrayList<>(Arrays.asList("foo","bar"));
Thread t=new Thread(new Writer(list));
// 在“写线程启动”之前,创建了iterator,该迭代器内部会在此刻创建一个snapshot快照数组,该snapshot引用指向的是未被修改的原数组:Arrays.asList("foo","bar"),
ListIterator<String> iter=list.listIterator();
t.start();
t.join(); // 写线程结束后,内部array引用指向的是新数组对象
//iterator内部是用旧数组进行迭代:因此输出时foo、bar,而不是输出foo、bar、writer1、writer2
while (iter.hasNext()) System.out.println(iter.next());
}
}

class Writer implements Runnable{
List<String> list;

public Writer(List<String> list){
this.list=list;
}
@Override
public void run() {
this.list.add("writer1");
this.list.add("writer2");
}
}
}

理解以上demo输出,需要对其源代码掌握和理解:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
  public ListIterator<E> listIterator(int index) {
Object[] elements = getArray();
int len = elements.length;
if (index < 0 || index > len)
throw new IndexOutOfBoundsException("Index: "+index);

return new COWIterator<E>(elements, index);
}

static final class COWIterator<E> implements ListIterator<E> {
/** Snapshot of the array */
// 这里是关键,当执行Iterator<String> itr = list.iterator()时,迭代器内部其实先生成一份底层数组的snapshot快照,之后迭代器读取的数组都是这个快照数组(也即旧时刻下的数组),而不是读取最新的、正在修改的数组
private final Object[] snapshot;
/** Index of element to be returned by subsequent call to next. */
private int cursor;

private COWIterator(Object[] elements, int initialCursor) {
cursor = initialCursor;
snapshot = elements;
}

public boolean hasNext() {
return cursor < snapshot.length;
}

public boolean hasPrevious() {
return cursor > 0;
}

@SuppressWarnings("unchecked")
public E next() {
if (! hasNext())
throw new NoSuchElementException();
return (E) snapshot[cursor++];
}

@SuppressWarnings("unchecked")
public E previous() {
if (! hasPrevious())
throw new NoSuchElementException();
return (E) snapshot[--cursor];
}
CopyOnWrite的缺点

CopyOnWrite容器有很多优点,但是同时也存在两个问题,即内存占用问题和数据一致性问题。所以在开发的时候需要注意一下。

「内存占用问题」。因为CopyOnWrite的写时复制机制,所以在进行写操作的时候,内存里会同时驻扎两个对象的内存,旧的对象和新写入的对象(注意:在复制的时候只是复制容器里的引用,只是在写的时候会创建新对象添加到新容器里,而旧容器的对象还在使用,所以有两份对象内存)。如果这些对象占用的内存比较大,比如说200M左右,那么再写入100M数据进去,内存就会占用300M,那么这个时候很有可能造成频繁的Yong GC和Full GC。之前我们系统中使用了一个服务由于每晚使用CopyOnWrite机制更新大对象,造成了每晚15秒的Full GC,应用响应时间也随之变长。

ArrayList适合单线程情况下的数组场景,Vector适合多线程场景,但Vector每个方法前都有Synchronized修饰,因此写线程的操作会阻塞读线程的读操作,性能低。