回到顶部 暗色模式

StampedLock源码解读

1. 字段

// 处理器数量
private static final int NCPU = Runtime.getRuntime().availableProcessors();
// 入队前的最大重试次数
private static final int SPINS = (NCPU > 1) ? 1 << 6 : 0;
// 作为首节点阻塞前的最大重试次数
private static final int HEAD_SPINS = (NCPU > 1) ? 1 << 10 : 0;
// 再次阻塞前的最大重试次数
private static final int MAX_HEAD_SPINS = (NCPU > 1) ? 1 << 16 : 0;
// 等待自旋上溢的闲置时期,必须为2的幂次-1
private static final int OVERFLOW_YIELD_RATE = 7;
// 在上溢前记录读锁的位数,占据低位
private static final int LG_READERS = 7;
// 每次读增加的单位
private static final long RUNIT = 1L;
// 写锁所在位数
// 1000 0000
private static final long WBIT = 1L << LG_READERS;
// 读锁所在位数
// 0111 1111
private static final long RBITS = WBIT - 1L;
// 读锁上限
// 0111 1110
private static final long RFULL = RBITS - 1L;
// 掩码
// 1111 1111
private static final long ABTIS = RBITS | WBITS;
// 读锁反数
// 1111 1111 1111 1111 1111 1111 1111 1111 1111 1111 1111 1111 1111 1111 1000 0000
private static final long SBITS = ~RBITS;
// 锁状态初始值
// 0001 0000 0000
private static final long ORIGIN = WBIT << 1;
// 取消获取锁时的特殊状态,表示中断
private static final long INTERRUPTED = 1L;
// 等待状态
private static final int WAITING = -1;
// 取消状态
private static final int CANCELLED = 1;
// 节点读模式
private static final int RMODE = 0;
// 节点写模式
private static final int WMODE = 1;
// CLH队列首节点
private transient volatile WNode whead;
// CLH队列尾节点
private transient volatile WNode wtail;
// 视图
transient ReadLockView readLockView;
transient WriteLockView writeLockView;
transient ReadWriteLockView readWriteLockView;
// 锁队列状态
// 高24位为版本号,每次获取和释放写锁都会+1,25位为写锁标志,剩余7位为读锁计数,记录当前读锁数量
private transient volatile long state;
// 额外的读锁计数
private transient int readerOverflow;

2. 内部类

// 节点在队列中排列,连续的写节点会在队列中,连续的读节点会在cowait中
// 等待节点
static final class WNode {
    volatile WNode prev;
    volatile WNode next;
    volatile WNode cowait; // 读线程所在链表
    volatile Thread thread; // 当前节点阻塞的线程
    volatile int status; // 0、WAITING或者CANCELLED
    final int mode; // RMODE或者WMODE
    WNode(int m, WNode p) { mode = m; prev = p; }
}

3. 构造函数

public StampedLock() { state = ORIGIN; }

4. 写锁

// 尝试获取独占写锁,阻塞直到成功获取
public long writeLock() {
    long s, next;
    return ((((s = state) & ABITS) == 0L && // 写锁未被获取
             // 尝试更新状态,增加一个写计数
             U.compareAndSwapLong(this, STATE, s, next = s + WBIT)) ?
             next : acquireWrite(false, 0L));
}

