RabbitMQ 如何实现延迟队列?

LeetCode刷题
• 阅读 641

延迟队列是指当消息被发送以后,并不是立即执行,而是等待特定的时间后,消费者才会执行该消息。
延迟队列的使用场景有以下几种:

  1. 未按时支付的订单,30 分钟过期之后取消订单。
  2. 给活跃度比较低的用户间隔 N 天之后推送消息,提高活跃度。
  3. 新注册会员的用户,等待几分钟之后发送欢迎邮件等。

    1.如何实现延迟队列?

    延迟队列有以下两种实现方式:

  4. 通过消息过期后进入死信交换器,再由交换器转发到延迟消费队列,实现延迟功能;
  5. 使用官方提供的延迟插件实现延迟功能。

早期,大部分公司都会采用第一种方式,而随着 RabbitMQ 3.5.7(2015 年底发布)的延迟插件的发布,因为其使用更简单、更方便,所以它现在才是大家普通会采用的,实现延迟队列的方式,所以本文也只讲第二种方式。

2.实现延迟队列

2.1 安装并启动延迟队列

2.1.1 下载延迟插件

https://github.com/rabbitmq/rabbitmq-delayed-message-exchange/releases

注意:需要根据你自己的 RabbitMQ 服务器端版本选择相同版本的延迟插件,可以在 RabbitMQ 控制台查看:
RabbitMQ 如何实现延迟队列?
RabbitMQ 如何实现延迟队列?

2.1.2 将插件放到插件目录

接下来,将上一步下载的插件放到 RabbitMQ 服务器安装目录,如果是 docker,使用一下命令复制:

docker cp 宿主机文件 容器名称或ID:容器目录

如下图所示:
RabbitMQ 如何实现延迟队列?
之后,进入 docker 容器,查看插件中是否包含延迟队列:

docker exec -it 容器名称或ID /bin/bash
rabbitmq-plugins list

如下图所示:
RabbitMQ 如何实现延迟队列?

2.1.3 启动插件

rabbitmq-plugins enable rabbitmq_delayed_message_exchange

如下图所示:
RabbitMQ 如何实现延迟队列?

2.1.4 重启RabbitMQ服务

安装完 RabbitMQ 插件之后,需要重启 RabbitMQ 服务才能生效。
如果使用的是 Docker,只需要重启 Docker 容器即可:

docker restart 容器名称或ID

如下图所示:
RabbitMQ 如何实现延迟队列?

2.1.5 验收结果

在 RabbitMQ 控制台查看,新建交换机时是否有延迟消息选项,如果有就说明延迟消息插件已经正常运行了,如下图所示:
RabbitMQ 如何实现延迟队列?

2.1.6 手动创建延迟交换器(可选)

此步骤可选(非必须),因为某些版本下通过程序创建延迟交换器可能会出错,如果出错了,手动创建延迟队列即可,如下图所示:
RabbitMQ 如何实现延迟队列?

2.2 编写延迟消息实现代码

2.2.1 配置交换器和队列

import org.springframework.context.annotation.Configuration;
import org.springframework.amqp.core.*;
import org.springframework.context.annotation.Bean;

/**
 * 延迟交换器和队列
 */
@Configuration
public class DelayedExchangeConfig {
    public static final String EXCHANGE_NAME = "myDelayedExchange";
    public static final String QUEUE_NAME = "delayed.queue";
    public static final String ROUTING_KEY = "delayed.routing.key";

    @Bean
    public CustomExchange delayedExchange() {
        return new CustomExchange(EXCHANGE_NAME,
                "x-delayed-message", // 消息类型
                true, // 是否持久化
                false); // 是否自动删除
    }

    @Bean
    public Queue delayedQueue() {
        return QueueBuilder.durable(QUEUE_NAME)
                .withArgument("x-delayed-type", "direct")
                .build();
    }

    @Bean
    public Binding delayedBinding(Queue delayedQueue,CustomExchange delayedExchange) {
        return BindingBuilder.bind(delayedQueue()).to(delayedExchange()).with(ROUTING_KEY).noargs();
    }
}

2.1.2 定义消息发送方法

import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Component;

@Component
public class DelayedMessageProducer {

    @Autowired
    private RabbitTemplate rabbitTemplate;

    @Scheduled(fixedDelay = 5000)
    public void sendDelayedMessage(String message) {
        rabbitTemplate.convertAndSend(DelayedExchangeConfig.EXCHANGE_NAME,
                DelayedExchangeConfig.ROUTING_KEY,
                message,
                messagePostProcessor -> {
                    messagePostProcessor.getMessageProperties().setDelay(10000); // 设置延迟时间,单位毫秒
                    return messagePostProcessor;
                });
    }
}

2.1.3 发送延迟消息

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RequestParam;
import org.springframework.web.bind.annotation.RestController;

@RestController
@RequestMapping("/delayed")
public class DelayedMessageController {
    @Autowired
    private DelayedMessageProducer delayedMessageProducer;

    @GetMapping("/send")
    public String sendDirectMessage(@RequestParam String message) {
        delayedMessageProducer.sendDelayedMessage(message);
        return "Delayed message sent to Exchange: " + message;
    }
}

2.1.4 接收延迟消息

import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;


@Component
public class DelayedMessageConsumer {

    @RabbitListener(queues = DelayedExchangeConfig.QUEUE_NAME)
    public void receiveDelayedMessage(String message) {
        System.out.println("Received delayed message: " + message);
    }
}
PS:获取本文延迟队列的实现 Demo,请加我:GG_Stone【备注:延迟队列】

