yield-bytes

沉淀、分享与无限进步

本文接前面ConcurrentHashMap文章的内容,继续深入到TreeBin这个特殊节点的读写锁竞争机制。

7、TreeBin类设计原理

对TreeBin类深入分析,不仅能够理解为何CHM能支持并发读的底层实现,而且也能加深jk1.8的ConcurrentHashMap整体设计原理。本文将详细深入地解析TreeBin优秀的读写锁控制设计,此部分内容在全网的相关文章很少涉及。

7.1 保证加锁对象不改变的设计思想

首先看其源码的注释说明:

TreeNodes used at the heads of bins. TreeBins do not hold user keys or values, but instead point to list of TreeNodes and their root. They also maintain a parasitic read-write lock forcing writers (who hold bin lock) to wait for readers (who do not) to complete before tree restructuring operations.

TreeBins节点不是用于放置key和value,而是用于指向TreeNodes和TreeNodes的根节点。TreeBins内部会维护一把读写锁,用于保证在树重构前,持有锁的写线程被强制等待无锁读线程完成。

当然这注释也未能回答这样核心问题:为何对于桶位是红黑树的情况下,CHM放置的一个TreeBin节点,而不像HashMap那样放置一个TreeNode节点?

首先看看TreeBin的使用场景,在put场景下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
    synchronized (f) {
if (tabAt(tab, i) == f) {
//
if (fh >= 0) {
// ..
}
//
else if (f instanceof TreeBin) {
Node<K,V> p;
binCount = 2;
if ((p = ((TreeBin<K,V>)f).putTreeVal(hash, key,
value)) != null) {
oldVal = p.val;
if (!onlyIfAbsent)
p.val = value;
}
}
}
}

当key所在桶位可以放入key时,先对头节点加锁synchronized (f),注意这是独占锁,要求头节点对象在锁期间不会改变,否则就不能锁住同一对象。

为论证f头节点是TreeNode类型不适用并发的CHM场景,不妨假设CHM使用TreeNode作为CHM桶位上红黑树的头节点,于是在put场景下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
            // ① 假设当前头节点f是一个TreeNode节点,则执行流会进入②分支
synchronized (f) {
if (tabAt(tab, i) == f) {
//
if (fh >= 0) {
// ..
}
// ② 这里已经将TreeBin换成TreeNode类型
else if (f instanceof TreeNode) {
Node<K,V> p;
binCount = 2;
// ③ 将key和value插入到红黑树当中
if ((p = ((TreeBin<K,V>)f).putTreeVal(hash, key,
value)) != null) {
oldVal = p.val;
if (!onlyIfAbsent)
p.val = value;
}
}
}
}

这里关键是第③步逻辑:将key和value插入到红黑树当中,会发生什么事情?

我们知道,在HashMap节点,将一个节点put入红黑树后,需要做插入平衡处理和将红黑树root节点移到双向链表的头节点位置,目的是为了保证桶位上的头节点即是红黑树的根节点也是双向链表的头结点,下面就是HashMap对应的操作

1
2
3
4
5
6
7
8
final TreeNode<K,V> putTreeVal(HashMap<K,V> map, Node<K,V>[] tab,
int h, K k, V v) {
// 插入平衡处理和将红黑树root节点移到双向链表的头节点位置
moveRootToFront(tab, balanceInsertion(root, x));
return null;
}
}

但对于并发CHM来说,红黑树的插入平衡处理会导致root节点发生了改变(例如插入平衡前根节点是treeNodeA,插入平衡后根节点是treeNodeB)而不是位于桶位头节点上,如果CHM桶位头节点还是TreeNode,那么就会出现以下图示的不能保证写线程独占操作的线程不安全情况。

TreeBin改为TreeNode的锁情况.001

如何解决以上遇到的问题?

阅读全文 »

有了《基于AQS实现的Condition底层原理解析》文章关于条件队列底层设计的讨论后,那么关于使用BlockingQueue接口和AQS设计各种阻塞队列实现——ArrayBlockingQueue、LinkedBlockingQueue、PriorityBlockingQueue的数据结构设计和源代码实现则非常好理解。注意本文的讨论的“阻塞队列”是指代BlockingQueue接口,不是指代AQS源码分析文章中所说的CLH的FIFO阻塞队列。

