ReentrantLock源码解析 | 京东云技术团队

京东云开发者
• 阅读 158

并发指同一时间内进行了多个线程。并发问题是多个线程对同一资源进行操作时产生的问题。通过加锁可以解决并发问题,ReentrantLock是锁的一种。

1 ReentrantLock

1.1 定义

ReentrantLock是Lock接口的实现类,可以手动的对某一段进行加锁。ReentrantLock可重入锁,具有可重入性,并且支持可中断锁。其内部对锁的控制有两种实现,一种为公平锁,另一种为非公平锁.

1.2 实现原理

ReentrantLock的实现原理为volatile+CAS。想要说明volatile和CAS首先要说明JMM。

1.2.1 JMM

JMM(java 内存模型 Java Memory Model 简称JMM) 本身是一个抽象的概念,并不在内存中真实存在的,它描述的是一组规范或者规则,通过这组规范定义了程序中各个变量的访问方式.

由于 JMM 运行程序的实体是线程.而每个线程创建时JMM都会为其创建一个自己的工作内存(栈空间),工作内存是每个线程的私有 数据区域.而java内存模型中规定所有的变量都存储在主内存中,主内存是共享内存区域,所有线程都可以访问,但线程的变量的操作(读取赋值等)必须在自己的工作内存中去进行,首先要 将变量从主存拷贝到自己的工作内存中,然后对变量进行操作,操作完成后再将变量操作完后的新值写回主内存,不能直接操作主内存的变量,各个线程的工作内存中存储着主内存的变量拷贝的副本,因不同的线程间无法访问对方的工作内存,线程间的通信必须在主内存来完成。

ReentrantLock源码解析 | 京东云技术团队

如图所示:线程A对变量A的操作,只能是从主内存中拷贝到线程中,再写回到主内存中。

1.2.2 volatile

volatile 是JAVA的关键字用于修饰变量,是java虚拟机的轻量同步机制,volatile不能保证原子性。
作用:

  • 线程可见性:一个变量在某个线程里修改了它的值,如果使用了volatile关键字,那么别的线程可以马上读到修改后的值。
  • 指令重排序:没加之前,指令是并发执行的,第一个线程执行到一半另一个线程可能开始执行了。加了volatile关键字后,不同线程是按照顺序一步一步执行的。1.2.3 CASCAS是Compare and Swap,就是比较和交换,而比较和交换是一个原子操作。线程基于CAS修改数据的方式:先获取主内存数据,在修改之前,先比较数据是否一致,如果一致修改主内存数据,如果不一致,放弃这次修改。

作用:CAS会使用现代处理器上提供的高效机器级别原子指令,这些原子指令以原子方式对内存执行读-改-写操作。1.2.4 AQSAQS的全称是AbstractQueuedSynchronizer(抽象的队列式的同步器),AQS定义了一套多线程访问共享资源的同步器框架。

AQS主要包含两部分内容:共享资源和等待队列。AQS底层已经对这两部分内容提供了很多方法。

  • 共享资源:共享资源是一个volatile的int类型变量。
  • 等待队列:等待队列是一个线程安全的队列,当线程拿不到锁时,会被park并放入队列。

2 源码解析

ReentrantLock在包java.util.concurrent.locks下,实现Lock接口。

2.1 lock方法

lock分为公平锁和非公平锁。

公平锁:

final void lock() {
    acquire(1);
}

非公平锁:上来先尝试将state从0修改为1,如果成功,代表获取锁资源。如果没有成功,调用acquire。state是AQS中的一个由volatile修饰的int类型变量,多个线程会通过CAS的方式修改state,在并发情况下,只会有一个线程成功的修改state。

