2020最新RabbitMQ图文教程通俗易懂版(下)【内附视频链接】

LogicLuminaryX
• 阅读 2339

RabbitMQ消息的事务机制

 在使用RabbitMQ的时候,我们可以通过消息持久化操作来解决因为服务器的异常奔溃导致的消息丢失,除此之外我们还会遇到一个问题,当消息的发布者在将消息发送出去之后,消息到底有没有正确到达broker代理服务器呢?如果不进行特殊配置的话,默认情况下发布操作是不会返回任何信息给生产者的,也就是默认情况下我们的生产者是不知道消息有没有正确到达broker的,如果在消息到达broker之前已经丢失的话,持久化操作也解决不了这个问题,因为消息根本就没到达代理服务器,你怎么进行持久化,那么这个问题该怎么解决呢?

RabbitMQ为我们提供了两种方式:

  • 通过AMQP事务机制实现,这也是AMQP协议层面提供的解决方案;
  • 通过将channel设置成confirm模式来实现;

AMQP事物机制控制

 RabbitMQ中与事务机制有关的方法有三个:txSelect(), txCommit()以及txRollback(), txSelect()用于将当前channel设置成transaction模式,txCommit()用于提交事务,txRollback()用于回滚事务,在通过txSelect()开启事务之后,我们便可以发布消息给broker代理服务器了,如果txCommit()提交成功了,则消息一定到达了broker了,如果在txCommit()执行之前broker异常崩溃或者由于其他原因抛出异常,这个时候我们便可以捕获异常通过txRollback()回滚事务。

SendTx.java

try {
    // 通过工厂创建连接
    connection = factory.newConnection();
    // 获取通道
    channel = connection.createChannel();
    // 开启事务
    channel.txSelect();
    // 声明队列
    channel.queueDeclare(QUEUE_NAME, false, false, false, null);
    // 创建消息
    String message = "Hello World!";
    // 将产生的消息放入队列
    channel.basicPublish("", QUEUE_NAME, null, message.getBytes("UTF-8"));
    System.out.println(" [x] Sent '" + message + "'");
    // 模拟程序异常
    int i = 1 / 0;
    // 提交事务
    channel.txCommit();
} catch (IOException | TimeoutException e) {
    e.printStackTrace();
    try {
        // 回滚事务
        channel.txRollback();
    } catch (IOException e1) {
        e1.printStackTrace();
    }
}

 事务确实能够解决producer与broker之间消息确认的问题,只有消息成功被broker接受,事务提交才能成功,否则我们便可以在捕获异常进行事务回滚操作同时进行消息重发,但是使用事务机制的话会降低RabbitMQ的性能,那么有没有更好的方法既能保障producer知道消息已经正确送到,又能基本上不带来性能上的损失呢?从AMQP协议的层面看是没有更好的方法,但是RabbitMQ提供了一个更好的方案,即将channel信道设置成confirm模式。

confirm确认模式

 通过AMQP协议层面为我们提供了事务机制解决了这个问题,但是采用事务机制实现会降低RabbitMQ的消息吞吐量,此时处理AMQP协议层面能够实现消息事物控制外,我们还有第二种方式即:Confirm模式。

Confirm确认模式原理

 生产者将信道设置成confirm模式,一旦信道进入confirm模式,所有在该信道上面发布的消息都会被指派一个唯一的ID(从1开始),一旦消息被投递到所有匹配的队列之后,broker就会发送一个确认给生产者(包含消息的唯一ID),这就使得生产者知道消息已经正确到达目的队列了,如果消息和队列是可持久化的,那么确认消息会将消息写入磁盘之后发出,broker回传给生产者的确认消息中deliver-tag域包含了确认消息的序列号,此外broker也可以设置basic.ack的multiple域,表示到这个序列号之前的所有消息都已经得到了处理。

 confirm模式最大的好处在于他是异步的,一旦发布一条消息,生产者应用程序就可以在等信道返回确认的同时继续发送下一条消息,当消息最终得到确认之后,生产者应用便可以通过回调方法来处理该确认消息,如果RabbitMQ因为自身内部错误导致消息丢失,就会发送一条nack消息,生产者应用程序同样可以在回调方法中处理该nack消息。

 在channel 被设置成 confirm 模式之后,所有被 publish 的后续消息都将被 confirm(即 ack) 或者被nack一次。但是没有对消息被 confirm 的快慢做任何保证,并且同一条消息不会既被 confirm又被nack 。

 注意:两种事物控制形式不能同时开启!

