RabbitMQ(一)

Stella981
• 阅读 677

系列说明

本系列主要讲解RabbitMQ,讲解其特性,例如消息持久化、消息TTL、消息的优先、延迟消息、消息可靠性、消费模式以及在Spring Boot中使用RabbitMQ,代码在我的Github上

RabbitMQ介绍

RabbitMQ使用Erlang语言开发基于AQMP协议的开源消息队列,RabbitMQ主要有以下特点:

  • 高可靠性: RabbitMQ依靠消息确认、持久化等实现高可靠性,但其吞吐量不太高
  • 高可用: RabbitMQ支持分布式部署,多个RabbitMQ服务器组成一个集群形成一个逻辑Broker
  • 多种协议支持: RabbitMQ基于AQMP协议,但是可以通过安装插件支持其它协议,例如STOMP、MQTT协议等
  • 多种客户端语言支持: RabbitMQ提供Java、C++等多种客户端语言支持
  • 管理页面: RabbitMQ提供Web管理页面以便可视化管理

AQMP

RabbitMQ基于AQMP协议开发的消息队列,AQMP协议在之前消息队列(一)中已经简单的介绍了,这里就简单的介绍一下:

RabbitMQ(一)

  • Broker: Broker指的是代理消息队列,是一个逻辑概念,指的是RabbitMQ服务器,其可以有多个Vritual Host组成。
  • Virtual Host: Vritual Host是一个虚拟概念,类似于权限控制组,一个Vritual Host可以有若干Exchange和Queue,权限控制的最小粒度是Virtual Host
  • Exchange: Exchange叫交换机,其可以多个Queue根据路由规则(Routing Key)绑定。Exchange接收生产者发送的消息,根据其类型(ExchangeType)和路由规则(Routing Key)把消息发送给队列。
  • Bingding: Binding联系Exchange和Queue
  • Connection: Connection在RabbitMQ中是一个客户端和Broker之间的TCP连接
  • Channel: Channel在RabbitMQ中叫做信道,有Connection创建,并且一个Connection可以创建多个Channel。在RabbitMQ必须通过Channel才能发送消息。之所以需要Channel,主要因为TCP连接过于昂贵。

需要注意的地方:

  • 如果消息发送到Exchange后,Exchange不能通过路由规则找到合适的队列,该消息将会被删除
  • RabbitMQ建议客户端线程之间不要共用Channel,而是共用Connection

RabbitMQ使用

RabbitMQ-Java官方提供了简单的使用教程,这里就简单的提一下,具体可见其网友翻译版本:RabbitMQ入门教程

这里展示的是RabbitMQ发送消息

public class Sender {

  private static final String EXCHANGE_NAME = "log";

  private static final String ROUTING_KEY = "info";

  private static final String MESSAGE = "hello world!";

  public static void main(String[] args) throws IOException, TimeoutException {
    ConnectionFactory factory = new ConnectionFactory();
    factory.setHost("47.100.20.186");
    factory.setPort(5672);
    factory.setUsername("guest");
    factory.setPassword("guest");
    // 通过连接工厂创建连接
    Connection connection = factory.newConnection();
    // 通过Connection创建Channel
    Channel channel = connection.createChannel();
    // 声明Exchange:名称及其类型,该操作同样是幂等的,如何声明对队列一样
    channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT);
    // 通过Channel向Exchange发送消息和Routing Key,并且配置了BasicProperties(消息属性)
    channel.basicPublish(EXCHANGE_NAME, ROUTING_KEY,
        MessageProperties.PERSISTENT_BASIC, MESSAGE.getBytes(StandardCharsets.UTF_8));
    // 关闭Channel和Connection
    channel.close();
    connection.close();
  }
}

这里展示使用RabbitMQ接收消息

public class Receiver {

  private static final String QUEUE_NAME = "log_info_queue";

  private static final String EXCHANGE_NAME = "log";

  private static final String ROUTING_KEY = "info";

