文章目录
  1. 1. 一. Unsafe类
  2. 2. 二. AQS
  3. 3. 三. 独占功能-ReentrantLock&Condition
  4. 4. 四. 共享锁功能-ReentrantReadWriteLock&Semaphore
  5. 5. 五. 参考文献

在JDK的concurrent包中,提供了丰富的并发处理工具,其中的很多同步器是基于AQS(AbstractQueuedSynchronizer)这个类来实现的,如ReentrantLock、ReentrantReadWriteLock、Semaphore、CountDownLatch、FutureTask(JDK1.7开始不依赖AQS)等。在JDK1.8中,还有如下的类继承了AQS:

AQS子类

一. Unsafe类

AQS底层调用了很多Unsafe类里的方法来进行并发的处理,Unsafe类里的方法基本都是native的,通过利用系统硬件的支持,来实现并发的控制。其中利用到硬件的最重要的一个指令是CAS(compare and swap),CAS包含了3个操作数:需要读写的内存位置V、进行比较的值A和拟写入的新值B,当且仅当V的值等于A时,CAS才会通过原子方式用新值B来更新V的值,否则不会执行任何操作。

在concurrent包的atomic目录下包含了很多原子变量类,如AtomicInteger、AtomicLong、AtomicBoolean、AtomicReference等,这些原子变量类的底层都是利用unsafe类提供的方法来实现的。

二. AQS

AQS是一个用于构造锁和同步器的框架,许多同步器都可通过AQS很容易且高效的构造出来。它解决了在实现同步器时涉及的大量细节问题。

AQS有一个整数状态信息,可以通过getState、setState以及compareAndSetState等方法来进行操作,这个整数可以用于表示任意的状态。例如ReentrantLock用它来表示所有者线程已经重复获取该锁的次数,Semaphore用它来表示剩余的许可数量,FutureTask(JDK1.7以前)用它来表示任务的状态(尚未开始、正在运行、已完成、已取消),ReentrantReadWriteLock用它的低16位来表示写操作的计数,高16位表示读操作的计数等等。此外,AQS的父类有一个exclusiveOwnerThread变量,可以用来存储独占锁情形下当前的执行线程。

在AQS中还维护着一个队列(采用双向链表实现),用来存储等待获取锁的线程,队列由Node类型的节点组成:

1
2
3
4
5
6
static final class Node {
volatile int waitStatus;
volatile Node prev;
volatile Node next;
volatile Thread thread;
Node nextWaiter;

waitStatus用来表示:

