25 AQS的左膀右臂——LockSupport、Condition
Diego38 75 1

1. 前言

前面我们学习了队列同步器 AQS 的实现,AQS 要完成线程排队等待的过程,线程休眠和唤醒需要借助 LockSupport 工具来完成;等待通知机制借助 Condition 协助来完成,本节我们就学习这两个类是如何协助 AQS 工作的。

2. LockSupport 的 API 介绍和使用

2.1 LockSupport 的 API 介绍

LockSupport 主要提供了阻塞休眠和唤醒的方法,底层是依赖 Unsafe 类做到的:

  • void park()、 void park(Object blocker) park 的英文意思是停车,停车的线程代表着车辆。 调用 park 会阻塞休眠当前的线程,在阻塞阶段,当前线程不消耗 CPU。通过其他线程调用 unpark (Thread) 来被唤醒,获取通过中断也能将当前线程唤醒。

有一点要注意,如果提前调用了 unpark (Thread), 目标线程调用 park 时不会进入休眠,而是直接被唤醒。

park (Object) 是 JDK1.6 新引入的方法,它可以像 Synchronized 关键字一样,在做线程 dump 时能看到阻塞对象。 通过 getBlocker (Thread) 可以得到线程的阻塞对象,在监控场景比较有用。

  • void parkNanos(long nanos)、void parkNanos(Object blocker, long nanos) 相比 park () 和 park (Object blocker),增加了超时返回的时间,单位是纳秒。

  • void parkUntil(long deadline) 阻塞线程直到 deadline 时间,deadline 是毫秒时间戳,该方法使用场景较少。

  • void unpark(Thread) 唤醒处于阻塞休眠状态的线程

2.2 LockSupport 的使用

接下来我们看一个案例:

public class ParkTest {

    static Object blocker = new Object();

    static Thread park = new Thread(() -> {
        System.out.println("进入阻塞休眠");
        LockSupport.park(blocker); //1
        System.out.println("退出阻塞休眠");
    });

    static Thread unpark= new Thread(() -> {
        //保证park线程进入休眠
        try {
            Thread.sleep(1000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        //唤醒park线程
        LockSupport.unpark(park); //2
    });

    public static void main(String[] args) {
        park.start();
        unpark.start();
    }
}

unpark 线程在 park 线程调用 park 进入休眠后,调用 unpark,park 线程才打印 “退出阻塞休眠”。执行 jps 命令打印出 pid,然后执行 jstack dump 出线程,会发现 park 线程堆栈中输出了阻塞对象。

...

"Thread-0" #11 prio=5 os_prio=31 tid=0x00007f9e350cf800 nid=0xa903 waiting on condition [0x0000700001814000]
   java.lang.Thread.State: WAITING (parking)
    at sun.misc.Unsafe.park(Native Method)
    - parking to wait for  <0x000000076ad2a3d0> (a java.lang.Object)// 这里打印出了阻塞对象object的内存地址
    at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)

..

我们将代码改下,删除 unpark 线程 sleep 代码,将 sleep 代码粘贴到 park 线程前部。

public class ParkTest {

    static Object blocker = new Object();

