SpringBoot集成RabbitMQ(死信队列)

蔡文姬
• 阅读 6537

介绍

死信队列:没有被及时消费的消息存放的队列,消息没有被及时消费有以下几点原因:
1.有消息被拒绝(basic.reject/ basic.nack)并且requeue=false
2.队列达到最大长度
3.消息TTL过期

场景

SpringBoot集成RabbitMQ(死信队列)

1.小时进入初始队列,等待30分钟后进入5分钟队列
2.消息等待5分钟后进入执行队列
3.执行失败后重新回到5分钟队列
4.失败5次后,消息进入2小时队列
5.消息等待2小时进入执行队列
6.失败5次后,将消息丢弃或做其他处理

使用

  • 安装MQ

使用docker方式安装,选择带mangement的版本

docker pull rabbitmq:management
docker run -d -p 5672:5672 -p 15672:15672 --name rabbitmq rabbitmq:management

访问 localhost: 15672,默认账号密码guest/guest

  • 项目配置

(1)创建springboot项目
(2)在application.properties配置文件中配置mq连接信息

spring.rabbitmq.host=localhost
spring.rabbitmq.port=5672
spring.rabbitmq.username=guest
spring.rabbitmq.password=guest

(3)队列配置

package com.df.ps.mq;

import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.core.TopicExchange;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer;
import org.springframework.amqp.rabbit.listener.adapter.MessageListenerAdapter;
import org.springframework.beans.factory.annotation.Autowire;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

import java.util.HashMap;
import java.util.Map;

@Configuration
public class MqConfig {

    //time
    @Value("${spring.df.buffered.min:120}")
    private int springdfBufferedTime;

    @Value("${spring.df.high-buffered.min:5}")
    private int springdfHighBufferedTime;

    @Value("${spring.df.low-buffered.min:120}")
    private int springdfLowBufferedTime;

    // 30min Buffered Queue
    @Value("${spring.df.queue:spring-df-buffered-queue}")
    private String springdfBufferedQueue;

    @Value("${spring.df.topic:spring-df-buffered-topic}")
    private String springdfBufferedTopic;

    @Value("${spring.df.route:spring-df-buffered-route}")
    private String springdfBufferedRouteKey;

    // 5M Buffered Queue
    @Value("${spring.df.high-buffered.queue:spring-df-high-buffered-queue}")
    private String springdfHighBufferedQueue;

    @Value("${spring.df.high-buffered.topic:spring-df-high-buffered-topic}")
    private String springdfHighBufferedTopic;

    @Value("${spring.df.high-buffered.route:spring-df-high-buffered-route}")
    private String springdfHighBufferedRouteKey;

    // High Queue
    @Value("${spring.df.high.queue:spring-df-high-queue}")
    private String springdfHighQueue;

    @Value("${spring.df.high.topic:spring-df-high-topic}")
    private String springdfHighTopic;

    @Value("${spring.df.high.route:spring-df-high-route}")
    private String springdfHighRouteKey;

    // 2H Low Buffered Queue
    @Value("${spring.df.low-buffered.queue:spring-df-low-buffered-queue}")
    private String springdfLowBufferedQueue;

    @Value("${spring.df.low-buffered.topic:spring-df-low-buffered-topic}")
    private String springdfLowBufferedTopic;

    @Value("${spring.df.low-buffered.route:spring-df-low-buffered-route}")
    private String springdfLowBufferedRouteKey;

    // Low Queue
    @Value("${spring.df.low.queue:spring-df-low-queue}")
    private String springdfLowQueue;

    @Value("${spring.df.low.topic:spring-df-low-topic}")
    private String springdfLowTopic;

    @Value("${spring.df.low.route:spring-df-low-route}")
    private String springdfLowRouteKey;


    @Bean(autowire = Autowire.BY_NAME, value = "springdfBufferedQueue")
    Queue springdfBufferedQueue() {
        int bufferedTime = 1000 * 60 * springdfBufferedTime;
        return createBufferedQueue(springdfBufferedQueue, springdfHighBufferedTopic, springdfHighBufferedRouteKey, bufferedTime);
    }