  • CANCELLED 表示当前的线程被取消
  • SIGNAL 只有SIGNAL状态的才能调用LockSupport.park将线程挂起,刚加入等待队列时都为0(Condition为CONDITION)
  • CONDITION 表示当前节点在等待condition,也就是在condition队列中
  • PROPAGATE 表示当前场景下后续的acquireShared能够得以执行
  • 0

AQS维护的队列
AQS队列
队列的第一个节点head就是代表持有锁的节点

在使用AQS的同步器中,并不是直接继承AQS,而是将一些功能委托给AQS,在同步器类内部中定义内部类,通过内部类对象来调用AQS的功能。因为如果直接继承AQS,那么AQS中的很多在同步器中的无用的方法,也会暴露给调用者,调用者很容易勿用,破坏简洁性。

AQS的功能可以分为两类:独占功能和共享锁功能,下面用ReentrantLock源码来分析独占功能的在源码中的使用方法,以ReentrantReadWriteLock源码来分析共享锁功能的使用方法。

三. 独占功能-ReentrantLock&Condition

本文以ReentrantLock为例来分析独占锁的使用,ReentrantLock的类图如下所示:

ReentrantLock类图

源码及核心逻辑解释写在了下面的源码注释中:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
//FairSync
final void lock() {
acquire(1);
}

//NonfairSync
final void lock() {
if (compareAndSetState(0, 1))
setExclusiveOwnerThread(Thread.currentThread());
else
acquire(1);

}

//AQS
public final void acquire(int arg) {
//尝试获取锁,如果没获取到,放入等待队列中,同时调用acquireQueued尝试park,挂起线程
if (!tryAcquire(arg) && acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();//如果是因为被中断而unpark的,就使线程中断
}
//FaireSync
protected final boolean tryAcquire(int acquires) {
final Thread current = Thread.currentThread();
int c = getState();
if (c == 0) {
//如果当前锁刚好空着,且队列前面没有在排队的,通过尝试更改state获取锁,获取锁后将exclusiveOwnerThread设为当前线程
if (!hasQueuedPredecessors() && compareAndSetState(0, acquires)) {
setExclusiveOwnerThread(current);
return true;
}
}else if (current == getExclusiveOwnerThread()) {//可重入锁
int nextc = c + acquires;
if (nextc < 0)
throw new Error("Maximum lock count exceeded");
setState(nextc);
return true;
}
return false;
}
//NonfairSync
protected final boolean tryAcquire(int acquires) {
return nonfairTryAcquire(acquires);
}
final boolean nonfairTryAcquire(int acquires) {
final Thread current = Thread.currentThread();
int c = getState();
if (c == 0) {
//不公平锁,直接尝试更改state来获取锁,不查看是否有其它线程在队列等待获取锁
if (compareAndSetState(0, acquires)) {
setExclusiveOwnerThread(current);
return true;
}
}else if (current == getExclusiveOwnerThread()) {
int nextc = c + acquires;
if (nextc < 0) // overflow
throw new Error("Maximum lock count exceeded");
setState(nextc);
return true;
}
return false;
}


//AQS
//返回线程是否被中断
final boolean acquireQueued(final Node node, int arg) {
boolean failed = true;
try {
boolean interrupted = false;
for (;;) {
//node的前驱节点
final Node p = node.predecessor();
//如果node的前驱节点为head(head为当前占有锁的节点),则下一个获取锁的节点就是node,尝试获取锁,获取到锁后更改head为node
if (p == head && tryAcquire(arg)) {
setHead(node);
p.next = null; // help GC
failed = false;
return interrupted;
}
//如果node的前驱节点p的waitStatus为SIGNAL,表示p已被设置为:在release时,要通知后继节点(节点p对应的线程在执行release时,会unpark后继节点),因此p的后继节点node可以安心的park了,因此在parkAndCheckInterrupt方法里调用了LockSupport.park来使线程不被调度(park后,只有在被其它线程unpark或interrupt等情况时才恢复执行);unpark函数可以先于park调用。比如线程B调用unpark函数,给线程A发了一个“许可”,那么当线程A调用park时,它发现已经有“许可”了,那么它会马上再继续运行,所以在上面tryAcquire到下面代码park之间,不用担心因为其它线程释放了锁,而导致当前线程park后一直无法unpark的问题
if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt())
interrupted = true;
}
} finally {
if (failed)
cancelAcquire(node);
}
}

//AQS
//如果node的前驱节点pred的waitStatus为SINGAL,表示pred在release会unparkpred的后继节点node,因此node可以安心park;如果waitStatus>0表示已经被取消,将node的前驱节点指向前面最近的未被取消的节点;否则将前驱节点waitStatus设置为SIGNAL,调用者需要再尝试获取锁,获取不到再park
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
int ws = pred.waitStatus;
if (ws == Node.SIGNAL)
/*
* This node has already set status asking a release
* to signal it, so it can safely park.
*/

return true;
if (ws > 0) {
/*
* Predecessor was cancelled. Skip over predecessors and
* indicate retry.
*/

do {
node.prev = pred = pred.prev;
} while (pred.waitStatus > 0);
pred.next = node;
} else {
/*
* waitStatus must be 0 or PROPAGATE. Indicate that we
* need a signal, but don't park yet. Caller will need to
* retry to make sure it cannot acquire before parking.
*/

compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
}
return false;
}

//
public void unlock() {
sync.release(1);
}
//AQS
public final boolean release(int arg) {
if (tryRelease(arg)) {//tryRelease只更改status和持有锁的线程变量,不用更改head,在acquireQueued中会更改
Node h = head;
if (h != null && h.waitStatus != 0)
unparkSuccessor(h);
return true;
}
return false;
}
//AQS
private void unparkSuccessor(Node node) {
/*
* If status is negative (i.e., possibly needing signal) try
* to clear in anticipation of signalling. It is OK if this
* fails or if status is changed by waiting thread.
*/

int ws = node.waitStatus;
if (ws < 0)
compareAndSetWaitStatus(node, ws, 0);

/*
* Thread to unpark is held in successor, which is normally
* just the next node. But if cancelled or apparently null,
* traverse backwards from tail to find the actual
* non-cancelled successor.
*/

Node s = node.next;
if (s == null || s.waitStatus > 0) {
s = null;
for (Node t = tail; t != null && t != node; t = t.prev)
if (t.waitStatus <= 0)
s = t;
}
//唤醒队列的后续节点,如果是共享锁,后继节点会递归的唤醒自己的后继节点
if (s != null)
LockSupport.unpark(s.thread);
}
public boolean tryLock() {
return sync.nonfairTryAcquire(1);
}