Confirm确认机制代码实现

 实现生产者confirm 机制有三种方式:

  • 普通confirm模式:每发送一条消息后,调用waitForConfirms()方法,等待服务器端confirm。实际上是一种串行confirm了。
  • 批量confirm模式:每发送一批消息后,调用waitForConfirmsOrDie()方法,等待服务器端confirm。
  • 异步confirm模式:提供一个回调方法,服务端confirm了一条或者多条消息后Client端会回调这个方法。

同步Confirm

SendConfirmSync.java

try {
    // 通过工厂创建连接
    connection = factory.newConnection();
    // 获取通道
    channel = connection.createChannel();
    // 开启confirm确认模式
    channel.confirmSelect();
    // 声明队列
    channel.queueDeclare(QUEUE_NAME, false, false, false, null);
    // 创建消息
    String message = "Hello World!";
    // 将产生的消息放入队列
    channel.basicPublish("", QUEUE_NAME, null, message.getBytes("UTF-8"));
    System.out.println(" [x] Sent '" + message + "'");
    // 确认消息是否发送成功-单条
    if (channel.waitForConfirms())
        System.out.println("消息发送成功!");
    else
        System.out.println("消息发送失败!");
    // 确认消息是否发送成功-多条
    // 直到所有消息都确认,只要有一个未确认就会IOException
    channel.waitForConfirmsOrDie();
    System.out.println("消息发送成功!");
}

 以上代码可以看出,使用同步的方式需要等所有的消息发送成功以后才会执行后面代码,只要有一个消息未被确认就会抛出IO异常。解决办法可以使用异步确认。

异步confirm

 异步confirm模式的编程实现最复杂,Channel对象提供的ConfirmListener()回调方法只包含deliveryTag(当前Chanel发出的消息序号),我们需要自己为每一个Channel维护一个unconfirm的消息序号集合,每publish一条数据,集合中元素加1,每回调一次handleAck方法,unconfirm集合删掉相应的一条(multiple=false)或多条(multiple=true)记录。从程序运行效率上看,这个unconfirm集合最好采用有序集合SortedSet存储结构。实际上,waitForConfirms()方法也是通过SortedSet维护消息序号的。

SendConfirmAsync.java

package com.xxxx.confirm.async.send;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.ConfirmListener;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;

import java.io.IOException;
import java.util.Collections;
import java.util.SortedSet;
import java.util.TreeSet;
import java.util.concurrent.TimeoutException;

/**
 * 信道确认模式-异步-生产者
 */
public class Send {

    // 队列名称
    public static final String QUEUE_NAME = "confirm_async";

    public static void main(String[] args) {
        // 定义连接工厂
        ConnectionFactory factory = new ConnectionFactory();
        factory.setPort(5672);
        factory.setHost("192.168.10.100");
        factory.setUsername("shop");
        factory.setPassword("shop");
        factory.setVirtualHost("/shop");

        Connection connection = null;
        Channel channel = null;
        try {
            // 维护信息发送回执deliveryTag
            final SortedSet<Long> confirmSet=Collections.synchronizedSortedSet(new TreeSet<Long>());
            // 创建连接
            connection = factory.newConnection();
            // 获取通道
            channel = connection.createChannel();
            // 开启confirm确认模式
            channel.confirmSelect();
            // 声明队列
            channel.queueDeclare(QUEUE_NAME, false, false, false, null);
            // 添加channel 监听
            channel.addConfirmListener(new ConfirmListener() {
                // 已确认
                @Override
                public void handleAck(long deliveryTag, boolean multiple) throws IOException {
                    // multiple=true已确认多条 false已确认单条
                    if (multiple) {
                        System.out.println("handleAck--success-->multiple" + deliveryTag);
                        // 清除前 deliveryTag 项标识id
                        confirmSet.headSet(deliveryTag + 1L).clear();
                    } else {
                        System.out.println("handleAck--success-->single" + deliveryTag);
                        confirmSet.remove(deliveryTag);
                    }
                }

                // 未确认
                @Override
                public void handleNack(long deliveryTag, boolean multiple) throws IOException {
                    // multiple=true未确认多条 false未确认单条
                    if (multiple) {
                        System.out.println("handleNack--failed-->multiple-->" + deliveryTag);
                        // 清除前 deliveryTag 项标识id
                        confirmSet.headSet(deliveryTag + 1L).clear();
                    } else {
                        System.out.println("handleNack--failed-->single" + deliveryTag);
                        confirmSet.remove(deliveryTag);
                    }
                }
            });
            // 循环发送消息演示消息确认
            while (true) {
                // 创建消息
                String message = "Hello World!";
                // 获取unconfirm的消息序号deliveryTag
                Long seqNo = channel.getNextPublishSeqNo();
                channel.basicPublish("", QUEUE_NAME, null, message.getBytes("utf-8"));
                // 将消息序号deliveryTag添加至SortedSet
                confirmSet.add(seqNo);
            }
        } catch (IOException | TimeoutException e) {
            e.printStackTrace();
        } finally {
            try {
                // 关闭通道
                if (null != channel && channel.isOpen())
                    channel.close();
                // 关闭连接
                if (null != connection && connection.isOpen())
                    connection.close();
            } catch (TimeoutException e) {
                e.printStackTrace();
            } catch (IOException e) {
                e.printStackTrace();
            }
        }
    }
}