BlockingQueue

BlockingQueue是一个接口,它有Queue接口常见的方法:add、offer,而put方法和take方法则是BlockingQueue特有的

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
public interface BlockingQueue<E> extends Queue<E> {

/**
* Inserts the specified element into this queue, waiting if necessary
* for space to become available.
*
* @param e the element to add
* @throws InterruptedException if interrupted while waiting
* @throws ClassCastException if the class of the specified element
* prevents it from being added to this queue
* @throws NullPointerException if the specified element is null
* @throws IllegalArgumentException if some property of the specified
* element prevents it from being added to this queue
*/
void put(E e) throws InterruptedException;

/**
* Retrieves and removes the head of this queue, waiting if necessary
* until an element becomes available.
*
* @return the head of this queue
* @throws InterruptedException if interrupted while waiting
*/
E take() throws InterruptedException;

}

其实put和take就是经典的生产者-消费者模型中的逻辑:生产者使用put操作不断向“容器”中放入元素,消费者使用take操作不断从“容器”中取出(消费)元素。只不过对于BlockingQueue接口来说,put和take都要求实现此BlockingQueue接口的子类需实现“阻塞机制”,也即:

1、对于put操作来说:生产者线程(1个或者多线程并发)向容器(该容器可以是基于数组实现或者基于链表实现)put一个元素(item),如果此容器已满,则所有生产者线程的put操作都会被阻塞直到“容器不满notFull”时。

结合AQS的条件队列的阻塞机制设计可推出:对于put的阻塞,其实就是让执行put操作的生产者线程在一个“特定的”条件队列中阻塞,这里的阻塞实现就是使用该条件队列的await方法达成。

2、对于take操作来说:消费者线程(1个或者多线程并发)从容器(该容器可以是基于数组实现或者基于链表实现)中take一个元素(item),如果此容器为空,则所有消费者线程的take操作都会被阻塞直到“容器不空notEmtpy”时。

结合AQS的条件队列的阻塞机制设计可推出:对于take的阻塞,其实就是让执行take操作的消费者线程在一个“特定的”条件队列中阻塞,这里的阻塞实现就是使用该条件队列的await方法达成。

阅读全文 »

关于AQS的独占模式同步器设计原理(以ReentrantLock为例)以及共享模式的同步器设计原理(以Semaphore为例)在前面文章的已经讨论完毕,这两种模式让我们理解了AQS通过底层的FIFO阻塞队列(又称同步队列/变体的CLH队列/Sync queue)实现了相当巧妙的多线程协调调度的复杂逻辑。当然AQS还有一个更为关键的设计:结合FIFO阻塞队列+条件队列(又称condition queue/wait queue)实现一种基于条件的await和signal的多线程间的协调机制,也即本文内容。

关于条件队列和阻塞队列的说明

对于阻塞队列,这里只给出独占模式的线程节点说明:阻塞队列其实有AQS内部定义的双向链表节点的属性如下:

1
2
3
4
5
6
7
8
   static final class Node {
volatile int waitStatus;
volatile Node prev;
volatile Node next;
volatile Thread thread;
}
Node head; // Head of the wait queue
Node tail; // Tail of the wait queue

而条件队列中,它是有单向链表实现,该单向链表的节点的属性为:

1
2
3
Node firstWaiter; // First node of condition queue.
Node lastWaiter; // Last node of condition queue.
Node nextWaiter; // Link to next node waiting on condition,

可以看到三个属性用于构成单向链表,而条件队里的nextWaiter指针是为了区别阻塞队列中的next指针。

阻塞队列的作用在前面的文章已经给出很明确的说明:只要当前线程没有请求到锁资源(state),则需要进入CLH阻塞队列进行排队等候,那么AQS给出的条件队列到底是解决什么场景呢?

这里不妨考虑这种场景:

