Java 多线程与并发(七):ReentrantLock 与 ReentrantReadWriteLock

Wesley13
• 阅读 401

ReentrantLock

我们已经通过前几章学会了 synchronized 和 AQS 等相关只是。下面我们继续来学习 ReentrantLock 这个并发工具类,如果你已经了解了 AQS 的机制,那么你学习 ReentrantLock 将会非常轻松。

背景

Synchronized 关键字虽然在 JDK 1.6 做了很多优化,但是它的底层是由 JVM 通过 CPU 指令去实现的,这就使得程序员无法对他进行扩展和优化。比如线程获取不到锁就会一直阻塞,也无法中断一个正在获取锁的线程。所以大神 Doug Lea,在 AQS 的基础上为我们构建了一个更加灵活的锁,他可以实现定时获取锁,公平/非公平锁,获取锁的过程中被中断等功能,这个就是 ReentrantLock。

代码分析

ReentrantLock 的字面意思是重入锁,synchronized 也是可重入的,所以我们可以认为 ReentrantLock 的作用就是用来替换 synchronized,实现它所不能实现的功能。

我们来通过 ReentrantLock 的 acquire 和 release 来了解一下:

final boolean nonfairTryAcquire(int acquires) {
    final Thread current = Thread.currentThread();//当前线程
    int c = getState();
    if (c == 0) {//表示锁未被抢占
        if (compareAndSetState(0, acquires)) {//使用 CAS 尝试获取到同步状态,获取到同步状态就是获取到了锁。
            setExclusiveOwnerThread(current); //当前线程占有锁
            return true;
        }
    }
    else if (current == getExclusiveOwnerThread()) {//线程已经占有锁了 重入
        int nextc = c + acquires;//同步状态记录重入的次数
        if (nextc < 0) // overflow
            throw new Error("Maximum lock count exceeded");
        setState(nextc);
        return true;
    }
    return false;
}

protected final boolean tryRelease(int releases) {
    int c = getState() - releases; //既然可重入 就需要释放重入获取的锁
    if (Thread.currentThread() != getExclusiveOwnerThread())
        throw new IllegalMonitorStateException();
    boolean free = false;
    if (c == 0) {
        free = true;//只有线程全部释放才返回true
        setExclusiveOwnerThread(null); //同步队列的线程都可以去获取同步状态了
    }
    setState(c); 
    return free;
}

看了这段代码我们也就理解了 ReentrantLock 如何实现加锁解锁以及重入锁的功能了。

  1. 通过 CAS 操作尝试更改 state,更改成功视为获取到锁。
  2. 如果是同一个线程再次获得锁,增加重入次数,存入到 state 中。
  3. release 操作正好和 acquire 相反。

其实如果你明白了上面这段代码,整个 ReentrantLock 你也就明白了,同理我们可以通过控制 State(AQS 提供了操作 State 的各种方法)来实现自己的锁。

上面的方法是 ReentrantLock 操作 State 的方法,在将它封装以下就可以完成锁的获取和释放方法了。

释放锁

public final boolean release(int arg) {
        if (tryRelease(arg)) {//调用子类的tryRelease 实际就是Sync的tryRelease
            Node h = head;//取同步队列的头节点
            if (h != null && h.waitStatus != 0)//同步队列头节点不为空且不是初始状态
                unparkSuccessor(h);//释放头节点 唤醒后续节点
            return true;
        }
        return false;
}

Sync 的 tryRelease 就是上一小节的重入锁释放方法,如果是同一线程,那么锁的重入次数就依次递减,直到重入次数为 0,此方法才会返回 ture,此时断开头节点唤醒后续节点去获取 AQS 的同步状态。

获取锁

非公平锁

static final class NonfairSync extends Sync {
    private static final long serialVersionUID = 7316153563782823691L;

    /**
         * Performs lock.  Try immediate barge, backing up to normal
         * acquire on failure.
         */
    final void lock() {
        if (compareAndSetState(0, 1))//通过CAS来获取同步状态 也就是锁
            setExclusiveOwnerThread(Thread.currentThread());//获取成功线程占有锁
        else
            acquire(1);//获取失败 进入AQS同步队列排队等待 执行AQS的 acquire  方法 
    }

    protected final boolean tryAcquire(int acquires) {
        return nonfairTryAcquire(acquires);
    }
}

当调用 lock 方法后,首先去尝试获取 state,这也是非公平的体现,抢到 AQS 的同步状态的未必是队列的首节点。抢不到就进入 AQS 的 acquire 方法。

公平锁

static final class FairSync extends Sync {
    private static final long serialVersionUID = -3000897897090466540L;

    final void lock() {
        acquire(1);//严格按照AQS的同步队列要求去获取同步状态
    }