Condition之于wait/notify,就像Lock之于synchronize,在执行wait/notify时需要先获取对象的控制权,比如将对象用synchronize放在同步块中,在同步块里面执行wait/notify,否则会报IllegalMonitorStateException异常;同样的,在执行Condition的await/signal之前,需要先调用ReentrantLock.lock()等方法获取锁,在锁里面执行await/signal。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
public final void await() throws InterruptedException {
if (Thread.interrupted())
throw new InterruptedException();
Node node = addConditionWaiter();//创建waitStatus=CONDITION的node,添加到Condition自己维护的链表中
int savedState = fullyRelease(node);//释放锁;在执行await之前,都需要调用ReentrantLock的lock,await时需要释放持有的锁
int interruptMode = 0;
//释放完当前线程占有的锁后,便利AQS队列,看当前节点是否在队列中,如果不在,说明它还没有竞争锁的资格,继续park,直到它被加入到队列中。被加入到队列中是在其它线程调用signal的时候。
while (!isOnSyncQueue(node)) {
LockSupport.park(this);
if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
break;
}
//被其它线程调用signal唤醒后,重新开始竞争锁
if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
interruptMode = REINTERRUPT;
if (node.nextWaiter != null) // clean up if cancelled
unlinkCancelledWaiters();
if (interruptMode != 0)
reportInterruptAfterWait(interruptMode);
}

public final void signal() {
if (!isHeldExclusively())
throw new IllegalMonitorStateException();
Node first = firstWaiter;
if (first != null)
doSignal(first);//唤醒Condition等待队列的第一个节点
}
private void doSignal(Node first) {
do {
if ( (firstWaiter = first.nextWaiter) == null)
lastWaiter = null;
first.nextWaiter = null;
//将老的头节点,加入到AQS等待队列中
} while (!transferForSignal(first) && (first = firstWaiter) != null);
}
final boolean transferForSignal(Node node) {
/*
* If cannot change waitStatus, the node has been cancelled.
*/

if (!compareAndSetWaitStatus(node, Node.CONDITION, 0))
return false;

/*
* Splice onto queue and try to set waitStatus of predecessor to
* indicate that thread is (probably) waiting. If cancelled or
* attempt to set waitStatus fails, wake up to resync (in which
* case the waitStatus can be transiently and harmlessly wrong).
*/

Node p = enq(node);
int ws = p.waitStatus;
if (ws > 0 || !compareAndSetWaitStatus(p, ws, Node.SIGNAL))
LockSupport.unpark(node.thread);
return true;
}

四. 共享锁功能-ReentrantReadWriteLock&Semaphore

ReentrantReadWriteLock的ReadLock、Semaphore等都是利用AQS的共享方法来实现的,下文以ReentrantReadWriteLock来分析共享锁的使用方法,Semaphore的实现方式类似。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
//ReadLock
public void lock() {
sync.acquireShared(1);
}
//AQS
public final void acquireShared(int arg) {
if (tryAcquireShared(arg) < 0)
doAcquireShared(arg);
}

protected final int tryAcquireShared(int unused) {
/*
* Walkthrough:
* 1. If write lock held by another thread, fail.
* 2. Otherwise, this thread is eligible for
* lock wrt state, so ask if it should block
* because of queue policy. If not, try
* to grant by CASing state and updating count.
* Note that step does not check for reentrant
* acquires, which is postponed to full version
* to avoid having to check hold count in
* the more typical non-reentrant case.
* 3. If step 2 fails either because thread
* apparently not eligible or CAS fails or count
* saturated, chain to version with full retry loop.
*/

Thread current = Thread.currentThread();
int c = getState();
//status的低16位表示写锁的计数,低16位不等于0表示现在有进程持有写锁,其他线程不能获取锁,不过同一个线程在持有写锁的情况下,如果等待队列的第一个节点(head的后继节点)不是等待写锁,那么可以获取读锁
if (exclusiveCount(c) != 0 &&
getExclusiveOwnerThread() != current)
return -1;
int r = sharedCount(c);
//公平锁在要获取读锁时,如果已经有线程在等待队列等待,则不能抢占;非公平锁在等待队列的第一个线程不是等待写锁的话,且当前没有线程占有写锁,则可以直接抢占读锁
if (!readerShouldBlock() &&
r < MAX_COUNT &&
compareAndSetState(c, c + SHARED_UNIT)) {//读锁的计数用status的高16位表示
if (r == 0) {
firstReader = current;
firstReaderHoldCount = 1;
} else if (firstReader == current) {
firstReaderHoldCount++;
} else {
HoldCounter rh = cachedHoldCounter;
if (rh == null || rh.tid != current.getId())
cachedHoldCounter = rh = readHolds.get();
else if (rh.count == 0)
readHolds.set(rh);

rh.count++;
}
return 1;
}
return fullTryAcquireShared(current);
}