  public static void main(String[] args) throws IOException, TimeoutException {
    ConnectionFactory factory = new ConnectionFactory();
    factory.setHost("47.100.20.186");
    factory.setPort(5672);
    factory.setUsername("guest");
    factory.setPassword("guest");
    // 通过连接工厂创建连接
    Connection connection = factory.newConnection();
    // 通过Connection创建Channel
    Channel channel = connection.createChannel();
    // 声明一个队列 -- 在RabbitMQ中,队列声明是幂等性的
    // 一个幂等操作的特点是其任意多次执行所产生的影响均与一次执行的影响相同
    // 也就是说,如果不存在,就创建,如果存在,不会对已经存在的队列产生任何影响
    // 但是如果声明时修改已存在队列的属性,则会抛出异常
    channel.queueDeclare(QUEUE_NAME, false, false , false, null);
    // 把Queue和Exchange通过Routing Key绑定
    channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, ROUTING_KEY);
    // 设置该消费者预读取消息数量:这里主要是考虑到慢消费的问题,这里使用PUSH模型,服务器推消息给客户端,
    // 可能会导致消息堆积,设置预读取数量后,服务器会发送指定数量消息后等待前面消息ACK后才会继续发送消息
    channel.basicQos(1);
    // 接收消息:这里使用自动ACK,当然也可以获取消息后手动ACK
    String s = channel.basicConsume(QUEUE_NAME, true, new DefaultConsumer(channel) {
      @Override
      public void handleDelivery(String consumerTag, Envelope envelope, BasicProperties properties, byte[] body) throws IOException {
        String message = new String(body, StandardCharsets.UTF_8);
      }
    });
  }
}

RabbitMQ的消息传递模型

RabbitMQ主要通过ExchangeType来设置消息传递模型,主要有下面4种模型,其中Header模型用的少:

  • Direct模型
  • Fanout模型
  • Topic模型
  • Header模型

Direct模型

Direct模型顾名思义指的是直接连接,只有当消息中的Routing Key与Queue绑定到Exchange的Routing Key一致,才会转发消息给该Queue RabbitMQ(一)

Fanout模型

Fanout模型类似于订阅/发布模型,Exchange会把消息转发给所有绑定到该Exchange上的Queue RabbitMQ(一)

Topic模型

Topic模型类型与Servlet的URL匹配模型,其会匹配消息的Routing Key和Queue绑定到Exchange的Routing Key,使用通配符匹配。有#和*两种通配符,#代表0个或多个字符,*代表1个字符 RabbitMQ(一)

RabbitMQ的持久化

首先RabbitMQ的持久化是异步持久化模型,也就是说在特定情况下,可能造成消息丢失。比如在RabbitMQ Server回调RabbitMQ Producer Client的接口表明已经接收到该消息,但是由于是异步持久化可能还没有把消息持久化到磁盘中,这时候MQ-Server断电就会导致消息的丢失

RabbitMQ中消息的持久化需要保证Exchange、Queue、Message都进行持久化操作。需要注意的是:Exchange、Queue的声明时幂等的。幂等指说多次声明产生的结果都是一样,也就是说如果其不存在则创建,存在则返回且不会对其产生任何影响,但是如果声明已存在的队列,且其属性不同则会抛出异常。

Exchange的持久化

RabbitMQ声明Exchange有几种方法,但主要使用下面方法,其中第三个参数表示是否将该Exchange持久化

/**
 * Actively declare a non-autodelete exchange with no extra arguments
 * @param exchange the name of the exchange
 * @param type the exchange type
 * @param durable true if we are declaring a durable exchange (the exchange will survive a server restart)
 */
Exchange.DeclareOk exchangeDeclare(String exchange, String type, boolean durable) throws IOException;

Queue的持久化

RabbitMQ声明Queue与Exchange的方法类型,同样使用durable参数表示是否将该Queue进行持久化操作,下面是其中一个方法

/**
 * Declare a queue
 * @param queue the name of the queue
 * @param durable true if we are declaring a durable queue (the queue will survive a server restart)
 * @param exclusive true if we are declaring an exclusive queue (restricted to this connection)
 * @param autoDelete true if we are declaring an autodelete queue (server will delete it when no longer in use)
 * @param arguments other properties (construction arguments) for the queue
 */
Queue.DeclareOk queueDeclare(String queue, boolean durable, boolean exclusive, boolean autoDelete, Map<String, Object> arguments) throws IOException;

部分参数说明:

  • exclusive:表明该队列是否是排它队列。如果一个队列被声明为排他队列,该队列仅对首次申明它的连接可见,并在连接断开时自动删除。这里需要注意三点:1. 排他队列是基于连接可见的,同一连接的不同信道是可以同时访问同一连接创建的排他队列;2.“首次”,如果一个连接已经声明了一个排他队列,其他连接是不允许建立同名的排他队列的,这个与普通队列不同;3.即使该队列是持久化的,一旦连接关闭或者客户端退出,该排他队列都会被自动删除的,这种队列适用于一个客户端发送读取消息的应用场景。
  • autoDelete:表明该队列是否自动删除。自动删除,如果该队列没有任何订阅的消费者的话,该队列会被自动删除。这种队列适用于临时队列。