    /**
         * Fair version of tryAcquire.  Don't grant access unless
         * recursive call or no waiters or is first.
         */
    protected final boolean tryAcquire(int acquires) {
        final Thread current = Thread.currentThread();//获取当前线程
        int c = getState();
        if (c == 0) {//锁未被抢占
            if (!hasQueuedPredecessors() &&//没有前驱节点
                compareAndSetState(0, acquires)) {//CAS获取同步状态
                setExclusiveOwnerThread(current);
                return true;
            }
        }
        else if (current == getExclusiveOwnerThread()) {//锁已被抢占且线程重入
            int nextc = c + acquires;//同步状态为重入次数
            if (nextc < 0)
                throw new Error("Maximum lock count exceeded");
            setState(nextc);
            return true;
        }
        return false;
    }
}

公平锁直接调用 AQS 的 acquire 方法,acquire 中自旋调用 tryAcquire。于非公平锁相比,少了那次 CAS 直接抢锁的过程。接下来在tryAcquire去抢占锁的时候,也会先调用hasQueuedPredecessors看看前面是否有节点已经在等待获取锁了,如果存在则同步队列的前驱节点优先。

公平锁因为有了大量的线程的切换而使吞吐量有所下降。

使用

类似于 synchronized 的锁。

Lock lock = new ReentrantLock();
try{
    lock.lock();
    //...
}fianlly{
    lock.unlock();
}

带限制的锁:

public boolean tryLock()// 尝试获取锁,立即返回获取结果 轮询锁
public boolean tryLock(long timeout, TimeUnit unit)//尝试获取锁,最多等待 timeout 时长 超时锁
public void lockInterruptibly()//可中断锁,调用线程 interrupt 方法,则锁方法抛出 InterruptedException  中断锁

比较

上一篇文章提到过,AQS 里面提供了等待队列方便我们实现更细粒度的等待通知。

synchronized

ReentrantLock

使用Object本身的wait、notify、notifyAll调度机制

与Condition结合进行进行更细粒度的线程的调度

显式的使用在同步方法或者同步代码块

显式的声明指定起始和结束位置

托管给JVM执行,不会因为异常、或者未释放而发生死锁

手动释放锁

由于 JDK 1.6 对 synchronized 的优化,它现在性能也不差,所以可以优先使用 synchronized,如果需要更多功能的锁,可以使用 ReentrantLock。

ReentrantReadWriteLock

背景

ReentrantLock 归根结底还是一个互斥锁,同一时刻只能有一个线程持有这把锁。互斥虽然能够保证线程安全,避免读/读,读/写,写/写冲突,但是读/读这样问题其实根本不需要互斥,这就使得在读比较多的问题中,ReentrantLock 牺牲了一部分性能。

如果我们考虑适当的放宽这个条件,多个线程同时读不互斥,这样性能就会提升很多,因此就诞生了 ReentrantReadWriteLock。

ReentrantReadWriteLock 中的读锁是一把共享锁,基于 AQS 的共享模式实现的。

ReentrantReadWriteLock 中的写锁是一把互斥锁,基于 AQS 的独占模式实现的。

源码分析

我们直到 AQS 只提供了一个 int 类型的 state 来表示锁的状态。现在我们有两把锁,同时还要表示锁的重入状态,该怎么办那?

设计者将这个 32 位 的 int 值给切分开,高 16 位代表读状态,低 16 为代表写状态。因为写锁是互斥的,所以需要用低 16 为来表示重入次数,所以写锁的重入次数最大为 65535 个。可能会有多个线程同时获得读锁,所以无法通过高 16 为同时记录真么多个线程的重入次数,所以使用了 ThreadLocl 来做这件事情。

ThreadLocal 的典型用处,此处用来记录线程获取锁的重入次数。

先看使用 ReentrantReadWriteLock 获取读写锁的方式,会调用 readLock() 和 writeLock() 两个方法。

public ReentrantReadWriteLock.WriteLock writeLock() { return writerLock; }
public ReentrantReadWriteLock.ReadLock  readLock()  { return readerLock; }

再来看 WriteLock 和 ReadLock 两个静态内部类,它们对锁的实现如下:

public static class ReadLock implements Lock, java.io.Serializable {
    public void lock() {
        sync.acquireShared(1); //共享
    }

    public void unlock() {
        sync.releaseShared(1); //共享
    }
}

public static class WriteLock implements Lock, java.io.Serializable {
    public void lock() {
        sync.acquire(1); //独占
    }

    public void unlock() {
        sync.release(1); //独占
    }
}

abstract static class Sync extends AbstractQueuedSynchronizer {}

WriteLock

WriteLock 与 ReentrantLock 在获取锁的时候有区别,WriteLock 不但要考虑目前是否有写锁占用,同时还要考虑是否有读锁占用。

public void lock() {
    sync.acquire(1);
}