final void lock() {
//通过原子方式修改值
    if (compareAndSetState(0, 1))
        setExclusiveOwnerThread(Thread.currentThread());
    else
        acquire(1);
}


 /**
 * 获取锁的线程.
 */
 private transient Thread exclusiveOwnerThread;


  /**
  * 设置拥有锁的线程
  */
  protected final void setExclusiveOwnerThread(Thread thread) {
      exclusiveOwnerThread = thread;

2.2 acquire方法

acquire是一个业务方法,里面并没有实际的业务处理,都是在调用其他方法。

public final void acquire(int arg) {
    //调用tryAcquire方法:尝试获取锁资源(非公平、公平),拿到锁资源,返回true,直接结束方法
    if (!tryAcquire(arg) &&
    //当没有获取锁资源后,会先调用addWaiter:会将没有获取到锁资源的线程封装为Node对象,
    //并且插入到AQS的队列的末尾.
   //继续调用acquireQueued方法,查看当前排队的Node是否在队列的前面,如果在前面,尝试获取锁资源
    //如果没在前面,尝试将线程挂起,阻塞起来!
     acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
     selfInterrupt();
}

2.3 tryAcquire方法

tryAcquire分为公平和非公平两种。

公平:

 protected final boolean tryAcquire(int acquires) {
   //拿到当前线程
    final Thread current = Thread.currentThread();
   //拿到AQS的state
     int c = getState();
   // 如果state == 0,说明没有线程占用着当前的锁资源
     if (c == 0) {
   //如果没有线程排队,直接直接CAS尝试获取锁资源
      if (!hasQueuedPredecessors() &&
      compareAndSetState(0, acquires)) {
     //如果获取资源成功,将当前线程设置为持有锁资源的线程
       setExclusiveOwnerThread(current);
        return true;
          }
        }
     //如果有线程持有锁资源,判断持有锁资源的线程是否是当前线程
         else if (current == getExclusiveOwnerThread()) {
      //增加AQS的state的值
            int nextc = c + acquires;
             if (nextc < 0)
             throw new Error("Maximum lock count exceeded");
              setState(nextc);
               return true;
            }
          return false;
        }
    }

非公平:

protected final boolean tryAcquire(int acquires) {
    return nonfairTryAcquire(acquires);
}
final boolean nonfairTryAcquire(int acquires) {
   //拿到当前线程
    final Thread current = Thread.currentThread();
   //拿到AQS的state
     int c = getState();
   // 如果state == 0,说明没有线程占用着当前的锁资源
      if (c == 0) {
   //获取锁资源
      if (compareAndSetState(0, acquires)) {
   //将当前占用这个互斥锁的线程属性设置为当前线程
      setExclusiveOwnerThread(current);
       return true;
       }
       }
   //如果有线程持有锁资源,判断持有锁资源的线程是否是当前线程
       else if (current == getExclusiveOwnerThread()) {
          int nextc = c + acquires;
          if (nextc < 0) // overflow
          throw new Error("Maximum lock count exceeded");
           setState(nextc);
           return true;
          }
         return false;
  }

2.4 addWaiter方法

在获取锁资源失败后,需要将当前线程封装为Node对象,并且插入到AQS队列的末尾。

private Node addWaiter(Node mode) {
    // 将当前线程封装为Node对象,mode为null,代表互斥锁
    Node node = new Node(Thread.currentThread(), mode);
    // pred是tail节点
    Node pred = tail;
    // 如果pred不为null,有线程正在排队
    if (pred != null) {
        // 将当前节点的prev,指定tail尾节点
        node.prev = pred;
        // 以CAS的方式,将当前节点变为tail节点
        if (compareAndSetTail(pred, node)) {
            // 之前的tail的next指向当前节点
            pred.next = node;
            return node;
        }
    }
    // 添加的流程为,  自己prev指向、tail指向自己、前节点next指向我
    // 如果上述方式,CAS操作失败,导致加入到AQS末尾失败,如果失败,就基于enq的方式添加到AQS队列
    enq(node);
    return node;
}
// enq,无论怎样都添加进入
private Node enq(final Node node) {
    for (;;) {
        // 拿到tail
        Node t = tail;
        // 如果tail为null,说明当前没有Node在队列中
        if (t == null) { 
            // 创建一个新的Node作为head,并且将tail和head指向一个Node
            if (compareAndSetHead(new Node()))
                tail = head;
        } else {
            // 和上述代码一致!
            node.prev = t;
            if (compareAndSetTail(t, node)) {
                t.next = node;
                return t;
            }
        }
    }
}

2.5 acquireQueued方法

// acquireQueued方法
// 查看当前排队的Node是否是head的next,
// 如果是,尝试获取锁资源,
// 如果不是或者获取锁资源失败那么就尝试将当前Node的线程挂起(unsafe.park())
final boolean acquireQueued(final Node node, int arg) {
    boolean failed = true;
    try {
        for (;;) {
            // 拿到上一个节点
            final Node p = node.predecessor();
            if (p == head && // 说明当前节点是head的next
                tryAcquire(arg)) { // 竞争锁资源,成功:true,失败:false
                // 进来说明拿到锁资源成功
                // 将当前节点置位head,thread和prev属性置位null
                setHead(node);
                // 帮助快速GC
                p.next = null; 
                // 设置获取锁资源成功
                failed = false;
                // 不管线程中断。
                return interrupted;
            }
            // 如果不是或者获取锁资源失败,尝试将线程挂起
            // 第一个事情,当前节点的上一个节点的状态正常!
            // 第二个事情,挂起线程
            if (shouldParkAfterFailedAcquire(p, node) &&
                // 通过LockSupport将当前线程挂起
                parkAndCheckInterrupt())
        }
    } finally {
        if (failed)
            cancelAcquire(node);
    }
}


// 确保上一个节点状态是正确的
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
    // 拿到上一个节点的状态
    int ws = pred.waitStatus;
    // 如果上一个节点为 -1
    if (ws == Node.SIGNAL)
        // 返回true,挂起线程
        return true;
    // 如果上一个节点是取消状态
    if (ws > 0) {
        // 循环往前找,找到一个状态小于等于0的节点
        do {
            node.prev = pred = pred.prev;
        } while (pred.waitStatus > 0);
        pred.next = node;
    } else {
        // 将小于等于0的节点状态该为-1
        compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
    }
    return false;
}

