AQS 都看完了,Condition 原理可不能少!

Stella981
• 阅读 630

AQS 都看完了,Condition 原理可不能少!

前言

" 在介绍 AQS 时,其中有一个内部类叫做 ConditionObject,当时并没有进行介绍,并且在后续阅读源码时,会发现很多地方用到了 Condition ,这时就会很诧异,这个 Condition 到底有什么作用?那今天就通过阅读 Condition 源码,从而弄清楚 Condition 到底是做什么的?当然阅读这篇文章的时候希望你已经阅读了 AQS、ReentrantLock 以及 LockSupport 的相关文章或者有一定的了解(当然小伙伴也可以直接跳到文末看总结"

1

介绍

Object 的监视器方法:wait、notify、notifyAll 应该都不陌生,在多线程使用场景下,必须先使用 synchronized 获取到锁,然后才可以调用 Object 的 wait、notify。

Condition 的使用,相当于用 Lock 替换了 synchronized,然后用 Condition 替换 Object 的监视器方法。

Conditions(也称为条件队列或条件变量)为一种线程提供了一种暂停执行(等待),直到另一线程通知被阻塞的线程,某些状态条件现在可能为真。

因为访问到此共享状态信息发生在不同的线程中,因此必须对其进行保护,所以会使用某种形式的锁。等待条件提供的关键属性是它以原子地释放了关联的锁,并且挂起当前线程,就像 Object.wait 一样。

Condition 实例本质上要绑定到锁。为了获得 Condition 实例,一般使用 Lock 实例的 newCondition() 方法。

Lock lock = new ReentrantLock();Condition con = lock.newCondition();

基本使用

class BoundedBuffer {    final Lock lock = new ReentrantLock();    // condition 实例依赖于 lock 实例    final Condition notFull = lock.newCondition();    final Condition notEmpty = lock.newCondition();    final Object[] items = new Object[100];    int putPtr, takePtr, count;    public void put(Object x) throws InterruptedException {        lock.lock();        try {            //  put 时判断是否已经满了            // 则线程在 notFull 条件上排队阻塞            while (count == items.length) {                notFull.await();            }            items[putPtr] = x;            if (++putPtr == items.length) {                putPtr = 0;            }            ++count;            // put 成功之后,队列中有元素            // 唤醒在 notEmpty 条件上排队阻塞的线程            notEmpty.signal();        } finally {            lock.unlock();        }    }    public Object take() throws InterruptedException {        lock.lock();        try {            // take 时,发现为空            // 则线程在 notEmpty 的条件上排队阻塞            while (count == 0) {                notEmpty.await();            }            Object x = items[takePtr];            if (++takePtr == items.length) {                takePtr = 0;            }            --count;            // take 成功,队列不可能是满的            // 唤醒在 notFull 条件上排队阻塞的线程            notFull.signal();            return x;        } finally {            lock.unlock();        }    }}

上面是官方文档的一个例子,实现了一个简单的 BlockingQueue ,看懂这里,会发现在同步队列中很多地方都是用的这个逻辑。必要的代码说明都已经在代码中进行注释。

问题疑问

  1. Condition 和 AQS 有什么关系?

  2. Condition 的实现原理是什么?

  3. Condition 的等待队列和 AQS 的同步队列有什么区别和联系?

2

源码分析

基本结构

AQS 都看完了,Condition 原理可不能少!

通过 UML 可以看出,Condition 只是一个抽象类,它的主要实现逻辑是在 AQS 的内部类 ConditionObject 实现的。下面主要从 await 和 signal 两个方法入手,从源码了解 ConditionObject。

创建 Condition

Lock lock = new ReentrantLock();Condition con = lock.newCondition();

一般使用 lock.newCondition() 创建条件变量。

public class ReentrantLock implements Lock, java.io.Serializable {    private final Sync sync;    public Condition newCondition() {        return sync.newCondition();    }    // Sync 集成 AQS    abstract static class Sync extends AbstractQueuedSynchronizer {                final ConditionObject newCondition() {            return new ConditionObject();        }    }}

这里使用的是 ReentrantLock 的源码,里面调用的 sync.newCondition(),Sync 继承 AQS,其实就是创建了一个 AQS 内部类的 ConditionObject 的实例。

这里需要注意的是 lock 每调用一次 lock.newCondition() 都会有一个新的 ConditionObject 实例生成,就是说一个 lock 可以创建多个 Condition 实例。

Condition 参数

/** 条件队列的第一个节点 */private transient Node firstWaiter;/** 条件队列的最后一个节点 */private transient Node lastWaiter;

await 方法

await 方法,会造成当前线程在等待,直到收到信号或被中断。

与此 Condition 相关联的锁被原子释放,并且出于线程调度目的,当前线程被禁用,并且处于休眠状态,直到发生以下四种情况之一:

  1. 其他一些线程调用此 Condition 的 signal 方法,而当前线程恰好被选择为要唤醒的线程;

  2. 其他一些线程调用此 Condition 的 signalAll 方法;

  3. 其他一些线程中断当前线程,并支持中断线程挂起;

  4. 发生虚假唤醒。

在所有情况下,在此方法可以返回之前,当前线程必须重新获取与此条件关联的锁。当线程返回时,可以保证保持此锁。

现在来看 AQS 内部的实现逻辑:

public final void await() throws InterruptedException {    // 响应中断    if (Thread.interrupted())        throw new InterruptedException();    // 添加到条件队列尾部(等待队列)    // 内部会创建 Node.CONDITION 类型的 Node    Node node = addConditionWaiter();    // 释放当前线程获取的锁(通过操作 state 的值)    // 释放了锁就会被阻塞挂起    int savedState = fullyRelease(node);    int interruptMode = 0;    // 节点已经不在同步队列中,则调用 park 让其在等待队列中挂着    while (!isOnSyncQueue(node)) {        // 调用 park 阻塞挂起当前线程        LockSupport.park(this);        // 说明 signal 被调用了或者线程被中断,校验下唤醒原因        // 如果因为终端被唤醒,则跳出循环        if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)            break;    }    // while 循环结束, 线程开始抢锁    if (acquireQueued(node, savedState) && interruptMode != THROW_IE)        interruptMode = REINTERRUPT;    if (node.nextWaiter != null) // clean up if cancelled        unlinkCancelledWaiters();    // 统一处理中断的    if (interruptMode != 0)        reportInterruptAfterWait(interruptMode);}

await 方法步骤如下:

  1. 创建 Node.CONDITION 类型的 Node 并添加到条件队列(ConditionQueue)的尾部;

  2. 释放当前线程获取的锁(通过操作 state 的值)

  3. 判断当前线程是否在同步队列(SyncQueue)中,不在的话会使用 park 挂起。

  4. 循环结束之后,说明已经已经在同步队列(SyncQueue)中了,后面等待获取到锁,继续执行即可。

在这里一定要把条件队列和同步队列进行区分清楚!!

条件队列/等待队列:即 Condition 的队列 同步队列:AQS 的队列。

下面对 await 里面重要方法进行阅读:

- addConditionWaiter() 方法

private Node addConditionWaiter() {    Node t = lastWaiter;    // If lastWaiter is cancelled, clean out.    // 判断尾节点状态,如果被取消,则清除所有被取消的节点    if (t != null && t.waitStatus != Node.CONDITION) {        unlinkCancelledWaiters();        t = lastWaiter;    }    // 创建新节点,类型为 Node.CONDITION    Node node = new Node(Thread.currentThread(), Node.CONDITION);    // 将新节点放到等待队列尾部    if (t == null)        firstWaiter = node;    else        t.nextWaiter = node;    lastWaiter = node;    return node;}

addConditionWaiter 方法可以看出,只是创建一个类型为 Node.CONDITION 的节点并放到条件队列尾部。同时通过这段代码还可以得出其他结论:

  1. 条件队列内部的 Node,只用到了 thread、waitStatus、nextWaiter 属性;

  2. 条件队列是单向队列。

作为对比,这里把条件队列和同步队列做出对比:

AQS 都看完了,Condition 原理可不能少!

AQS 同步队列如下:

AQS 都看完了,Condition 原理可不能少!

再来看下 Condition 的条件队列

AQS 都看完了,Condition 原理可不能少!

waitStatus 在 AQS 中已经进行了介绍:

  1. 默认状态为 0;

  2. waitStatus > 0 (CANCELLED 1) 说明该节点超时或者中断了,需要从队列中移除;

  3. waitStatus = -1 SIGNAL 当前线程的前一个节点的状态为 SIGNAL,则当前线程需要阻塞(unpark);

  4. waitStatus = -2 CONDITION -2 :该节点目前在条件队列;

  5. waitStatus = -3 PROPAGATE -3 :releaseShared 应该被传播到其他节点,在共享锁模式下使用。

- fullyRelease 方法 (AQS)

final int fullyRelease(Node node) {    boolean failed = true;    try {        // 获取当前节点的 state        int savedState = getState();        // 释放锁        if (release(savedState)) {            failed = false;            return savedState;        } else {            throw new IllegalMonitorStateException();        }    } finally {        if (failed)            node.waitStatus = Node.CANCELLED;    }}

fullyRelease 方法是由 AQS 提供的,首先获取当前的 state,然后调用 release 方法进行释放锁。

public final boolean release(int arg) {    if (tryRelease(arg)) {        Node h = head;        if (h != null && h.waitStatus != 0)            unparkSuccessor(h);        return true;    }    return false;}

release 方法在 AQS 中做了详细的介绍。它的主要作用就是释放锁,并且需要注意的是:

  1. fullyRelease 会一次性释放所有的锁,所以说不管重入多少次,在这里都会全部释放的。

  2. 这里会抛出异常,主要是在释放锁失败时,这时就会在 finally 里面将节点状态置为 Node.CANCELLED。

**-**isOnSyncQueue(node)

通过上面的流程,节点已经放到了条件队列并且释放了持有的,而后就会挂起阻塞,直到 signal 唤醒。但是在挂起时要保证节点已经不在同步队列(SyncQueue)中了才可以挂起。

final boolean isOnSyncQueue(Node node) {    // 当前节点是条件队列节点,或者上一个节点是空    if (node.waitStatus == Node.CONDITION || node.prev == null)        return false;    if (node.next != null) // If has successor, it must be on queue        return true;    return findNodeFromTail(node);}// 从尾部开始遍历private boolean findNodeFromTail(Node node) {    Node t = tail;    for (;;) {        if (t == node)            return true;        if (t == null)            return false;        t = t.prev;    }}

如果一个节点(总是一个最初放置在条件队列中的节点)现在正等待在同步队列上重新获取,则返回true。

这段代码的主要作用判断节点是不是在同步队列中,如果不在同步队列中,后面才会调用 park 进行阻塞当前线程。这里就会有一个疑问:AQS 的同步队列和 Condition 的条件队列应该是无关的,这里为什么会要保证节点不在同步队列之后才可以进行阻塞?因为 signal 或者 signalAll 唤醒节点之后,节点就会被放到同步队列中。

线程到这里已经被阻塞了,当有其他线程调用 signal 或者 signalAll 时,会唤醒当前线程。

而后会验证是否因中断唤醒当前线程,这里假设没有发生中断。那 while 循环的 isOnSyncQueue(Node node) 必然会返回 true ,表示当前节点已经在同步队列中了。

后续会调用 acquireQueued(node, savedState) 进行获取锁。

final boolean acquireQueued(final Node node, int arg) {    // 是否拿到资源    boolean failed = true;    try {        // 中断状态        boolean interrupted = false;        // 无限循环        for (;;) {            // 当前节点之前的节点            final Node p = node.predecessor();            // 前一个节点是头节点, 说明当前节点是 头节点的 next 即真实的第一个数据节点 (因为 head 是虚拟节点)            // 然后再尝试获取资源            if (p == head && tryAcquire(arg)) {                // 获取成功之后 将头指针指向当前节点                setHead(node);                 p.next = null; // help GC                failed = false;                return interrupted;            }            // p 不是头节点, 或者 头节点未能获取到资源 (非公平情况下被别的节点抢占)             // 判断 node 是否要被阻塞,获取不到锁就会一直阻塞            if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt())                interrupted = true;        }    } finally {        if (failed)            cancelAcquire(node);    }}

这里就是 AQS 的逻辑了,同样可以阅读 AQS 的相关介绍。

  1. 不断获取本节点的上一个节点是否为 head,因为 head 是虚拟节点,如果当前节点的上一个节点是 head 节点,则当前节点为 第一个数据节点>

  2. 第一个数据节点不断的去获取资源,获取成功,则将 head 指向当前节点;

  3. 当前节点不是头节点,或者 tryAcquire(arg) 失败(失败可能是非公平锁)。这时候需要判断前一个节点状态决定 当前节点是否要被阻塞(前一个节点状态是否为 SIGNAL)。

值得注意的是,当节点放到 AQS 的同步队列时,也是进行争抢资源,同时设置 savedState 的值,这个值则是代表当初释放锁的时候释放了多少重入次数。

总体流程画图如下:

AQS 都看完了,Condition 原理可不能少!

signal

public final void signal() {    // 是否为当前持有线程    if (!isHeldExclusively())        throw new IllegalMonitorStateException();    Node first = firstWaiter;    if (first != null)        doSignal(first);}private void doSignal(Node first) {    do {        // firstWaiter 头节点指向条件队列头的下一个节点        if ( (firstWaiter = first.nextWaiter) == null)            lastWaiter = null;        // 将原来的头节点和同步队列断开        first.nextWaiter = null;    } while (!transferForSignal(first) &&                (first = firstWaiter) != null);}final boolean transferForSignal(Node node) {     // 判断节点是否已经在之前被取消了    if (!compareAndSetWaitStatus(node, Node.CONDITION, 0))        return false;    // 调用 enq 添加到 同步队列的尾部    Node p = enq(node);    int ws = p.waitStatus;    // node 的上一个节点 修改为 SIGNAL 这样后续就可以唤醒自己了    if (ws > 0 || !compareAndSetWaitStatus(p, ws, Node.SIGNAL))        LockSupport.unpark(node.thread);    return true;}

enq 同样可以阅读 AQS 的代码

private Node enq(final Node node) {    for (;;) {        Node t = tail;        // 尾节点为空 需要初始化头节点,此时头尾节点是一个        if (t == null) { // Must initialize            if (compareAndSetHead(new Node()))                tail = head;        } else {            // 不为空 循环赋值            node.prev = t;            if (compareAndSetTail(t, node)) {                t.next = node;                return t;            }        }    }}

通过 enq 方法将节点放到 AQS 的同步队列之后,要将 node 的前一个节点的 waitStatus 设置为 Node.SIGNAL。signalAll 的代码也是类似。

3

总结

Q&A

Q: Condition 和 AQS 有什么关系?

A: Condition 是基于 AQS 实现的,Condition 的实现类 ConditionObject 是 AQS 的一个内部类,在里面共用了一部分 AQS 的逻辑。

Q: Condition 的实现原理是什么?

A: Condition 内部维护一个条件队列,在获取锁的情况下,线程调用 await,线程会被放置在条件队列中并被阻塞。直到调用 signal、signalAll 唤醒线程,此后线程唤醒,会放入到 AQS 的同步队列,参与争抢锁资源。

Q: Condition 的等待队列和 AQS 的同步队列有什么区别和联系?

A: Condition 的等待队列是单向链表,AQS 的是双向链表。二者之间并没有什么明确的联系。仅仅在节点从阻塞状态被唤醒后,会从等待队列挪到同步队列中。

结束语

本文主要是阅读 Condition 的相关代码,不过省略了线程中断等逻辑。有兴趣的小伙伴。可以更深入的研究相关的源码。

- -


历史文章 | 相关推荐

AQS 都看完了,Condition 原理可不能少!

AQS 都看完了,Condition 原理可不能少!

AQS 都看完了,Condition 原理可不能少!

作者:刘志航,一个宅宅的北漂程序员。

AQS 都看完了,Condition 原理可不能少!

" 阅读源码,记录笔记;享受生活,分享心得。"

AQS 都看完了,Condition 原理可不能少!

神秘代码 | d3ggOiBsaXV6aGloYW5nY29t

掘金|知乎|头条|InfoQ :liuzhihang

◆  ◆  ◆  ◆  ◆

AQS 都看完了,Condition 原理可不能少!

一键三连

本文分享自微信公众号 - 刘志航(liuzhihangs)。
如有侵权,请联系 support@oschina.cn 删除。
本文参与“OSC源创计划”,欢迎正在阅读的你也加入,一起分享。

点赞
收藏
评论区
推荐文章
blmius blmius
3年前
MySQL:[Err] 1292 - Incorrect datetime value: ‘0000-00-00 00:00:00‘ for column ‘CREATE_TIME‘ at row 1
文章目录问题用navicat导入数据时,报错:原因这是因为当前的MySQL不支持datetime为0的情况。解决修改sql\mode:sql\mode:SQLMode定义了MySQL应支持的SQL语法、数据校验等,这样可以更容易地在不同的环境中使用MySQL。全局s
皕杰报表之UUID
​在我们用皕杰报表工具设计填报报表时,如何在新增行里自动增加id呢?能新增整数排序id吗?目前可以在新增行里自动增加id,但只能用uuid函数增加UUID编码,不能新增整数排序id。uuid函数说明:获取一个UUID,可以在填报表中用来创建数据ID语法:uuid()或uuid(sep)参数说明:sep布尔值,生成的uuid中是否包含分隔符'',缺省为
待兔 待兔
12个月前
手写Java HashMap源码
HashMap的使用教程HashMap的使用教程HashMap的使用教程HashMap的使用教程HashMap的使用教程22
Jacquelyn38 Jacquelyn38
4年前
2020年前端实用代码段,为你的工作保驾护航
有空的时候,自己总结了几个代码段,在开发中也经常使用,谢谢。1、使用解构获取json数据let jsonData  id: 1,status: "OK",data: 'a', 'b';let  id, status, data: number   jsonData;console.log(id, status, number )
Wesley13 Wesley13
3年前
mysql设置时区
mysql设置时区mysql\_query("SETtime\_zone'8:00'")ordie('时区设置失败,请联系管理员!');中国在东8区所以加8方法二:selectcount(user\_id)asdevice,CONVERT\_TZ(FROM\_UNIXTIME(reg\_time),'08:00','0
Wesley13 Wesley13
3年前
00:Java简单了解
浅谈Java之概述Java是SUN(StanfordUniversityNetwork),斯坦福大学网络公司)1995年推出的一门高级编程语言。Java是一种面向Internet的编程语言。随着Java技术在web方面的不断成熟,已经成为Web应用程序的首选开发语言。Java是简单易学,完全面向对象,安全可靠,与平台无关的编程语言。
Stella981 Stella981
3年前
Django中Admin中的一些参数配置
设置在列表中显示的字段,id为django模型默认的主键list_display('id','name','sex','profession','email','qq','phone','status','create_time')设置在列表可编辑字段list_editable
Wesley13 Wesley13
3年前
MySQL部分从库上面因为大量的临时表tmp_table造成慢查询
背景描述Time:20190124T00:08:14.70572408:00User@Host:@Id:Schema:sentrymetaLast_errno:0Killed:0Query_time:0.315758Lock_
Python进阶者 Python进阶者
1年前
Excel中这日期老是出来00:00:00,怎么用Pandas把这个去除
大家好,我是皮皮。一、前言前几天在Python白银交流群【上海新年人】问了一个Pandas数据筛选的问题。问题如下:这日期老是出来00:00:00,怎么把这个去除。二、实现过程后来【论草莓如何成为冻干莓】给了一个思路和代码如下:pd.toexcel之前把这
美凌格栋栋酱 美凌格栋栋酱
5个月前
Oracle 分组与拼接字符串同时使用
SELECTT.,ROWNUMIDFROM(SELECTT.EMPLID,T.NAME,T.BU,T.REALDEPART,T.FORMATDATE,SUM(T.S0)S0,MAX(UPDATETIME)CREATETIME,LISTAGG(TOCHAR(