    @Bean(autowire = Autowire.BY_NAME, value = "springdfHighBufferedQueue")
    Queue springdfHighBufferedQueue() {
        int highBufferedTime = 1000 * 60 * springdfHighBufferedTime;
        return createBufferedQueue(springdfHighBufferedQueue, springdfHighTopic, springdfHighRouteKey, highBufferedTime);
    }

    @Bean(autowire = Autowire.BY_NAME, value = "springdfHighQueue")
    Queue springdfHighQueue() {
        return new Queue(springdfHighQueue, true);
    }

    @Bean(autowire = Autowire.BY_NAME, value = "springdfLowBufferedQueue")
    Queue springdfLowBufferedQueue() {
        int lowBufferedTime = 1000 * 60 * springdfLowBufferedTime;
        return createBufferedQueue(springdfLowBufferedQueue, springdfLowTopic, springdfLowRouteKey, lowBufferedTime);
    }

    @Bean(autowire = Autowire.BY_NAME, value = "springdfLowQueue")
    Queue springdfLowQueue() {
        return new Queue(springdfLowQueue, true);
    }


    @Bean(autowire = Autowire.BY_NAME, value = "springdfBufferedTopic")
    TopicExchange springdfBufferedTopic() {
        return new TopicExchange(springdfBufferedTopic);
    }

    @Bean
    Binding springBuffereddf(Queue springdfBufferedQueue, TopicExchange springdfBufferedTopic) {
        return BindingBuilder.bind(springdfBufferedQueue).to(springdfBufferedTopic).with(springdfBufferedRouteKey);
    }


    @Bean(autowire = Autowire.BY_NAME, value = "springdfHighBufferedTopic")
    TopicExchange springdfHighBufferedTopic() {
        return new TopicExchange(springdfHighBufferedTopic);
    }

    @Bean
    Binding springHighBuffereddf(Queue springdfHighBufferedQueue, TopicExchange springdfHighBufferedTopic) {
        return BindingBuilder.bind(springdfHighBufferedQueue).to(springdfHighBufferedTopic).with(springdfHighBufferedRouteKey);
    }

    @Bean(autowire = Autowire.BY_NAME, value = "springdfHighTopic")
    TopicExchange springdfHighTopic() {
        return new TopicExchange(springdfHighTopic);
    }

    @Bean
    Binding springHighdf(Queue springdfHighQueue, TopicExchange springdfHighTopic) {
        return BindingBuilder.bind(springdfHighQueue).to(springdfHighTopic).with(springdfHighRouteKey);
    }

    @Bean(autowire = Autowire.BY_NAME, value = "springdfLowBufferedTopic")
    TopicExchange springdfLowBufferedTopic() {
        return new TopicExchange(springdfLowBufferedTopic);
    }

    @Bean
    Binding springLowBuffereddf(Queue springdfLowBufferedQueue, TopicExchange springdfLowBufferedTopic) {
        return BindingBuilder.bind(springdfLowBufferedQueue).to(springdfLowBufferedTopic).with(springdfLowBufferedRouteKey);
    }

    @Bean(autowire = Autowire.BY_NAME, value = "springdfLowTopic")
    TopicExchange springdfLowTopic() {
        return new TopicExchange(springdfLowTopic);
    }

    @Bean
    Binding springLowdf(Queue springdfLowQueue, TopicExchange springdfLowTopic) {
        return BindingBuilder.bind(springdfLowQueue).to(springdfLowTopic).with(springdfLowRouteKey);
    }


    @Bean
    SimpleMessageListenerContainer container(ConnectionFactory connectionFactory,
                                             MessageListenerAdapter listenerAdapter) {
        SimpleMessageListenerContainer container = new SimpleMessageListenerContainer();
        container.setConnectionFactory(connectionFactory);
        container.setQueueNames(springdfHighQueue, springdfLowQueue);
        container.setMessageListener(listenerAdapter);
        return container;
    }

    @Bean
    MessageListenerAdapter listenerAdapter(IntegrationReceiver receiver) {


        MessageListenerAdapter adapter = new MessageListenerAdapter(receiver);
        adapter.setDefaultListenerMethod("receive");
        Map<String, String> queueOrTagToMethodName = new HashMap<>();
        queueOrTagToMethodName.put(springdfHighQueue, "springdfHighReceive");
        queueOrTagToMethodName.put(springdfLowQueue, "springdfLowReceive");
        adapter.setQueueOrTagToMethodName(queueOrTagToMethodName);
        return adapter;

    }


