42 ScheduledThreadPoolExecutor的实现
Diego38 15 1

1. 前言

前面讲了两节定制版的ThreadPoolExecutor,其中支持线程内有序执行的OrderedThreadPoolExecutor接收OrderedRunnable对象,而本节我们介绍的ScheduledThreadPoolExecutor也是一种定制化的ThreadPoolExecutor。

2. ScheduledThreadPoolExecutor的使用

在电商下订单时,经常遇到下完订单之后,支付之前,会有一个倒计时,通常是15分钟,15分钟内支付完成,才会通知卖家发货,假设15内未支付,订单会自动取消。

在了解ScheduledThreadPoolExecutor之前,我们可能通过进行Sleep一段时间,去数据查询轮询。比较耗费资源,查询到为支付,还需要继续刚才的过程。

上一章我们举了这个场景通过延迟队列来实现,本节我们通过ScheduledThreadPoolExecutor再实现一遍。

public class ScheduledThreadPoolExecutorTest {

    /**
     * 订单对象
     */
    public static class OrderItem {
        private Long orderId;
        private boolean isPay;
        private boolean isExpired;

        public boolean isExpired() {
            return isExpired;
        }

        public void setExpired(boolean expired) {
            isExpired = expired;
        }


        public boolean isPay() {
            return isPay;
        }

        public void setPay(boolean pay) {
            isPay = pay;
        }

        public Long getOrderId() {
            return orderId;
        }

        public void setOrderId(Long orderId) {
            this.orderId = orderId;
        }

    }

   public static void main(String[] args) throws InterruptedException {
        ScheduledThreadPoolExecutor scheduledThreadPoolExecutor = new ScheduledThreadPoolExecutor(10);

        for (int i = 0; i < 100; i++) {
            OrderItem orderItem = new OrderItem();
            orderItem.setOrderId((long) i);
            ScheduledFuture<?> schedule = scheduledThreadPoolExecutor.schedule(() -> {
                if (orderItem.isPay() || orderItem.isExpired()) {
                    return;
                }
                orderItem.setExpired(true);
                System.out.println("订单因超时未支付而关闭" + orderItem.getOrderId());

            }, 60, TimeUnit.SECONDS);
            Thread.sleep(1000);
            long delay = schedule.getDelay(TimeUnit.SECONDS);
            System.out.println("订单任务延迟秒数"  + delay);
            // 3的倍数订单,我们不关闭,通过取消任务来实现
            if ( i % 3 == 0) {
                schedule.cancel(true);
            }

            //只支付orderId为偶数的订单
            if (i % 2 == 0) {
                orderItem.setPay(true);
            }
        }

    }
}

输出如下:

订单因超时未支付而关闭11
订单任务延迟秒数58
订单任务延迟秒数58
订单因超时未支付而关闭13

如代码所示,偶数订单支付后,未被关闭,非偶数且非3倍数订单在一分钟后执行了的关闭。

3. ScheduledThreadPoolExecutor的实现

ScheduledThreadPoolExecutor有几个常用的方法,需要掌握

3.1 ScheduledThreadPoolExecutor的核心API

  • schedule(Callable callable, long delay, TimeUnit unit) 将callable任务延迟delay时间后执行,单位unit,该任务只执行一次,返回ScheduledFuture。

  • schedule(Runnable command, long delay, TimeUnit unit) 将command任务延迟delay时间后执行,单位unit,该任务只执行一次,返回ScheduledFuture。

  • scheduleAtFixedRate(Runnable command,long initialDelay, long period,TimeUnit unit) 将command任务每隔period时间定时执行,初始延迟时间为initialDelay,单位unit,该任务会多次执行,返回ScheduledFuture,如果ScheduledFuture.cancel(), 任务会停止执行。

  • scheduleWithFixedDelay(Runnable command,long initialDelay, long delay,TimeUnit unit) 将command任务延迟delay时间后定时执行,初始延迟时间为initialDelay,单位unit,该任务会多次执行,返回ScheduledFuture,如果ScheduledFuture.cancel(), 任务会停止执行。

以上描述不足以说明scheduleAtFixedRate和scheduleWithFixedDelay的区别, 接下来以图示说明: image

  • scheduleWithFixedDelay delayed时间固定,执行时间无论怎么变换,都要有固定的间隔时间
  • scheduleWithFixedRate period时间固定,执行时间小于period,那么下次执行会有差值的间隔时间,执行时间大于period,下次执行将立即执行

在实际场景scheduleWithFixedDelay比较多,这样做任务的执行频率相对平均,对系统压力负载友好。

scheduleAtFixedRate和scheduleWithFixedDelay都是多次执行,如何做到的呢?

原生线程池内执行的任务通过FutureTask进行包装,而ScheduledThreadPoolExecutor内执行的任务是通过ScheduledFutureTask包装,其他的run方法如下:

public void run() {
            boolean periodic = isPeriodic();
            if (!canRunInCurrentRunState(periodic))
                cancel(false);
            else if (!periodic)
                ScheduledFutureTask.super.run();
            else if (ScheduledFutureTask.super.runAndReset()) {
                setNextRunTime();
                reExecutePeriodic(outerTask);
            }
        }

上述代码表示,当发现任务是periodic即多次执行,会重新放到队列中,等待下次执行。

原生线程池在submit后返回FutureTask,而ScheduledThreadPoolExecutor在schedule后会返回ScheduledFutureTask,ScheduledFutureTask中有一个取消cancel方法,可以让延迟队列里的任务失效。

ScheduledFutureTask有两个核心变量

  • 任务的执行时间time
  • 任务的加入队列时间sequenceNumber

在延迟队列里做比较时,先比较time,哪个早执行,哪个排前面,如果time相等,则比较sequenceNumber,哪个小哪个排前面。

ScheduledThreadPoolExecutor内部关联的队列是一个无界对接,因为设置最大线程数也就没有了意义,因为队列永远达不到满的状态。

ScheduledThreadPoolExecutor的核心原理不在于它可以执行延迟任务,延迟任务是由delayWorkerQueue来完成的,上一章我们讲延迟队列时已经了解延迟队列的实现。delayWorkerQueue内部元素也是按执行时间有序的。

ScheduledThreadPoolExecutor的核心原理在将延迟队列、线程池结合了起来,如下图所示,并且支持了定时任务的高级API–scheduleAtFixedRate和scheduleWithFixedDelay。 image

4. 总结

ScheduledThreadPoolExecutor场景定时任务工具组件,是借助将延迟队列和线程池整合来实现的。

预览图
评论区

索引目录