protected final boolean tryAcquire(int acquires) {
/*
* Walkthrough:
* 1. If read count nonzero or write count nonzero
* and owner is a different thread, fail.
* 2. If count would saturate, fail. (This can only
* happen if count is already nonzero.)
* 3. Otherwise, this thread is eligible for lock if
* it is either a reentrant acquire or
* queue policy allows it. If so, update state
* and set owner.
*/

Thread current = Thread.currentThread();
int c = getState();
int w = exclusiveCount(c);
if (c != 0) {
// (Note: if c != 0 and w == 0 then shared count != 0)
if (w == 0 || current != getExclusiveOwnerThread())
return false;
if (w + exclusiveCount(acquires) > MAX_COUNT)
throw new Error("Maximum lock count exceeded");
// Reentrant acquire
setState(c + acquires);
return true;
}
////获取写锁时,如果当前读写锁都没有被占有,公平锁会根据是否等待队列前面有等待的线程来判断能否获取写锁,非公平锁则直接获取写锁
if (writerShouldBlock() ||
!compareAndSetState(c, c + acquires))
return false;
setExclusiveOwnerThread(current);
return true;
}

private void doAcquireShared(int arg) {
final Node node = addWaiter(Node.SHARED);
boolean failed = true;
try {
boolean interrupted = false;
for (;;) {
final Node p = node.predecessor();
if (p == head) {
int r = tryAcquireShared(arg);
if (r >= 0) {
//获取到锁后,修改head为当前的node,并将后继等待读锁的也唤醒,唤醒是递归的
setHeadAndPropagate(node, r);
p.next = null; // help GC
if (interrupted)
selfInterrupt();
failed = false;
return;
}
}
if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt())
interrupted = true;
}
} finally {
if (failed)
cancelAcquire(node);
}
}
private void setHeadAndPropagate(Node node, int propagate) {
Node h = head; // Record old head for check below
setHead(node);
/*
* Try to signal next queued node if:
* Propagation was indicated by caller,
* or was recorded (as h.waitStatus either before
* or after setHead) by a previous operation
* (note: this uses sign-check of waitStatus because
* PROPAGATE status may transition to SIGNAL.)
* and
* The next node is waiting in shared mode,
* or we don't know, because it appears null
*
* The conservatism in both of these checks may cause
* unnecessary wake-ups, but only when there are multiple
* racing acquires/releases, so most need signals now or soon
* anyway.
*/

if (propagate > 0 || h == null || h.waitStatus < 0 || (h = head) == null || h.waitStatus < 0) {
Node s = node.next;
if (s == null || s.isShared())
doReleaseShared();//如果后继节点是shared的,唤醒它
}
}
private void doReleaseShared() {
/*
* Ensure that a release propagates, even if there are other
* in-progress acquires/releases. This proceeds in the usual
* way of trying to unparkSuccessor of head if it needs
* signal. But if it does not, status is set to PROPAGATE to
* ensure that upon release, propagation continues.
* Additionally, we must loop in case a new node is added
* while we are doing this. Also, unlike other uses of
* unparkSuccessor, we need to know if CAS to reset status
* fails, if so rechecking.
*/

for (;;) {
Node h = head;
if (h != null && h != tail) {
int ws = h.waitStatus;
if (ws == Node.SIGNAL) {
if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
continue; // loop to recheck cases
unparkSuccessor(h);
}
else if (ws == 0 && !compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
continue; // loop on failed CAS

}
if (h == head) // loop if head changed
break;
}
}

Semaphore也是share模式,跟ReentrantReadWriteLock的读锁机制类似,只不过ReentrantReadWriteLock的ReadLock在lock()时status是递增的,而Semaphore调用acquire()方法是递减的,当减到0时就要在队列等待,直到其他线程调用了release()且status的值>=等待线程要获取的permit的数量时,才获取permit。

在Semaphore中,公平与不公平的一个区别为:公平的,假如当前有线程A和B在等待,A的permit=4,B的permit=1,当前的status=2,线程A比线程B早到,则虽然status>1,但是由于A排在B前面,因此B还是得等待;如果非公平的,则B可以直接获取permit;创建Semaphore对象时,如果没有指定是否公平的参数,则默认为不公平的。

五. 参考文献

  1. java并发编程实战
  2. 怎么理解Condition
  3. Java的LockSupport.park()实现分析
  4. AbstractQueuedSynchronizer的介绍和原理分析
  5. 深度解析Java8 – AbstractQueuedSynchronizer的实现分析
文章目录
  1. 1. 一. Unsafe类
  2. 2. 二. AQS
  3. 3. 三. 独占功能-ReentrantLock&Condition
  4. 4. 四. 共享锁功能-ReentrantReadWriteLock&Semaphore
  5. 5. 五. 参考文献