36 通过延迟队列来关闭超时支付订单
Diego38 34 1

1. 前言

上一节我们学习了优先队列,队列内部的元素可以排序,假如内部元素的排序维度是执行时间,那么就产生了新一种类型的队列–延迟队列。

由于优先队列的可排序性,使得延迟队列成为可能,本节我们学习延迟队列的分类和实现原理。

2. 延迟队列的分类和实现原理

延迟队列实际上是按照执行时间正序的优先队列,越早执行的任务元素优先执行。JDK内部主要有两种延迟队列,都属于线程安全的阻塞队列。

  • DelayQueue

    • 适用于延迟场景的并发安全的延迟队列
    • 基于PriorityQueue实现,元素是Delayed类型,支持获取倒计时间,通过Lock+Condition保证并发安全。
public class DelayQueue<E extends Delayed> extends AbstractQueue<E>
   implements BlockingQueue<E> {
public interface Delayed extends Comparable<Delayed> {

  /**
   * Returns the remaining delay associated with this object, in the
   * given time unit.
   *
   * @param unit the time unit
   * @return the remaining delay; zero or negative values indicate
   * that the delay has already elapsed
   */
  long getDelay(TimeUnit unit);
}

image

  • PriorityQueue是非线程安全的,要实现现在安全DelayQueue在操作入队出队时,需要针对内置的PriorityQueue进行加锁访问,基于Condition做到了队列空时阻塞,以及任务执行的阻塞等待。 以下是take操作的伪代码:

    public E take() throws InterruptedException {
        final ReentrantLock lock = this.lock;
        lock.lockInterruptibly();
        try {
            for (;;) {
    
                if (队列为空时)
                    //进入Condition等待
                    available.await();
                else {
                    //得到第一个元素的距离现在的时间差
                    long delay = first.getDelay(NANOSECONDS);
                    //一共需要等待delay纳秒
                    available.awaitNanos(delay);
                }
            }
        } finally {
            lock.unlock();
        }
    }
  • DelayedWorkQueue

  • JDK中最常见的定时器ScheduledThreadPoolExecutor并没有急于前面的DelayQueue来实现,而是通过ScheduledThreadPoolExecutor内部类DelayedWorkQueue来实现的延迟队列。

  • 可调度的线程池ScheduledThreadPoolExecutor的内部类阻塞+优先级+线程安全

  • 相比DelayQueue的Delayed对象,DelayedWorkQueue内部接收的是Runnable对象, 通过Lock + condition + 内置的堆排数组实现。 总体来说,两个延迟队列的实现大同小异,接下来我们看一个实际案例。

3. 通过延迟队列来关闭超时支付订单

我们在支付外卖、电商订单时,往往看到一个倒计时,表示我们订单需要在15分钟内支付,否则就会关闭订单,在未了解延迟队列之前,我们会起一个线程,每隔一分钟或几秒钟去检测订单是否超时,这种基于轮询的实现是很消耗系统资源的,因为大部分时间内,在0-14分钟内订单都不会超时,只是在第15分钟结束时订单才会超时。

有了延迟队列,关闭超时订单可以基于延迟队列来实现。代码如下:

public class CloseOrderTest {

    private static DelayQueue<FoodOrder> queue = new DelayQueue<>();

    static class FoodOrder implements Delayed {
        private Long orderId;
        private Long expireTime;
        private Boolean isPayed;
        //未来多久执行
        @Override
        public long getDelay(TimeUnit unit) {
            return unit.convert(TimeUnit.MILLISECONDS.toNanos(expireTime) - System.nanoTime() , NANOSECONDS);
        }
        @Override
        public int compareTo(Delayed o) {
            return expireTime >  o.getDelay(TimeUnit.NANOSECONDS) ? 1 : -1;
        }
        public Long getOrderId() {
            return orderId;
        }
        public void setOrderId(Long orderId) {
            this.orderId = orderId;
        }
        public Long getExpireTime() {
            return expireTime;
        }
        public void setExpireTime(Long expireTime) {
            this.expireTime = expireTime;
        }
        public void setIsPayed(Boolean payed){
            this.isPayed = payed;
        }
        public Boolean getIsPayed() {
            return isPayed;
        }
    }

    public static void main(String[] args) {
        //下订单
        new Thread(() ->{
            for (int i =0 ; i < 1000 ; i++) {
                try {
                    Thread.sleep(1000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                FoodOrder foodOrder = new FoodOrder();
                foodOrder.setOrderId(Long.valueOf(i));
                foodOrder.setIsPayed(false);
                //5秒后过期
                foodOrder.setExpireTime(5 * 1000L);
                queue.offer(foodOrder);
            }
        }).start();

        //过期订单
        new Thread(() ->{
            while (true){
                try {
                    FoodOrder item = queue.take();
                    System.out.println("订单:" + item.getOrderId() + "已过期");
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        }).start();
    }
}

输出如下:

订单:0已过期
订单:1已过期
订单:2已过期
订单:3已过期

以上代码基于DelayQueue实现,内部元素实现了Delayed接口,Delayed接口有两个需要实现的方法分别是getDelay和compareTo。

getDelay在DelayQueue返回的是以纳秒为单位的距离现在的延迟执行时间,而compareTo是需要定义队列内排序顺序,Delayed接口定义有些繁琐了,按理说getDelay就足够了。

另外基于ScheduledThreadPoolExecutor也可以实现,目前我们还没学到线程池那一章,先以DelayQueue实现。

4. 总结

本章我们学习了延迟队列的分类和实现原理,想要完全理解这些内容,还需要大家自己一定要动手练习下。下一章开始,我们将学习线程池相关内容,线程池是作为并发编程最核心的知识点,也是面试时候的一大热点,我们一起把线程池相关的问题进行系统的梳理。

预览图
评论区

索引目录