消费者线程:消费者线程原本在阻塞队列等待,当外界有线程释放了锁资源,那么此消费者线程从阻塞队列被唤醒后出队并拿到锁资源后,发现“存放货物的队列是空的”,这种未加入“条件限制”的等待线程调度策略则显得不够明智。

因此可以这么设计:给当前独占锁lock对象添加添加一个条件队列:如果有一个或者多消费者线程过来取“货物”,当遇到“仓库没有货物可取”这种条件时,那么这些消费者线程先被安排在条件队列等待(阻塞自己):

firstWaiter(c0)->c1->c2-c3->c4->lastWaiter(c5)->null

直到条件“仓库有货物可取时”,那么条件队列的消费者线程再转移到阻塞队列里面排队等候被唤醒去抢占锁资源以实施消费行为

阅读全文 »

在前面文章中,addCount方法分为两部分:

第一部分是put一个节点后,需要对size加1计数,这部分交由fullAddCount完成,它的设计和逻辑可谓精妙,非常值得在实际项目参考其代码实现。

第二部分是加1计数后,需要判断是否需要对table进行扩容,扩容思想设计及其源代码实现同样非常精妙,值得多次阅读和学以致用!

本文将重点深入分析CHM核心扩容逻辑:transfer、helpTransfer、以及resizeStamp。

CHM的sc设置基数图示

《gitee 博客文章封面》

1、addCount的扩容判断设计

第1个执行扩容线程

本章节最精彩的地方:分析Doug Lea 如何安排“每个加入扩容任务线程对sc进行cas加1计数”、“每个结束自己扩容任务线程对sc进行减1”、以及“最后一个结束扩容线程要干些什么收尾工作”。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
// putVal调用addCount(1L, binCount),因此这里x就是1,check即binCount,对于put入一个key(key已存在,则binCount=0,新key,binCount>0),那么binCount自然是>=0
private final void addCount(long x, int check) {
CounterCell[] as; long b, s;
// 主分支1:完成加1计数逻辑,之前文章fullAddCount已经详细分析,本文重点讲解addCount主分支2
// ......忽略部分


// 主分支2:当前线程检查是否需要扩容,若需要,则执行transfer扩容逻辑
// check即binCount,每次新增节点,当然要检查是否需要扩容
if (check >= 0) {
Node<K,V>[] tab, nt; int n, sc;
// 这里的s就是主分支1里面的 s = sumCount(),含义:完成加1计数后,统计当前CHM节点总数量s,看看s有无达到扩容阈值sizeCtl
while (s >= (long)(sc = sizeCtl) && (tab = table) != null &&
(n = tab.length) < MAXIMUM_CAPACITY) {
int rs = resizeStamp(n); // jdk 1.8这里的写法是一个bug,后面有指出原因。
//分支2.1:sc的值为负数时,表明CHM还在扩容期,原因参考后面小节的sc、resizeStamp方法的解析
if (sc < 0) {
// 分支2.1.1:while循环扩容结束点,扩容结束条件有5个,(nt = nextTable) == null 以及transferIndex <= 0条件再看完transfer源码解析后,可以很容易理解,但前面3个条件目前理解会很困难,需要理解后面小节的sc、resizeStamp方法解析后才能准确理解其含义,也即这里先跳过这个5个条件的解释。
if ((sc >>> RESIZE_STAMP_SHIFT) != rs || sc == rs + 1 ||
sc == rs + MAX_RESIZERS || (nt = nextTable) == null ||
transferIndex <= 0)
break;
// 分支2.1.2:第1个执行扩容线程在分支2.2将sc设置基数值后,以后每进来一个扩容线程都会对sc进行cas加1
if (U.compareAndSwapInt(this, SIZECTL, sc, sc + 1))
transfer(tab, nt);
}
//分支2.2:第1个执行扩容线程会执行此逻辑,将sc(sizeCtl)设为一个基础数(该数为负数),为什么设置一个负数呢?后面resizeStamp方法给出非常完整解答!没有充分积累,此处看起来将很难理解其设计意图。
else if (U.compareAndSwapInt(this, SIZECTL, sc,
(rs << RESIZE_STAMP_SHIFT) + 2))
transfer(tab, null);
s = sumCount();
}
}
}
阅读全文 »

