回到顶部 暗色模式

ReentranLock源码解读

        $ReentrantLock$ 是一个可重入的独占锁,与 $synchronized$ 行为类似,但提供了额外的功能。

1. 构造函数

// 默认为非公平锁
public ReentrantLock() { sync = new NonfairSync(); }

public ReentrantLock(boolean fair) { sync = fair ? new FairSync() : new NonfairSync(); }

2. 内部类

abstract static class Sync extends AbstractQueuedSynchronizer {
    private static final long serialVersionUID = -5179523762034025860L;

    abstract void lock();

    // 非公平上锁, tryAcquire方法在子类中实现
    final boolean nonfairTryAcquire(int acquires) {
        final Thread current = Thread.currentThread();
        int c = getState();
        if (c == 0) { // 锁未被占有
            if (compareAndSetState(0, acquires)) { // 尝试获取锁
                setExclusiveOwnerThread(current); // 设置独占线程
                return true;
            }
        }
        else if (current == getExclusiveOwnerThread()) { // 当前线程已经占有锁
            int nextc = c + acquires;
            if (nextc < 0)
                throw new Error("Maximum lock count exceeded");
            setState(nextc); // 添加上锁次数
            return true;
        }
        return false;
    }

    protected final boolean tryRelease(int releases) {
        int c = getState() - releases;
        if (Thread.currentThread() != getExclusiveOwnerThread()) // 当前线程非独占线程
            throw new IllegalMonitorStateException();
        boolean free = false;
        if (c == 0) { // 锁被释放
            free = true;
            setExclusiveOwnerThread(null); // 取消独占状态
        }
        setState(c);
        return free;
    }

    // 当前实现是否持有锁
    protected final boolean isHeldExclusively() {
        return getExclusiveOwnerThread() == Thread.currentThread();
    }

    final ConditionObject newCondition() {
        return new ConditionObject();
    }

    // 返回持有者线程
    final Thread getOwner() { return getState() == 0 ? null : getExclusiveOwnerThread(); }

    // 返回当前线程上锁次数
    final int getHoldCount() { return isHeldExclusively() ? getState() : 0; }

    final boolean isLocked() { return getState() != 0; }

    private void readObject(java.io.ObjectInputStream s) { /* 省略 */ }
}
// 非公平锁
static final class NonfairSync extends Sync {
    private static final long serialVersionUID = 7316153563782823691L;

    final void lock() {
        if (compareAndSetState(0, 1)) // 尝试上锁
            setExclusiveOwnerThread(Thread.currentThread());
        else // 自旋获取锁
            acquire(1);
    }

    protected final boolean tryAcquire(int acquires) { return nonfairTryAcquire(acquires);} // 调用父类方法
}
// 公平锁
static final class FairSync extends Sync {
    private static final long serialVersionUID = -3000897897090466540L;

    final void lock() { acquire(1); } // 公平方式获取锁,获取失败则自旋

    // 公平方式获取锁
    protected final boolean tryAcquire(int acquires) {
        final Thread current = Thread.currentThread();
        int c = getState();
        if (c == 0) {
            if (!hasQueuedPredecessors() && // 没有前驱节点
                compareAndSetState(0, acquires)) { // 尝试获取锁
                setExclusiveOwnerThread(current);
                return true;
            }
        }
        else if (current == getExclusiveOwnerThread()) { // 当前线程为独占线程
            int nextc = c + acquires; // 添加获取次数
            if (nextc < 0)
                throw new Error("Maximum lock count exceeded");
            setState(nextc);
            return true;
        }
        return false;
    }
}

        其中 $acquire$ 方法的实现可以在这里查看。

3. 上锁和解锁

// 声明了一个内部类字段作为同步器
private final Sync sync;

public void lock { sync.lock(); } // 调用lock方法,该方法在子类中被实现

// 响应中断式上锁
public void lockInterruptibly() throws InterruptedException {
    sync.acquireInterruptibly(1);
}