private long acquireWrite(boolean interruptible, long deadline) {
    WNode node = null, p;
    // 第一次自旋
    for (int spins = -1;;) {
        long m, s, ns;
        if ((m = (s = state) & ABITS) == 0L) {
            // 再次尝试获取写锁
            if (U.compareAndSwapLong(this, STATE, s, ns + WBIT))
                return ns;
        } else if (spins < 0)
            // 存在独占写锁并且队列为空,自旋
            spins = (m == WBITS && wtail == whead) ? SPINS : 0;
        else if (spins > 0) {
            // 随机减少一次自旋次数
            if (LockSupport.nextSecondarySeed() >= 0)
                --spins;
        } else if ((p = wtail) == null) { // 队列为空
            // 创建哑节点作为首节点
            WNode hd = new WNode(WMODE, null);
            if (U.compareAndSwapObject(this, WHEAD, null, hd))
                wtail = hd;
        } else if (node == null) // 队列非空,创建节点
            node = new WNode(WMODE, p);
        else if (node.prev != p) // 尾节点改变,更新尾节点
            node.prev = p;
        else if (U.compareAndSwapObject(this, WTAIL, p, node)) { // 尝试将当前节点设为尾节点,退出循环
            p.next = node;
            break;
        }
    }
    // 第二次自旋
    for (int spins = -1;;) {
        WNode h, np, pp; int ps;
        // 前驱节点为首节点
        if ((h = whead) == p) {
            if (spins < 0) // 设置自旋次数
                spins = HEAD_SPINS;
            else if (spins < MAX_HEAD_SPINS) // 增加自旋次数
                spins <<= 1;
            // 第三次自旋
            for (int k = spins;;) {
                long s, ns;
                if (((s = state) & ABITS) == 0L) {
                    // 尝试获取写锁
                    if (U.compareAndSwapLong(this, STATE, s,
                                             ns = s + WBIT)) {
                        // 获取成功,设置当前节点为首节点
                        whead = node;
                        node.prev = null;
                        return ns;
                    }
                } else if (LockSupport.nextSecondarySeed() >= 0 &&
                           --k <= 0) // 随机减少自旋次数
                    break;
            }
        } else if (h != null) { // 前驱节点非空
            WNode c; Thread w;
            // 首节点相连的读线程链表非空
            while ((c = h.cowait) != null) {
                // 唤醒读线程节点
                if (U.compareAndSwapObject(h, WCOWAIT, c, c.cowait) &&
                    (w = c.thread) != null)
                    U.unpark(w);
            }
        }
        // 首节点未改变
        if (whead == h) {
            // 前驱节点改变,更新前驱节点
            if ((np = node.prev) != p) {
                if (np != null)
                    (p = np).next = node;
            } else if ((ps = p.status) == 0)
                // 设置前驱节点为WAITING状态
                U.compareAndSwapInt(p, WSTATUS, 0, WAITING);
            else if (ps == CANCELLED) { // 前驱节点被取消
                // 删除前驱节点
                if ((pp = p.prev) != null) {
                    node.prev = pp;
                    pp.next = node;
                }
            } else {
                long time;
                if (deadline = 0L) // 未设置超时时间
                    time = 0L;
                else if ((time = deadline - System.nanoTime()) <= 0L)
                    // 超时,取消当前节点
                    return cancelWaiter(node, node, false);
                Thread wt = Thread.currentThread();
                // 记录节点阻塞的锁
                U.putObject(wt, PARKLOCKER, this);
                // 设置当前节点线程为当前线程
                node.thread = wt;
                // 前驱节点被阻塞、前驱节点非首节点或者写锁已经被获取、首节点未改变、前驱节点未改变
                if (p.status < 0 && (p != h || (state & ABITS) != 0L) &&
                    whead == h && node.prev == p)
                    // 阻塞当前线程
                    U.park(false, time);
                // 当前节点被唤醒,清除线程
                node.thread = null;
                U.putObject(wt, PARKLOCKER, null);
                // 是否检查中断,如果中断,取消当前节点
                if (interruptible && Thread.interrupted())
                    return cancelWaiter(node, node, true);
            }
        }
    }
}

       总结:

  1. 判断写锁是否被获取,没有则尝试获取;
  2. 前驱节点为首节点,自旋,自旋次数为0则入队
  3. 前驱节点为首节点,自旋获取锁
  4. 否则尝试唤醒头节点中阻塞的读线程
  5. 没有获取写锁,阻塞当前线程
public void unlockWrite(long stamp) {
    WNode h;
    // 检查版本号
    if (state != stamp || (stamp & WBIT) == 0L)
        throw new IllegalMonitorStateException();
    // 更新版本号
    state = (stamp += WBIT) == 0L ? ORIGIN : stamp;
    // 队列非空,状态非0,唤醒下一个节点
    if ((h = whead) != null && h.status != 0)
        release(h);
}

private void release(WNode h) {
    if (h != null) {
        WNode q; Thread w;
        // 状态设为0
        U.compareAndSwapInt(h, WSTATUS, WAITING, 0);
        // 首节点的下一个节点为空或者被取消
        if ((q = h.next) == null || q.status == CANCELLED) {
            // 尾遍历查找阻塞节点
            for (WNode t = wtail; t != null && t != h; t = t.prev)
                if (t.status <= 0)
                    q = t;
        }
        // 唤醒节点
        if (q != null && (w = q.thread) != null)
            U.unpark(w);
    }
}

5. 读锁

