Fork me on GitHub

【Java多线程】JUC锁 06. ReentrantReadWriteLock

ReentrantReadWriteLock

1. 前言

  • 读写锁,它维护了一对相关的锁 :“读取锁”和“写入锁”,一个用于读取操作,另一个用于写入操作
  • 默认不公平模式,支持公平模式,不会强加对锁的读取或写入优先访问顺序
  • 写入锁可以降级为读取锁,读取锁不能升级为写入锁

2. 源码解析

2.1 数据结构

  • ReentrantReadWriteLock包含一对读锁ReadLock和写锁WriteLock,Sync对象;读锁ReadLock和写锁WriteLock继承自Lock接口,内部的Sync对象和ReentrantReadWriteLock的Sync对象是一样的
  • ReentrantReadWriteLock的公平锁和非公平锁由Sync对象实例决定

2.2 内部类

  • ReadLock
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
public static class ReadLock implements Lock, java.io.Serializable {
private static final long serialVersionUID = -5992448646407690164L;
private final Sync sync;

protected ReadLock(ReentrantReadWriteLock lock) {
sync = lock.sync; // 同ReentrantReadWriteLock Sync一样
}

//获取锁
public void lock() {
sync.acquireShared(1);
}

//获取锁,除非当前线程被中断
public void lockInterruptibly() throws InterruptedException {
sync.acquireSharedInterruptibly(1);
}

//尝试获取锁,立即返回
public boolean tryLock() {
return sync.tryReadLock();
}

//获取读锁,除非在指定时间内写锁被占用或者当前线程中断
public boolean tryLock(long timeout, TimeUnit unit)
throws InterruptedException {
return sync.tryAcquireSharedNanos(1, unit.toNanos(timeout));
}

//释放锁
public void unlock() {
sync.releaseShared(1);
}

//读锁不支持Condition条件
public Condition newCondition() {
throw new UnsupportedOperationException();
}

public String toString() {
int r = sync.getReadLockCount();
return super.toString() +
"[Read locks = " + r + "]";
}
}
  • WriteLock
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
public static class WriteLock implements Lock, java.io.Serializable {
private static final long serialVersionUID = -4992448646407690164L;
private final Sync sync;

protected WriteLock(ReentrantReadWriteLock lock) {
sync = lock.sync; //sync 同ReentrantReadWriteLock.sync
}

//获取锁
public void lock() {
sync.acquire(1);
}

//获取锁除非当前线程中断
public void lockInterruptibly() throws InterruptedException {
sync.acquireInterruptibly(1);
}

//尝试获取锁
public boolean tryLock( ) {
return sync.tryWriteLock();
}

//指定时间尝试获取锁,除非线程中断
public boolean tryLock(long timeout, TimeUnit unit)
throws InterruptedException {
return sync.tryAcquireNanos(1, unit.toNanos(timeout));
}

//释放锁
public void unlock() {
sync.release(1);
}

public Condition newCondition() {
return sync.newCondition();
}

public String toString() {
Thread o = sync.getOwner();
return super.toString() + ((o == null) ?
"[Unlocked]" :
"[Locked by thread " + o.getName() + "]");
}

//查询锁是否被当前线程占据
public boolean isHeldByCurrentThread() {
return sync.isHeldExclusively();
}

//当前线程重入锁数
public int getHoldCount() {
return sync.getWriteHoldCount();
}
}

2.3 共享锁

2.3.1 获取锁

获取共享锁ReadLock思想:首先通过tryAcquireShared()尝试获取;失败则通过doAcquireShared()不断的循环并尝试获取锁,若有需要,则阻塞等待。

  • lock()

在ReadLock中获取锁

1
2
3
public void lock() {
sync.acquireShared(1);
}
1
2
3
4
5
//AQS中实现,获取共享锁
public final void acquireShared(int arg) {
if (tryAcquireShared(arg) < 0)
doAcquireShared(arg);
}
  • tryAcquireShared

