AQS概述

AQS是juc包下的一个抽象类,很多juc包下的工具类都是根据AQS是实现的,比如ThreadPoolExecutor、CountDownLatch、ReetrantLock、ReetrantWriteLock、Semaphore

img

AQS的核心内容

核心属性state

1
2
3
4
/**
* The synchronization state.
*/
private volatile int state;

不同的实现,state的意义不同。以ReetrantLock为例:

  • state为0,没有线程持有这个lock锁
  • state > 0,当前lock锁被某个线程持有
  • 不存在state小于0的情况

同步队列

1
2
3
4
5
6
7
8
9
10
11
12
13
/**
* Head of the wait queue, lazily initialized. Except for
* initialization, it is modified only via method setHead. Note:
* If head exists, its waitStatus is guaranteed not to be
* CANCELLED.
*/
private transient volatile Node head;

/**
* Tail of the wait queue, lazily initialized. Modified only via
* method enq to add new wait node.
*/
private transient volatile Node tail;
  • Node
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
static final class Node {
/** Marker to indicate a node is waiting in shared mode */
static final Node SHARED = new Node();
/** Marker to indicate a node is waiting in exclusive mode */
static final Node EXCLUSIVE = null;

/** waitStatus value to indicate thread has cancelled */
static final int CANCELLED = 1;
/** waitStatus value to indicate successor's thread needs unparking */
static final int SIGNAL = -1;
/** waitStatus value to indicate thread is waiting on condition */
static final int CONDITION = -2;
/**
* waitStatus value to indicate the next acquireShared should
* unconditionally propagate
*/
static final int PROPAGATE = -3;

/**
* Status field, taking on only the values:
* SIGNAL: The successor of this node is (or will soon be)
* blocked (via park), so the current node must
* unpark its successor when it releases or
* cancels. To avoid races, acquire methods must
* first indicate they need a signal,
* then retry the atomic acquire, and then,
* on failure, block.
* CANCELLED: This node is cancelled due to timeout or interrupt.
* Nodes never leave this state. In particular,
* a thread with cancelled node never again blocks.
* CONDITION: This node is currently on a condition queue.
* It will not be used as a sync queue node
* until transferred, at which time the status
* will be set to 0. (Use of this value here has
* nothing to do with the other uses of the
* field, but simplifies mechanics.)
* PROPAGATE: A releaseShared should be propagated to other
* nodes. This is set (for head node only) in
* doReleaseShared to ensure propagation
* continues, even if other operations have
* since intervened.
* 0: None of the above
*
* The values are arranged numerically to simplify use.
* Non-negative values mean that a node doesn't need to
* signal. So, most code doesn't need to check for particular
* values, just for sign.
*
* The field is initialized to 0 for normal sync nodes, and
* CONDITION for condition nodes. It is modified using CAS
* (or when possible, unconditional volatile writes).
*/
volatile int waitStatus;

/**
* Link to predecessor node that current node/thread relies on
* for checking waitStatus. Assigned during enqueuing, and nulled
* out (for sake of GC) only upon dequeuing. Also, upon
* cancellation of a predecessor, we short-circuit while
* finding a non-cancelled one, which will always exist
* because the head node is never cancelled: A node becomes
* head only as a result of successful acquire. A
* cancelled thread never succeeds in acquiring, and a thread only
* cancels itself, not any other node.
*/
volatile Node prev;

/**
* Link to the successor node that the current node/thread
* unparks upon release. Assigned during enqueuing, adjusted
* when bypassing cancelled predecessors, and nulled out (for
* sake of GC) when dequeued. The enq operation does not
* assign next field of a predecessor until after attachment,
* so seeing a null next field does not necessarily mean that
* node is at end of queue. However, if a next field appears
* to be null, we can scan prev's from the tail to
* double-check. The next field of cancelled nodes is set to
* point to the node itself instead of null, to make life
* easier for isOnSyncQueue.
*/
volatile Node next;

/**
* The thread that enqueued this node. Initialized on
* construction and nulled out after use.
*/
volatile Thread thread;

/**
* Link to next node waiting on condition, or the special
* value SHARED. Because condition queues are accessed only
* when holding in exclusive mode, we just need a simple
* linked queue to hold nodes while they are waiting on
* conditions. They are then transferred to the queue to
* re-acquire. And because conditions can only be exclusive,
* we save a field by using special value to indicate shared
* mode.
*/
Node nextWaiter; //用于Condition,因此Condition是单向链表(不用prev和next)

/**
* Returns true if node is waiting in shared mode.
*/
final boolean isShared() {
return nextWaiter == SHARED;
}

/**
* Returns previous node, or throws NullPointerException if null.
* Use when predecessor cannot be null. The null check could
* be elided, but is present to help the VM.
*
* @return the predecessor of this node
*/
final Node predecessor() throws NullPointerException {
Node p = prev;
if (p == null)
throw new NullPointerException();
else
return p;
}

Node() { // Used to establish initial head or SHARED marker
}

Node(Thread thread, Node mode) { // Used by addWaiter
this.nextWaiter = mode;
this.thread = thread;
}

Node(Thread thread, int waitStatus) { // Used by Condition
this.waitStatus = waitStatus;
this.thread = thread;
}
}

