/** * The synchronization state. */ privatevolatileint state;
不同的实现,state的意义不同。以ReetrantLock为例:
state为0,没有线程持有这个lock锁
state > 0,当前lock锁被某个线程持有
不存在state小于0的情况
同步队列
1 2 3 4 5 6 7 8 9 10 11 12 13
/** * Head of the wait queue, lazily initialized. Except for * initialization, it is modified only via method setHead. Note: * If head exists, its waitStatus is guaranteed not to be * CANCELLED. */ privatetransientvolatile Node head; /** * Tail of the wait queue, lazily initialized. Modified only via * method enq to add new wait node. */ privatetransientvolatile Node tail;
staticfinalclassNode { /** Marker to indicate a node is waiting in shared mode */ staticfinalNodeSHARED=newNode(); /** Marker to indicate a node is waiting in exclusive mode */ staticfinalNodeEXCLUSIVE=null; /** waitStatus value to indicate thread has cancelled */ staticfinalintCANCELLED=1; /** waitStatus value to indicate successor's thread needs unparking */ staticfinalintSIGNAL= -1; /** waitStatus value to indicate thread is waiting on condition */ staticfinalintCONDITION= -2; /** * waitStatus value to indicate the next acquireShared should * unconditionally propagate */ staticfinalintPROPAGATE= -3; /** * Status field, taking on only the values: * SIGNAL: The successor of this node is (or will soon be) * blocked (via park), so the current node must * unpark its successor when it releases or * cancels. To avoid races, acquire methods must * first indicate they need a signal, * then retry the atomic acquire, and then, * on failure, block. * CANCELLED: This node is cancelled due to timeout or interrupt. * Nodes never leave this state. In particular, * a thread with cancelled node never again blocks. * CONDITION: This node is currently on a condition queue. * It will not be used as a sync queue node * until transferred, at which time the status * will be set to 0. (Use of this value here has * nothing to do with the other uses of the * field, but simplifies mechanics.) * PROPAGATE: A releaseShared should be propagated to other * nodes. This is set (for head node only) in * doReleaseShared to ensure propagation * continues, even if other operations have * since intervened. * 0: None of the above * * The values are arranged numerically to simplify use. * Non-negative values mean that a node doesn't need to * signal. So, most code doesn't need to check for particular * values, just for sign. * * The field is initialized to 0 for normal sync nodes, and * CONDITION for condition nodes. It is modified using CAS * (or when possible, unconditional volatile writes). */ volatileint waitStatus; /** * Link to predecessor node that current node/thread relies on * for checking waitStatus. Assigned during enqueuing, and nulled * out (for sake of GC) only upon dequeuing. Also, upon * cancellation of a predecessor, we short-circuit while * finding a non-cancelled one, which will always exist * because the head node is never cancelled: A node becomes * head only as a result of successful acquire. A * cancelled thread never succeeds in acquiring, and a thread only * cancels itself, not any other node. */ volatile Node prev; /** * Link to the successor node that the current node/thread * unparks upon release. Assigned during enqueuing, adjusted * when bypassing cancelled predecessors, and nulled out (for * sake of GC) when dequeued. The enq operation does not * assign next field of a predecessor until after attachment, * so seeing a null next field does not necessarily mean that * node is at end of queue. However, if a next field appears * to be null, we can scan prev's from the tail to * double-check. The next field of cancelled nodes is set to * point to the node itself instead of null, to make life * easier for isOnSyncQueue. */ volatile Node next; /** * The thread that enqueued this node. Initialized on * construction and nulled out after use. */ volatile Thread thread; /** * Link to next node waiting on condition, or the special * value SHARED. Because condition queues are accessed only * when holding in exclusive mode, we just need a simple * linked queue to hold nodes while they are waiting on * conditions. They are then transferred to the queue to * re-acquire. And because conditions can only be exclusive, * we save a field by using special value to indicate shared * mode. */ Node nextWaiter; //用于Condition,因此Condition是单向链表(不用prev和next) /** * Returns true if node is waiting in shared mode. */ finalbooleanisShared() { return nextWaiter == SHARED; } /** * Returns previous node, or throws NullPointerException if null. * Use when predecessor cannot be null. The null check could * be elided, but is present to help the VM. * * @return the predecessor of this node */ final Node predecessor()throws NullPointerException { Nodep= prev; if (p == null) thrownewNullPointerException(); else return p; } Node() { // Used to establish initial head or SHARED marker } Node(Thread thread, Node mode) { // Used by addWaiter this.nextWaiter = mode; this.thread = thread; } Node(Thread thread, int waitStatus) { // Used by Condition this.waitStatus = waitStatus; this.thread = thread; } }
finalbooleanacquireQueued(final Node node, int arg) { booleanfailed=true; try { booleaninterrupted=false; for (;;) { //拿node的前一个节点 finalNodep= node.predecessor(); //若p是头节点,,说明自己排在队列的第一个尝试抢锁 if (p == head && tryAcquire(arg)) { //node成为新的head setHead(node); p.next = null; // help GC failed = false; //拿到锁了返回false return interrupted; } //1.应该阻塞,调用parkAndCheckInterrupt阻塞线程 //2.不应该阻塞,再给一次抢锁的机会 if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt()) interrupted = true; } } finally { if (failed) //基本不可能走到这一步,除非是系统级别的异常导致获取锁失败for循环意外退出, cancelAcquire(node); } }
非公平锁和公平锁的直观体现
从源码上来看,公平和非公平的直观体现是lock方法和tryAcquire方法
lock方法一般是获取锁资源的入口方法,非公平锁会直接抢一次锁资源,而公平锁不会
acquire的底层逻辑
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19
/** * Acquires in exclusive mode, ignoring interrupts. Implemented * by invoking at least once {@link #tryAcquire}, * returning on success. Otherwise the thread is queued, possibly * repeatedly blocking and unblocking, invoking {@link * #tryAcquire} until success. This method can be used * to implement method {@link Lock#lock}. * * @param arg the acquire argument. This value is conveyed to * {@link #tryAcquire} but is otherwise uninterpreted and * can represent anything you like. */ // arg = 1 publicfinalvoidacquire(int arg) { if (!tryAcquire(arg) && acquireQueued(addWaiter(Node.EXCLUSIVE), arg)) selfInterrupt(); }
publicfinalbooleanhasQueuedPredecessors() { // The correctness of this depends on head being initialized // before tail and on head.next being accurate if the current // thread is first in queue. Nodet= tail; // Read fields in reverse initialization order Nodeh= head; Node s; return h != t && ((s = h.next) == null || s.thread != Thread.currentThread()); }
/** * Creates and enqueues node for current thread and given mode. * * @param mode Node.EXCLUSIVE for exclusive, Node.SHARED for shared * @return the new node */ private Node addWaiter(Node mode) { Nodenode=newNode(Thread.currentThread(), mode); // Try the fast path of enq; backup to full enq on failure Nodepred= tail; if (pred != null) { node.prev = pred; if (compareAndSetTail(pred, node)) { pred.next = node; return node; } } enq(node); return node; }
publicfinalvoidacquire(int arg) { if (!tryAcquire(arg) && acquireQueued(addWaiter(Node.EXCLUSIVE), arg)) selfInterrupt(); } private Node addWaiter(Node mode) { Nodenode=newNode(Thread.currentThread(), mode); // Try the fast path of enq; backup to full enq on failure Nodepred= tail; if (pred != null) { node.prev = pred; if (compareAndSetTail(pred, node)) { pred.next = node; return node; } } enq(node); return node; } finalbooleanacquireQueued(final Node node, int arg) { booleanfailed=true; try { booleaninterrupted=false; for (;;) { finalNodep= node.predecessor(); if (p == head && tryAcquire(arg)) { setHead(node); p.next = null; // help GC failed = false; return interrupted; } if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt()) interrupted = true; } } finally { if (failed) cancelAcquire(node); } } privatestaticbooleanshouldParkAfterFailedAcquire(Node pred, Node node) { intws= pred.waitStatus; if (ws == Node.SIGNAL) //当前线程挂起 /* * This node has already set status asking a release * to signal it, so it can safely park. */ returntrue; if (ws > 0) { /* * Predecessor was cancelled. Skip over predecessors and * indicate retry. */ do { node.prev = pred = pred.prev; } while (pred.waitStatus > 0); pred.next = node; } else { /* * waitStatus must be 0 or PROPAGATE. Indicate that we * need a signal, but don't park yet. Caller will need to * retry to make sure it cannot acquire before parking. */ compareAndSetWaitStatus(pred, ws, Node.SIGNAL); } returnfalse; } privatefinalbooleanparkAndCheckInterrupt() { LockSupport.park(this); return Thread.interrupted(); } staticvoidselfInterrupt() { Thread.currentThread().interrupt(); } /* 在park中: 只要permit为1或者中断状态为true,那么执行park就不能够阻塞线程。park只可能消耗掉permit, 但不会去消耗掉中断状态。 因此需要interrupt()去消耗掉,并将这个中断状态暂时保存到一个局部变量interrupted中 在selfInterrupt中: 当parkAndCheckInterrupt()方法返回true后又调用了 selfInterrupt()方法重 新设置中断标记,这样做的目的是为了让用户代码(同步代码块)能够通过 Thread.isInterrupted()等方法 感知到线程在获取同步状态的过程中被中断过。 */