Java ReentrantLock 解析

[toc]

ReentrantLock 特性(对比 synchronized)
– JDK 实现的(synchronized 是基于 JVM 实现的)
– 需要代码控制获得锁、释放锁(synchronized 只需要把同步代码添加关键字 )
– 支持获取锁超时(synchronized 不支持)
– 支持获取到锁的线程能够响应中断(synchronized 不支持)

构造函数

Java 中 Lock 的实现有:ReentrantLock、ReentrantReadWriteLock(ReadLock、WriteLock) 等。

ReentrantLock 实现 Lock 接口,在 ReentrantLock 用到了 AbstractQueuedSynchronizer(队列同步器,AQS)的子类,所有的同步操作都是依靠 AQS 实现。

ReentrantLock 支持两种锁模式,公平锁非公平锁。默认的实现是非公平的。

公平锁指的是线程获取锁的顺序是按照加锁顺序来的,而非公平锁指的是抢锁机制,先 lock 的线程不一定先获得锁。

public class ReentrantLock implements Lock, java.io.Serializable {
    private final Sync sync;

    abstract static class Sync extends AbstractQueuedSynchronizer {
        // ...
    }

    // 非公平同步
    static final class NonfairSync extends Sync {

        final void lock() {
            if (compareAndSetState(0, 1))
                setExclusiveOwnerThread(Thread.currentThread());
            else
                acquire(1);
        }

        protected final boolean tryAcquire(int acquires) {
            return nonfairTryAcquire(acquires);
        }
    }

    // 公平同步
    static final class FairSync extends Sync {

        final void lock() {
            acquire(1);
        }

        protected final boolean tryAcquire(int acquires) {
            final Thread current = Thread.currentThread();
            int c = getState();
            if (c == 0) {
                if (!hasQueuedPredecessors() &&
                    compareAndSetState(0, acquires)) {
                    setExclusiveOwnerThread(current);
                    return true;
                }
            }
            else if (current == getExclusiveOwnerThread()) {
                int nextc = c + acquires;
                if (nextc < 0)
                    throw new Error("Maximum lock count exceeded");
                setState(nextc);
                return true;
            }
            return false;
        }
    }

    // 默认构造函数,非公平
    public ReentrantLock() {
        sync = new NonfairSync();
    }

    // 构造函数指定公平/非公平
    public ReentrantLock(boolean fair) {
        sync = fair ? new FairSync() : new NonfairSync();
    }

    // 获取锁方法
    public void lock() {
        sync.lock();
    }

    // 获取锁方法
    public void lockInterruptibly() throws InterruptedException {
        sync.acquireInterruptibly(1);
    }

    // 尝试获取锁,返回值表示是否成功获取到锁
    public boolean tryLock() {
        return sync.nonfairTryAcquire(1);
    }

    // 释放锁
    public void unlock() {
        sync.release(1);
    }

    // 创建条件
    public Condition newCondition() {
        return sync.newCondition();
    }
}

非公平锁实现

关注默认的非公平锁的实现:在 ReentrantLock 调用 lock() 的时候,调用的是下面的代码:

class NonfairSync extends Sync {
  final void lock() {
    if (compareAndSetState(0, 1))
      setExclusiveOwnerThread(Thread.currentThread());
    else
      acquire(1);
  }

  protected final boolean tryAcquire(int acquires) {
    return nonfairTryAcquire(acquires);
  }
}

获取锁

compareAndSetState(0, 1) 进行锁的获取操作:

/**
 * 1. compareAndSetState 是通过 Unsafe 类实现的
 * 2. Unsafe 类对于 JVM 来说是不安全的,内封装了一些可以直接操作指定内存位置的接口
 * 3. Unsafe 类封装了 CAS 操作,实现乐观锁
 */
protected final boolean compareAndSetState(int expect, int update) {
    return unsafe.compareAndSwapInt(this, stateOffset, expect, update);
}

对于竞争成功的线程会调用 setExclusiveOwnerThread()

private transient Thread exclusiveOwnerThread;

// 设置为当前线程
protected final void setExclusiveOwnerThread(Thread t) {
    exclusiveOwnerThread = t;
}

获取锁失败

竞争失败的线程,会调用 acquire(1) 方法,这个方法也是 ReentrantLock 设计的核心之一。

/**
 * AbstractQueuedSynchronizer.class
 *
 * tryAcquire:重新进行一次锁获取和进行锁重入的处理
 * addWaiter:将线程添加到等待队列中
 * acquireQueued:自旋获取锁      
 * 
 * 如果 acquireQueued 返回true,表示线程被中断
 * selfInterrupt:响应中断线程
 */
