redis - 延迟队列

授权君
• 阅读 1387

之前有介绍几种延迟队列的对方,如java并发编程学习之DelayQueueActiveMQ - 延迟队列RabbitMQ - 延迟队列,对于延迟队列,我还是推荐用以上几种,这边只对redis如何实现延迟队列做了一个例子。
为了实现延迟队列,我们需要定义两个类型的数据:

  1. 队列,需要执行的任务,直接放入队列,线程通过队列的内容进行执行任务。
  2. 有序集合,通过成员的分数用来判断是否可以放入队列执行。我们往有序集合插入数据的时候,分数就是当前时间+延迟的时间,判断的时候,就可以通过当前时间和分数进行比较,如果当前时间大于分数,说明还没到执行的时候。如果小于等于分数,则放入队列执行。

示例

把任务加入到有序集合,如果分数为0,说明没有延迟,直接加入到队列中。如果分数不为0,说明有延迟,把当前时间加上延迟时间,作为分数存入有序集合中。

private static void add(double score) {
    SimpleDateFormat formatter = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
    String format = formatter.format(new Date());
    // 马上执行的任务
    if (score == 0) {
        JedisUtils.rpush(queue, format);
    } else {
        double date = System.currentTimeMillis() + score;
        JedisUtils.zadd(delay, date, format + "-" + score);
    }
}

队列处理任务。如果取到任务,直接执行,如果没有任务,则休眠10毫秒。

static String queue = "queue:";
static String delay = "delay:";