public final void acquire(int arg) {
    if (!tryAcquire(arg) && //尝试获取独占锁
        acquireQueued(addWaiter(Node.EXCLUSIVE), arg)) //获取失败后排队
        selfInterrupt();
}

protected final boolean tryAcquire(int acquires) {

    Thread current = Thread.currentThread();
    int c = getState();  //获取共享变量state
    int w = exclusiveCount(c); //获取写锁数量
    if (c != 0) { //有读锁或者写锁
        // (Note: if c != 0 and w == 0 then shared count != 0)
        if (w == 0 || current != getExclusiveOwnerThread()) //写锁为0(证明有读锁),或者持有写锁的线程不为当前线程,其实就是除了自己之外还有读锁或者写锁
            return false;
        if (w + exclusiveCount(acquires) > MAX_COUNT)
            throw new Error("Maximum lock count exceeded");
        // Reentrant acquire
        setState(c + acquires);  //当前线程持有写锁,为重入锁,增加重入锁的次数即可,因为当前线程持有锁,所以不需要 CAS。
        return true;
    }
    // 目前没有锁,直接使用 CAS 获取
    if (writerShouldBlock() ||
        !compareAndSetState(c, c + acquires)) //CAS操作失败,多线程情况下被抢占,获取锁失败。CAS成功则获取锁成功
        return false;
    setExclusiveOwnerThread(current);
    return true;
}

释放:

protected final boolean tryRelease(int releases) {
    if (!isHeldExclusively())//当前线程不持有写锁
        throw new IllegalMonitorStateException();
    int nextc = getState() - releases; //重入次数减少
    boolean free = exclusiveCount(nextc) == 0; //减少到0写锁释放
    if (free)
        setExclusiveOwnerThread(null); //写锁释放
    setState(nextc);
    return free;
}

ReadLock

我们再看看使用共享模式的 ReadLock 在获取锁的方式上有什么不同。