在ReentrantReadWriteLock内部类Sync实现tryAcquireShared()

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
//尝试获取共享锁
protected final int tryAcquireShared(int unused) {
/*
* Walkthrough:
* 1. If write lock held by another thread, fail.
* 2. Otherwise, this thread is eligible for
* lock wrt state, so ask if it should block
* because of queue policy. If not, try
* to grant by CASing state and updating count.
* Note that step does not check for reentrant
* acquires, which is postponed to full version
* to avoid having to check hold count in
* the more typical non-reentrant case.
* 3. If step 2 fails either because thread
* apparently not eligible or CAS fails or count
* saturated, chain to version with full retry loop.
*/
Thread current = Thread.currentThread();
int c = getState(); //锁状态
// 如果“锁”是“互斥锁”,并且获取锁的线程不是current线程;则返回-1。
if (exclusiveCount(c) != 0 &&
getExclusiveOwnerThread() != current)
return -1;
// 获取“读取锁”的共享计数
int r = sharedCount(c);
// 如果“不需要阻塞等待”,并且“读取锁”的共享计数小于MAX_COUNT;
// 则通过CAS函数更新“锁的状态”,将“读取锁”的共享计数+1。
if (!readerShouldBlock() &&
r < MAX_COUNT &&
compareAndSetState(c, c + SHARED_UNIT)) {
if (r == 0) {
firstReader = current;
firstReaderHoldCount = 1;
} else if (firstReader == current) {
firstReaderHoldCount++;
} else {
//HoldCounter是用来统计该线程获取“读取锁”的次数。
HoldCounter rh = cachedHoldCounter;
if (rh == null || rh.tid != getThreadId(current))
cachedHoldCounter = rh = readHolds.get();
else if (rh.count == 0)
readHolds.set(rh);
rh.count++;
}
return 1;
}
return fullTryAcquireShared(current);
}

其中判断是否需要阻塞readerShouldBlock与锁模式有关:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
//FairSync:
final boolean readerShouldBlock() {
return hasQueuedPredecessors();
}
//判断当前线程前是否有线程在等待
public final boolean hasQueuedPredecessors() {
Node t = tail; // Read fields in reverse initialization order
Node h = head;
Node s;
return h != t &&
((s = h.next) == null || s.thread != Thread.currentThread());
}

//NonfairSync:
final boolean readerShouldBlock() {
return apparentlyFirstQueuedIsExclusive();
}
//AQS中实现,等待队列首线程不是独占锁
final boolean apparentlyFirstQueuedIsExclusive() {
Node h, s;
return (h = head) != null &&
(s = h.next) != null &&
!s.isShared() &&
s.thread != null;
}
  • fullTryAcquireShared
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
final int fullTryAcquireShared(Thread current) {
HoldCounter rh = null;
for (;;) {
// 获取“锁”的状态
int c = getState();
// 如果“锁”是“互斥锁”,并且获取锁的线程不是current线程;则返回-1。
if (exclusiveCount(c) != 0) {
if (getExclusiveOwnerThread() != current)
return -1;
// 如果“需要阻塞等待”。
// (01) 当“需要阻塞等待”的线程是第1个获取锁的线程的话,则继续往下执行。
// (02) 当“需要阻塞等待”的线程获取锁的次数=0时,则返回-1。
} else if (readerShouldBlock()) {
// 如果想要获取锁的线程(current)是第1个获取锁(firstReader)的线程
if (firstReader == current) {
} else {
if (rh == null) {
rh = cachedHoldCounter;
if (rh == null || rh.tid != current.getId()) {
rh = readHolds.get();
if (rh.count == 0)
readHolds.remove();
}
}
// 如果当前线程获取锁的计数=0,则返回-1。
if (rh.count == 0)
return -1;
}
}
// 如果“不需要阻塞等待”,则获取“读取锁”的共享统计数;
// 如果共享统计数超过MAX_COUNT,则抛出异常。
if (sharedCount(c) == MAX_COUNT)
throw new Error("Maximum lock count exceeded");
// 将线程获取“读取锁”的次数+1。
if (compareAndSetState(c, c + SHARED_UNIT)) {
// 如果是第1次获取“读取锁”,则更新firstReader和firstReaderHoldCount。
if (sharedCount(c) == 0) {
firstReader = current;
firstReaderHoldCount = 1;
// 如果想要获取锁的线程(current)是第1个获取锁(firstReader)的线程,
// 则将firstReaderHoldCount+1。
} else if (firstReader == current) {
firstReaderHoldCount++;
} else {
if (rh == null)
rh = cachedHoldCounter;
if (rh == null || rh.tid != current.getId())
rh = readHolds.get();
else if (rh.count == 0)
readHolds.set(rh);
// 更新线程的获取“读取锁”的共享计数
rh.count++;
cachedHoldCounter = rh; // cache for release
}
return 1;
}
}
}
  • doAcquireShared

