RabbitMQ - 延迟队列

Dart
• 阅读 2159

rabbitmq的延迟队列,我们可以通过死信交换器来实现。
生产者发送消息,定义2秒后消息过期,消息就会进入死信交换器,最后到死信队列。

// 定义队列的名称
public final static String QUEUE_NAME = "queue.scheduler";
// 定义交换器的名称
public final static String EXCHANGE_NAME = "exchange.scheduler";
// 定义路由的名称
public final static String ROUTE_NAME = "route.scheduler";
// 定义死信队列的名称
public final static String DLX_QUEUE_NAME = "scheduler.queue.name";
// 定义死信交换器的名称
public final static String DLX_EXCHANGE_NAME = "scheduler.exchange.name";

public static void main(String[] args) throws IOException, TimeoutException {
    // 声明一个连接工厂
    ConnectionFactory factory = new ConnectionFactory();
    // 创建一个与rabbitmq服务器的连接
    // 创建一个Channel
    try (Connection connection = factory.newConnection();
         Channel channel = connection.createChannel()) {
        // 定义交换器
        channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT, false, false, null);
        Map<String, Object> arguments = new HashMap<String, Object>();
        arguments.put("x-dead-letter-exchange", DLX_EXCHANGE_NAME);
        arguments.put("x-message-ttl", 2000);
        // 定义队列
        channel.queueDeclare(QUEUE_NAME, false, false, false, arguments);
        // 绑定队列
        channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, ROUTE_NAME);
        // 定义死信交换器
        channel.exchangeDeclare(DLX_EXCHANGE_NAME, BuiltinExchangeType.DIRECT, false, false, null);
        // 定义死信队列
        channel.queueDeclare(DLX_QUEUE_NAME, false, false, false, null);
        // 绑定死信队列
        channel.queueBind(DLX_QUEUE_NAME, DLX_EXCHANGE_NAME, ROUTE_NAME);
        // 发送消息
        SimpleDateFormat df = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
        channel.basicPublish(EXCHANGE_NAME, ROUTE_NAME, true, null, df.format(new Date()).getBytes());
    }
}

消费者,从私信队列获取消息,可以得到延迟后的消息。

public static void main(String[] args) throws IOException, TimeoutException {
    // 声明一个连接工厂
    ConnectionFactory factory = new ConnectionFactory();
    // 创建一个与rabbitmq服务器的连接
    Connection connection = factory.newConnection();
    // 创建一个Channel
    Channel channel = connection.createChannel();
    System.out.println("Waiting for messages.");
    // 异步回调处理
    DeliverCallback deliverCallback = (consumerTag, delivery) -> {
        String message = new String(delivery.getBody(), "UTF-8");
        SimpleDateFormat df = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
        System.out.println(df.format(new Date()) + " Received '" + message + "'");
    };
    // 接收消息
    channel.basicConsume(ProducerScheduler.DLX_QUEUE_NAME, true, deliverCallback, consumerTag -> {
    });
}

运行结果如下,达到了延迟队列的效果。除此之外,还可以用启用延迟插件。
RabbitMQ - 延迟队列

点赞
收藏
评论区
推荐文章
Wesley13 Wesley13
4年前
RabbitMQ学习总结(7)——Spring整合RabbitMQ实例
1.RabbitMQ简介RabbitMQ是流行的开源消息队列系统,用erlang语言开发。RabbitMQ是AMQP(高级消息队列协议)的标准实现。 官网:http://www.rabbitmq.com/(https://www.oschina.net/action/GoToLink?urlhttp%3A%2F%2Fwww.rabbi
Stella981 Stella981
4年前
Spring Boot(十四)RabbitMQ延迟队列
一、前言延迟队列的使用场景:1.未按时支付的订单,30分钟过期之后取消订单;2.给活跃度比较低的用户间隔N天之后推送消息,提高活跃度;3.过1分钟给新注册会员的用户,发送注册邮件等。实现延迟队列的方式有两种:1.通过消息过期后进入死信交换器,再由交换器转发到延迟消费队列,实现延迟功能;2.使用rabbitmqdelayed
Stella981 Stella981
4年前
RabbitMQ_消息队列基本使用_2
简介RabbitMQ:接受消息再传递消息,可以视为一个“邮局”。发送者和接受者通过队列来进行交互,队列的大小可以视为无限的,多个发送者可以发生给一个队列,多个接收者也可以从一个队列中接受消息。pika&使用rabbitmq使用的协议是amqp,用于python的推荐客户端是pikapipinstallpika
Stella981 Stella981
4年前
RabbitMQ如何通过持久化保证消息99.99%不丢失?
1\.本篇概要要解决该问题,就要用到RabbitMQ中持久化的概念,所谓持久化,就是RabbitMQ会将内存中的数据(Exchange交换器,Queue队列,Message消息)固化到磁盘,以防异常情况发生时,数据丢失。其中,RabblitMQ的持久化分为三个部分:1.交换器(Exchange
Stella981 Stella981
4年前
RabbitMQ 消息中间件搭建详解
1.RabbitMQ简介消息中间件也可以称消息队列,是指用高效可靠的消息传递机制进行与平台无关的数据交流,并基于数据通信来进行分布式系统的集成。通过提供消息传递和消息队列模型,可以在分布式环境下扩展进程的通信。RabbitMQ是使用Erlang语言开发的开源消息队列系统,基于AMQP协议来实现。AMQP的主要特征是面向消息、队列、路由(包
Stella981 Stella981
4年前
RabbitMQ系列三 (深入消息队列)
消息持久化是RabbitMQ最为人津津乐道的特性之一,RabbitMQ能够在付出最小的性能代价的基础上实现消息的持久化,最大的奥秘就在于RabbitMQ多层消息队列的设计上。下面,本文就从MessageQueue的设计和消息在MessageQueue的生命周期两个方面全面介绍 RabbitMQ的消息队列。RabbitMQ完全实现
Stella981 Stella981
4年前
RabbitMQ 基础概念介绍
AMQP消息模型RabbitMQ是基于AMQP(高级消息队列协议)的一个开源实现,其内部实际也是AMQP的基本概念。AMQP的消息发送流程有如下几个步骤:1.消息生产者(producer)将消息发布到Exchange中;2.Exchange根据队列的绑定关系将消息分发到不同的队列(Queue
Stella981 Stella981
4年前
RabbitMQ —— 延迟队列
RabbitMQ实现延迟队列一:在队列上设置TTLPublishdelaysync.exchangedelay.5m.queue(延迟队列)delay.exchangetest.queue(正常队列)Consumer//延迟队列startMap<String,Object
Stella981 Stella981
4年前
RabbitMQ 三种方式的TTL
TTL说明RabbitMQ支持三种方式PerQueueMessageTTL(为进入队列的每一条消息设置一个TTL)QueueTTL(队列的TTL,如果在设置的TTL时间内,没有消费者连接,没有消息发送,RabbitMQ会默认其是将要抛弃不用的,会考虑在TTL到期后删除掉该队列)PerM
Stella981 Stella981
4年前
RocketMQ查询死信队列中的消息内容【实战笔记】
说明RocketMQ中当重试消息超过最大重试次数(默认16次),会被发送到%DLQ%开头的死信队列,默认死信队列为只写权限。在有些情况下,想看看死信队列里的内容。1.更改死信队列权限bin/mqadminupdateTopicPermcClusterBt%DLQ%onlin
Dart
Dart
Lv1
九月九日眺山川,归心归望积风烟。
文章
4
粉丝
0
获赞
0