2.6 unlock方法

释放锁资源,将state减1,如果state减为0了,唤醒在队列中排队的Node。

public final boolean release(int arg) {
    // 核心的释放锁资源方法
    if (tryRelease(arg)) {
        // 释放锁资源释放干净了。  (state == 0)
        Node h = head;
        // 如果头节点不为null,并且头节点的状态不为0,唤醒排队的线程
        if (h != null && h.waitStatus != 0)
            // 唤醒线程
            unparkSuccessor(h);
        return true;
    }
    // 释放锁成功,但是state != 0
    return false;
}
// 核心的释放锁资源方法
protected final boolean tryRelease(int releases) {
    // 获取state - 1
    int c = getState() - releases;
    // 如果释放锁的线程不是占用锁的线程,抛异常
    if (Thread.currentThread() != getExclusiveOwnerThread())
        throw new IllegalMonitorStateException();
    // 是否成功的将锁资源释放利索 (state == 0)
    boolean free = false;
    if (c == 0) {
        // 锁资源释放干净。
        free = true;
        // 将占用锁资源的属性设置为null
        setExclusiveOwnerThread(null);
    }
    // 将state赋值
    setState(c);
    // 返回true,代表释放干净了
    return free;
}


// 唤醒节点
private void unparkSuccessor(Node node) {
    // 拿到头节点状态
    int ws = node.waitStatus;
    // 如果头节点状态小于0,换为0
    if (ws < 0)
        compareAndSetWaitStatus(node, ws, 0);
    // 拿到当前节点的next
    Node s = node.next;
    // 如果s == null ,或者s的状态为1
    if (s == null || s.waitStatus > 0) {
        // next节点不需要唤醒,需要唤醒next的next
        s = null;
        // 从尾部往前找,找到状态正常的节点。(小于等于0代表正常状态)
        for (Node t = tail; t != null && t != node; t = t.prev)
            if (t.waitStatus <= 0)
                s = t;
    }
    // 经过循环的获取,如果拿到状态正常的节点,并且不为null
    if (s != null)
        // 唤醒线程
        LockSupport.unpark(s.thread);
}

3 使用实例

3.1 公平锁

1.代码:

public class ReentrantLockTest {