异步模式的优点就是执行效率高,不需要等待消息执行完,只需要监听消息即可。

Spring集成RabbitMQ

官网:https://spring.io/projects/sp...

为什么使用spring AMQP?

  • 基于Spring之上社区活跃
  • 对AMQP协议进行了高度的封装
  • 极大的简化了RabbitMQ的操作
  • 易用性、可扩展

创建聚合项目

创建父项目spring-rabbitmq

2020最新RabbitMQ图文教程通俗易懂版(下)【内附视频链接】

2020最新RabbitMQ图文教程通俗易懂版(下)【内附视频链接】

2020最新RabbitMQ图文教程通俗易懂版(下)【内附视频链接】

2020最新RabbitMQ图文教程通俗易懂版(下)【内附视频链接】

创建rabbitmq-provider

鼠标右键spring-rabbitmq项目new -> Module

2020最新RabbitMQ图文教程通俗易懂版(下)【内附视频链接】

2020最新RabbitMQ图文教程通俗易懂版(下)【内附视频链接】

2020最新RabbitMQ图文教程通俗易懂版(下)【内附视频链接】

2020最新RabbitMQ图文教程通俗易懂版(下)【内附视频链接】

创建rabbitmq-consumer

鼠标右键spring-rabbitmq项目new -> Module

2020最新RabbitMQ图文教程通俗易懂版(下)【内附视频链接】

2020最新RabbitMQ图文教程通俗易懂版(下)【内附视频链接】

2020最新RabbitMQ图文教程通俗易懂版(下)【内附视频链接】

2020最新RabbitMQ图文教程通俗易懂版(下)【内附视频链接】

父项目依赖

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>
    <parent>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-parent</artifactId>
        <version>2.1.7.RELEASE</version>
        <relativePath/> <!-- lookup parent from repository -->
    </parent>

    <modules>
        <module>rabbitmq-provider</module>
        <module>rabbitmq-consumer</module>
    </modules>

    <groupId>com.xxxx</groupId>
    <artifactId>spring-rabbitmq</artifactId>
    <version>0.0.1-SNAPSHOT</version>
    <packaging>pom</packaging>
    <name>srping-rabbitmq</name>
    <description>Demo project for Spring Boot</description>

    <properties>
        <java.version>1.8</java.version>
    </properties>

</project>

编写生产者

pom.xml

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>
    <parent>
        <groupId>com.xxxx</groupId>
        <artifactId>spring-rabbitmq</artifactId>
        <version>0.0.1-SNAPSHOT</version>
    </parent>
    <groupId>com.xxxx</groupId>
    <artifactId>rabbitmq-provider</artifactId>
    <version>0.0.1-SNAPSHOT</version>
    <name>rabbitmq-provider</name>
    <description>Demo project for Spring Boot</description>

    <properties>
        <java.version>1.8</java.version>
    </properties>

    <dependencies>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-amqp</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-test</artifactId>
        </dependency>
    </dependencies>
</project>

application.yml

spring:
  rabbitmq:
    host: 127.0.0.1
    port: 5672
    username: shop
    password: shop
    virtual-host: /shop