以ReetrantLock为例,如果A线程想获取锁,但是发现锁资源被其他线程占用,A线程需要被封装成一个Node对象,进入到同步队列尾部排队,并挂起线程等待锁资源

Condition的单向链表

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
public class ConditionObject implements Condition, java.io.Serializable {
private static final long serialVersionUID = 1173984872572414699L;
/** First node of condition queue. */
private transient Node firstWaiter;
/** Last node of condition queue. */
private transient Node lastWaiter;

/**
* Creates a new {@code ConditionObject} instance.
*/
public ConditionObject() { }

// Internal methods

/**
* Adds a new waiter to wait queue.
* @return its new wait node
*/
private Node addConditionWaiter() {
Node t = lastWaiter;
// If lastWaiter is cancelled, clean out.
if (t != null && t.waitStatus != Node.CONDITION) {
unlinkCancelledWaiters();
t = lastWaiter;
}
Node node = new Node(Thread.currentThread(), Node.CONDITION);
if (t == null)
firstWaiter = node;
else
t.nextWaiter = node;
lastWaiter = node;
return node;
}

当持有lock锁的线程,执行了await方法,会将当前线程封装成Node,插入到Condition单向链表。等到其他线程执行singal唤醒,进入同步队列等到竞争锁资源

lock锁和AQS的继承关系

基本关系

img

非公平锁的逻辑

img

  1. 基于CAS操作,尝试将state从0变成1(在lock方法,非公平锁才有步骤1)

     1.1 成功拿到锁,执行
    
     1.2 竞争锁资源失败,进行后续的竞争 (进入2)
    
  2. 执行tryAcquire的逻辑

     2.1 查看state是否为0,如果为0就再次执行cas尝试拿锁
    
     2.2 查看是否是锁重入的逻辑,直接对state+1,锁重入成功
    
     2.3 再次尝试拿锁的操作失败 (进入3) 
    
  3. 执行addWaiter,准备进入同步队列排队

     3.1 将当前线程封装为Node对象
    
     3.2 将当前Node添加到同步队列的末尾
    
  4. 执行accquireQueued的逻辑,要做的不只是单纯的阻塞线程,还有被唤醒或者自旋获取锁后出队列

     4.1 获取node节点的前驱节点,判断其是否是head,是则继续抢锁(可能刚入队列就排在head后面,也有可能自旋后,有其他节点获取锁出队列,而使得node排在head后面),抢锁成功则出队换头
    
     4.2 node的前驱节点不是head或者抢锁失败,进入阻塞判断shouldParkAfterFailedAcquire
      
     4.3 判断应该放心阻塞,调用parkAndCheckInterrupt阻塞当前线程
    