    public static void main(String[] args) {
        ReentrantLock lock = new ReentrantLock(true);
        new Thread(()->test(lock),"线程A").start();
        new Thread(()->test(lock),"线程B").start();
        new Thread(()->test(lock),"线程C").start();
    }
    public static   void test(ReentrantLock lock){
        for (int i = 0; i < 3;i++){
            try {
                lock.lock();
                System.out.println(Thread.currentThread().getName()+"获取了锁!");
                Thread.sleep(200);
            }catch (Exception e){
                e.printStackTrace();
            }finally {
                lock.unlock();
            }
        }
    }
}

2.执行结果:

ReentrantLock源码解析 | 京东云技术团队

3.小结:

公平锁可以保证每个线程获取锁的机会是相等的。

3.2 非公平锁

1.代码:

public class ReentrantLockTest {


    public static void main(String[] args) {
        ReentrantLock lock = new ReentrantLock();
        new Thread(()->test(lock),"线程A").start();
        new Thread(()->test(lock),"线程B").start();
        new Thread(()->test(lock),"线程C").start();
    }
    public static   void test(ReentrantLock lock){
        for (int i = 0; i < 3;i++){
            try {
                lock.lock();
                System.out.println(Thread.currentThread().getName()+"获取了锁!");
                Thread.sleep(200);
            }catch (Exception e){
                e.printStackTrace();
            }finally {
                lock.unlock();
            }
        }
    }
}

2.执行结果:

ReentrantLock源码解析 | 京东云技术团队

3.小结:

非公平锁每个线程获取锁的机会是随机的。

3.3 忽略重复操作

1.代码:

public class ReentrantLockTest {
    private ReentrantLock lock = new ReentrantLock();


    public void doSomething(){
        if(lock.tryLock()){
            try {
                System.out.println(Thread.currentThread().getName()+"获取了锁!");
                Thread.sleep(5);
            }catch (Exception e){
                e.printStackTrace();
            }finally {
                lock.unlock();
            }
        }
    }
    public static void main(String[] args) throws Exception {
        ReentrantLockTest test = new ReentrantLockTest();
        for (int i = 0; i < 10;i++){
            new Thread(()->{test.doSomething();},"线程"+i).start();
            Thread.sleep(1);
        }
    }
}

2.执行结果:

ReentrantLock源码解析 | 京东云技术团队

3.小结:

当线程持有锁时,不会重复执行,可以用来防止定时任务重复执行或者页面事件多次触发时不会重复触发。

3.4 超时不执行

1.代码:

public class ReentrantLockTest {