doAcquireShared()的作用是在CLH队列中自旋获取共享锁。doAcquireShared()在每一次尝试获取锁时,是通过tryAcquireShared()来执行的!

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
private void doAcquireShared(int arg) {
// 创建“当前线程”对应的Shared节点,并将该线程添加到CLH队列中。
final Node node = addWaiter(Node.SHARED);
boolean failed = true;
try {
boolean interrupted = false;
for (;;) {
// 获取“node”的前一节点
final Node p = node.predecessor();
// 如果“当前线程”是CLH队列的表头,则尝试获取共享锁。
if (p == head) {
int r = tryAcquireShared(arg);
if (r >= 0) {
setHeadAndPropagate(node, r);
p.next = null; // help GC
if (interrupted)
selfInterrupt();
failed = false;
return;
}
}
// 如果“当前线程”不是CLH队列的表头,则通过shouldParkAfterFailedAcquire()判断是否需要等待,
// 需要的话,则通过parkAndCheckInterrupt()进行阻塞等待。若阻塞等待过程中,线程被中断过,则设置interrupted为true。
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
interrupted = true;
}
} finally {
if (failed)
cancelAcquire(node);
}
}

2.3.2 释放锁

释放共享锁的思想,是先通过tryReleaseShared()尝试释放共享锁。尝试成功的话,则通过doReleaseShared()唤醒“其他等待获取共享锁的线程”,并返回true;否则的话,返回flase。

  • unlock()
1
2
3
public void unlock() {
sync.releaseShared(1);
}
  • releaseShared()
1
2
3
4
5
6
7
8
//AQS中实现
public final boolean releaseShared(int arg) {
if (tryReleaseShared(arg)) {
doReleaseShared();
return true;
}
return false;
}
  • tryReleaseShared()
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
protected final boolean tryReleaseShared(int unused) {
// 获取当前线程,即释放共享锁的线程。
Thread current = Thread.currentThread();
// 如果想要释放锁的线程(current)是第1个获取锁(firstReader)的线程,
// 并且“第1个获取锁的线程获取锁的次数”=1,则设置firstReader为null;
// 否则,将“第1个获取锁的线程的获取次数”-1。
if (firstReader == current) {
// assert firstReaderHoldCount > 0;
if (firstReaderHoldCount == 1)
firstReader = null;
else
firstReaderHoldCount--;
// 获取rh对象,并更新“当前线程获取锁的信息”。
} else {

HoldCounter rh = cachedHoldCounter;
if (rh == null || rh.tid != current.getId())
rh = readHolds.get();
int count = rh.count;
if (count <= 1) {
readHolds.remove();
if (count <= 0)
throw unmatchedUnlockException();
}
--rh.count;
}
for (;;) {
// 获取锁的状态
int c = getState();
// 将锁的获取次数-1。
int nextc = c - SHARED_UNIT;
// 通过CAS更新锁的状态。
if (compareAndSetState(c, nextc))
return nextc == 0;
}
}
  • doReleaseShared()
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
//AQS中实现,会释放“共享锁”。从前往后的遍历CLH队列,依次“唤醒”然后“执行”队列中每个节点对应的线程;最终的目的是让这些线程释放它们所持有的锁。
private void doReleaseShared() {
for (;;) {
// 获取CLH队列的头节点
Node h = head;
// 如果头节点不为null,并且头节点不等于tail节点。
if (h != null && h != tail) {
// 获取头节点对应的线程的状态
int ws = h.waitStatus;
// 如果头节点对应的线程是SIGNAL状态,则意味着“头节点的下一个节点所对应的线程”需要被unpark唤醒。
if (ws == Node.SIGNAL) {
// 设置“头节点对应的线程状态”为空状态。失败的话,则继续循环。
if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
continue;
// 唤醒“头节点的下一个节点所对应的线程”。
unparkSuccessor(h);
}
// 如果头节点对应的线程是空状态,则设置“文件点对应的线程所拥有的共享锁”为其它线程获取锁的空状态。
else if (ws == 0 &&
!compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
continue; // loop on failed CAS
}
// 如果头节点发生变化,则继续循环。否则,退出循环。
if (h == head) // loop if head changed
break;
}
}

3. 参考

http://www.cnblogs.com/skywang12345/p/3505809.html