protected final int tryAcquireShared(int unused) {

    Thread current = Thread.currentThread();
    int c = getState();
    if (exclusiveCount(c) != 0 &&
        getExclusiveOwnerThread() != current) //写锁不等于0的情况下,验证是否是当前写锁尝试获取读锁
        return -1;
    int r = sharedCount(c);  //获取读锁数量
    if (!readerShouldBlock() && //读锁不需要阻塞
        r < MAX_COUNT &&  //读锁小于最大读锁数量
        compareAndSetState(c, c + SHARED_UNIT)) { //CAS操作尝试设置获取读锁 也就是高位加1
        if (r == 0) {  //当前线程第一个并且第一次获取读锁,说明此线程是第一个获取读锁的,或者说在它前面获取读锁的都走光光了,它也算是第一个吧
            firstReader = current;
            firstReaderHoldCount = 1;
        } else if (firstReader == current) { //说明是 firstReader 重入获取读锁(这非常简单,count 加 1 结束)
            firstReaderHoldCount++;
        } else { // 当前线程不是第一个获取读锁的线程,更新 ThreadLocal
            HoldCounter rh = cachedHoldCounter;
            if (rh == null || rh.tid != getThreadId(current))
                cachedHoldCounter = rh = readHolds.get();
            else if (rh.count == 0)
                readHolds.set(rh);
            rh.count++;
        }
        return 1;
    }
    return fullTryAcquireShared(current);
}
  1. 只要没有写锁占用,读锁就可以尝试获取锁。

  2. ThreadLocalHoldCounter 是用来记录每个读锁的重入次数的。

    static final class ThreadLocalHoldCounter extends ThreadLocal { //ThreadLocal变量 public HoldCounter initialValue() { return new HoldCounter(); } }

    static final class HoldCounter { int count = 0; //当前线程持有锁的次数 // Use id, not reference, to avoid garbage retention final long tid = getThreadId(Thread.currentThread()); //当前线程ID }

释放:

protected final boolean tryReleaseShared(int unused) {
    Thread current = Thread.currentThread();
    if (firstReader == current) {//当前线程是读锁的第一个线程
        // assert firstReaderHoldCount > 0;
        if (firstReaderHoldCount == 1) //第一次占有读锁 直接清除该线程
            firstReader = null;
        else
            firstReaderHoldCount--;//读锁的第一个线程重入次数减少
    } else {
        HoldCounter rh = cachedHoldCounter;
        if (rh == null || rh.tid != getThreadId(current))
            rh = readHolds.get();
        int count = rh.count;
        if (count <= 1) {
            readHolds.remove();//读锁释放
            if (count <= 0)
                throw unmatchedUnlockException();
        }
        --rh.count; //重入次数减少
    }
    for (;;) {
        int c = getState();
        int nextc = c - SHARED_UNIT;
        //减少读锁的线程数量
        if (compareAndSetState(c, nextc))
            // Releasing the read lock has no effect on readers,
            // but it may allow waiting writers to proceed if
            // both read and write locks are now free.
            return nextc == 0;
    }
}

锁降级

锁降级指的是写锁降级为读锁。首先持有当前写锁,然后获取到读锁,随后就释放掉写锁。ReentrantReadWriteLock 不支持锁升级,因为如果有多个线程获取到了读锁,其中任何一个获取到了写锁,修改了数据,其他的线程感知不到更新,这样就无法保证数据的可见性。

点赞
收藏
评论区
推荐文章
秃头王路飞 秃头王路飞
4个月前
webpack5手撸vue2脚手架
webpack5手撸vue相信工作个12年的小伙伴们在面试的时候多多少少怕被问到关于webpack方面的知识,本菜鸟最近闲来无事,就尝试了手撸了下vue2的脚手架,第一次发帖实在是没有经验,望海涵。languageJavaScript"name":"vuecliversion2","version":"1.0.0","desc
浅梦一笑 浅梦一笑
4个月前
初学 Python 需要安装哪些软件?超级实用,小白必看!
编程这个东西是真的奇妙。对于懂得的人来说,会觉得这个工具是多么的好用、有趣,而对于小白来说,就如同大山一样。其实这个都可以理解,大家都是这样过来的。那么接下来就说一下python相关的东西吧,并说一下我对编程的理解。本人也是小白一名,如有不对的地方,还请各位大神指出01名词解释:如果在编程方面接触的比较少,那么对于软件这一块,有几个名词一定要了解,比如开发环
技术小男生 技术小男生
4个月前
linux环境jdk环境变量配置
1:编辑系统配置文件vi/etc/profile2:按字母键i进入编辑模式,在最底部添加内容:JAVAHOME/opt/jdk1.8.0152CLASSPATH.:$JAVAHOME/lib/dt.jar:$JAVAHOME/lib/tools.jarPATH$JAVAHOME/bin:$PATH3:生效配置
光头强的博客 光头强的博客
4个月前
Java面向对象试题
1、请创建一个Animal动物类,要求有方法eat()方法,方法输出一条语句“吃东西”。创建一个接口A,接口里有一个抽象方法fly()。创建一个Bird类继承Animal类并实现接口A里的方法输出一条有语句“鸟儿飞翔”,重写eat()方法输出一条语句“鸟儿吃虫”。在Test类中向上转型创建b对象,调用eat方法。然后向下转型调用eat()方
刚刚好 刚刚好
4个月前
css问题
1、在IOS中图片不显示(给图片加了圆角或者img没有父级)<div<imgsrc""/</divdiv{width:20px;height:20px;borderradius:20px;overflow:h
blmius blmius
1年前
MySQL:[Err] 1292 - Incorrect datetime value: ‘0000-00-00 00:00:00‘ for column ‘CREATE_TIME‘ at row 1
文章目录问题用navicat导入数据时,报错:原因这是因为当前的MySQL不支持datetime为0的情况。解决修改sql\mode:sql\mode:SQLMode定义了MySQL应支持的SQL语法、数据校验等,这样可以更容易地在不同的环境中使用MySQL。全局s
小森森 小森森
4个月前
校园表白墙微信小程序V1.0 SayLove -基于微信云开发-一键快速搭建,开箱即用
后续会继续更新,敬请期待2.0全新版本欢迎添加左边的微信一起探讨!项目地址:(https://www.aliyun.com/activity/daily/bestoffer?userCodesskuuw5n)\2.Bug修复更新日历2.情侣脸功能大家不要使用了,现在阿里云的接口已经要收费了(土豪请随意),\\和注意
晴空闲云 晴空闲云
4个月前
css中box-sizing解放盒子实际宽高计算
我们知道传统的盒子模型,如果增加内边距padding和边框border,那么会撑大整个盒子,造成盒子的宽度不好计算,在实务中特别不方便。boxsizing可以设置盒模型的方式,可以很好的设置固定宽高的盒模型。盒子宽高计算假如我们设置如下盒子:宽度和高度均为200px,那么这会这个盒子实际的宽高就都是200px。但是当我们设置这个盒子的边框和内间距的时候,那
艾木酱 艾木酱
3个月前
快速入门|使用MemFire Cloud构建React Native应用程序
MemFireCloud是一款提供云数据库,用户可以创建云数据库,并对数据库进行管理,还可以对数据库进行备份操作。它还提供后端即服务,用户可以在1分钟内新建一个应用,使用自动生成的API和SDK,访问云数据库、对象存储、用户认证与授权等功能,可专
helloworld_28799839 helloworld_28799839
4个月前
常用知识整理
Javascript判断对象是否为空jsObject.keys(myObject).length0经常使用的三元运算我们经常遇到处理表格列状态字段如status的时候可以用到vue