Java项目笔记之消息中间件

Wesley13
• 阅读 532

不点蓝字,我们哪来故事?

===

消息中间件-RocketMQ

=================

消息中间件简介

应用场景

Java项目笔记之消息中间件

详情网址:阿里云官网消息队列RocketMQ

  1. Java项目笔记之消息中间件

  2. Java项目笔记之消息中间件

  3. Java项目笔记之消息中间件

  4. Java项目笔记之消息中间件

  5. Java项目笔记之消息中间件

  6. Java项目笔记之消息中间件

异步解耦

需求:注册完后要发送短信和发送邮件去通知用户;

假设系统完成注册、发送短信、发送邮件需要花费150ms。

怎么去对这个系统做优化

注册完成后,开启两个线程,分别做发送短信和发送邮件的任务。此时花费的时间变成了100ms;

这样还不是最好的,因为发送短信和发送邮件失败会对核心功能注册产生影响。使用消息中间件,可以做到异步解耦,效率高也更安全。

削峰填谷

服务器的访问量在某一时刻非常高的时候可能会给服务器造成压力而宕机。此时使用消息中间件让服务器来正常的处理请求,消息中间件中允许请求的短暂积压。

分布式缓存同步/消息分发

为什么要使用消息队列?

回答:这个问题,咱只答三个最主要的应用场景(不可否认还有其他的,但是只答三个主要的),即以下六个字:解耦、异步、削峰

(1)解耦

传统模式:

Java项目笔记之消息中间件

传统模式的缺点:

  • 系统间耦合性太强,如上图所示,系统A在代码中直接调用系统B和系统C的代码,如果将来D系统接入,系统A还需要修改代码,过于麻烦!

中间件模式:

Java项目笔记之消息中间件

中间件模式的的优点:

  • 将消息写入消息队列,需要消息的系统自己从消息队列中订阅,从而系统A不需要做任何修改。
(2)异步

传统模式:

Java项目笔记之消息中间件

传统模式的缺点:

  • 一些非必要的业务逻辑以同步的方式运行,太耗费时间。

中间件模式:

Java项目笔记之消息中间件

中间件模式的的优点:

  • 将消息写入消息队列,非必要的业务逻辑以异步的方式运行,加快响应速度
(3)削峰

传统模式

Java项目笔记之消息中间件

传统模式的缺点:

  • 并发量大的时候,所有的请求直接怼到数据库,造成数据库连接异常

中间件模式:

Java项目笔记之消息中间件

中间件模式的的优点:

  • 系统A慢慢的按照数据库能处理的并发量,从消息队列中慢慢拉取消息。在生产中,这个短暂的高峰期积压是允许的。

使用了消息队列会有什么缺点?

分析:一个使用了MQ的项目,如果连这个问题都没有考虑过,就把MQ引进去了,那就给自己的项目带来了风险。我们引入一个技术,要对这个技术的弊端有充分的认识,才能做好预防。要记住,不要给公司挖坑!回答:回答也很容易,从以下两个个角度来答

  • 系统可用性降低:你想啊,本来其他系统只要运行好好的,那你的系统就是正常的。现在你非要加个消息队列进去,那消息队列挂了,你的系统不是呵呵了。因此,系统可用性降低;

  • 系统复杂性增加:要多考虑很多方面的问题,比如一致性问题、如何保证消息不被重复消费,如何保证保证消息可靠传输。因此,需要考虑的东西更多,系统复杂性增大。

但是,我们该用还是要用的。

消息队列如何选型?

消息中间件

特性

ActiveMQ

RabbitMQ

RocketMQ

kafka

开发语言

java

erlang

java

scala

单机吞吐量

万级

万级

10万级

100万级

时效性

ms级

us级

ms级

ms级以内

可用性

高(主从架构)

高(主从架构)

非常高(分布式架构)

非常高(分布式架构)

功能特性

成熟的产品,在很多公司得到应用;有较多的文档;各种协议支持较好

基于erlang开发,所以并发能力很强,性能极其好,延时很低;管理界面较丰富

MQ功能比较完备,扩展性佳

只支持主要的MQ功能,像一些消息查询,消息回溯等功能没有提供,毕竟是为大数据准备的,在大数据领域应用广。