public long readLock() {
    long s = state, next;
    return ((whead == wtail && (s & ABITS) < RFULL && // 没有写锁并且读锁未达上限
             U.compareAndSwapLong(this, STATE, s, next = s + RUNIT)) ? // 增加读锁计数
             next : acquireRead(false, 0L));
}

private long acquireRead(boolean interruptible, long deadline) {
    WNode node = null, p;
    // 第一次自旋
    for (int spins = -1;;) {
        WNode h;
        // 队列为空
        if ((h = whead) == (p = wtail)) {
            // 第二次自旋
            for (long m, s, ns;;) {
                if ((m = (s = state) & ABITS) < RFULL ? // 读锁未达上限
                    U.compareAndSwapLong(this, STATE, s, ns = s + RUNIT) : // 增加读锁计数
                    (m < WBIT && (ns = tryIncReaderOverflow(s)) != 0L)) // 读锁达上限,溢出处理
                    return ns;
                else if (m >= WBIT) { // 写锁被获取
                    if (spins > 0) { // 随机减少自旋次数
                        if (LockSupport.nextSecondarySeed() >= 0)
                            --spins;
                    } else {
                        if (spins == 0) {
                            WNode nh = whead, np = wtail;
                            // 头尾节点未改变或者队列非空
                            if ((nh == h && np == p) || (h = nh) != (p = np))
                                break;
                        }
                        // 重置自旋次数
                        spins = SPINS;
                    }
                }
            }
        }
        // 尾节点为空,初始化
        if (p == null) {
            WNode hd = new WNode(WMODE, null);
            if (U.compareAndSwapObject(this, WHEAD, null, hd))
                wtail = hd;
        } else if (node == null) // 当前节点为空,初始化
            node = new WNode(RMODE, p);
        else if (h == p || p.mode != RMODE) { // 队列为空或者尾节点不是读模式
            if (node.prev != p) // 入队
                node.prev = p;
            else if (U.compareAndSwapObject(this, WTAIL, p, node)) {
                // 更新尾节点,进入第五次自旋
                p.next = node;
                break;
            }
        } else if (!U.compareAndSwapObject(p, WCOWAIT, node.cowait = p.cowait, node)) // 尾节点为读模式,设置当前节点cowait字段
            node.cowait = null;
        else {
            // 第三次自旋
            for (;;) {
                WNode pp, c; Thread w;
                // 头节点非空、头节点cowait非空、唤醒cowait节点中的线程
                if ((h = whead) != null && (c = h.cowait) != null &&
                    U.compareAndSwapObject(h, WCOWAIT, c, c.cowait) &&
                    (w = c.thread) != null)
                    U.unpark(w);
                // 前驱节点的前驱节点为头节点或者前驱节点为头节点
                if (h == (pp = p.prev) || h == p || pp == null) {
                    long m, s, ns;
                    // 第四次自旋
                    do {
                        if ((m = (s = state) & ABITS) < RFULL ? // 读锁未达上限
                            U.compareAndSwapLong(this, STATE, s, // 增加读锁计数
                                                 ns = s + RUNIT) :
                            (m < WBIT && // 写锁未被获取
                             (ns = tryIncReaderOverflow(s)) != 0L)) // 溢出处理
                    } while (m < WBIT); // 写锁未被获取
                }
                // 头节点和前置节点未被改变
                if (whead == h && p.prev == pp) {
                    long time;
                    // 前前驱节点为空或者前驱节点为头节点或者前驱节点被取消
                    if (pp == null || h == p || p.status > 0) {
                        // 跳出循环,进入第一次自旋
                        node = null;
                        break;
                    }
                    // 超时处理,没有设置则不处理
                    if (deadline == 0L)
                        time = 0L;
                    else if ((time = deadline - System.nanoTime()) <= 0L)
                        return cancelWaiter(node, p, false);
                    Thread wt = Thread.currentThread();
                    U.putObject(wt, PARKLOCKER, this);
                    node.thread = wt;
                    // 前前驱节点非头节点、写锁被获取并且头节点尾节点未改变
                    if ((h != pp || (state & ABITS) == WBIT) &&
                         whead == h && p.prev == pp)
                        // 阻塞当前线程
                        U.park(false, time);
                    node.thread = null;
                    U.putObject(wt, PARKLOCKER, null);
                    // 如果设置中断标志,判断是否中断
                    if (interruptible && Thread.interrupted())
                        return cancelWaiter(node, p, true);
                }
            }
        }
    }
    // 第五次自旋
    for (int spins = -1;;) {
        WNode h, np, pp; int ps;
        if ((h = whead) == p) { // 队列为空
            // 设置自旋次数
            if (spins < 0)
                spins = HEAD_SPINS;
            else if (spins < MAX_HEAD_SPINS)
                spins <<= 1;
            // 第六次自旋
            for (int k = spins;;) {
                long m, s, ns;
                if ((m = (s = state) & ABTIS) < RFULL ? // 读锁未达上限
                    U.compareAndSwapLong(this, STATE, s, ns = s + RUNIT) : // 增加读锁计数
                    (m < WBIT && (ns = tryIncReaderOverflow(s)) != 0L)) { // 写锁未被获取,溢出处理
                    WNode c; Thread w;
                    whead = node;
                    node.prev = null;
                    // 唤醒等待线程
                    while ((c = node.cowait) != null) {
                        if (U.compareAndSwapObject(node, WCOWAIT,
                                                   c, c.cowait) &&
                            (w = c.thread) != null)
                            U.unpark(w);
                    }
                    return ns;
                } else if (m >= WBIT && // 写锁被获取
                           LockSupport.nextSecondarySeed() >= 0 && --k <= 0) // 自旋次数达到上限
                    break;
            }
        } else if (h != null) { // 头节点非空
            WNode c; Thread w;
            // 唤醒等待线程
            while ((c = h.cowait) != null) {
                if (U.compareAndSwapObject(h, WCOWAIT, c, c.cowait) &&
                    (w = c.thread) != null)
                    U.unpark(w);
            }
        }
        if (whead == h) { // 头节点未改变
            if ((np = node.prev) != p) { // 前驱节点改变
                if (np != null) // 更新前驱节点
                    (p = np).next = node;
            } else if ((ps = p.status) == 0) // 前驱节点状态为0
                U.compareAndSwapInt(p, WSTATUS, 0, WAITING); // 设置阻塞状态
            else if (ps == CANCELLED) { // 前驱节点被取消
                if ((pp = p.prev) != null) { // 移除前驱节点
                    node.prev = pp;
                    pp.next = node;
                }
            } else {
                long time;
                // 超时处理,未设置则不处理
                if (deadline == 0L)
                    time = 0L;
                else if ((time = deadline - System.nanoTime()) <= 0L)
                    return cancelWaiter(node, node, false);
                Thread wt = Thread.currentThread();
                U.putObject(wt, PARKLOCKER, this);
                node.thread = wt;
                // 阻塞当前线程
                if (p.status < 0 &&
                    (p != h || (state & ABITS) == WBIT) &&
                    whead == h && node.prev == p)
                    U.park(false, time);
                node.thread = null;
                U.putObject(wt, PARKLOCKER, null);
                // 中断处理
                if (interruptible && Thread.interrupted())
                    return cancelWaiter(node, node, true);
            }
        }
    }
}

        总结:

  1. 如果首尾节点相同,自旋获取锁;
  2. 如果首尾节点不同,入队
  3. 如果当前节点之前的节点为写节点,进入第五次自旋;
  4. 如果当前节点之前的节点为读节点,进入之前读节点的cowait中;
  5. 再次检测首尾节点是否相同,如果相同,自旋获取锁;
  6. 如果不相同,阻塞当前线程。
public void unlockRead(long stamp) {
    long s, m; WNode h;
    for (;;) {
        // 检查版本号是否一致
        if (((s = state) & SBITS) != (stamp & SBITS) ||
            (stamp & ABITS) == 0L || (m = s & ABITS) == 0L || m == WBIT)
            throw new IllegalMonitorStateException();
        // 读锁未达上限
        if (m < RFULL) {
            // 释放读锁,减少读锁计数
            if (U.compareAndSwapLong(this, STATE, s, s - RUNIT)) {
                if (m == RUNIT && (h = whead) != null && h.status != 0)
                    release(h);
                break;
            }
        } else if (tryDecReaderOverflow(s) != 0L) // 溢出处理
            break;
    }
}

6. 乐观锁

public long tryOptimisticRead() {
    long s;
    // 写锁未被获取,返回高25位
    return (((s = state) & WBIT) == 0L) ? (s & SBITS) : 0L;
}

public boolean validate(long stamp) {
    // 加入内存屏障,刷新数据
    U.loadFence();
    // 检查版本号是否一致
    return (stamp & SBITS) == (state & SBITS);
}

StampedLock源码解读