    public static void main(String[] args) {
        ReentrantLock lock = new ReentrantLock();
        new Thread(()->test(lock),"线程A").start();
        new Thread(()->test(lock),"线程B").start();
    }
    public static   void test(ReentrantLock lock){
            try {
                if(lock.tryLock(2, TimeUnit.SECONDS)){
                    try {
                        System.out.println(Thread.currentThread().getName()+"获取了锁!");
                        Thread.sleep(3000);
                    }finally {
                        lock.unlock();
                    }
                }
            }catch (Exception e){
                e.printStackTrace();
            }


    }

2.执行结果:

ReentrantLock源码解析 | 京东云技术团队

3.小结:

超时不执行可以防止由于资源处理不当长时间占用资源产生的死锁问题。

4 总结

并发是现在软件系统不可避免的问题,ReentrantLock是可重入的独占锁,比起synchronized功能更加丰富,支持公平锁实现,支持中断响应以及限时等待等,是处理并发问题很好的解决方案。

作者:京东物流 陈昌浩

来源:京东云开发者社区

点赞
收藏
评论区
推荐文章
灯灯灯灯 灯灯灯灯
2年前
Java并发之Semaphore源码解析
Semaphore前情提要在学习本章前,需要先了解ReentrantLock源码解析,ReentrantLock源码解析里介绍的方法有很多是本章的铺垫。下面,我们进入本章正题Semaphore。从概念上来讲,信号量(Semaphore)会维护一组许可证用于限制线程对资源的访问,当我们有一资源允许线程并发访问,但我们希望能限制访问量,就可以用信号量对访问线程
Wesley13 Wesley13
2年前
java锁学习(一)
作用能够保证同一时刻,最多只有一个线程执行该段代码,以达到并发安全的效果主要用于同时刻对线程间对任务进行锁地位synchronized是JAVA的原生关键字,是JAVA中最基本的互斥手段,是并发编程中的元老角色不使用并发的后果不使用并发会导致多线程情况下,同一个数据被多个线程同时更改,造成结果和预期不一致
Peter20 Peter20
3年前
mysql中like用法
like的通配符有两种%(百分号):代表零个、一个或者多个字符。\(下划线):代表一个数字或者字符。1\.name以"李"开头wherenamelike'李%'2\.name中包含"云",“云”可以在任何位置wherenamelike'%云%'3\.第二个和第三个字符是0的值wheresalarylike'\00%'4\
九鹤 九鹤
3年前
并发编程的基础概念
什么是线程?什么是进程?java可以开启线程吗?不能因为Java无法直接操硬件,他是运行在虚拟机上面的,什么是并发?什么是并行?并发就是多个线程去操作一个资源。并行是多个线程同时行,但是操作的资源不是同一个。线程的六个状态new(诞生)runnable(运行)Blocked(阻塞)waiiiing(等待)Tiemwaiing(超时等待)
Wesley13 Wesley13
2年前
JAVA中 ReentrantReadWriteLock读写锁详系教程,包会
一、读写锁简介现实中有这样一种场景:对共享资源有读和写的操作,且写操作没有读操作那么频繁。在没有写操作的时候,多个线程同时读一个资源没有任何问题,所以应该允许多个线程同时读取共享资源;但是如果一个线程想去写这些共享资源,就不应该允许其他线程对该资源进行读和写的操作了。 针对这种场景,JAVA的并发包提供了读写锁ReentrantReadW
Stella981 Stella981
2年前
ReentrantReadWriteLock实现原理
  在java并发包java.util.concurrent中,除了重入锁ReentrantLock外,读写锁ReentrantReadWriteLock也很常用。在实际开发场景中,在使用共享资源时,可能读操作远远多于写操作。这种情况下,如果对这部分共享资源能够让多个线程读的时候不受阻塞,仅仅在写的时候保证安全性,这样效率会得到显著提升。读写锁Reentra
Stella981 Stella981
2年前
Noark入门之线程模型
0x00单线程多进程单线程与单进程多线程的目的都是想尽可能的利用CPU,减少CPU的空闲时间,特别是多核环境,今天咱不做深度解读,跳过...0x01线程池锁最早的一部分游戏服务器是采用线程池的方式来处理玩家的业务请求,以达最大限度的利用多核优势来提高处理业务能力。但线程池同时也带来了并发问题,为了解决同一玩家多个业务请求不被
Stella981 Stella981
2年前
ReentrantReadWriteLock读写锁详解
一、读写锁简介现实中有这样一种场景:对共享资源有读和写的操作,且写操作没有读操作那么频繁。在没有写操作的时候,多个线程同时读一个资源没有任何问题,所以应该允许多个线程同时读取共享资源;但是如果一个线程想去写这些共享资源,就不应该允许其他线程对该资源进行读和写的操作了。 针对这种场景,JAVA的并发包提供了读写锁ReentrantReadW
Wesley13 Wesley13
2年前
MySQL部分从库上面因为大量的临时表tmp_table造成慢查询
背景描述Time:20190124T00:08:14.70572408:00User@Host:@Id:Schema:sentrymetaLast_errno:0Killed:0Query_time:0.315758Lock_
Wesley13 Wesley13
2年前
4种常用Java线程锁的特点,性能比较及使用场景
多个线程同时对同一个对象进行读写操作,很容易会出现一些难以预料的问题。所以很多时候我们需要给代码块加锁,同一时刻只允许一个线程对某个对象进行操作。多线程之所以会容易引发一些难以发现的bug,很多时候是写代码的程序员对线程锁不熟悉或者干脆就没有在必要的地方给线程加锁导致的。本篇我想分享java多线程中的4种常见线程锁的特点、性能比较及使用场景。一、多线