消息中间件对比:

Java项目笔记之消息中间件

RocketMQ的核心概念

生产者Producer

负责生产消息,一般由业务系统负责生产消息。一个消息生产者会把业务应用系统里产生的消息发送到broker服务器。RocketMQ提供多种发送方式,同步发送、异步发送、顺序发送、单向发送。同步和异步方式均需要Broker返回确认信息,单向发送不需要。

消费者Consumer

负责消费消息,一般是后台系统负责异步消费。一个消息消费者会从Broker服务器拉取消息、并将其提供给应用程序。从用户应用的角度而言提供了两种消费形式:拉取式消费、推动式消费。

名字服务Name Server

名称服务充当路由消息的提供者。生产者或消费者能够通过名字服务查找各主题相应的Broker IP列表。多个Namesrv实例组成集群,但相互独立,没有信息交换。

代理服务器Broker Server

消息中转角色,负责存储消息、转发消息。代理服务器在RocketMQ系统中负责接收从生产者发送来的消息并存储、同时为消费者的拉取请求作准备。代理服务器也存储消息相关的元数据,包括消费者组、消费进度偏移和主题和队列消息等。

消息内容Message

消息系统所传输信息的物理载体,生产和消费数据的最小单位,每条消息必须属于一个主题。RocketMQ中每个消息拥有唯一的Message ID,且可以携带具有业务标识的Key。系统提供了通过Message ID和Key查询消息的功能。

消息主题Topic

表示一类消息的集合,每个主题包含若干条消息,每条消息只能属于一个主题,是RocketMQ进行消息订阅的基本单位。

标签Tag

为消息设置的标志,用于同一主题下区分不同类型的消息。来自同一业务单元的消息,可以根据不同业务目的在同一主题下设置不同标签。标签能够有效地保持代码的清晰度和连贯性,并优化RocketMQ提供的查询系统。消费者可以根据Tag实现对不同子主题的不同消费逻辑,实现更好的扩展性。

消息队列MessageQueue

对于每个Topic都可以设置一定数量的消息队列用来进行数据的读取

RocketMQ核心概念
消息中间件核心概念(会画)

Java项目笔记之消息中间件

Java项目笔记之消息中间件

RocketMQ环境搭建

下载RocketMQ

http://rocketmq.apache.org/

https://github.com/apache/rocketmq

  • bin:启动脚本,包括shell脚本和CMD脚本

  • conf:实例配置文件 ,包括broker配置文件、logback配置文件等

  • lib:依赖jar包,包括Netty、commons-lang、FastJSON等

window的安装配置

1 使用rocketmq-4.5.1.zip 解压到指定目录

2 需要配置环境变量ROCKETMQ_HOME,加path

3 修改broker的配置文件:broker.conf中加了最后两行(开启属性过滤)

Java项目笔记之消息中间件

4 启动nameserver:bin目录下cmd执行命令mqnamesrv.cmd

5 启动broker:bin目录下cmd执行命令mqbroker.cmd -c ../conf/broker.conf

6 启动管理控制台:jar包所在的目录下java -jar rocketmq-console-ng-1.0.1.jar

单机环境搭建

详细的配置在xmind文件中。

集群环境搭建

详细的配置在xmind文件中。

总体架构设计

两主两从:Java项目笔记之消息中间件

集群应用流程
  1. 启动NameServer,NameServer起来后监听端口9876,等待Broker、Producer、Consumer连上来,相当于一个路由控制中心。

  2. Broker启动,跟所有的NameServer保持长连接,定时(每隔30s)发送心跳包。心跳包中包含当前Broker信息(IP+端口等)以及存储所有Topic信息。注册成功后,NameServer集群中就有Topic跟Broker的映射关系。

  3. 创建Topic,创建Topic时需要指定该Topic要存储在哪些Broker上,也可以在发送消息时自动创建Topic。

  4. Producer发送消息,启动时先跟NameServer集群中的其中一台建立长连接,并从NameServer中获取当前发送的Topic存在哪些Broker上,轮询从队列列表中选择一个队列,然后与队列所在的Broker建立长连接从而向Broker发消息。

  5. Consumer跟Producer类似,跟其中一台NameServer建立长连接,获取当前订阅Topic存在哪些Broker上,然后直接跟Broker建立连接通道,开始消费消息。