public final void acquire(int arg) {
    if (!tryAcquire(arg) &&
        acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
        selfInterrupt();
}

/**
 * NonfairSync.class
 *
 * nonfairTryAcquire 实现了 ReentrantLock 可重入
 */
protected final boolean tryAcquire(int acquires) {
    return nonfairTryAcquire(acquires);
}

/**
 * Sync.class
 * 
 * 在锁状态为0时(空闲状态),尝试获取锁。
 * 在锁状态不为0(被占用状态),判断当前线程是否是占用锁的线程,如果是,那么进行计数(释放锁时会减掉计数)
 */
final boolean nonfairTryAcquire(int acquires) {
    final Thread current = Thread.currentThread();
    int c = getState();
    if (c == 0) {
        if (compareAndSetState(0, acquires)) {
            setExclusiveOwnerThread(current);
            return true;
        }
    }
    else if (current == getExclusiveOwnerThread()) {
        int nextc = c + acquires;
        if (nextc < 0) // overflow
            throw new Error("Maximum lock count exceeded");
        setState(nextc);
        return true;
    }
    return false;
}

等待队列

如果没有重新获取到锁或者锁的占用线程和当前线程不是一个线程,那么把当前线程添加到等待队列中,调用 addWaiter

// 等待队列头节点和尾节点
private transient volatile Node head;
private transient volatile Node tail;
/**
 * AbstractQueuedSynchronizer.class 
 * 添加到等待队列,是一个双向链表
 */
private Node addWaiter(Node mode) {
    Node node = new Node(Thread.currentThread(), mode);
    Node pred = tail;
    // 如果等待队列已经存在,添加到尾部并返回
    if (pred != null) {
        // 线程安全的代码
        node.prev = pred;
        // tail.next 是不断竞争的
        if (compareAndSetTail(pred, node)) {
            pred.next = node;
            return node;
        }
    }
    // 如果等地队列不存在,或添加到尾部失败
    enq(node);
    return node;
}

/**
 * 插入等待队列或初始化队列
 */
private Node enq(final Node node) {
    for (;;) {
        Node t = tail;
        if (t == null) { 
            // 初始化队列,CAS 设置 head(head 是一个虚拟节点)
            if (compareAndSetHead(new Node()))
                tail = head;
        } else {
            // 不断尝试设置 tail
            node.prev = t;
            if (compareAndSetTail(t, node)) {
                t.next = node;
                return t;
            }
        }
    }
}

自旋锁

加入等待队列后,自旋获取锁,调用 acquireQueued 方法

此处是 Node 节点的自旋过程,检查当前节点是不是 head 节点的 next 节点。如果是,则尝试获取锁,如果获取成功,释放当前节点,同时返回。

final boolean acquireQueued(final Node node, int arg) {
    boolean failed = true;
    try {
        boolean interrupted = false;
        for (;;) {
            // 前一个节点,如果是 head,获取锁
            final Node p = node.predecessor();
            if (p == head && tryAcquire(arg)) {
                setHead(node);
                p.next = null;
                failed = false;
                return interrupted;
            }
            // 避免线程一直轮训耗费性能
            if (shouldParkAfterFailedAcquire(p, node) &&
                parkAndCheckInterrupt())
                interrupted = true;
        }
    } finally {
        if (failed)
            cancelAcquire(node);
    }
}

如果这里一直不断的循环检查,其实是很耗费性能的,所以有了 shouldParkAfterFailedAcquireparkAndCheckInterrupt,这两个方法就实现了线程的等待从而避免无限的轮询,同时检查是否线程中断。

/**
 * 1. 检查一下当前节点(node)的前置节点(pred)的 waitStatus 是否是 SIGNAL,SIGNAL 表示 pred 线程已经 Park
 * 2. pred waitStatus>0,pred 线程已经 cancel,不断向前调整当前节点的 pred 节点(同时cancel 线程移除了队列)
 * 3. pred waitStatus<0,设置 pred 的 waitStatus 为 SIGNAL
 */
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
    int ws = pred.waitStatus;
    if (ws == Node.SIGNAL)
        return true;
    if (ws > 0) {
        do {
            node.prev = pred = pred.prev;
        } while (pred.waitStatus > 0);
        pred.next = node;
    } else {
        compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
    }
    return false;
}

/**
 * 当 shouldParkAfterFailedAcquire 返回 True 的时候执行 parkAndCheckInterrupt 方法
 * 当前的线程 park,暂停了线程的轮询
 */
private final boolean parkAndCheckInterrupt() {
    LockSupport.park(this);
    return Thread.interrupted();
}

释放锁

锁释放主要是通过 unlock() 方法实现。释放锁后,锁可能被等待线程中的锁获取,也可能被的线程获取

// ReentrantLock.class
public void unlock() {
        sync.release(1);
}

// AbstractQueuedSynchronizer.class
public final boolean release(int arg) {
    // 先看 tryRelease 的实现
    if (tryRelease(arg)) {
        Node h = head;
        // 检查 head 释放合法,unpark head
        if (h != null && h.waitStatus != 0)
            unparkSuccessor(h);
        return true;
    }
    return false;
}