前言

虽然jdk1.7的CHM能够解决非阻塞高并发的读写情况,但它仍然不够理想,例如写并发度受限于Segment数组初始大小,一旦创建就无法再扩容并发度;如果key非常多且分布不均匀,例如都落在同一个Segment位置上,相当于多个线程竞争同一把“全局锁”,CHM“仿佛”退化为HashTable。此外,因为CHM的Segment上使用jdk1.7的HashMap,它的性能显然没有jdk1.8的HashMap强,jdk1.7resize扩容处理时只能单线程完成扩容操作,jdk1.7计算size方法可能要对每个Segment加阻塞锁,基于以上jdk1.7的CHM缺点,Doug Lea重新设计新版本的CHM,其源代码行数总共6312行,性能确实高了,但代价是代码逻辑的复杂度要高出很多。

理解本文所有源码分析以及相关技术都需要读者已经掌握1.8/1.7HashMap、1.7 CHM、Unsafe、CAS这些进阶知识,否则难以消化相关知识,可以提前阅读本博客相关文章:

(以下“CHM”表示ConcurrentHashMap简写)

数据结构图示

分为普通状态下(非扩容期间)和扩容期间

(1)CHM非扩容时,也即普通正常状态下的内部数据结构图:

可以看到跟jdk1.8的HashMap数据结构不同的地方:CHM用了一个称为TreeBin的节点作为桶位头节点,不是HashMap的TreeNode:
在这里插入图片描述
(2)CHM正在扩容时的内部数据结构图:

可以看到正在扩容时的内部节点结构,数组尾部的一些桶位头节点放入了ForwardingNode节点。

阅读全文 »

信号量是典型的共享模式:

何为共享模式:同一时间可以有多个线程同时持有锁资源,强调的是线程并行概念。

独占模式:同一时间只能有一个线程持有锁资源

1、Semaphore信号量说明

底层实现原理:使用AQS同步状态的state来保存信号量的当前计数。release(1)表示线程释放了1个锁资源,对应的state值加1,acquire(1)表示线程消耗1个数量的锁资源,对应的state值减1。一般用于限制线程并发工作数量。

2、Semaphore使用acquire()的小demo

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
public class SemaphoreDemo {
public static void main(String[] args) throws InterruptedException {
Semaphore semaphore = new Semaphore(5);
List<Thread> threadList=new ArrayList<>();
for (int i = 0; i < 10; i++) {
Thread t= new Thread(()->{
try {
semaphore.acquire();
System.out.println("线程" + Thread.currentThread().getId() + "已拿到锁资源");
Thread.sleep(5000);
} catch (InterruptedException e) {
e.printStackTrace();
}finally {
semaphore.release();
}
});
threadList.add(t);
}
for (Thread t : threadList) {
t.start();
}
for (Thread t : threadList) {
t.join();
}
}
}

可以观察到结果:5个线程能够同时成功抢到锁资源并执行了5秒,后面5个线程因为已经没有可用资源,只能等待前面5个线程运行结束后才能继续。

注意Semaphore在初始化的构造器也是可以指定公平模式和非公平模式,以下将以默认的公平模式去讨论AQS是如何支撑Semaphore的工作过程

3、Semaphore获取锁资源acquire的工作流程

阅读全文 »

Unsafe类在整套JUC框架中绝对是核心的一个概念,它是实现free-lock的底层核心设计,它内部直接调用的是Java的JNI,只有理解它的CAS原子操作的内部设计原理,才能更加深入理解JUC的free-lock设计。

使用unsafe操作数组

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
package hashmap.demo;

import java.lang.reflect.Field;
import sun.misc.Unsafe;