// AQS中实现
public final void acquireInterruptibly(int arg)
        throws InterruptedException {
    if (Thread.interrupted())
        throw new InterruptedException();
    if (!tryAcquire(arg))
        doAcquireInterruptibly(arg);
}

// 尝试以非公平方式获取锁
public boolean tryLock() { return sync.nonfairTryAcquire(1); }

public boolean tryLock(long timeout, TimeUnit unit)
        throws InterruptedException {
    return sync.tryAcquireNanos(1, unit.toNanos(timeout));
}

// AQS中实现
public final void tryAcquireNanos(int arg, long nanosTimeout)
        throws InterruptedException {
    if (Thread.interrupted())
        throw new InterruptedException();
    return tryAcquire(arg) ||
        doAcquireNanos(arg, nanosTimeout);
}

// 每次只能解一个锁,所以如果多次上锁就需要多次解锁
public void unlock() { sync.release(1); }

        其中 $doAcquireInterruptibly$ 方法和 $doAcquireNanos$ 方法的实现可以在这里查看。

4. 条件

// 创建条件
public Condition newCondition() { return sync.newCondition(); }

// 是否有线程在条件上等待
public boolean hasWaiters(Condition condition) {
    if (condition == null)
        throw new NullPointerException();
    if (!(condition instanceof AbstractQueuedSynchronizer.ConditionObject))
        // 非AQS中定义的ConditionObject类型对象
        throw new IllegalArgumentException("not owner");
    // 使用AQS中的方法判断,检查条件队列中是否有状态为CONDITION的节点
    return sync.hasWaiters((AbstractQueuedSynchronizer.ConditionObject)condition);
}

// 获取条件上等待的线程数量
public int getWaitQueueLength(Condition condition) {
    if (condition == null)
        throw new NullPointerException();
    if (!(condition instanceof AbstractQueuedSynchronizer.ConditionObject))
        // 非AQS中定义的ConditionObject类型对象
        throw new IllegalArgumentException("now owner");
    // 使用AQS中的方法统计,遍历条件队列并统计CONDITION节点数量
    return sync.getWaitQueueLength((AbstractQueuedSynchronizer.ConditionObject)condition);
}

// 获取条件上等待的线程集合
protected Collection<Thread> getWaitingThreads(Condition condition) {
    if (condition == null)
        throw new NullPointerException();
    if (!(condition instanceof AbstractQueuedSynchronizer.ConditionObject))
        // 非AQS中定义的ConditionObject类型对象
        throw new IllegalArgumentException("not owner");
    // 使用AQS中的方法统计,遍历条件队列并返回CONDITION节点中持有的线程集合
    return sync.getWaitingThreads((AbstractQueuedSynchronizer.ConditionObject)condition);
}

5. 其他方法

// 当前线程上锁次数
public int getHoldCount() { return sync.getHoldCount(); }

// 是否被当前线程持有
public boolean isHeldByCurrentThread() { return sync.isHeldExclusively(); }

// 是否上锁,即状态是否非零
public boolean isLocked() { return sync.isLocked(); }

// 是否公平锁
public final boolean isFair() { return sync instanceof FairSync; }

// 获取持有者线程
protected Thread getOwner() { return sync.getOwner(); }

// 是否有等待中的线程
public final boolean hasQueuedThreads() { return sync.hasQueuedThreads(); }

// AQS中实现
public final boolean hasQueuedThreads() {
    return head != tail; // 首节点为哑节点,因此只要首尾不等即可
}

// 线程是否在队列中等待,从尾部遍历等待队列,判断线程是否在其中
public final boolean hasQueuedThread(Thread thread) { return sync.isQueued(thread); }

// 获取等待线程的数量,从尾部遍历等待队列,统计等待线程数
public final int getQueueLength() { return sync.getQueueLength(); }

// 获取等待线程的集合,从尾部遍历等待队列,返回等待线程集合
protected Collection<Thread> getQueuedThreads() { return sync.getQueuedThreads(); }

ReentranLock源码解读