server:
  port: 8081

RabbitmqConfig.java

package com.xxxx.rabbitmqprovider.config;

import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.core.TopicExchange;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

@Configuration
public class RabbitmqConfig {
    /**
     * 申明队列
     * @return
     */
    @Bean
    public Queue queue(){
        return new Queue("topics");
    }

    /**
     * 申明交换机(主题模式)
     * @return
     */
    @Bean
    public TopicExchange topicExchange(){
        return new TopicExchange("topicExchange");
    }

    /**
     * 将队列绑定到交换机
     * @return
     */
    @Bean
    public Binding binding(){
        return BindingBuilder.bind(queue()).to(topicExchange()).with("topic.msg");
    }
}

Send.java

package com.xxxx.rabbitmqprovider.send;

import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

@Component
public class Sender {
    @Autowired
    private RabbitTemplate rabbitTemplate;

    public void send() {
        String message = "Hello World!";
        /**
         * 第一个参数:交换机名称
         * 第二个参数:路由key名称
         * 第三个参数:发送的消息
         */
        rabbitTemplate.convertAndSend("topicExchange", "topic.msg", message);
        System.out.println("发送:" + message);
    }
}

RabbitmqProviderTestApplication.java

package com.xxxx.rabbitmqprovider;

import com.xxxx.rabbitmqprovider.send.Sender;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.junit4.SpringRunner;

@RunWith(SpringRunner.class)
@SpringBootTest(classes = RabbitmqProviderApplication.class)
public class  {
    @Autowired
    private Sender sender;

    @Test
    public void testSend(){
        sender.send();
    }
}

编写消费者

pom.xml

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>
    <parent>
        <groupId>com.xxxx</groupId>
        <artifactId>spring-rabbitmq</artifactId>
        <version>0.0.1-SNAPSHOT</version>
    </parent>

    <groupId>com.xxxx</groupId>
    <artifactId>rabbitmq-consumer</artifactId>
    <version>0.0.1-SNAPSHOT</version>
    <name>rabbitmq-consumer</name>
    <description>Demo project for Spring Boot</description>

    <properties>
        <java.version>1.8</java.version>
    </properties>

    <dependencies>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-amqp</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-test</artifactId>
        </dependency>
    </dependencies>
</project>

application.yml

spring:
  rabbitmq:
    host: 127.0.0.1
    port: 5672
    username: shop
    password: shop
    virtual-host: /shop
server:
  port: 8082

Consumer.java

package com.xxxx.rabbitmqconsumer.revc;

import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

@Component
//监听队列
@RabbitListener(queues = "topics")
public class Consumer {
    //表示接收消息后的处理方法
    @RabbitHandler
    public void recv(String message){
        System.out.println("接收消息:"+message);
    }
}

测试

2020最新RabbitMQ图文教程通俗易懂版(下)【内附视频链接】

2020最新RabbitMQ图文教程通俗易懂版(下)【内附视频链接】

总结

当然这是官网最简单的例子,以后如果项目是基于配置来做的话要掌握以下:

  1. pom中引用jar
  2. 先配置rabbitmq的配置

    1. 先配置ConnectionFactory
    2. 配置RabbitAmdmin
  3. 配置RabbitTemplate这里通常在配置一个Message Convert使用JSON进行数据格式的传输
  4. 配置Exchange
  5. 配置Queue
  6. 配置一个消息处理的bean或者通过Spring扫描,这个Bean最后继承MessageListener 来处理JSON数据
  7. 配置Listener Container

去看视频了解