Message的持久化

消息的持久化需要在生产者发送消息时设置消息属性,以表明该消息时持久化消息。下面是消息发送的一个API

/**
 * Publish a message.
 *
 * Publishing to a non-existent exchange will result in a channel-level
 * protocol exception, which closes the channel.
 *
 * @param exchange the exchange to publish the message to
 * @param routingKey the routing key
 * @param props other properties for the message - routing headers etc
 * @param body the message body
 */
void basicPublish(String exchange, String routingKey, BasicProperties props, byte[] body) throws IOException;

部分参数说明:

  • exchange:表示该消息发送到哪个Exchange
  • routingKey:表示该消息的Routing Key
  • props:表示该消息的属性
  • body:消息实体

BasicProperties定义如下:

public BasicProperties(
            String contentType,//消息类型如:text/plain
            String contentEncoding,//编码
            Map<String,Object> headers,
            Integer deliveryMode,//1:nonpersistent 2:persistent
            Integer priority,//优先级
            String correlationId,
            String replyTo,//反馈队列
            String expiration,//expiration到期时间
            String messageId,
            Date timestamp,
            String type,
            String userId,
            String appId,
            String clusterId)

其中RabbitMQ提供了属性实现已经更简单的配置消息属性:

/** Empty basic properties, with no fields set */
BasicProperties.MINIMAL_BASIC
/** Empty basic properties, with only deliveryMode set to 2 (persistent) */
BasicProperties.MINIMAL_PERSISTENT_BASIC
/** Content-type "application/octet-stream", deliveryMode 1 (nonpersistent), priority zero */
BasicProperties.BASIC
/** Content-type "application/octet-stream", deliveryMode 2 (persistent), priority zero */
BasicProperties.PERSISTENT_BASIC
/** Content-type "text/plain", deliveryMode 1 (nonpersistent), priority zero */
BasicProperties.TEXT_PLAIN
/** Content-type "text/plain", deliveryMode 2 (persistent), priority zero */
BasicProperties.PERSISTENT_TEXT_PLAIN

当然可以使用时编程自定义设置消息属性:

AMQP.BasicProperties.Builder builder = new Builder();
BasicProperties properties = builder
    .deliveryMode(2)
    .build();
channel.basicPublish(EXCHANGE_NAME, ROUTING_KEY, properties, MESSAGE.getBytes(StandardCharsets.UTF_8));

消息什么时候刷到磁盘(持久化)

写入文件前会有一个Buffer,大小为1M,数据在写入文件时,首先会写入到这个Buffer,如果Buffer已满,则会将Buffer写入到文件(未必刷到磁盘)。 有个固定的刷盘时间:25ms,也就是不管Buffer满不满,每个25ms,Buffer里的数据及未刷新到磁盘的文件内容必定会刷到磁盘。 每次消息写入后,如果没有后续写入请求,则会直接将已写入的消息刷到磁盘:使用Erlang的receive x after 0实现,只要进程的信箱里没有消息,则产生一个timeout消息,而timeout会触发刷盘操作。

TTL

TTL(Time To Live)表示存活时间。RabbitMQ中可以对Queue和Message设置TTL,以控制Queue和Message的存活时间。

Queue TTL

队列的存活时间指的是Queue在自动删除前可以处于未使用状态的时间。未使用状态指的是Queue上没有Consumer、Queue没有被重新声明。队列的存活时间在队列第一次声明时通过指定队列的属性"x-expires"指定,单位是毫秒,代码如下:

Map<String, Object> queueArgs = new HashMap<>();
// 设置1分钟过期
queueArgs.put("x-expires", 60000);
channel.queueDeclare("queue", false, false, false, queueArgs);

Message TTL

消息的存活时间指的是消息在队列中的存活时间,超过该时间消息将被删除或者不能传递给消费者。消息的存活时间可以通过设置每条消息的存活时间或者设置某条队列中的所以存活时间,当两者都有时,时间小的有效。

设置消息属性 针对每条消息可以在发送消息时设置消息属性

// 设置消息属性-TTL为30s
BasicProperties properties = new BasicProperties.Builder()
    .expiration("30000").build();
channel.basicPublish("exchange", "kanyuxia", properties,
    "hello".getBytes(StandardCharsets.UTF_8));

