StampedLock源码解读
1. 字段
// 处理器数量
private static final int NCPU = Runtime.getRuntime().availableProcessors();
// 入队前的最大重试次数
private static final int SPINS = (NCPU > 1) ? 1 << 6 : 0;
// 作为首节点阻塞前的最大重试次数
private static final int HEAD_SPINS = (NCPU > 1) ? 1 << 10 : 0;
// 再次阻塞前的最大重试次数
private static final int MAX_HEAD_SPINS = (NCPU > 1) ? 1 << 16 : 0;
// 等待自旋上溢的闲置时期,必须为2的幂次-1
private static final int OVERFLOW_YIELD_RATE = 7;
// 在上溢前记录读锁的位数,占据低位
private static final int LG_READERS = 7;
// 每次读增加的单位
private static final long RUNIT = 1L;
// 写锁所在位数
// 1000 0000
private static final long WBIT = 1L << LG_READERS;
// 读锁所在位数
// 0111 1111
private static final long RBITS = WBIT - 1L;
// 读锁上限
// 0111 1110
private static final long RFULL = RBITS - 1L;
// 掩码
// 1111 1111
private static final long ABTIS = RBITS | WBITS;
// 读锁反数
// 1111 1111 1111 1111 1111 1111 1111 1111 1111 1111 1111 1111 1111 1111 1000 0000
private static final long SBITS = ~RBITS;
// 锁状态初始值
// 0001 0000 0000
private static final long ORIGIN = WBIT << 1;
// 取消获取锁时的特殊状态,表示中断
private static final long INTERRUPTED = 1L;
// 等待状态
private static final int WAITING = -1;
// 取消状态
private static final int CANCELLED = 1;
// 节点读模式
private static final int RMODE = 0;
// 节点写模式
private static final int WMODE = 1;
// CLH队列首节点
private transient volatile WNode whead;
// CLH队列尾节点
private transient volatile WNode wtail;
// 视图
transient ReadLockView readLockView;
transient WriteLockView writeLockView;
transient ReadWriteLockView readWriteLockView;
// 锁队列状态
// 高24位为版本号,每次获取和释放写锁都会+1,25位为写锁标志,剩余7位为读锁计数,记录当前读锁数量
private transient volatile long state;
// 额外的读锁计数
private transient int readerOverflow;
2. 内部类
// 节点在队列中排列,连续的写节点会在队列中,连续的读节点会在cowait中
// 等待节点
static final class WNode {
volatile WNode prev;
volatile WNode next;
volatile WNode cowait; // 读线程所在链表
volatile Thread thread; // 当前节点阻塞的线程
volatile int status; // 0、WAITING或者CANCELLED
final int mode; // RMODE或者WMODE
WNode(int m, WNode p) { mode = m; prev = p; }
}
3. 构造函数
public StampedLock() { state = ORIGIN; }
4. 写锁
// 尝试获取独占写锁,阻塞直到成功获取
public long writeLock() {
long s, next;
return ((((s = state) & ABITS) == 0L && // 写锁未被获取
// 尝试更新状态,增加一个写计数
U.compareAndSwapLong(this, STATE, s, next = s + WBIT)) ?
next : acquireWrite(false, 0L));
}
private long acquireWrite(boolean interruptible, long deadline) {
WNode node = null, p;
// 第一次自旋
for (int spins = -1;;) {
long m, s, ns;
if ((m = (s = state) & ABITS) == 0L) {
// 再次尝试获取写锁
if (U.compareAndSwapLong(this, STATE, s, ns + WBIT))
return ns;
} else if (spins < 0)
// 存在独占写锁并且队列为空,自旋
spins = (m == WBITS && wtail == whead) ? SPINS : 0;
else if (spins > 0) {
// 随机减少一次自旋次数
if (LockSupport.nextSecondarySeed() >= 0)
--spins;
} else if ((p = wtail) == null) { // 队列为空
// 创建哑节点作为首节点
WNode hd = new WNode(WMODE, null);
if (U.compareAndSwapObject(this, WHEAD, null, hd))
wtail = hd;
} else if (node == null) // 队列非空,创建节点
node = new WNode(WMODE, p);
else if (node.prev != p) // 尾节点改变,更新尾节点
node.prev = p;
else if (U.compareAndSwapObject(this, WTAIL, p, node)) { // 尝试将当前节点设为尾节点,退出循环
p.next = node;
break;
}
}
// 第二次自旋
for (int spins = -1;;) {
WNode h, np, pp; int ps;
// 前驱节点为首节点
if ((h = whead) == p) {
if (spins < 0) // 设置自旋次数
spins = HEAD_SPINS;
else if (spins < MAX_HEAD_SPINS) // 增加自旋次数
spins <<= 1;
// 第三次自旋
for (int k = spins;;) {
long s, ns;
if (((s = state) & ABITS) == 0L) {
// 尝试获取写锁
if (U.compareAndSwapLong(this, STATE, s,
ns = s + WBIT)) {
// 获取成功,设置当前节点为首节点
whead = node;
node.prev = null;
return ns;
}
} else if (LockSupport.nextSecondarySeed() >= 0 &&
--k <= 0) // 随机减少自旋次数
break;
}
} else if (h != null) { // 前驱节点非空
WNode c; Thread w;
// 首节点相连的读线程链表非空
while ((c = h.cowait) != null) {
// 唤醒读线程节点
if (U.compareAndSwapObject(h, WCOWAIT, c, c.cowait) &&
(w = c.thread) != null)
U.unpark(w);
}
}
// 首节点未改变
if (whead == h) {
// 前驱节点改变,更新前驱节点
if ((np = node.prev) != p) {
if (np != null)
(p = np).next = node;
} else if ((ps = p.status) == 0)
// 设置前驱节点为WAITING状态
U.compareAndSwapInt(p, WSTATUS, 0, WAITING);
else if (ps == CANCELLED) { // 前驱节点被取消
// 删除前驱节点
if ((pp = p.prev) != null) {
node.prev = pp;
pp.next = node;
}
} else {
long time;
if (deadline = 0L) // 未设置超时时间
time = 0L;
else if ((time = deadline - System.nanoTime()) <= 0L)
// 超时,取消当前节点
return cancelWaiter(node, node, false);
Thread wt = Thread.currentThread();
// 记录节点阻塞的锁
U.putObject(wt, PARKLOCKER, this);
// 设置当前节点线程为当前线程
node.thread = wt;
// 前驱节点被阻塞、前驱节点非首节点或者写锁已经被获取、首节点未改变、前驱节点未改变
if (p.status < 0 && (p != h || (state & ABITS) != 0L) &&
whead == h && node.prev == p)
// 阻塞当前线程
U.park(false, time);
// 当前节点被唤醒,清除线程
node.thread = null;
U.putObject(wt, PARKLOCKER, null);
// 是否检查中断,如果中断,取消当前节点
if (interruptible && Thread.interrupted())
return cancelWaiter(node, node, true);
}
}
}
}
总结:
- 判断写锁是否被获取,没有则尝试获取;
- 前驱节点为首节点,自旋,自旋次数为0则入队
- 前驱节点为首节点,自旋获取锁
- 否则尝试唤醒头节点中阻塞的读线程
- 没有获取写锁,阻塞当前线程
public void unlockWrite(long stamp) {
WNode h;
// 检查版本号
if (state != stamp || (stamp & WBIT) == 0L)
throw new IllegalMonitorStateException();
// 更新版本号
state = (stamp += WBIT) == 0L ? ORIGIN : stamp;
// 队列非空,状态非0,唤醒下一个节点
if ((h = whead) != null && h.status != 0)
release(h);
}
private void release(WNode h) {
if (h != null) {
WNode q; Thread w;
// 状态设为0
U.compareAndSwapInt(h, WSTATUS, WAITING, 0);
// 首节点的下一个节点为空或者被取消
if ((q = h.next) == null || q.status == CANCELLED) {
// 尾遍历查找阻塞节点
for (WNode t = wtail; t != null && t != h; t = t.prev)
if (t.status <= 0)
q = t;
}
// 唤醒节点
if (q != null && (w = q.thread) != null)
U.unpark(w);
}
}
5. 读锁
public long readLock() {
long s = state, next;
return ((whead == wtail && (s & ABITS) < RFULL && // 没有写锁并且读锁未达上限
U.compareAndSwapLong(this, STATE, s, next = s + RUNIT)) ? // 增加读锁计数
next : acquireRead(false, 0L));
}
private long acquireRead(boolean interruptible, long deadline) {
WNode node = null, p;
// 第一次自旋
for (int spins = -1;;) {
WNode h;
// 队列为空
if ((h = whead) == (p = wtail)) {
// 第二次自旋
for (long m, s, ns;;) {
if ((m = (s = state) & ABITS) < RFULL ? // 读锁未达上限
U.compareAndSwapLong(this, STATE, s, ns = s + RUNIT) : // 增加读锁计数
(m < WBIT && (ns = tryIncReaderOverflow(s)) != 0L)) // 读锁达上限,溢出处理
return ns;
else if (m >= WBIT) { // 写锁被获取
if (spins > 0) { // 随机减少自旋次数
if (LockSupport.nextSecondarySeed() >= 0)
--spins;
} else {
if (spins == 0) {
WNode nh = whead, np = wtail;
// 头尾节点未改变或者队列非空
if ((nh == h && np == p) || (h = nh) != (p = np))
break;
}
// 重置自旋次数
spins = SPINS;
}
}
}
}
// 尾节点为空,初始化
if (p == null) {
WNode hd = new WNode(WMODE, null);
if (U.compareAndSwapObject(this, WHEAD, null, hd))
wtail = hd;
} else if (node == null) // 当前节点为空,初始化
node = new WNode(RMODE, p);
else if (h == p || p.mode != RMODE) { // 队列为空或者尾节点不是读模式
if (node.prev != p) // 入队
node.prev = p;
else if (U.compareAndSwapObject(this, WTAIL, p, node)) {
// 更新尾节点,进入第五次自旋
p.next = node;
break;
}
} else if (!U.compareAndSwapObject(p, WCOWAIT, node.cowait = p.cowait, node)) // 尾节点为读模式,设置当前节点cowait字段
node.cowait = null;
else {
// 第三次自旋
for (;;) {
WNode pp, c; Thread w;
// 头节点非空、头节点cowait非空、唤醒cowait节点中的线程
if ((h = whead) != null && (c = h.cowait) != null &&
U.compareAndSwapObject(h, WCOWAIT, c, c.cowait) &&
(w = c.thread) != null)
U.unpark(w);
// 前驱节点的前驱节点为头节点或者前驱节点为头节点
if (h == (pp = p.prev) || h == p || pp == null) {
long m, s, ns;
// 第四次自旋
do {
if ((m = (s = state) & ABITS) < RFULL ? // 读锁未达上限
U.compareAndSwapLong(this, STATE, s, // 增加读锁计数
ns = s + RUNIT) :
(m < WBIT && // 写锁未被获取
(ns = tryIncReaderOverflow(s)) != 0L)) // 溢出处理
} while (m < WBIT); // 写锁未被获取
}
// 头节点和前置节点未被改变
if (whead == h && p.prev == pp) {
long time;
// 前前驱节点为空或者前驱节点为头节点或者前驱节点被取消
if (pp == null || h == p || p.status > 0) {
// 跳出循环,进入第一次自旋
node = null;
break;
}
// 超时处理,没有设置则不处理
if (deadline == 0L)
time = 0L;
else if ((time = deadline - System.nanoTime()) <= 0L)
return cancelWaiter(node, p, false);
Thread wt = Thread.currentThread();
U.putObject(wt, PARKLOCKER, this);
node.thread = wt;
// 前前驱节点非头节点、写锁被获取并且头节点尾节点未改变
if ((h != pp || (state & ABITS) == WBIT) &&
whead == h && p.prev == pp)
// 阻塞当前线程
U.park(false, time);
node.thread = null;
U.putObject(wt, PARKLOCKER, null);
// 如果设置中断标志,判断是否中断
if (interruptible && Thread.interrupted())
return cancelWaiter(node, p, true);
}
}
}
}
// 第五次自旋
for (int spins = -1;;) {
WNode h, np, pp; int ps;
if ((h = whead) == p) { // 队列为空
// 设置自旋次数
if (spins < 0)
spins = HEAD_SPINS;
else if (spins < MAX_HEAD_SPINS)
spins <<= 1;
// 第六次自旋
for (int k = spins;;) {
long m, s, ns;
if ((m = (s = state) & ABTIS) < RFULL ? // 读锁未达上限
U.compareAndSwapLong(this, STATE, s, ns = s + RUNIT) : // 增加读锁计数
(m < WBIT && (ns = tryIncReaderOverflow(s)) != 0L)) { // 写锁未被获取,溢出处理
WNode c; Thread w;
whead = node;
node.prev = null;
// 唤醒等待线程
while ((c = node.cowait) != null) {
if (U.compareAndSwapObject(node, WCOWAIT,
c, c.cowait) &&
(w = c.thread) != null)
U.unpark(w);
}
return ns;
} else if (m >= WBIT && // 写锁被获取
LockSupport.nextSecondarySeed() >= 0 && --k <= 0) // 自旋次数达到上限
break;
}
} else if (h != null) { // 头节点非空
WNode c; Thread w;
// 唤醒等待线程
while ((c = h.cowait) != null) {
if (U.compareAndSwapObject(h, WCOWAIT, c, c.cowait) &&
(w = c.thread) != null)
U.unpark(w);
}
}
if (whead == h) { // 头节点未改变
if ((np = node.prev) != p) { // 前驱节点改变
if (np != null) // 更新前驱节点
(p = np).next = node;
} else if ((ps = p.status) == 0) // 前驱节点状态为0
U.compareAndSwapInt(p, WSTATUS, 0, WAITING); // 设置阻塞状态
else if (ps == CANCELLED) { // 前驱节点被取消
if ((pp = p.prev) != null) { // 移除前驱节点
node.prev = pp;
pp.next = node;
}
} else {
long time;
// 超时处理,未设置则不处理
if (deadline == 0L)
time = 0L;
else if ((time = deadline - System.nanoTime()) <= 0L)
return cancelWaiter(node, node, false);
Thread wt = Thread.currentThread();
U.putObject(wt, PARKLOCKER, this);
node.thread = wt;
// 阻塞当前线程
if (p.status < 0 &&
(p != h || (state & ABITS) == WBIT) &&
whead == h && node.prev == p)
U.park(false, time);
node.thread = null;
U.putObject(wt, PARKLOCKER, null);
// 中断处理
if (interruptible && Thread.interrupted())
return cancelWaiter(node, node, true);
}
}
}
}
总结:
- 如果首尾节点相同,自旋获取锁;
- 如果首尾节点不同,入队
- 如果当前节点之前的节点为写节点,进入第五次自旋;
- 如果当前节点之前的节点为读节点,进入之前读节点的cowait中;
- 再次检测首尾节点是否相同,如果相同,自旋获取锁;
- 如果不相同,阻塞当前线程。
public void unlockRead(long stamp) {
long s, m; WNode h;
for (;;) {
// 检查版本号是否一致
if (((s = state) & SBITS) != (stamp & SBITS) ||
(stamp & ABITS) == 0L || (m = s & ABITS) == 0L || m == WBIT)
throw new IllegalMonitorStateException();
// 读锁未达上限
if (m < RFULL) {
// 释放读锁,减少读锁计数
if (U.compareAndSwapLong(this, STATE, s, s - RUNIT)) {
if (m == RUNIT && (h = whead) != null && h.status != 0)
release(h);
break;
}
} else if (tryDecReaderOverflow(s) != 0L) // 溢出处理
break;
}
}
6. 乐观锁
public long tryOptimisticRead() {
long s;
// 写锁未被获取,返回高25位
return (((s = state) & WBIT) == 0L) ? (s & SBITS) : 0L;
}
public boolean validate(long stamp) {
// 加入内存屏障,刷新数据
U.loadFence();
// 检查版本号是否一致
return (stamp & SBITS) == (state & SBITS);
}