AQS底层调用了很多Unsafe类里的方法来进行并发的处理,Unsafe类里的方法基本都是native的,通过利用系统硬件的支持,来实现并发的控制。其中利用到硬件的最重要的一个指令是CAS(compare and swap),CAS包含了3个操作数:需要读写的内存位置V、进行比较的值A和拟写入的新值B,当且仅当V的值等于A时,CAS才会通过原子方式用新值B来更新V的值,否则不会执行任何操作。
//NonfairSync finalvoidlock(){ if (compareAndSetState(0, 1)) setExclusiveOwnerThread(Thread.currentThread()); else acquire(1); }
//AQS publicfinalvoidacquire(int arg){ //尝试获取锁,如果没获取到,放入等待队列中,同时调用acquireQueued尝试park,挂起线程 if (!tryAcquire(arg) && acquireQueued(addWaiter(Node.EXCLUSIVE), arg)) selfInterrupt();//如果是因为被中断而unpark的,就使线程中断 } //FaireSync protectedfinalbooleantryAcquire(int acquires){ final Thread current = Thread.currentThread(); int c = getState(); if (c == 0) { //如果当前锁刚好空着,且队列前面没有在排队的,通过尝试更改state获取锁,获取锁后将exclusiveOwnerThread设为当前线程 if (!hasQueuedPredecessors() && compareAndSetState(0, acquires)) { setExclusiveOwnerThread(current); returntrue; } }elseif(current == getExclusiveOwnerThread()) {//可重入锁 int nextc = c + acquires; if (nextc < 0) thrownew Error("Maximum lock count exceeded"); setState(nextc); returntrue; } returnfalse; } //NonfairSync protectedfinalbooleantryAcquire(int acquires){ return nonfairTryAcquire(acquires); } finalbooleannonfairTryAcquire(int acquires){ final Thread current = Thread.currentThread(); int c = getState(); if (c == 0) { //不公平锁,直接尝试更改state来获取锁,不查看是否有其它线程在队列等待获取锁 if (compareAndSetState(0, acquires)) { setExclusiveOwnerThread(current); returntrue; } }elseif(current == getExclusiveOwnerThread()) { int nextc = c + acquires; if (nextc < 0) // overflow thrownew Error("Maximum lock count exceeded"); setState(nextc); returntrue; } returnfalse; }
//AQS //返回线程是否被中断 finalbooleanacquireQueued(final Node node, int arg){ boolean failed = true; try { boolean interrupted = false; for (;;) { //node的前驱节点 final Node p = node.predecessor(); //如果node的前驱节点为head(head为当前占有锁的节点),则下一个获取锁的节点就是node,尝试获取锁,获取到锁后更改head为node if (p == head && tryAcquire(arg)) { setHead(node); p.next = null; // help GC failed = false; return interrupted; } //如果node的前驱节点p的waitStatus为SIGNAL,表示p已被设置为:在release时,要通知后继节点(节点p对应的线程在执行release时,会unpark后继节点),因此p的后继节点node可以安心的park了,因此在parkAndCheckInterrupt方法里调用了LockSupport.park来使线程不被调度(park后,只有在被其它线程unpark或interrupt等情况时才恢复执行);unpark函数可以先于park调用。比如线程B调用unpark函数,给线程A发了一个“许可”,那么当线程A调用park时,它发现已经有“许可”了,那么它会马上再继续运行,所以在上面tryAcquire到下面代码park之间,不用担心因为其它线程释放了锁,而导致当前线程park后一直无法unpark的问题 if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt()) interrupted = true; } } finally { if (failed) cancelAcquire(node); } }
//AQS //如果node的前驱节点pred的waitStatus为SINGAL,表示pred在release会unparkpred的后继节点node,因此node可以安心park;如果waitStatus>0表示已经被取消,将node的前驱节点指向前面最近的未被取消的节点;否则将前驱节点waitStatus设置为SIGNAL,调用者需要再尝试获取锁,获取不到再park privatestaticbooleanshouldParkAfterFailedAcquire(Node pred, Node node){ int ws = pred.waitStatus; if (ws == Node.SIGNAL) /* * This node has already set status asking a release * to signal it, so it can safely park. */ returntrue; if (ws > 0) { /* * Predecessor was cancelled. Skip over predecessors and * indicate retry. */ do { node.prev = pred = pred.prev; } while (pred.waitStatus > 0); pred.next = node; } else { /* * waitStatus must be 0 or PROPAGATE. Indicate that we * need a signal, but don't park yet. Caller will need to * retry to make sure it cannot acquire before parking. */ compareAndSetWaitStatus(pred, ws, Node.SIGNAL); } returnfalse; }
// publicvoidunlock(){ sync.release(1); } //AQS publicfinalbooleanrelease(int arg){ if (tryRelease(arg)) {//tryRelease只更改status和持有锁的线程变量,不用更改head,在acquireQueued中会更改 Node h = head; if (h != null && h.waitStatus != 0) unparkSuccessor(h); returntrue; } returnfalse; } //AQS privatevoidunparkSuccessor(Node node){ /* * If status is negative (i.e., possibly needing signal) try * to clear in anticipation of signalling. It is OK if this * fails or if status is changed by waiting thread. */ int ws = node.waitStatus; if (ws < 0) compareAndSetWaitStatus(node, ws, 0);
/* * Thread to unpark is held in successor, which is normally * just the next node. But if cancelled or apparently null, * traverse backwards from tail to find the actual * non-cancelled successor. */ Node s = node.next; if (s == null || s.waitStatus > 0) { s = null; for (Node t = tail; t != null && t != node; t = t.prev) if (t.waitStatus <= 0) s = t; } //唤醒队列的后续节点,如果是共享锁,后继节点会递归的唤醒自己的后继节点 if (s != null) LockSupport.unpark(s.thread); } publicbooleantryLock(){ return sync.nonfairTryAcquire(1); }
publicfinalvoidawait()throws InterruptedException { if (Thread.interrupted()) thrownew InterruptedException(); Node node = addConditionWaiter();//创建waitStatus=CONDITION的node,添加到Condition自己维护的链表中 int savedState = fullyRelease(node);//释放锁;在执行await之前,都需要调用ReentrantLock的lock,await时需要释放持有的锁 int interruptMode = 0; //释放完当前线程占有的锁后,便利AQS队列,看当前节点是否在队列中,如果不在,说明它还没有竞争锁的资格,继续park,直到它被加入到队列中。被加入到队列中是在其它线程调用signal的时候。 while (!isOnSyncQueue(node)) { LockSupport.park(this); if ((interruptMode = checkInterruptWhileWaiting(node)) != 0) break; } //被其它线程调用signal唤醒后,重新开始竞争锁 if (acquireQueued(node, savedState) && interruptMode != THROW_IE) interruptMode = REINTERRUPT; if (node.nextWaiter != null) // clean up if cancelled unlinkCancelledWaiters(); if (interruptMode != 0) reportInterruptAfterWait(interruptMode); }
publicfinalvoidsignal(){ if (!isHeldExclusively()) thrownew IllegalMonitorStateException(); Node first = firstWaiter; if (first != null) doSignal(first);//唤醒Condition等待队列的第一个节点 } privatevoiddoSignal(Node first){ do { if ( (firstWaiter = first.nextWaiter) == null) lastWaiter = null; first.nextWaiter = null; //将老的头节点,加入到AQS等待队列中 } while (!transferForSignal(first) && (first = firstWaiter) != null); } finalbooleantransferForSignal(Node node){ /* * If cannot change waitStatus, the node has been cancelled. */ if (!compareAndSetWaitStatus(node, Node.CONDITION, 0)) returnfalse;
/* * Splice onto queue and try to set waitStatus of predecessor to * indicate that thread is (probably) waiting. If cancelled or * attempt to set waitStatus fails, wake up to resync (in which * case the waitStatus can be transiently and harmlessly wrong). */ Node p = enq(node); int ws = p.waitStatus; if (ws > 0 || !compareAndSetWaitStatus(p, ws, Node.SIGNAL)) LockSupport.unpark(node.thread); returntrue; }
protectedfinalinttryAcquireShared(int unused){ /* * Walkthrough: * 1. If write lock held by another thread, fail. * 2. Otherwise, this thread is eligible for * lock wrt state, so ask if it should block * because of queue policy. If not, try * to grant by CASing state and updating count. * Note that step does not check for reentrant * acquires, which is postponed to full version * to avoid having to check hold count in * the more typical non-reentrant case. * 3. If step 2 fails either because thread * apparently not eligible or CAS fails or count * saturated, chain to version with full retry loop. */ Thread current = Thread.currentThread(); int c = getState(); //status的低16位表示写锁的计数,低16位不等于0表示现在有进程持有写锁,其他线程不能获取锁,不过同一个线程在持有写锁的情况下,如果等待队列的第一个节点(head的后继节点)不是等待写锁,那么可以获取读锁 if (exclusiveCount(c) != 0 && getExclusiveOwnerThread() != current) return -1; int r = sharedCount(c); //公平锁在要获取读锁时,如果已经有线程在等待队列等待,则不能抢占;非公平锁在等待队列的第一个线程不是等待写锁的话,且当前没有线程占有写锁,则可以直接抢占读锁 if (!readerShouldBlock() && r < MAX_COUNT && compareAndSetState(c, c + SHARED_UNIT)) {//读锁的计数用status的高16位表示 if (r == 0) { firstReader = current; firstReaderHoldCount = 1; } elseif(firstReader == current){ firstReaderHoldCount++; } else { HoldCounter rh = cachedHoldCounter; if (rh == null || rh.tid != current.getId()) cachedHoldCounter = rh = readHolds.get(); elseif(rh.count == 0) readHolds.set(rh); rh.count++; } return1; } return fullTryAcquireShared(current); }
protectedfinalbooleantryAcquire(int acquires){ /* * Walkthrough: * 1. If read count nonzero or write count nonzero * and owner is a different thread, fail. * 2. If count would saturate, fail. (This can only * happen if count is already nonzero.) * 3. Otherwise, this thread is eligible for lock if * it is either a reentrant acquire or * queue policy allows it. If so, update state * and set owner. */ Thread current = Thread.currentThread(); int c = getState(); int w = exclusiveCount(c); if (c != 0) { // (Note: if c != 0 and w == 0 then shared count != 0) if (w == 0 || current != getExclusiveOwnerThread()) returnfalse; if (w + exclusiveCount(acquires) > MAX_COUNT) thrownew Error("Maximum lock count exceeded"); // Reentrant acquire setState(c + acquires); returntrue; } ////获取写锁时,如果当前读写锁都没有被占有,公平锁会根据是否等待队列前面有等待的线程来判断能否获取写锁,非公平锁则直接获取写锁 if (writerShouldBlock() || !compareAndSetState(c, c + acquires)) returnfalse; setExclusiveOwnerThread(current); returntrue; }
privatevoiddoAcquireShared(int arg){ final Node node = addWaiter(Node.SHARED); boolean failed = true; try { boolean interrupted = false; for (;;) { final Node p = node.predecessor(); if (p == head) { int r = tryAcquireShared(arg); if (r >= 0) { //获取到锁后,修改head为当前的node,并将后继等待读锁的也唤醒,唤醒是递归的 setHeadAndPropagate(node, r); p.next = null; // help GC if (interrupted) selfInterrupt(); failed = false; return; } } if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt()) interrupted = true; } } finally { if (failed) cancelAcquire(node); } } privatevoidsetHeadAndPropagate(Node node, int propagate){ Node h = head; // Record old head for check below setHead(node); /* * Try to signal next queued node if: * Propagation was indicated by caller, * or was recorded (as h.waitStatus either before * or after setHead) by a previous operation * (note: this uses sign-check of waitStatus because * PROPAGATE status may transition to SIGNAL.) * and * The next node is waiting in shared mode, * or we don't know, because it appears null * * The conservatism in both of these checks may cause * unnecessary wake-ups, but only when there are multiple * racing acquires/releases, so most need signals now or soon * anyway. */ if (propagate > 0 || h == null || h.waitStatus < 0 || (h = head) == null || h.waitStatus < 0) { Node s = node.next; if (s == null || s.isShared()) doReleaseShared();//如果后继节点是shared的,唤醒它 } } privatevoiddoReleaseShared(){ /* * Ensure that a release propagates, even if there are other * in-progress acquires/releases. This proceeds in the usual * way of trying to unparkSuccessor of head if it needs * signal. But if it does not, status is set to PROPAGATE to * ensure that upon release, propagation continues. * Additionally, we must loop in case a new node is added * while we are doing this. Also, unlike other uses of * unparkSuccessor, we need to know if CAS to reset status * fails, if so rechecking. */ for (;;) { Node h = head; if (h != null && h != tail) { int ws = h.waitStatus; if (ws == Node.SIGNAL) { if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0)) continue; // loop to recheck cases unparkSuccessor(h); } elseif(ws == 0 && !compareAndSetWaitStatus(h, 0, Node.PROPAGATE)) continue; // loop on failed CAS } if (h == head) // loop if head changed break; } }