/**
 * ReentrantLock.class
 * 主要是做了一个释放锁的过程,将同步状态state -1,直到减到0为止,setExclusiveOwnerThread(null) 清除当前占用的线程
 * 
 * setState(c),且c为0的时候,等待队列里的线程可能被 park 了还没有唤醒。此时新进入的线程是有机会获取到锁的。
 */
protected final boolean tryRelease(int releases) {
    int c = getState() - releases;
    if (Thread.currentThread() != getExclusiveOwnerThread())
        throw new IllegalMonitorStateException();
    boolean free = false;
    if (c == 0) {
        free = true;
        setExclusiveOwnerThread(null);
    }
    setState(c);
    return free;
}

进行线程的唤醒

private void unparkSuccessor(Node node) {
    int ws = node.waitStatus;
    if (ws < 0)
        compareAndSetWaitStatus(node, ws, 0);

    Node s = node.next;
    if (s == null || s.waitStatus > 0) {
        s = null;
        for (Node t = tail; t != null && t != node; t = t.prev)
            if (t.waitStatus <= 0)
                s = t;
    }
    if (s != null)
        LockSupport.unpark(s.thread);
}

至此锁的释放就结束了,可以看到 ReentrantLock 是一个不断的循环的状态模型。

公平锁和非公平锁的差别

ReentrantLock 具有公平和非公平两种模式,也各有优缺点:

  • 公平锁是严格的以 FIFO 的方式进行锁的竞争,但是非公平锁是无序的锁竞争。
  • 非公平模式刚释放锁的线程很大程度上能比较快的获取到锁,队列中的线程只能等待,所以非公平锁可能会有“饥饿”的问题,但是重复的锁获取能减小线程之间的切换。
  • 而公平锁则是严格的线程切换,这样对操作系统的影响是比较大的,所以非公平锁的吞吐量是大于公平锁的,JDK 将非公平锁作为默认的实现。

代码实现上的区别

// 非公平
final boolean nonfairTryAcquire(int acquires) {

    // ...

    if (compareAndSetState(0, acquires)) {
        setExclusiveOwnerThread(current);
        return true;
    }
}

// 公平
final boolean tryAcquire(int acquires) {

    // ...

    if (!hasQueuedPredecessors() &&
        compareAndSetState(0, acquires)) {
        setExclusiveOwnerThread(current);
        return true;
    }
}

// 公平模式,只有队列里的下一个节点可以获取锁
public final boolean hasQueuedPredecessors() {
    Node t = tail; 
    Node h = head;
    Node s;
    return h != t &&
        ((s = h.next) == null || s.thread != Thread.currentThread());
}

使用 ReentrantLock

public class Test {
    private Lock lock = new ReentrantLock();

    public void test() {
        lock.lock();

        // 同步代码

        lock.unlock();
    }
}

以上代码效果和 synchronized 一样,都可以同步执行,lock 方法获得锁,unlock 方法释放锁。

await 方法

public class Test {
    private Lock lock = new ReentrantLock();
    private Condition condition=lock.newCondition();

    public void test() {
        try {
            lock.lock();
            condition.await();

            // 同步代码
        } finally {
            lock.unlock();
        }
    }
}

通过创建 Condition 对象来使线程 wait,必须先执行 lock() 方法获得锁

signal 方法

try {
      lock.lock();
    condition.signal();

    // 同步代码
} finally {
    lock.unlock();
}

Condition 对象的 signal 方法可以唤醒 wait 线程,必须先执行 lock() 方法获得锁

创建多个condition对象

一个 Condition 对象的 signal(signalAll)方法和该对象的 await 方法是一一对应的,也就是一个 Condition 对象的 signal(signalAll)方法不能唤醒其他 Condition 对象的 await 方法。

比较 Condition 类和 Object 类:

  • Condition 类的 awiat 方法和 Object 类的 wait 方法等效
  • Condition 类的 signal 方法和 Object 类的 notify 方法等效
  • Condition 类的 signalAll 方法和 Object 类的 notifyAll 方法等效
  • Condition 类可以唤醒指定条件的线程,而 Object 类的唤醒是随机的。

读写锁 ReentrantReadWriteLock

ReentrantLock 是完全互斥排他的,这样其实效率是不高的。ReentrantReadWriteLock 是读写锁:读锁–多个线程可以获得读锁;写锁–只有一个线程能获得写锁。读锁和写锁是互斥的。

private ReentrantReadWriteLock lock = new ReentrantReadWriteLock();

public void read() {
    try {
        try {
            lock.readLock().lock();

            // 同步代码,读逻辑
        } finally {
            lock.readLock().unlock();
        }
    } catch (InterruptedException e) {
        e.printStackTrace();
    }
}

public void write() {
    try {
        try {
            lock.writeLock().lock();

                        // 同步代码,写逻辑
        } finally {
            lock.writeLock().unlock();
        }
    } catch (InterruptedException e) {
        e.printStackTrace();
    }
}
Tags:

Add a Comment

电子邮件地址不会被公开。 必填项已用*标注