public class Unsafe3 {

private static Unsafe unsafe;

static{
try {
Field field = Unsafe.class.getDeclaredField("theUnsafe");
field.setAccessible(true);
unsafe = (Unsafe)field.get(null);
} catch (Exception e) {
e.printStackTrace();
}
}

public static void main(String[] args) throws Exception {

int[] intArr1={10,20,30,40};
int[] intArr2={1,3,5,7,8,9,10,1,1,1,1,1,1,1,1};

// byte类型数组内存每个"单元格"容量是1个字节,对应Scale就是1
int byteScale=unsafe.arrayIndexScale(byte[].class);
// short类型数组内存每个"单元格"容量是2个字节,对应Scale就是2
int shortScale=unsafe.arrayIndexScale(short[].class);
// int类型数组内存每个"单元格"容量是4个字节,对应Scale就是4
int intScale=unsafe.arrayIndexScale(int[].class);
// long类型数组内存每个"单元格"容量是8个字节,对应Scale就是8
int longScale=unsafe.arrayIndexScale(long[].class);
// Integer类型数组内存每个"单元格"容量是4个字节,对应Scale就是4
int integerScale=unsafe.arrayIndexScale(Integer[].class);
// String类型数组内存每个"单元格"容量是4个字节,对应Scale就是4
int stringScale=unsafe.arrayIndexScale(String[].class);

//
boolean b=intArr1.getClass().equals(intArr2.getClass()) && int[].class.equals(intArr1.getClass()) && int[].class.equals(intArr2.getClass());
System.out.println(b); // true

// intArr1数组实例和intArr2的数组实例都是同一对象,指向int[]
System.out.println(intArr1.getClass().equals(intArr2.getClass()));


// 获取当前intArr1数组实例的相对基址
long baseOffset1=unsafe.arrayBaseOffset(intArr1.getClass());

System.out.println(baseOffset1); // 16
// 获取当前intArr2数组实例的相对基址
long baseOffset2=unsafe.arrayBaseOffset(intArr2.getClass());
System.out.println(baseOffset2); // 16
// 从baseOffset1和baseOffset2的值都是16可以看出,这是他们指向对象的相对基址,而不是指向对象绝对地址(如果是执行对象绝对地址,那么这里两个值一定不同)


System.out.println(unsafe.getIntVolatile(intArr1,baseOffset1)); //打印intArr1第1个(首地址)对应的元素,也就是intArr1[0]
System.out.println(unsafe.getIntVolatile(intArr1,baseOffset1+intScale)); //打印intArr1的第2个元素,也就是intArr1[1]
System.out.println(unsafe.getIntVolatile(intArr1,baseOffset1+intScale* 2L)); //打印intArr1的第3个元素,也就是intArr1[2]
System.out.println(unsafe.getIntVolatile(intArr1,baseOffset1+intScale* 3L)); //打印intArr1的第4个元素,也就是intArr1[3]

// 获取intArr2的数据的第3个元素
System.out.println(unsafe.getIntVolatile(intArr2,baseOffset2+intScale* 2L)); //打印intArr2第3个对应的元素,也就是intArr2[2]

}
}

阅读全文 »

本文是入门和理解AQS框架的重要文章,尽管AQS还有共享模式以及条件Condition等设计,但重入锁仍然是最适合理解AQS底层数据结构及其算法设计的切入点。

单线程使用可重入锁的内部简单工作机制

分别在以下两个断点位置进行debug,断点条件i==5,并且在variables窗口watch一个特殊的变量state

1
2
3
4
5
6
7
8
9
10
11
12
13
14
import java.util.concurrent.locks.ReentrantLock;

public class ReentrantLockDemo {
public static ReentrantLock lock=new ReentrantLock();
public static void main(String[] args) {
for (int i = 1; i <= 5; i++) {
lock.lock(); // 断点位置
}
for (int i = 1; i <= 5; i++) {
lock.unlock();// 断点位置
}
}
}

可以看到lock.lock()的内部执行流程如下所示,

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
// 1、java.util.concurrent.locks.ReentrantLock$NonfairSync@29444d75[State = 4, empty queue]
public void lock() {
sync.lock();
}

// 2、java.util.concurrent.locks.ReentrantLock
final void lock() {
if (compareAndSetState(0, 1)) // 重点:使用cas更新state的值
setExclusiveOwnerThread(Thread.currentThread());
else
acquire(1);
}