点赞
收藏
评论区
推荐文章
blmius blmius
4年前
MySQL:[Err] 1292 - Incorrect datetime value: ‘0000-00-00 00:00:00‘ for column ‘CREATE_TIME‘ at row 1
文章目录问题用navicat导入数据时,报错:原因这是因为当前的MySQL不支持datetime为0的情况。解决修改sql\mode:sql\mode:SQLMode定义了MySQL应支持的SQL语法、数据校验等,这样可以更容易地在不同的环境中使用MySQL。全局s
Wesley13 Wesley13
3年前
MySQL部分从库上面因为大量的临时表tmp_table造成慢查询
背景描述Time:20190124T00:08:14.70572408:00User@Host:@Id:Schema:sentrymetaLast_errno:0Killed:0Query_time:0.315758Lock_
美凌格栋栋酱 美凌格栋栋酱
7个月前
Oracle 分组与拼接字符串同时使用
SELECTT.,ROWNUMIDFROM(SELECTT.EMPLID,T.NAME,T.BU,T.REALDEPART,T.FORMATDATE,SUM(T.S0)S0,MAX(UPDATETIME)CREATETIME,LISTAGG(TOCHAR(
Stella981 Stella981
3年前
RabbitMQ之消息确认机制(事务+Confirm)
概述在使用RabbitMQ的时候,我们可以通过消息持久化操作来解决因为服务器的异常奔溃导致的消息丢失,除此之外我们还会遇到一个问题,当消息的发布者在将消息发送出去之后,消息到底有没有正确到达broker代理服务器呢?如果不进行特殊配置的话,默认情况下发布操作是不会返回任何信息给生产者的,也就是默认情况下我们的生产者是不知道消息有没有
Stella981 Stella981
3年前
RabbitMQ系列三 (深入消息队列)
消息持久化是RabbitMQ最为人津津乐道的特性之一,RabbitMQ能够在付出最小的性能代价的基础上实现消息的持久化,最大的奥秘就在于RabbitMQ多层消息队列的设计上。下面,本文就从MessageQueue的设计和消息在MessageQueue的生命周期两个方面全面介绍 RabbitMQ的消息队列。RabbitMQ完全实现
Stella981 Stella981
3年前
RabbitMQ小技巧
导读在使用RabbitMQ消息中间件时,因为消息的投递是异步的,默认情况下,RabbitMQ会删除那些无法路由的消息。为了能够检出消息是否顺利投递到队列,我们需要相应的处理机制。今天就来验证一下相关的验证机制。!RabbitMQ小技巧确定消息投递情况RabbitMQ小技巧确定消息投递情况(https://imgblog.csdnim
Stella981 Stella981
3年前
RabbitMQ学习:RabbitMQ的基本概念及RabbitMQ使用场景(二)
1、RabbitMQ的基本概念RabbitMQ是一种消息中间件,用于处理来自客户端的异步消息。服务端将要发送的消息放入到队列池中。接收端可以根据RabbitMQ配置的转发机制接收服务端发来的消息。RabbitMQ依据指定的转发规则进行消息的转发、缓冲和持久化操作,主要用在多服务器间或单服务器的子系统间进行通信,是分布式系统
Stella981 Stella981
3年前
RabbitMQ学习:安装RabbitMQ及RabbitMQ的初步配置(一)
RabbitMQ基础含义RabbitMQ是一种消息中间件,用于处理来自客户端的异步消息。服务端将要发送的消息放入到队列池中。接收端可以根据RabbitMQ配置的转发机制接收服务端发来的消息。RabbitMQ依据指定的转发规则进行消息的转发、缓冲和持久化操作,主要用在多服务器间或单服务器的子系统间进行通信,是分布式系统标准的配置。
Wesley13 Wesley13
3年前
JMS消息的概念解释
1、默认生产者消息是持久的:会存数据库\消费者的持久:createDurableSubscriber是指消费者能收到所有它订阅时间点之后的消息,即使消费者注册后关闭,当它重启就能收到注册时间点之后所有的消息;即当此消费用户ID(AAA)在producer发送之前就已经注册,那么此id能收到producer发送的所有消息,如果是在produce
Stella981 Stella981
3年前
RabbitMq学习(二)RabbitMQ的消息确认机制
一.为什么有消息确认机制在RabbitMq中,一个消息从产生到最终的消息接受,中间大致会有三个环节,首先是消息到达交换机、然后是消息通过交换机到达队列,最后消费者消费绑定的队列消息。 但是在这个过程中,如果出现网络或者系统的异常,就会导致消息不能被正常消费。如果不能正常消费消息,会造成两方面的问题。 1.1在服务端消息到
融云IM即时通讯 融云IM即时通讯
9个月前
融云IM干货丨IM服务消息推送,客户端版本更新后,如何确保消息不丢失?
确保客户端版本更新后消息不丢失,可以采取以下几种策略:消息持久化:确保消息被存储在可靠的存储介质中,如数据库或磁盘,这样即使客户端或服务端发生故障,消息也不会丢失。对于RabbitMQ等消息队列,需要开启持久化机制,将消息持久化到硬盘上,即使服务重启也能从