前言
Java中的FutureTask作为可异步执行任务并可获取执行结果而被大家所熟知。通常可以使用future.get()来获取线程的执行结果,在线程执行结束之前,get方法会一直阻塞状态,
直到call()返回,其优点是使用线程异步执行任务的情况下还可以获取到线程的执行结果,但是FutureTask的以上功能却是依靠通过一个叫AbstractQueuedSynchronizer的类来实现,
至少在JDK1.5、JDK1.6版本是这样的(从1.7开始FutureTask已经被其作者Doug Lea修改为不再依赖AbstractQueuedSynchronizer实现了,这是JDK1.7的变化之一)。
但是AbstractQueuedSynchronizer在JDK1.8中还有如下图所示的众多子类:
这些JDK中的工具类或多或少都被大家用过不止一次,比如ReentrantLock,我们知道ReentrantLock的功能是实现代码段的并发访问控制,
也就是通常意义上所说的锁,在没有看到AbstractQueuedSynchronizer前,可能会以为它的实现是通过类似于synchronized,通过对对象加锁来实现的。
但事实上它仅仅是一个工具类!没有使用更“高级”的机器指令,不是关键字,也不依靠JDK编译时的特殊处理,仅仅作为一个普普通通的类就完成了代码块的并发访问控制,
这就更让人疑问它怎么实现的代码块的并发访问控制的了。那就让我们一起来仔细看下Doug Lea怎么去实现的这个锁。为了方便,本文中使用AQS代替AbstractQueuedSynchronizer。
细说AQS
在深入分析AQS之前,我想先从AQS的功能上说明下AQS,站在使用者的角度,AQS的功能可以分为两类:独占功能和共享功能,它的所有子类中,要么实现并使用了它独占功能的API,要么使用了共享锁的功能,而不会同时使用两套API,即便是它最有名的子类ReentrantReadWriteLock,也是通过两个内部类:读锁和写锁,分别实现的两套API来实现的,为什么这么做,后面我们再分析,到目前为止,我们只需要明白AQS在功能上有独占控制和共享控制两种功能即可。
独占锁
在真正对解读AQS之前,我想先从使用了它独占控制功能的子类ReentrantLock说起,分析ReentrantLock的同时看一看AQS的实现,再推理出AQS独特的设计思路和实现方式。最后,再看其共享控制功能的实现。
对于ReentrantLock,使用过的同学应该都知道,通常是这么用它的:1
2
3reentrantLock.lock()
//do something
reentrantLock.unlock()
ReentrantLock会保证 do something在同一时间只有一个线程在执行这段代码,或者说,同一时刻只有一个线程的lock方法会返回。其余线程会被挂起,直到获取锁。从这里可以看出,其实ReentrantLock实现的就是一个独占锁的功能:有且只有一个线程获取到锁,其余线程全部挂起,直到该拥有锁的线程释放锁,被挂起的线程被唤醒重新开始竞争锁。没错,ReentrantLock使用的就是AQS的独占API实现的。
那现在我们就从ReentrantLock的实现开始一起看看重入锁是怎么实现的。
首先看lock方法:
1 | public void lock() { |
如FutureTask(JDK1.6)一样,ReentrantLock内部有代理类完成具体操作,ReentrantLock只是封装了统一的一套API而已。值得注意的是,使用过ReentrantLock的同学应该知道,ReentrantLock又分为公平锁和非公平锁,所以,ReentrantLock内部只有两个sync的实现:
公平锁:每个线程抢占锁的顺序为先后调用lock方法的顺序依次获取锁,类似于排队吃饭。
非公平锁:每个线程抢占锁的顺序不定,谁运气好,谁就获取到锁,和调用lock方法的先后顺序无关,类似于堵车时,加塞的那些XXXX。
到这里,通过ReentrantLock的功能和锁的所谓排不排队的方式,我们是否可以这么猜测ReentrantLock或者AQS的实现(现在不清楚谁去实现这些功能):有那么一个被volatile修饰的标志位叫做key,用来表示有没有线程拿走了锁,或者说,锁还存不存在,还需要一个线程安全的队列,维护一堆被挂起的线程,以至于当锁被归还时,能通知到这些被挂起的线程,可以来竞争获取锁了。
至于公平锁和非公平锁,唯一的区别是在获取锁的时候是直接去获取锁,还是进入队列排队的问题了。为了验证我们的猜想,我们继续看一下ReentrantLock中公平锁的实现:1
2
3final void lock() {
acquire(1);
}
调用到了AQS的acquire方法:1
2
3
4
5public final void acquire(int arg) { //tryAcquire 由子类实现,模板方法模式
if (!tryAcquire(arg) && //如果没有获取到,则addWaiter加入等待队列,并挂起线程
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt(); //挂起后唤醒返回的中断状态是true的话,这里会中断当前线程
}
从方法名字上看语义是,尝试获取锁,获取不到则创建一个waiter(当前线程)后放到队列中,这和我们猜测的好像很类似。[G1]先看下tryAcquire方法:1
2
3protected boolean tryAcquire(int arg) {
throw new UnsupportedOperationException();
}
留空了,Doug Lea是想留给子类去实现(既然要给子类实现,应该用抽象方法,但是Doug Lea没有这么做,原因是AQS有两种功能,面向两种使用场景,需要给子类定义的方法都是抽象方法了,会导致子类无论如何都需要实现另外一种场景的抽象方法,显然,这对子类来说是不友好的。)
看下FairSync的tryAcquire方法:
getState方法是AQS的方法,因为在AQS里面有个叫statede的标志位 :
事实上,这个state就是前面我们猜想的那个“key”!
回到tryAcquire方法:
1 | protected final boolean tryAcquire(int acquires) { |
到此,如果如果获取锁,tryAcquire返回true,反之,返回false,回到AQS的acquire方法。
如果没有获取到锁,按照我们的描述,应该将当前线程放到队列中去,只不过,在放之前,需要做些包装。
先看addWaiter方法:1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19private Node addWaiter(Node mode) {
//初始化一个node节点
Node node = new Node(Thread.currentThread(), mode);
//先尝试直接加入到尾节点后面
// Try the fast path of enq; backup to full enq on failure
Node pred = tail;
if (pred != null) {
node.prev = pred;//从这里也可以看出,先将node的pre指向尾节点,然后cas设置tail,再将原tail的next指向新节点,
//所以可能next为空的情况存在,但是已经加入的节点的pre肯定是存在
if (compareAndSetTail(pred, node)) {
pred.next = node;
return node;
}
}
//失败的话,for循环loop加入
enq(node);
return node;
}
用当前线程去构造一个Node对象,mode是一个表示Node类型的字段,仅仅表示这个节点是独占的,还是共享的,或者说,AQS的这个队列中,哪些节点是独占的,哪些是共享的。
这里lock调用的是AQS独占的API,当然,可以写死是独占状态的节点。
创建好节点后,将节点加入到队列尾部,此处,在队列不为空的时候,先尝试通过cas方式修改尾节点为最新的节点,如果修改失败,意味着有并发,这个时候才会进入enq中死循环,“自旋”方式修改。
将线程的节点接入到队里中后,当然还需要做一件事:将当前线程挂起!这个事,由acquireQueued来做。
在解释acquireQueued之前,我们需要先看下AQS中队列的内存结构,我们知道,队列由Node类型的节点组成,其中至少有两个变量,一个封装线程,一个封装节点类型。
而实际上,它的内存结构是这样的(第一次节点插入时,第一个节点是一个空节点,代表有一个线程已经获取锁,事实上,队列的第一个节点就是代表持有锁的节点):
黄色节点为队列默认的头节点,每次有线程竞争失败,进入队列后其实都是插入到队列的尾节点(tail后面)后面。这个从enq方法可以看出来,上文中有提到enq方法为将节点插入队列的方法:1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18private Node enq(final Node node) {
//loop操作,tail不存在的情况会初始化一个空节点,并将head和tail都指向空节点,
//然后cas加入node,确保节点一定会加入
for (;;) {
Node t = tail;
if (t == null) { // Must initialize
if (compareAndSetHead(new Node()))
tail = head;
} else {
node.prev = t;
if (compareAndSetTail(t, node)) {
t.next = node;
return t;
}
}
}
}
再回来看看
1 | final boolean acquireQueued(final Node node, int arg) { |
这块代码有几点需要说明:
- Node节点中,除了存储当前线程,节点类型,队列中前后元素的变量,还有一个叫waitStatus的变量,改变量用于描述节点的状态,为什么需要这个状态呢?
原因是:AQS的队列中,在有并发时,肯定会存取一定数量的节点,每个节点[G4] 代表了一个线程的状态,有的线程可能“等不及”获取锁了,需要放弃竞争,退出队列,有的线程在等待一些条件满足,满足后才恢复执行(这里的描述很像某个J.U.C包下的工具类,ReentrankLock的Condition,事实上,Condition同样也是AQS的子类)等等,总之,各个线程有各个线程的状态,但总需要一个变量来描述它,这个变量就叫waitStatus,它有四种状态:
分别表示:
- 节点取消
- 节点等待触发
- 节点等待条件
- 节点状态需要向后传播。
只有当前节点的前一个节点为SIGNAL时,才能当前节点才能被挂起。
- 对线程的挂起及唤醒操作是通过使用UNSAFE类调用JNI方法实现的。当然,还提供了挂起指定时间后唤醒的API,在后面我们会讲到。
到此为止,一个线程对于锁的一次竞争才告于段落,结果有两种,要么成功获取到锁(不用进入到AQS队列中),要么,获取失败,被挂起,等待下次唤醒后继续循环尝试获取锁,值得注意的是,AQS的队列为FIFO队列,所以,每次被CPU假唤醒,且当前线程不是出在头节点的位置,也是会被挂起的。AQS通过这样的方式,实现了竞争的排队策略。
看完了获取锁,在看看释放锁,具体看代码之前,我们可以先继续猜下,释放操作需要做哪些事情:
因为获取锁的线程的节点,此时在AQS的头节点位置,所以,可能需要将头节点移除。
而应该是直接释放锁,然后找到AQS的头节点,通知它可以来竞争锁了。
是不是这样呢?我们继续来看下,同样我们用ReentrantLock的FairSync来说明:1
2
3
4
5
6
7
8
9
10
11
12
13public void unlock() {
sync.release(1);
}
public final boolean release(int arg) {
if (tryRelease(arg)) {
Node h = head;
if (h != null && h.waitStatus != 0)
unparkSuccessor(h);
return true;
}
return false;
}
unlock方法调用了AQS的release方法,同样传入了参数1,和获取锁的相应对应,获取一个锁,标示为+1,释放一个锁,标志位-1。
同样,release为空方法,子类自己实现逻辑:1
2
3
4
5
6
7
8
9
10
11
12protected final boolean tryRelease(int releases) {
int c = getState() - releases;
if (Thread.currentThread() != getExclusiveOwnerThread()) //如果释放的线程和获取锁的线程不是同一个,抛出非法监视器状态异常。
throw new IllegalMonitorStateException();
boolean free = false;
if (c == 0) {//因为是重入的关系,不是每次释放锁c都等于0,直到最后一次释放锁时,才通知AQS不需要再记录哪个线程正在获取锁。
free = true;
setExclusiveOwnerThread(null);
}
setState(c);
return free;
}
释放锁,成功后,找到AQS的头节点,并唤醒它即可:1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27private void unparkSuccessor(Node node) {
/*
* If status is negative (i.e., possibly needing signal) try
* to clear in anticipation of signalling. It is OK if this
* fails or if status is changed by waiting thread.
*/
// 尝试将node的等待状态置为0,这样的话,后继争用线程可以有机会再尝试获取一次锁。
int ws = node.waitStatus;
if (ws < 0)
compareAndSetWaitStatus(node, ws, 0);
/*
* Thread to unpark is held in successor, which is normally
* just the next node. But if cancelled or apparently null,
* traverse backwards from tail to find the actual
* non-cancelled successor.
*/
Node s = node.next;
if (s == null || s.waitStatus > 0) {
s = null;
for (Node t = tail; t != null && t != node; t = t.prev)
if (t.waitStatus <= 0)
s = t;
}
if (s != null)
LockSupport.unpark(s.thread);
}
值得注意的是,寻找的顺序是从队列尾部开始往前去找的最前面的一个waitStatus小于0的节点。
到此,ReentrantLock的lock和unlock方法已经基本解析完毕了,唯独还剩下一个非公平锁NonfairSync没说,其实,它和公平锁的唯一区别就是获取锁的方式不同,一个是按前后顺序一次获取锁,一个是抢占式的获取锁,那ReentrantLock是怎么实现的呢?再看两段代码:
非公平锁的lock方法的处理方式是: 在lock的时候先直接cas修改一次state变量(尝试获取锁),成功就返回,不成功再排队,从而达到不排队直接抢占的目的。1
2
3
4
5
6static final class FairSync extends Sync {
private static final long serialVersionUID = -3000897897090466540L;
final void lock() {
acquire(1);
}
而对于公平锁:则是老老实实的开始就走AQS的流程排队获取锁。如果前面有人调用过其lock方法,则排在队列中前面,也就更有机会更早的获取锁,从而达到“公平”的目的。
总结
这篇文章,我们从ReentrantLock出发,完整的分析了AQS独占功能的API及内部实现,总的来说,思路其实并不复杂,还是使用的标志位+队列的方式,记录获取锁、竞争锁、释放锁等一系列锁的状态,或许用更准确一点的描述的话,应该是使用的标志位+队列的方式,记录锁、竞争、释放等一系列独占的状态,因为站在AQS的层面state可以表示锁,也可以表示其他状态,它并不关心它的子类把它变成一个什么工具类,而只是提供了一套维护一个独占状态。甚至,最准确的是AQS只是维护了一个状态,因为,别忘了,它还有一套共享状态的API,所以,AQS只是维护一个状态,一个控制各个线程何时可以访问的状态,它只对状态负责,而这个状态表示什么含义,由子类自己去定义。
原网页地址:http://www.infoq.com/cn/articles/jdk1.8-abstractqueuedsynchronizer
本站评论搭建在 Github Issue 上,请点击进行评论。