设置队列属性 通过设置队列中消息的TTL属性,然后传入该队列的所有消息都有该TTL属性

Map<String, Object> queueArgs = new HashMap<>();
queueArgs.put("x-message-ttl", 30000);
channel.queueDeclare("queue", false, false, false, queueArgs);

Reference

https://www.jianshu.com/p/64357bf35808?utm_campaign=maleskine&utm_content=note&utm_medium=seo_notes&utm_source=recommendation http://blog.csdn.net/wanbf123/article/details/78052419 http://blog.csdn.net/u013256816/article/details/60875666 http://blog.csdn.net/u013256816/article/details/54916011

点赞
收藏
评论区
推荐文章
blmius blmius
2年前
MySQL:[Err] 1292 - Incorrect datetime value: ‘0000-00-00 00:00:00‘ for column ‘CREATE_TIME‘ at row 1
文章目录问题用navicat导入数据时,报错:原因这是因为当前的MySQL不支持datetime为0的情况。解决修改sql\mode:sql\mode:SQLMode定义了MySQL应支持的SQL语法、数据校验等,这样可以更容易地在不同的环境中使用MySQL。全局s
Jacquelyn38 Jacquelyn38
2年前
2020年前端实用代码段,为你的工作保驾护航
有空的时候,自己总结了几个代码段,在开发中也经常使用,谢谢。1、使用解构获取json数据let jsonData  id: 1,status: "OK",data: 'a', 'b';let  id, status, data: number   jsonData;console.log(id, status, number )
皕杰报表之UUID
​在我们用皕杰报表工具设计填报报表时,如何在新增行里自动增加id呢?能新增整数排序id吗?目前可以在新增行里自动增加id,但只能用uuid函数增加UUID编码,不能新增整数排序id。uuid函数说明:获取一个UUID,可以在填报表中用来创建数据ID语法:uuid()或uuid(sep)参数说明:sep布尔值,生成的uuid中是否包含分隔符'',缺省为
Stella981 Stella981
2年前
KVM调整cpu和内存
一.修改kvm虚拟机的配置1、virsheditcentos7找到“memory”和“vcpu”标签,将<namecentos7</name<uuid2220a6d1a36a4fbb8523e078b3dfe795</uuid
Easter79 Easter79
2年前
Twitter的分布式自增ID算法snowflake (Java版)
概述分布式系统中,有一些需要使用全局唯一ID的场景,这种时候为了防止ID冲突可以使用36位的UUID,但是UUID有一些缺点,首先他相对比较长,另外UUID一般是无序的。有些时候我们希望能使用一种简单一些的ID,并且希望ID能够按照时间有序生成。而twitter的snowflake解决了这种需求,最初Twitter把存储系统从MySQL迁移
Wesley13 Wesley13
2年前
mysql设置时区
mysql设置时区mysql\_query("SETtime\_zone'8:00'")ordie('时区设置失败,请联系管理员!');中国在东8区所以加8方法二:selectcount(user\_id)asdevice,CONVERT\_TZ(FROM\_UNIXTIME(reg\_time),'08:00','0
Wesley13 Wesley13
2年前
00:Java简单了解
浅谈Java之概述Java是SUN(StanfordUniversityNetwork),斯坦福大学网络公司)1995年推出的一门高级编程语言。Java是一种面向Internet的编程语言。随着Java技术在web方面的不断成熟,已经成为Web应用程序的首选开发语言。Java是简单易学,完全面向对象,安全可靠,与平台无关的编程语言。
Stella981 Stella981
2年前
Django中Admin中的一些参数配置
设置在列表中显示的字段,id为django模型默认的主键list_display('id','name','sex','profession','email','qq','phone','status','create_time')设置在列表可编辑字段list_editable
Wesley13 Wesley13
2年前
MySQL部分从库上面因为大量的临时表tmp_table造成慢查询
背景描述Time:20190124T00:08:14.70572408:00User@Host:@Id:Schema:sentrymetaLast_errno:0Killed:0Query_time:0.315758Lock_
Python进阶者 Python进阶者
3个月前
Excel中这日期老是出来00:00:00,怎么用Pandas把这个去除
大家好,我是皮皮。一、前言前几天在Python白银交流群【上海新年人】问了一个Pandas数据筛选的问题。问题如下:这日期老是出来00:00:00,怎么把这个去除。二、实现过程后来【论草莓如何成为冻干莓】给了一个思路和代码如下:pd.toexcel之前把这