小结

实现 RabbitMQ 延迟队列目前主流的实现方式,是采用官方提供的延迟插件来实现。而延迟插件需要先下载插件、然后配置并重启 RabbitMQ 服务,之后就可以通过编写代码的方式实现延迟队列了。

本文已收录到我的面试小站 www.javacn.site,其中包含的内容有:Redis、JVM、并发、并发、MySQL、Spring、Spring MVC、Spring Boot、Spring Cloud、MyBatis、设计模式、消息队列等模块。
点赞
收藏
评论区
推荐文章
一种异步延迟队列的实现方式
目前系统中有很多需要用到延时处理的功能:支付超时取消、排队超时、短信、微信等提醒延迟发送、token刷新、会员卡过期等等。通过延时处理,极大的节省系统的资源,不必轮询数据库处理任务。今天,就来介绍一种异步延迟队列的实现方式
zdd小小菜鸟 zdd小小菜鸟
2年前
RabbitMQ面试
RabbitMQ面试1.RabbitMQ的使用场景有哪些?tex抢购活动,削峰填谷,防止系统崩塌。延迟信息处理,比如10分钟之后给下单未付款的用户发送邮件提醒。解耦系统,对于新增的功能可以单独写模块扩展,比如用
Stella981 Stella981
3年前
Spring Boot(十四)RabbitMQ延迟队列
一、前言延迟队列的使用场景:1.未按时支付的订单,30分钟过期之后取消订单;2.给活跃度比较低的用户间隔N天之后推送消息,提高活跃度;3.过1分钟给新注册会员的用户,发送注册邮件等。实现延迟队列的方式有两种:1.通过消息过期后进入死信交换器,再由交换器转发到延迟消费队列,实现延迟功能;2.使用rabbitmqdelayed
Stella981 Stella981
3年前
RabbitMQ如何高效的消费消息
在上篇介绍了如何简单的发送一个消息队列之后,我们本篇来看下RabbitMQ的另外一种模式,工作队列。什么是工作队列我们上篇文章说的是,一个生产者生产了消息被一个消费者消费了,如下图!(https://usergoldcdn.xitu.io/2020/5/15/1721768c1b303014?w1824&h55
Stella981 Stella981
3年前
RabbitMQ如何保证队列里的消息99.99%被消费?
1\.本篇概要其实,还有1种场景需要考虑:当消费者接收到消息后,还没处理完业务逻辑,消费者挂掉了,那消息也算丢失了?,比如用户下单,订单中心发送了1个消息到RabbitMQ里的队列,积分中心收到这个消息,准备给这个下单的用户增加20积分,但积分还没增加成功呢,积分中心自己挂掉了,导致数据出现问题。那么如何解
Wesley13 Wesley13
3年前
JMS消息的概念解释
1、默认生产者消息是持久的:会存数据库\消费者的持久:createDurableSubscriber是指消费者能收到所有它订阅时间点之后的消息,即使消费者注册后关闭,当它重启就能收到注册时间点之后所有的消息;即当此消费用户ID(AAA)在producer发送之前就已经注册,那么此id能收到producer发送的所有消息,如果是在produce
Easter79 Easter79
3年前
SpringBoot+RabbitMQ+Redis实现商品秒杀
业务分析一般而言,商品秒杀大概可以拆分成以下几步:1.用户校验校验是否多次抢单,保证每个商品每个用户只能秒杀一次2.下单订单信息进入消息队列,等待消费3.减少库存消费订单消息,减少商品库存,增加订单记录4.付款十五分钟内完成支付,修改支付状态创建表goods\_info商品库存表
Stella981 Stella981
3年前
Spring Boot与RabbitMQ结合实现延迟队列的示例
背景何为延迟队列?顾名思义,延迟队列就是进入该队列的消息会被延迟消费的队列。而一般的队列,消息一旦入队了之后就会被消费者马上消费。场景一:在订单系统中,一个用户下单之后通常有30分钟的时间进行支付,如果30分钟之内没有支付成功,那么这个订单将进行一场处理。这是就可以使用延时队列将订单信息发送到延时队列。场景二:用户希望通过手机远程遥控
Stella981 Stella981
3年前
RabbitMQ队列延迟
RabbitMQ队列延迟1\.场景:“订单下单成功后,15分钟未支付自动取消”1.传统处理超时订单采取定时任务轮训数据库订单,并且批量处理。其弊端也是显而易见的;对服务器、数据库性会有很大的要求,并且当处理大量订单起来会很力不从心,而且实时性也不是特别好。当然传统的手法还可以再
Stella981 Stella981
3年前
SpringBoot+RabbitMQ+Redis实现商品秒杀
业务分析一般而言,商品秒杀大概可以拆分成以下几步:1.用户校验校验是否多次抢单,保证每个商品每个用户只能秒杀一次2.下单订单信息进入消息队列,等待消费3.减少库存消费订单消息,减少商品库存,增加订单记录4.付款十五分钟内完成支付,修改支付状态创建表goods\_info商品库存表
Stella981 Stella981
3年前
RabbitMQ延迟消息发送
为什么使用延迟消息?不同于同步消息,有些业务场景下希望可以实现延迟一定时间再消费消息。典型的场景有微信、支付宝等第三方支付回调接口,会在用户支付后3秒、5秒、30秒等等时间后向应用服务器发送回调请求,确保应用服务器可以正确收到消息。那有些朋友就会说了,把需要定时处理的数据存到数据库中用定时任务就可以实现,为什么还弄个异步消息。增加后台维护