    private Queue createBufferedQueue(String queueName, String topic, String routeKey, int bufferedTime) {
        Map<String, Object> args = new HashMap<>();
        args.put("x-dead-letter-exchange", topic);
        args.put("x-dead-letter-routing-key", routeKey);
        args.put("x-message-ttl", bufferedTime);
        // 是否持久化
        boolean durable = true;
        // 仅创建者可以使用的私有队列,断开后自动删除
        boolean exclusive = false;
        // 当所有消费客户端连接断开后,是否自动删除队列
        boolean autoDelete = false;

        return new Queue(queueName, durable, exclusive, autoDelete, args);
    }
}
  • 消费者配置
package com.df.ps.mq;

import com.fasterxml.jackson.databind.ObjectMapper;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;

import java.util.Map;

public class MqReceiver {

    private static Logger logger = LoggerFactory.getLogger(MqReceiver.class);

    @Value("${high-retry:5}")
    private int highRetry;

    @Value("${low-retry:5}")
    private int lowRetry;

    @Value("${spring.df.high-buffered.topic:spring-df-high-buffered-topic}")
    private String springdfHighBufferedTopic;

    @Value("${spring.df.high-buffered.route:spring-df-high-buffered-route}")
    private String springdfHighBufferedRouteKey;

    @Value("${spring.df.low-buffered.topic:spring-df-low-buffered-topic}")
    private String springdfLowBufferedTopic;

    @Value("${spring.df.low-buffered.route:spring-df-low-buffered-route}")
    private String springdfLowBufferedRouteKey;

    private final RabbitTemplate rabbitTemplate;
    @Autowired
    public MqReceiver(RabbitTemplate rabbitTemplate) {
        this.rabbitTemplate = rabbitTemplate;
    }

    public void receive(Object message) {
        if (logger.isInfoEnabled()) {
            logger.info("default receiver: " + message);
        }
    }

    /**
     * 消息从初始队列进入5分钟的高速缓冲队列
     * @param message
     */
    public void highReceiver(Object message){
        ObjectMapper mapper = new ObjectMapper();
        Map msg = mapper.convertValue(message, Map.class);

        try{
            logger.info("这里做消息处理...");
        }catch (Exception e){
            int times = msg.get("times") == null ? 0 : (int) msg.get("times");
            if (times < highRetry) {
                msg.put("times", times + 1);
                rabbitTemplate.convertAndSend(springdfHighBufferedTopic,springdfHighBufferedRouteKey,message);
            } else {
                msg.put("times", 0);
                rabbitTemplate.convertAndSend(springdfLowBufferedTopic,springdfLowBufferedRouteKey,message);
            }
        }
    }