    static Thread park = new Thread(() -> {
        //保证unpark线程提前唤醒
        try {
            Thread.sleep(1000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        System.out.println("进入阻塞休眠");
        LockSupport.park(blocker); //1
        System.out.println("退出阻塞休眠");
    });

    static Thread unpark= new Thread(() -> {
        //唤醒park线程
        LockSupport.unpark(park); //2
    });

    public static void main(String[] args) {
        park.start();
        unpark.start();
    }
}

执行后,会发现线程几乎同时打印 “进入阻塞休眠” 以及 “退出阻塞休眠”,park 方法并没有让 park 线程进入休眠。这是因为 unpark 线程先调用 unpark 方法,park 线程后续调用的首次 park 方法 将直接被唤醒。

3. Condition 的 API 介绍和使用

3.1 Condition 的 API 介绍

Condition 相比原生 Synchronize 锁的等待通知机制,相当于锁对象的 object,只不过方法由 wait/notify/notifyAll 变成了 await/signal/signalAll。Condition 对象要和处于独占模式的 Lock 锁结合起来使用,Condition 接口的实现类只有一个即 ConditionObject,来自于 AQS。

Condition 的核心 API 介绍:

  • void await() throws InterruptedException、void awaitUninterruptibly() 前者 await 方法是响应中断的等待方法,后者 awaitUninterruptibly 法是不响应中断的等待方法。执行 await 方法会触发当前线程释放锁并进入条件队列。 通过其他线程执行 signal 或 signalAll 会将处于条件队列的目标线程转移至同步队列,并且唤醒目标线程。唤醒目标线程的另一种方式是对目标线程执行中断 (调用 interrupt 方法)

  • long awaitNanos(long nanosTimeout) throws InterruptedException、 boolean await(long time, TimeUnit unit) throws InterruptedException 两个超时等待方法都是支持响应中断,超时后,会自行被唤醒进入条件队列等待锁的释放。

  • void signal() 拥有锁的线程唤醒一个处于条件队列等待的线程

  • void signalAll() 拥有锁的线程唤醒所有等待在 Condition 上的线程。

3.2 Condition 的使用

Condition 对象需要从独占锁中派生创建,与 Synchronized 原生锁实现的等待通知机制不同的是,一个 Lock 锁对象支持多次创建 Condition 实例,意味着 lock 锁对应的同步队列可以关联多个条件队列。

通过一个场景题来熟悉下 Condition 的使用:编写一个线程安全的 list,容量 100,支持新增和删除头部元素。

public class ConditionTest {

    public static class ThreadSafeList {

        private Lock lock = new ReentrantLock();
        private Condition unFullCon = lock.newCondition();
        private Condition unEmptyCon = lock.newCondition();

        private List<Long> list = new ArrayList<>();
        private Integer capacity = 100;

        public Long get(int index) {
            lock.lock();
            try {
                return list.get(index);
            } finally {
                lock.unlock();
            }
        }

        public void  add(Long item) throws InterruptedException {
            lock.lock();
            try {
                //1. 列表满就一直等待
                while (list.size() >= capacity) {
                    unFullCon.await();
                }
                //2. 添加到末尾
                list.add(item);
                //3. 唤醒等待非空条件的线程
                unEmptyCon.signal();
            }finally {
                lock.unlock();
            }
        }

        public void  remove() throws InterruptedException {
            lock.lock();
            try {
                //4. 列表为空就一直等待
                while (list.size() == 0) {
                    unEmptyCon.await();
                }
                //5. 移除第一个元素
                list.remove(0);
                //6. 唤醒等待非满的线程
                unFullCon.signal();
            }finally {
                lock.unlock();
            }
        }

        public String toString() {
            lock.lock();
            try {
                return list.toString();
            } finally {
                lock.unlock();
            }
        }

    }

    public static void main(String[] args) throws InterruptedException {
        ThreadSafeList safeList = new ThreadSafeList();
        Thread addThread = new Thread(() -> {
            for (int i = 0; i < 1000 ; i ++ ){
                try {
                    safeList.add((long) i);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        });
        Thread removeThread = new Thread( () -> {
            for (int i = 0; i < 900 ; i ++ ){
                try {
                    safeList.remove();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        });
        addThread.start();
        removeThread.start();
        Thread.sleep(10000);
        System.out.println(safeList.toString());
    }
}

上述代码,由 Lock 锁新建两个 Condition,一个代表非满条件,一个代表非空条件。 ThreadSafeList 的 add 和 remove 方法流程可以通过以下图形来表示: image

  1. add 线程获取锁后首先判断非满条件,满足非满条件才会添加元素,唤醒处在非空条件队列的 remove 线程,否则会释放锁跳出加锁区域进入非满条件队列等待唤醒
  2. remove 线程获取锁是要判断非空条件,满足非空条件才会删除元素,否则会释放锁跳出加锁区域进入非空条件等待队列;删除完元素后会唤醒非满队列的第一个元素
  3. 处于非满条件队列的 add 线程会重新进入同步队列等待锁的释放,进入加锁区域重复 1 的流程。
  4. 同理,处在非空条件队列的 remove 线程在收到唤醒后,进入同步队列重新获取锁,然后继续 2 的流程。这也解释了,为什么判断非空非满条件要进行 while 循环,因为每次只需 await 方法返回实际上都是重新进入加锁区域,需要对当前条件重新进行判断。

    4. 总结

    LockSupport 类和 Condition 接口是 AQS 的左膀右臂,熟悉这两个类的 API 和使用,对后续的学习帮助很大的,这两个类在后续要讲的并发组件源码中出现频率比较高。
预览图
评论区

索引目录