监控平台搭建

看Xmind进行操作。

核心基础使用

创建生产者消费者;

添加依赖:

 <dependency>

发送和消费消息:

生产者:

 /**

消费者:

 /**

发送消息方式

  1. 同步发送 sync

    发送消息采用同步模式,这种方式只有在消息完全发送完成之后才返回结果,此方式存在需要同步等待发送结果的时间代价。

    这种方式具有内部重试机制,即在主动声明本次消息发送失败之前,内部实现将重试一定次数,默认为2次(DefaultMQProducer#getRetryTimesWhenSendFailed)。发送的结果存在同一个消息可能被多次发送给给broker,这里需要应用的开发者自己在消费端处理幂等性问题。

  2. 异步发送async

    发送消息采用异步发送模式,消息发送后立刻返回,当消息完全完成发送后,会调用回调函数sendCallback来告知发送者本次发送是成功或者失败。异步模式通常用于响应时间敏感业务场景,即承受不了同步发送消息时等待返回的耗时代价。

    同同步发送一样,异步模式也在内部实现了重试机制,默认次数为2次(DefaultMQProducer#getRetryTimesWhenSendAsyncFailed})。发送的结果同样存在同一个消息可能被多次发送给给broker,需要应用的开发者自己在消费端处理幂等性问题。

  3. 一次性发送one-way

    采用one-way发送模式发送消息的时候,发送端发送完消息后会立即返回,不会等待来自broker的ack来告知本次消息发送是否完全完成发送。这种方式吞吐量很大,但是存在消息丢失的风险,所以其适用于不重要的消息发送,比如日志收集。

选择:

  • 当发送的消息不重要时,采用one-way方式,以提高吞吐量;

  • 当发送的消息很重要是,且对响应时间不敏感的时候采用sync方式;

  • 当发送的消息很重要,且对响应时间非常敏感的时候采用async方式;

  1. 同步消息(默认的)sync

    生产者

    /**
    
    消费者都是差不多的;
  2. 异步消息async

    生产者

     /**
    
    消费者相同
  3. 一次性消息one-way

    生产者

     /**
    

消费模式

     //BROADCASTING:广播模式  会给所有的消费者发一份

集群消费是用的最广泛的一种消费模式,在集群消费模式下,同一条消息,只能被group中的任意一个消费者消费,这个概念很重要,这是与广播消费的最明显区别。

MessageModel.CLUSTERING   多个消费者处理的是相同的业务,如订单处理;多个消费者分担一个消费者的压力, 一个消息只会给一个消费者消费

广播模式

广播消费,类似于ActiveMQ中的发布订阅模式,消息会发给Consume Group中的每一个消费者进行消费。由于广播模式下要求一条消息需要投递到一个消费组下面所有的消费者实例,所以也就没有消息被分摊消费的说法。

MessageModel.BROADCASTING  需要对同一个消息进行不同处理的时候, 比如对同一个消息, 需要同时发送短信和发送邮件, 一个消息会发送给所有的消费者

测试

测试的时候可以使用之前的消费者代码;

消费者:

 /**

消费方式

推送消费push

push方式里,consumer把轮询过程封装了,并注册MessageListener监听器,取到消息后,唤醒MessageListener的consumeMessage()来消费,对用户而言,感觉消息是被推送过来的。

优点

实时性高,但增加服务端负载,消费端能力不同,如果push的速度过快,消费端会出现很多问题

拉取消费pull

pull方式里,取消息的过程需要用户自己写,首先通过打算消费的Topic拿到MessageQueue的集合,遍历MessageQueue集合,然后针对每个MessageQueue批量取消息,一次取完后,记录该队列下一次要取的开始offset,直到取完了,再换另一个MessageQueue。

局限

消费者从server端拉消息,主动权在消费端,可控性好,但是时间间隔不好设置,间隔太短,则空请求会多,浪费资源,间隔太长,则消息不能及时处理

测试:

生产者相同;

消费者:

/**

/**

延时消息

需求:订单支付的时限,30分钟未支付的,就改变状态,清理订单消息所占的空间。

比如电商里,提交了一个订单就可以发送一个延时消息,1h后去检查这个订单的状态,如果还是未付款就取消订单释放库存。

延时消息的使用限制现在RocketMq并不支持任意时间的延时,需要设置几个固定的延时等级,从1s到2h分别对应着等级1到18private String messageDelayLevel = "1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h";消息消费失败会进入延时消息队列,消息发送时间与设置的延时等级和重试次数有关,详见代码SendMessageProcessor.java

测试:

生产者:

 /**

消费者相同;

消息过滤

在大多数情况下,TAG是一个简单而有用的设计,其可以来选择您想要的消息。例如:

DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("wolfcode-consumer");

consumer.subscribe("TOPIC", "TAGA || TAGB || TAGC");

消费者将接收包含TAGA或TAGB或TAGC的消息。但是限制是一个消息只能有一个标签,这对于复杂的场景可能不起作用。在这种情况下,可以使用SQL表达式筛选消息。SQL特性可以通过发送消息时的属性来进行计算。在RocketMQ定义的语法下,可以实现一些简单的逻辑。下面是一个例子:

Tag标签过滤
SQL92过滤
基本语法

RocketMQ只定义了一些基本语法来支持这个特性。你也可以很容易地扩展它。

  • 数值比较,比如:>,>=,<,<=,BETWEEN,=;

  • 字符比较,比如:=,<>,IN;

  • IS NULL 或者 IS NOT NULL;

  • 逻辑符号 AND,OR,NOT;

常量支持类型为:

  • 数值,比如:123,3.1415;

  • 字符,比如:**'abc',必须用单引号包裹起来;**

  • NULL,特殊的常量

  • 布尔值,TRUEFALSE

只有使用push模式的消费者才能用使用SQL92标准的sql语句,接口如下:public void subscribe(finalString topic, final MessageSelector messageSelector)

注意: 在使用SQL过滤的时候, 需要配置参数enablePropertyFilter=true

生产者
/**
消费者
 /**

顺序消息(了解)

消息有序指的是可以按照消息的发送顺序来消费(FIFO)。RocketMQ可以严格的保证消息有序,可以分为分区有序或者全局有序。

顺序消费的原理解析,在默认的情况下消息发送会采取Round Robin轮询方式把消息发送到不同的queue(分区队列);而消费消息的时候从多个queue上拉取消息,这种情况发送和消费是不能保证顺序。但是如果控制发送的顺序消息只依次发送到同一个queue中,消费的时候只从这个queue上依次拉取,则就保证了顺序。当发送和消费参与的queue只有一个,则是全局有序;如果多个queue参与,则为分区有序,即相对每个queue,消息都是有序的。

测试代码在项目中,或许xmind中。

SpringBoot集成rocketMQ

分别创建两个生产者和消费者项目。

添加依赖:
         <dependency>
配置信息:

生产者

 rocketmq.name-server=127.0.0.1:9876

消费者

 rocketmq.name-server=127.0.0.1:9876
实现代码:

包括了:

  • 生产消息的类型:同步sync、异步async、一次性one-way

  • 消费模式:BROADCASTING:广播模式  会给所有的消费者发一份;CLUSTERING: 集群模式 只会有一个消费者消费信息 默认值。

  • 延时消息:使用原生的Producer对象和使用API;

  • 设置消息标签、key、自定义属性;

  • 消息过滤;

生产者:

 @RestController

消费者:

 /**



 /**



 /**

java学途

只分享有用的Java技术资料

扫描二维码关注公众号

Java项目笔记之消息中间件 

笔记|学习资料|面试笔试题|经验分享 

如有任何需求或问题欢迎骚扰。微信号:JL2020aini

或扫描下方二维码添加小编微信

Java项目笔记之消息中间件 

小伙砸,欢迎再看分享给其他小伙伴!共同进步!

本文分享自微信公众号 - java学途(javaxty)。
如有侵权,请联系 support@oschina.cn 删除。
本文参与“OSC源创计划”,欢迎正在阅读的你也加入,一起分享。

点赞
收藏
评论区
推荐文章
blmius blmius
2年前
MySQL:[Err] 1292 - Incorrect datetime value: ‘0000-00-00 00:00:00‘ for column ‘CREATE_TIME‘ at row 1
文章目录问题用navicat导入数据时,报错:原因这是因为当前的MySQL不支持datetime为0的情况。解决修改sql\mode:sql\mode:SQLMode定义了MySQL应支持的SQL语法、数据校验等,这样可以更容易地在不同的环境中使用MySQL。全局s
Jacquelyn38 Jacquelyn38
2年前
2020年前端实用代码段,为你的工作保驾护航
有空的时候,自己总结了几个代码段,在开发中也经常使用,谢谢。1、使用解构获取json数据let jsonData  id: 1,status: "OK",data: 'a', 'b';let  id, status, data: number   jsonData;console.log(id, status, number )
皕杰报表之UUID
​在我们用皕杰报表工具设计填报报表时,如何在新增行里自动增加id呢?能新增整数排序id吗?目前可以在新增行里自动增加id,但只能用uuid函数增加UUID编码,不能新增整数排序id。uuid函数说明:获取一个UUID,可以在填报表中用来创建数据ID语法:uuid()或uuid(sep)参数说明:sep布尔值,生成的uuid中是否包含分隔符'',缺省为
Wesley13 Wesley13
2年前
Java获得今日零时零分零秒的时间(Date型)
publicDatezeroTime()throwsParseException{    DatetimenewDate();    SimpleDateFormatsimpnewSimpleDateFormat("yyyyMMdd00:00:00");    SimpleDateFormatsimp2newS
Stella981 Stella981
2年前
KVM调整cpu和内存
一.修改kvm虚拟机的配置1、virsheditcentos7找到“memory”和“vcpu”标签,将<namecentos7</name<uuid2220a6d1a36a4fbb8523e078b3dfe795</uuid
Easter79 Easter79
2年前
Twitter的分布式自增ID算法snowflake (Java版)
概述分布式系统中,有一些需要使用全局唯一ID的场景,这种时候为了防止ID冲突可以使用36位的UUID,但是UUID有一些缺点,首先他相对比较长,另外UUID一般是无序的。有些时候我们希望能使用一种简单一些的ID,并且希望ID能够按照时间有序生成。而twitter的snowflake解决了这种需求,最初Twitter把存储系统从MySQL迁移
Wesley13 Wesley13
2年前
mysql设置时区
mysql设置时区mysql\_query("SETtime\_zone'8:00'")ordie('时区设置失败,请联系管理员!');中国在东8区所以加8方法二:selectcount(user\_id)asdevice,CONVERT\_TZ(FROM\_UNIXTIME(reg\_time),'08:00','0
Wesley13 Wesley13
2年前
00:Java简单了解
浅谈Java之概述Java是SUN(StanfordUniversityNetwork),斯坦福大学网络公司)1995年推出的一门高级编程语言。Java是一种面向Internet的编程语言。随着Java技术在web方面的不断成熟,已经成为Web应用程序的首选开发语言。Java是简单易学,完全面向对象,安全可靠,与平台无关的编程语言。
Stella981 Stella981
2年前
Django中Admin中的一些参数配置
设置在列表中显示的字段,id为django模型默认的主键list_display('id','name','sex','profession','email','qq','phone','status','create_time')设置在列表可编辑字段list_editable
Wesley13 Wesley13
2年前
MySQL部分从库上面因为大量的临时表tmp_table造成慢查询
背景描述Time:20190124T00:08:14.70572408:00User@Host:@Id:Schema:sentrymetaLast_errno:0Killed:0Query_time:0.315758Lock_
Python进阶者 Python进阶者
3个月前
Excel中这日期老是出来00:00:00,怎么用Pandas把这个去除
大家好,我是皮皮。一、前言前几天在Python白银交流群【上海新年人】问了一个Pandas数据筛选的问题。问题如下:这日期老是出来00:00:00,怎么把这个去除。二、实现过程后来【论草莓如何成为冻干莓】给了一个思路和代码如下:pd.toexcel之前把这