static class QueueThread implements Runnable {
    @Override
    public void run() {
        while (true) {
            String res = JedisUtils.lpop(queue);
            if (null != res) {
                SimpleDateFormat formatter = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
                System.out.println(formatter.format(new Date()) + "---" + "处理消息:" + res);
            } else {
                //暂时没有消息,就休眠10毫秒
                try {
                    TimeUnit.MILLISECONDS.sleep(10);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        }
    }
}

延迟任务处理。在while循环中 ,先根据小于当前时间的分数取有序集合的数据,如果有数据,说明存在马上执行的任务,把任务从有序集合移除,并加入到队列中。

static class DelayThread implements Runnable {

    @Override
    public void run() {
        while (true) {
            long now = System.currentTimeMillis();
            Set<String> set = JedisUtils.zrangeByScore(delay, Double.MIN_VALUE, now);
            // 如果有可以运行的,则从有序集合移除,并放入队列
            if (set.size() > 0) {
                Iterator<String> iterator = set.iterator();
                while (iterator.hasNext()) {
                    String next = iterator.next();
                    JedisUtils.zrem(delay, next);
                    JedisUtils.rpush(queue, next);
                }
            } else {
                // 没有内容,则休眠10毫秒
                try {
                    TimeUnit.MILLISECONDS.sleep(10);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        }
    }
}

测试例子,

@Test
public void testDelayQueue() throws InterruptedException {
    new Thread(new QueueThread()).start();
    new Thread(new DelayThread()).start();
    add(2000);
    add(3000);
    add(0);
    add(4000);
    add(5000);
    add(6000);
    TimeUnit.SECONDS.sleep(10);
}

运行结果如下:

2020-09-24 22:36:59---处理消息:2020-09-24 22:36:59
2020-09-24 22:37:01---处理消息:2020-09-24 22:36:59-2000.0
2020-09-24 22:37:02---处理消息:2020-09-24 22:36:59-3000.0
2020-09-24 22:37:03---处理消息:2020-09-24 22:36:59-4000.0
2020-09-24 22:37:04---处理消息:2020-09-24 22:36:59-5000.0
2020-09-24 22:37:05---处理消息:2020-09-24 22:36:59-6000.0
点赞
收藏
评论区
推荐文章
一种异步延迟队列的实现方式
目前系统中有很多需要用到延时处理的功能:支付超时取消、排队超时、短信、微信等提醒延迟发送、token刷新、会员卡过期等等。通过延时处理,极大的节省系统的资源,不必轮询数据库处理任务。今天,就来介绍一种异步延迟队列的实现方式
Stella981 Stella981
3年前
Spring Boot(十四)RabbitMQ延迟队列
一、前言延迟队列的使用场景:1.未按时支付的订单,30分钟过期之后取消订单;2.给活跃度比较低的用户间隔N天之后推送消息,提高活跃度;3.过1分钟给新注册会员的用户,发送注册邮件等。实现延迟队列的方式有两种:1.通过消息过期后进入死信交换器,再由交换器转发到延迟消费队列,实现延迟功能;2.使用rabbitmqdelayed
Stella981 Stella981
3年前
BlockingQueue介绍
几种类型的BlockingQueueArrayBlockingQueue:一个由数组结构组成的有界阻塞队列。LinkedBlockingQueue:一个由链表结构组成的有界阻塞队列。PriorityBlockingQueue:一个支持优先级排序的无界阻塞队列。DelayQueue:一个使用优先级队列实现的无界阻塞队列。Synchro
Wesley13 Wesley13
3年前
IM 消息服务架构
IM消息架构主要有1、消息redis缓存队列及用户信息memcache2、消息的数据落地(入库mysql)3、消息的发送4、离线消息服务5、过期消息服务消息redis缓存队列服务端落地队列1.客户端通过HTTPS
Stella981 Stella981
3年前
RocketMQ安装部署
一、简介RocketMQRocektMQ是阿里巴巴在2012年开源的一个纯java、分布式、队列模型的第三代消息中间件,不仅在传统高频交易链路有着低延迟的出色表现,在实时计算等大数据领域也有着不错的吞吐。2016年11月11号,双十一大促见证了RocketMQ低延迟存储架构的成功试水,99.996%的延迟落在了10ms以内,极个别由于
Stella981 Stella981
3年前
RabbitMQ —— 延迟队列
RabbitMQ实现延迟队列一:在队列上设置TTLPublishdelaysync.exchangedelay.5m.queue(延迟队列)delay.exchangetest.queue(正常队列)Consumer//延迟队列startMap<String,Object
Wesley13 Wesley13
3年前
Java并发新构件之DelayQueue
    DelayQueue主要用于放置实现了Delay接口的对象,其中的对象只能在其时刻到期时才能从队列中取走。这种队列是有序的,即队头的延迟到期时间最短。如果没有任何延迟到期,那就不会有任何头元素,并且poll()将返回null(正因为这样,你不能将null放置到这种队列中)    下面是一个示例,其中的Delayed对象自身就是任务,而Delaye
Stella981 Stella981
3年前
Rabbitmq 延迟队列实现定时任务,实战
点击上方“Java专栏”,选择“置顶或者星标”第一时间阅读精彩文章!1、☞程序员进阶必备资源免费送「21种技术方向!」点击查看☜(https://www.oschina.net/action/GoToLink?urlhttps%3A%2F%2Fmp.weixin.qq.com%2Fs%3F__biz%3DMzU5
Stella981 Stella981
3年前
Spring Boot与RabbitMQ结合实现延迟队列的示例
背景何为延迟队列?顾名思义,延迟队列就是进入该队列的消息会被延迟消费的队列。而一般的队列,消息一旦入队了之后就会被消费者马上消费。场景一:在订单系统中,一个用户下单之后通常有30分钟的时间进行支付,如果30分钟之内没有支付成功,那么这个订单将进行一场处理。这是就可以使用延时队列将订单信息发送到延时队列。场景二:用户希望通过手机远程遥控
Stella981 Stella981
3年前
RabbitMQ队列延迟
RabbitMQ队列延迟1\.场景:“订单下单成功后,15分钟未支付自动取消”1.传统处理超时订单采取定时任务轮训数据库订单,并且批量处理。其弊端也是显而易见的;对服务器、数据库性会有很大的要求,并且当处理大量订单起来会很力不从心,而且实时性也不是特别好。当然传统的手法还可以再
京东云开发者 京东云开发者
11个月前
【京东云新品发布月刊】2024年7月产品动态
京东云7月产品动态:1.【消息队列RocketMQ】新品上线消息队列RocketMQ是京东云基于ApacheRocketMQ打造的一款低延迟、高并发、高可用、高可靠的分布式消息队列服务。支持开源客户端零改造接入,同时具备计算存储分离,灵活扩缩容的优势。能够