// 3、java.util.concurrent.locks.AbstractQueuedSynchronizer
protected final boolean compareAndSetState(int expect, int update) {
// See below for intrinsics setup to support this
return unsafe.compareAndSwapInt(this, stateOffset, expect, update);
}

// 4、java.util.concurrent.locks.AbstractQueuedSynchronizer
public final void acquire(int arg) {
if (!tryAcquire(arg) && // 调用下面5在ReentrantLock定义的tryAcquire方法
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}

// 5、java.util.concurrent.locks.ReentrantLock
protected final boolean tryAcquire(int acquires) {
return nonfairTryAcquire(acquires);
}

// 6、java.util.concurrent.locks.ReentrantLock
final boolean nonfairTryAcquire(int acquires) {
final Thread current = Thread.currentThread();
int c = getState();// 重点:获取state变量当前值
if (c == 0) {
if (compareAndSetState(0, acquires)) {
setExclusiveOwnerThread(current);
return true;
}
}
else if (current == getExclusiveOwnerThread()) {
int nextc = c + acquires;
if (nextc < 0) // overflow
throw new Error("Maximum lock count exceeded");
setState(nextc); // 重点:更新state变量值
return true;
}
return false;
}

经过这么一轮debug,可以观察到原来lock.lock() 通过cas更改state变量值来实现“可重入性”,例如,这里for循环5次使得该线程在同一锁对象上加锁了5次,可以看到对应的state累加计数等于5,基于此可以推出lock.unlock()操作则是每次unlock()就是对state进行cas减1操作,如下:

阅读全文 »

阅读本文前,需要部分JDK源码深度知识储备:Unsafe与CAS原理、jdk1.7的HashMap原理设计以及分析过其源代码

在这里插入图片描述

《gitee 博客封面图》

1、为何引入ConcurrentHashMap这种适应并发场景数据结构?

  • HashTable

从Java7和Java8的HashMap源码设计可以得出基本结论:两者都不是线程安全,如果非得在并发情况下使用它们,会出现一些问题,如Java7的HashMap死循环导致CPU利用率飙高、put/get不一致问题等,当然也有绝对线程安全的HashTable,但从其源码实现来看,HashTable简单粗暴地在put/get/size等方法前加一个synchronized 修饰:

1
2
3
4
public synchronized V get(Object key) 
public synchronized V put(K key, V value)
public synchronized int size()
//...

在并发操作HashTable的线程数量不多的场景下,其性能影响不大,而且线程隔离程度最高(保证线程安全);但在高并发场景下,HashTable的性能显得力不从心,考察以下情况:

例如有1000个线程对HashTable(数组长度为1024)进行读写操作,由于synchronized是对整个数组进行加锁,只要有一个线程在数组的某个桶位上进行put/get等操作,其他999个线程都会被阻塞无法做其他事情,就连读操作也会被阻塞而不能并发读。

阅读全文 »

这可能是全网最期待的jdk1.8的红黑树balanceDeletion的源代码解析技术文章!

其实掌握HashMap红黑树的同学都知道,balanceDeletion方法的源代码是HashMap红黑树部分最复杂也是最难理解的部分,目前少有coder对balanceDeletion有足够深入且可理解的分析,绝大部分关于深入HashMap分析的文章都会跳过balanceDeletion源代码,有部分文章的coder他并不直接给出balanceDeletion的源代码解析,而是自行实现非HashMap的“红黑树删除平衡”代码,如链接,但显然不能跟jdk源码高质量功能相比(HashMap源码里面的removeTreeNodebalanceDeletion的代码设计是最完整的),因此要想真正掌握完整jdk级别的HashMap红黑树的balanceDeletion逻辑,那么源代码解析肯定要搬出来。

目前个人认可的文章是这篇文章,个人也给它留了评论和鼓励(该博客作者能深钻JUC源代码实现),但也发现该文在解析balanceDeletion源码、图示(少部分)不够直观、简约、清晰,因此亲自实现一篇相对高质量且尽量可理解的removeTreeNodebalanceDeletion源代码分析,本文不会跟类似文章图或者文章组织或者思路重复。

在这里插入图片描述

《gitee 博客文章封面》

阅读全文 »