  • accquireQueued
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
final boolean acquireQueued(final Node node, int arg) {
boolean failed = true;
try {
boolean interrupted = false;
for (;;) {
//拿node的前一个节点
final Node p = node.predecessor();
//若p是头节点,,说明自己排在队列的第一个尝试抢锁
if (p == head && tryAcquire(arg)) {
//node成为新的head
setHead(node);
p.next = null; // help GC
failed = false;
//拿到锁了返回false
return interrupted;
}
//1.应该阻塞,调用parkAndCheckInterrupt阻塞线程
//2.不应该阻塞,再给一次抢锁的机会
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
interrupted = true;
}
} finally {
if (failed)
//基本不可能走到这一步,除非是系统级别的异常导致获取锁失败for循环意外退出,
cancelAcquire(node);
}
}

非公平锁和公平锁的直观体现

从源码上来看,公平和非公平的直观体现是lock方法和tryAcquire方法

lock方法一般是获取锁资源的入口方法,非公平锁会直接抢一次锁资源,而公平锁不会

img

acquire的底层逻辑

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
/**
* Acquires in exclusive mode, ignoring interrupts. Implemented
* by invoking at least once {@link #tryAcquire},
* returning on success. Otherwise the thread is queued, possibly
* repeatedly blocking and unblocking, invoking {@link
* #tryAcquire} until success. This method can be used
* to implement method {@link Lock#lock}.
*
* @param arg the acquire argument. This value is conveyed to
* {@link #tryAcquire} but is otherwise uninterpreted and
* can represent anything you like.
*/

// arg = 1
public final void acquire(int arg) {
if (!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}

就是之前提到的逻辑:

  1. 先走tryAcquire,再次抢锁,抢到了就结束
  2. 没抢到执行addWaiter方法准备排队,被封装成Node,进入同步队列
  3. 再走acquireQueued方法,获取锁还是挂起线程,基于内部细粒度的逻辑

tryAcquire的底层逻辑

有公平锁和非公平锁两种实现

非公平锁实现

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
/**
* Performs non-fair tryLock. tryAcquire is implemented in
* subclasses, but both need nonfair try for trylock method.
*/

//
final boolean nonfairTryAcquire(int acquires) {
final Thread current = Thread.currentThread();
int c = getState();

// 步骤2.1
if (c == 0) {
// 执行CAS,尝试将state从0修改成1
if (compareAndSetState(0, acquires)) {
//修改成功
setExclusiveOwnerThread(current);
return true;
}
}
// 步骤2.2
// 判断是不是锁重入
else if (current == getExclusiveOwnerThread()) {
int nextc = c + acquires;
if (nextc < 0) // overflow
throw new Error("Maximum lock count exceeded");
setState(nextc);
// 锁重入成功
return true;
}
return false;
}

公平锁实现

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
protected final boolean tryAcquire(int acquires) {
final Thread current = Thread.currentThread();
int c = getState();
if (c == 0) {
// 如果当前锁资源没被占用,需要满足一定的条件才能通过CAS抢锁
// 1. 如果AQS的同步队列没有排队的Node,可以抢锁
// 2. 如果AQS的同步队列有排队的Node,并且排在“第一名”的是当前线程,可以抢锁
if (!hasQueuedPredecessors() &&
compareAndSetState(0, acquires)) {
setExclusiveOwnerThread(current);
return true;
}
}
else if (current == getExclusiveOwnerThread()) {
int nextc = c + acquires;
if (nextc < 0)
throw new Error("Maximum lock count exceeded");
setState(nextc);
return true;
}
return false;
}
  • hasQueuedPredecessors方法
1
2
3
4
5
6
7
8
9
10
public final boolean hasQueuedPredecessors() {
// The correctness of this depends on head being initialized
// before tail and on head.next being accurate if the current
// thread is first in queue.
Node t = tail; // Read fields in reverse initialization order
Node h = head;
Node s;
return h != t &&
((s = h.next) == null || s.thread != Thread.currentThread());
}

addWaiter的底层逻辑

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
/**
* Creates and enqueues node for current thread and given mode.
*
* @param mode Node.EXCLUSIVE for exclusive, Node.SHARED for shared
* @return the new node
*/
private Node addWaiter(Node mode) {
Node node = new Node(Thread.currentThread(), mode);
// Try the fast path of enq; backup to full enq on failure
Node pred = tail;
if (pred != null) {
node.prev = pred;
if (compareAndSetTail(pred, node)) {
pred.next = node;
return node;
}
}
enq(node);
return node;
}

通过CAS保证将tali指向自己,从而保证了原子性

img

如果CAS失败,会不断死循环,不断指向1,2操作

accquireQueued的底层逻辑

img

img

img

img

源码补充

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
public final void acquire(int arg) {
if (!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}

private Node addWaiter(Node mode) {
Node node = new Node(Thread.currentThread(), mode);
// Try the fast path of enq; backup to full enq on failure
Node pred = tail;
if (pred != null) {
node.prev = pred;
if (compareAndSetTail(pred, node)) {
pred.next = node;
return node;
}
}
enq(node);
return node;
}

final boolean acquireQueued(final Node node, int arg) {
boolean failed = true;
try {
boolean interrupted = false;
for (;;) {
final Node p = node.predecessor();
if (p == head && tryAcquire(arg)) {
setHead(node);
p.next = null; // help GC
failed = false;
return interrupted;
}
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
interrupted = true;
}
} finally {
if (failed)
cancelAcquire(node);
}
}

private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
int ws = pred.waitStatus;
if (ws == Node.SIGNAL) //当前线程挂起
/*
* This node has already set status asking a release
* to signal it, so it can safely park.
*/
return true;
if (ws > 0) {
/*
* Predecessor was cancelled. Skip over predecessors and
* indicate retry.
*/
do {
node.prev = pred = pred.prev;
} while (pred.waitStatus > 0);
pred.next = node;
} else {
/*
* waitStatus must be 0 or PROPAGATE. Indicate that we
* need a signal, but don't park yet. Caller will need to
* retry to make sure it cannot acquire before parking.
*/
compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
}
return false;
}


private final boolean parkAndCheckInterrupt() {
LockSupport.park(this);
return Thread.interrupted();
}


static void selfInterrupt() {
Thread.currentThread().interrupt();
}

/*
在park中:
只要permit为1或者中断状态为true,那么执行park就不能够阻塞线程。park只可能消耗掉permit,
但不会去消耗掉中断状态。
因此需要interrupt()去消耗掉,并将这个中断状态暂时保存到一个局部变量interrupted中
在selfInterrupt中:
当parkAndCheckInterrupt()方法返回true后又调用了 selfInterrupt()方法重
新设置中断标记,这样做的目的是为了让用户代码(同步代码块)能够通过
Thread.isInterrupted()等方法 感知到线程在获取同步状态的过程中被中断过。
*/

AQS的Condition支持

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
static final class Node {
static final Node SHARED = new Node();

static final Node EXCLUSIVE = null;

static final int CANCELLED = 1;

static final int SIGNAL = -1;

static final int CONDITION = -2;

static final int PROPAGATE = -3;

/*
Node只要在Condition单向链表中,状态就是上面的-2
waitStatus简写wt
*/
volatile int waitStatus;

volatile Node prev;

volatile Node next;

volatile Thread thread;

/*
单向链表的下一个节点
*/
Node nextWaiter;

final boolean isShared() {
return nextWaiter == SHARED;
}

final Node predecessor() throws NullPointerException {
Node p = prev;
if (p == null)
throw new NullPointerException();
else
return p;
}

Node() { // Used to establish initial head or SHARED marker
}

Node(Thread thread, Node mode) { // Used by addWaiter
this.nextWaiter = mode;
this.thread = thread;
}

Node(Thread thread, int waitStatus) { // Used by Condition
this.waitStatus = waitStatus;
this.thread = thread;
}
}

img

  • Condition是基于Node对象组成的单向链表
  • 在Condition中,Node状态必须是-2,如果不是-2,就可以从中移除掉了
  • Condition的Node是利用nextWaiter属性连接下一个节点
  • Condition中还有指向头尾的两个属性,分别是firstWaiter和lastWaiter

Condition的挂起操作流程

当持有lock锁的线程,执行以下4个流程:

  1. 将当前对象封装成Node对象,加入单向链表中
  2. 释放锁资源
  3. 确认当前线程的Node,没有在AQS的同步队列中。如果在,说明执行了signal方法,那个线程已经进入了同步队列。不需要挂起
  4. 没有在同步队列,直接挂起

Condition的signal唤醒操作流程

  1. 确保执行signal的线程持有锁资源
  2. 将第一个Node从单向链表中断开
  3. 将Node的状态从-2改成0
  4. 将Node移到同步队列
  5. 确保Node在同步队列中可以被唤醒。直接唤醒线程和将prev指向的Node状态设置为-1

Condition在await被唤醒后的逻辑

Condition在await被唤醒后的逻辑
1、确认被唤醒的方式:

  1. 单纯地被signal方法唤醒

    • 被interrupt中断唤醒
    • 被signal唤醒后,然后执行了interrupt(保留中断标记位)
  2. 确保Node在同步队列后,就可以跳出while循环

  3. 执行acquireQueued方法后,等待获取锁资源

  4. 在获取锁资源的同时,如果被中断过,需要确认是否保留中断标记位

  5. 如果是中断唤醒,需要将当前Node断开单向链表连接

  6. 根据中断模型,执行抛出异常、方法