    /**
     * 消息从5分钟缓冲队列进入2小时缓冲队列
     * @param message
     */
    public void lowReceiver(Object message){
        ObjectMapper mapper = new ObjectMapper();
        Map msg = mapper.convertValue(message, Map.class);
        
        try {
            logger.info("这里做消息处理...");
        }catch (Exception e){
            int times = msg.get("times") == null ? 0 : (int) msg.get("times");
            if (times < lowRetry) {
                rabbitTemplate.convertAndSend(springdfLowBufferedTopic,springdfLowBufferedRouteKey,message);
            }else{
                logger.info("消息无法被消费...");
            }
        } 
    }
}
点赞
收藏
评论区
推荐文章
Wesley13 Wesley13
3年前
MySQL部分从库上面因为大量的临时表tmp_table造成慢查询
背景描述Time:20190124T00:08:14.70572408:00User@Host:@Id:Schema:sentrymetaLast_errno:0Killed:0Query_time:0.315758Lock_
美凌格栋栋酱 美凌格栋栋酱
7个月前
Oracle 分组与拼接字符串同时使用
SELECTT.,ROWNUMIDFROM(SELECTT.EMPLID,T.NAME,T.BU,T.REALDEPART,T.FORMATDATE,SUM(T.S0)S0,MAX(UPDATETIME)CREATETIME,LISTAGG(TOCHAR(
Stella981 Stella981
3年前
Spring Boot(十四)RabbitMQ延迟队列
一、前言延迟队列的使用场景:1.未按时支付的订单,30分钟过期之后取消订单;2.给活跃度比较低的用户间隔N天之后推送消息,提高活跃度;3.过1分钟给新注册会员的用户,发送注册邮件等。实现延迟队列的方式有两种:1.通过消息过期后进入死信交换器,再由交换器转发到延迟消费队列,实现延迟功能;2.使用rabbitmqdelayed
Stella981 Stella981
3年前
RabbitMQ如何高效的消费消息
在上篇介绍了如何简单的发送一个消息队列之后,我们本篇来看下RabbitMQ的另外一种模式,工作队列。什么是工作队列我们上篇文章说的是,一个生产者生产了消息被一个消费者消费了,如下图!(https://usergoldcdn.xitu.io/2020/5/15/1721768c1b303014?w1824&h55
Stella981 Stella981
3年前
Spring Boot(七):RabbitMQ 详解
一、RabbitMQ简介RabbitMQ即一个消息队列,主要是用来实现应用程序的异步和解耦,同时也能起到消息缓冲,消息分发的作用。消息中间件在互联网公司的使用中越来越多,消息中间件最主要的作用是解耦,中间件最标准的用法是生产者生产消息传送到队列,消费者从队列中拿取消息并处理,生产者不用关心是谁来消费,消费者不用关心谁在生产消息,从而达到解耦的
Wesley13 Wesley13
3年前
3.rabbitmq
rabbitmq发布订阅模式模型组成一个消费者Producer,一个交换机Exchange,多个消息队列Queue,多个消费者Consumer一个生产者,多个消费者,每一个消费者都有自己的一个队列,生产者没有将消息直接发送到队列,而是发送到了交换机,每个队列绑定交换机,生产者发送
Stella981 Stella981
3年前
RabbitMQ 的核心概念,看了必懂!
作者:海向出处:cnblogs.com/haixiang/p/10853467.htmlRabbitMQ特点RabbitMQ相较于其他消息队列,有一系列防止消息丢失的措施,拥有强悍的高可用性能,它的吞吐量可能没有其他消息队列大,但是其消息的保障性出类拔萃,被广泛用于金融类业务。AMQP协议A
Wesley13 Wesley13
3年前
JMS介绍
JMS消息传送模型    点对点消息传送模型  在点对点消息传送模型中,应用程序由消息队列,发送者,接收者组成。每一个消息发送给一个特殊的消息队列,该队列保存了所有发送给它的消息(除了被接收者消费掉的和过期的消息)。点对点消息模型有一些特性,如下:每个消息只有一个接收者;消息发送者和接收者并没有时间依
Stella981 Stella981
3年前
RabbitMQ 三种方式的TTL
TTL说明RabbitMQ支持三种方式PerQueueMessageTTL(为进入队列的每一条消息设置一个TTL)QueueTTL(队列的TTL,如果在设置的TTL时间内,没有消费者连接,没有消息发送,RabbitMQ会默认其是将要抛弃不用的,会考虑在TTL到期后删除掉该队列)PerM
Stella981 Stella981
3年前
RabbitMq学习(二)RabbitMQ的消息确认机制
一.为什么有消息确认机制在RabbitMq中,一个消息从产生到最终的消息接受,中间大致会有三个环节,首先是消息到达交换机、然后是消息通过交换机到达队列,最后消费者消费绑定的队列消息。 但是在这个过程中,如果出现网络或者系统的异常,就会导致消息不能被正常消费。如果不能正常消费消息,会造成两方面的问题。 1.1在服务端消息到
Stella981 Stella981
3年前
RocketMQ查询死信队列中的消息内容【实战笔记】
说明RocketMQ中当重试消息超过最大重试次数(默认16次),会被发送到%DLQ%开头的死信队列,默认死信队列为只写权限。在有些情况下,想看看死信队列里的内容。1.更改死信队列权限bin/mqadminupdateTopicPermcClusterBt%DLQ%onlin
蔡文姬
蔡文姬
Lv1
白日放歌须纵酒,青春作伴好还乡。
文章
4
粉丝
0
获赞
0