Message Queue消息队列基本原理

Stella981
• 阅读 597

消息队列基本原理

📦 本文已归档到:「blog」

消息队列(Message Queue,简称 MQ)技术是分布式应用间交换信息的一种技术。

消息队列主要解决应用耦合,异步消息,流量削锋等问题,实现高性能,高可用,可伸缩和最终一致性架构。是大型分布式系统不可缺少的中间件。

注意:_为了简便,下文中除了文章标题,一律使用 MQ 简称_。

一、为何用 MQ

MQ 比较核心的优点有 3 个:解耦异步削峰

解耦

不同系统如果要建立通信,传统的做法是:调用接口。

如果需要和新的系统建立通信或删除已建立的通信,都需要修改代码,这种方案显然耦合度很高。

Message Queue消息队列基本原理

img

如果使用 MQ,系统间的通信只需要通过发布/订阅(Pub/Sub)模型即可,彼此没有直接联系,也就不需要相互感知,从而达到 解耦

Message Queue消息队列基本原理

img

异步

假设这样一个场景,用户向系统 A 发起请求,系统 A 处理计算只需要 10 ms,然后通知系统 BCD 写库,系统 BCD 写库耗时分别为:100ms、200ms、300ms。最终总耗时为:10+100ms+200ms+300ms=610ms。此外,加上请求和响应的网络传输时间,从用户角度看,可能要等待将近 1s 才能得到结果。

Message Queue消息队列基本原理

img

如果使用 MQ,系统 A 接到请求后,耗时 10ms 处理计算,然后向系统 BCD 连续发送消息,假设耗时 5ms。那么 这一过程的总耗时为 3ms + 5ms = 8ms,这相比于 610 ms,大大缩短了响应时间。至于系统 BCD 的写库操作,只要自行消费 MQ 后处理即可,用户无需关注。

Message Queue消息队列基本原理

img

削峰

假设某个系统读写数据库的稳定性能为每秒处理 1000 条数据。平常情况下,远远达不到这么大的处理量。假设,因为因为做活动,系统的瞬时请求量剧增,达到每秒 10000 个并发请求,数据库根本承受不了,可能直接就把数据库给整崩溃了,这样系统服务就不可用了。

Message Queue消息队列基本原理

img

如果使用 MQ,每秒写入 10000 条请求,但是系统 A 每秒只从 MQ 中消费 1000 条请求,然后写入数据库。这样,就不会超过数据库的承受能力,而是把请求积压在 MQ 中。只要高峰期一过,系统 A 就会很快把积压的消息给处理掉。

Message Queue消息队列基本原理

img

二、MQ 的问题

凡事有利有弊,使用 MQ 给系统带来很多好处,也会付出一定的代价。

它引入了以下问题:

  • 系统可用性降低 - 引入了 MQ 后,通信需要基于 MQ 完成,如果 MQ 宕机,则服务不可用。因此,MQ 要保证是高可用的,详情参考:MQ 的高可用

  • 系统复杂度提高 - 使用 MQ,需要关注一些新的问题:

  • 如何保证消息没有重复消费?

  • 如何处理消息丢失的问题?

  • 如何保证消息传递的顺序性?

  • 如何处理大量消息积压的问题?

  • 一致性问题 - 假设系统 A 处理完直接返回成功的结果给用户,用户认为请求成功。但如果此时,系统 BCD 中只要有任意一个写库失败,那么数据就不一致了。这种情况如何处理?

重复消费

如何保证消息不被重复消费如何保证消息消费的幂等性 是同一个问题。

必须先明确产生重复消费的原因,才能对症下药。

重复消费问题原因

重复消费问题通常不是 MQ 来处理,而是由开发来处理的。

以 Kafka 举例:Kafka 每个 Partition 都是一个有序的、不可变的记录序列,不断追加到结构化的提交日志中。Partition 中为每条记录分配一个连续的 id 号,称为偏移量(Offset),用于唯一标识 Partition 内的记录。

Message Queue消息队列基本原理

img

Kafka 的客户端和 Broker 都会保存 Offset。客户端消费消息后,每隔一段时间,就把已消费的 Offset 提交给 Kafka Broker,表示已消费。

在这个过程中,如果客户端应用消费消息后,因为宕机、重启等情况而没有提交已消费的 Offset 。当系统恢复后,会继续消费消息,由于 Offset 未提交,就会出现重复消费的问题。

Message Queue消息队列基本原理

img

重复消费解决方案

应对重复消费问题,就要通过幂等性来解决。

这个问题可以从业务层面来解决。

MQ 重复消费不可怕,可怕的是没有应对机制,可以借鉴的思路有:

  • 如果是写数据库,可以先根据主键查询,判断数据是否已存在,存在则更新,不存在则插入;

  • 如果是写 Redis,set 操作,由于天然具有幂等性,大可放心;

  • 如果是根据消息做较复杂的逻辑处理,可以在消息中加入全局唯一 ID,例如:订单 ID 等。在客户端存储中(Mysql、Redis 等)保存已消费消息的 ID。一旦接受到新消息,先判断消息中的 ID 是否在已消费消息 ID 表中存在,存在则不再处理,不存在则处理。

消息丢失

如何处理消息丢失的问题如何保证消息不被重复消费 是同一个问题。关注点有:

  • Kafka Server 丢失数据

  • 消费方丢失数据

  • 生产方丢失数据

消费方丢失数据

唯一可能导致消费方丢失数据的情况,就是:消费方设置了自动提交 Offset。设置了自动提交 Offset,接受到消息后就会自动提交 Offset 给 Kafka ,Kafka 就认为消息已被消费。如果此时,消费方尚未来得及处理消息就挂了,那么消息就丢了。

解决方法就是:消费方关闭自动提交 Offset,处理完消息后手动提交 Offset。但这种情况下可能会出现重复消费的情形,需要自行保证幂等性。

Kafka 丢失数据

当 Kafka 某个 Broker 宕机,需要重新选举 Partition 的 Leader。若此时其他的 Follower 尚未同步 Leader 的数据,那么新选某个 Follower 为 Leader 后,就丢失了部分数据。

为此,一般要求至少设置 4 个参数:

  • 给 Topic 设置 replication.factor 参数 - 这个值必须大于 1,要求每个 Partition 必须有至少 2 个副本。

  • 在 Kafka 服务端设置 min.insync.replicas 参数 - 这个值必须大于 1,这是要求一个 Leader 需要和至少一个 Follower 保持通信,这样才能确保 Leader 挂了还有替补。

  • 在 Producer 端设置 acks=all - 这意味着:要求每条数据,必须是写入所有 replica 之后,才能认为写入成功了

  • 在 Producer 端设置 retries=MAX(很大很大很大的一个值,无限次重试的意思) - 这意味着要求一旦写入失败,就无限重试,卡在这里了。

生产方丢失数据

如果按照上述的思路设置了 acks=all,生产方一定不会丢数据。

要求是,你的 Leader 接收到消息,所有的 Follower 都同步到了消息之后,才认为本生产消息成功了。如果未满足这个条件,生产者会自动不断的重试,重试无限次。

消息的顺序性

以 Kafka 为例

要保证 MQ 的顺序性,势必要付出一定的代价,所以实施方案前,要先明确业务场景是不是有必要保证消息的顺序性。只有那些明确对消息处理顺序有要求的业务场景才值得去保证消息顺序性。

方案一

一个 Topic,一个 Partition,一个 Consumer,内部单线程消费,单线程吞吐量太低,一般不会用这个。

方案二

  • 写入数据到 Partition 时指定一个全局唯一的 ID,例如订单 ID。发送方保证相同 ID 的消息有序的发送到同一个 Partition。

  • 基于上一点,消费方从 Kafka Partition 中消费消息时,此刻一定是顺序的。但如果消费方式以并发方式消费消息,顺序就可能会被打乱。为此,还有做到以下几点:

  • 消费方维护 N 个缓存队列,具有相同 ID 的数据都写入同一个队列中;

  • 创建 N 个线程,每个线程只负责从指定的一个队列中取数据。

Message Queue消息队列基本原理

img

消息积压

假设一个 MQ 消费者可以一秒处理 1000 条消息,三个 MQ 消费者可以一秒处理 3000 条消息,那么一分钟的处理量是 18 万条。如果 MQ 中积压了几百万到上千万的数据,即使消费者恢复了,也需要大概很长的时间才能恢复过来。

对于产线环境来说,漫长的等待是不可接受的,所以面临这种窘境时,只能临时紧急扩容以应对了,具体操作步骤和思路如下:

  • 先修复 Consumer 的问题,确保其恢复消费速度,然后将现有 Consumer 都停掉。

  • 新建一个 Topic,Partition 是原来的 10 倍,临时建立好原先 10 倍的 Queue 数量。

  • 然后写一个临时的分发数据的 Consumer 程序,这个程序部署上去消费积压的数据,消费之后不做耗时的处理,直接均匀轮询写入临时建立好的 10 倍数量的 Queue。

  • 接着临时征用 10 倍的机器来部署 Consumer ,每一批 Consumer 消费一个临时 Queue 的数据。这种做法相当于是临时将 Queue 资源和 Consumer 资源扩大 10 倍,以正常的 10 倍速度来消费数据。

  • 等快速消费完积压数据之后,得恢复原先部署的架构重新用原先的 consumer 机器来消费消息。

三、MQ 的高可用

不同 MQ 实现高可用的原理各不相同。因为 Kafka 比较具有代表性,所以这里以 Kafka 为例。

Kafka 的高可用

Kafka 的核心概念

了解 Kafka,必须先了解 Kafka 的核心概念:

  • Broker - Kafka 集群包含一个或多个节点,这种节点被称为 Broker。

  • Topic - 每条发布到 Kafka 集群的消息都有一个类别,这个类别被称为 Topic。(不同 Topic 的消息是物理隔离的;同一个 Topic 的消息保存在一个或多个 Broker 上,但用户只需指定消息的 Topic 即可生产或消费数据而不必关心数据存于何处)。对于每一个 Topic, Kafka 集群都会维持一个分区日志。

  • Partition - 了提高 Kafka 的吞吐率,每个 Topic 包含一个或多个 Partition,每个 Partition 在物理上对应一个文件夹,该文件夹下存储这个 Partition 的所有消息和索引文件。

  • Kafka 日志的分区(Partition)分布在 Kafka 集群的节点上。每个节点在处理数据和请求时,共享这些分区。每一个分区都会在已配置的节点上进行备份,确保容错性。

Message Queue消息队列基本原理

img

Kafka 的副本机制

Kafka 是如何实现高可用的呢?

Kafka 在 0.8 以前的版本中,如果一个 Broker 宕机了,其上面的 Partition 都不能用了,这自然不是高可用的。

为了实现高可用,Kafka 引入了复制功能。

简单来说,就是副本机制( Replicate )。

每个 Partition 都有一个 Leader,零个或多个 Follower。Leader 和 Follower 都是 Broker,每个 Broker 都会成为某些分区的 Leader 和某些分区的 Follower,因此集群的负载是平衡的。

  • Leader 处理一切对 Partition (分区)的读写请求

  • 而 Follower 只需被动的同步 Leader 上的数据

同一个 Topic 的不同 Partition 会分布在多个 Broker 上,而且一个 Partition 还会在其他的 Broker 上面进行备份,Producer 在发布消息到某个 Partition 时,先找到该 Partition 的 Leader,然后向这个 Leader 推送消息;每个 Follower 都从 Leader 拉取消息,拉取消息成功之后,向 Leader 发送一个 ACK 确认。

Message Queue消息队列基本原理

img

FAQ

问:为什么让 Leader 处理一切对对 Partition (分区)的读写请求?

答:因为如果允许所有 Broker 都可以处理读写请求,就可能产生数据一致性问题。

Kafka 选举 Leader

由上文可知,Partition 在多个 Broker 上存在副本。

如果某个 Follower 宕机,啥事儿没有,正常工作。

如果 Leader 宕机了,会从 Follower 中重新选举一个新的 Leader。

MQ 的通信模式

MQ 可驻留在内存或磁盘上,队列存储消息直到它们被应用程序读取。通过 MQ,应用程序可独立地执行,它们不需要知道彼此的位置,不需要等待接收程序接收此消息。在分布式计算环境中,为了集成分布式应用,开发者需要对异构网络环境下的分布式应用提供有效的通信手段。为了管理需要共享的信息,对应用提供公共的信息交换机制是重要的。

  • 点对点 - 点对点方式是最为传统和常见的通讯方式,它支持一对一、一对多、多对多、多对一等多种配置方式,支持树状、网状等多种拓扑结构。

  • 多点广播 - MQ 适用于不同类型的应用。其中重要的,也是正在发展中的是"多点广播"应用,即能够将消息发送到多个目标站点 (Destination List)。可以使用一条 MQ 指令将单一消息发送到多个目标站点,并确保为每一站点可靠地提供信息。MQ 不仅提供了多点广播的功能,而且还拥有智能消息分发功能,在将一条消息发送到同一系统上的多个用户时,MQ 将消息的一个复制版本和该系统上接收者的名单发送到目标 MQ 系统。目标 MQ 系统在本地复制这些消息,并将它们发送到名单上的队列,从而尽可能减少网络的传输量。

  • 发布/订阅 (Publish/Subscribe) - 发布/订阅模式使消息的分发可以突破目的队列地理位置的限制,使消息按照特定的主题甚至内容进行分发,用户或应用程序可以根据主题或内容接收到所需要的消息。发布/订阅模式使得发送者和接收者之间的耦合关系变得更为松散,发送者不必关心接收者的目的地址,而接收者也不必关心消息的发送地址,而只是根据消息的主题进行消息的收发。

  • 集群 (Cluster) - 为了简化点对点通讯模式中的系统配置,MQ 提供 Cluster(集群) 的解决方案。集群类似于一个域 (Domain),集群内部的队列管理器之间通讯时,不需要两两之间建立消息通道,而是采用集群 (Cluster) 通道与其它成员通讯,从而大大简化了系统配置。此外,集群中的队列管理器之间能够自动进行负载均衡,当某一队列管理器出现故障时,其它队列管理器可以接管它的工作,从而大大提高系统的高可靠性。

一般的业务系统要引入 MQ,最早大家都用 ActiveMQ,但是现在确实大家用的不多了,没经过大规模吞吐量场景的验证,社区也不是很活跃,所以大家还是算了吧,我个人不推荐用这个了;

  • 后来大家开始用 RabbitMQ,但是确实 erlang 语言阻止了大量的 Java 工程师去深入研究和掌控它,对公司而言,几乎处于不可控的状态,但是确实人家是开源的,比较稳定的支持,活跃度也高;

  • 不过现在确实越来越多的公司会去用 RocketMQ,确实很不错,毕竟是阿里出品,但社区可能有突然黄掉的风险(目前 RocketMQ 已捐给 Apache,但 GitHub 上的活跃度其实不算高)对自己公司技术实力有绝对自信的,推荐用 RocketMQ,否则回去老老实实用 RabbitMQ 吧,人家有活跃的开源社区,绝对不会黄。

  • 所以中小型公司,技术实力较为一般,技术挑战不是特别高,用 RabbitMQ 是不错的选择;大型公司,基础架构研发实力较强,用 RocketMQ 是很好的选择。

  • 如果是大数据领域的实时计算、日志采集等场景,用 Kafka 是业内标准的,绝对没问题,社区活跃度很高,绝对不会黄,何况几乎是全世界这个领域的事实性规范。

JMS

谈 MQ 就不得不提一下 JMS 。

JMS(JAVA Message Service,java 消息服务)API 是一个消息服务的标准/规范,允许应用程序组件基于 JavaEE 平台创建、发送、接收和读取消息。它使分布式通信耦合度更低,消息服务更加可靠以及异步性。

在 EJB 架构中,有消息 bean 可以无缝的与 JM 消息服务集成。在 J2EE 架构模式中,有消息服务者模式,用于实现消息与应用直接的解耦。

消息模型

在 JMS 标准中,有两种消息模型:

  • P2P(Point to Point)

  • Pub/Sub(Publish/Subscribe)

P2P 模式

Message Queue消息队列基本原理

P2P 模式包含三个角色:MQ(Queue),发送者(Sender),接收者(Receiver)。每个消息都被发送到一个特定的队列,接收者从队列中获取消息。队列保留着消息,直到他们被消费或超时。

P2P 的特点

  • 每个消息只有一个消费者(Consumer)(即一旦被消费,消息就不再在 MQ 中)

  • 发送者和接收者之间在时间上没有依赖性,也就是说当发送者发送了消息之后,不管接收者有没有正在运行,它不会影响到消息被发送到队列

  • 接收者在成功接收消息之后需向队列应答成功

如果希望发送的每个消息都会被成功处理的话,那么需要 P2P 模式。

Pub/sub 模式

Message Queue消息队列基本原理

包含三个角色主题(Topic),发布者(Publisher),订阅者(Subscriber) 。多个发布者将消息发送到 Topic,系统将这些消息传递给多个订阅者。

Pub/Sub 的特点

  • 每个消息可以有多个消费者

  • 发布者和订阅者之间有时间上的依赖性。针对某个主题(Topic)的订阅者,它必须创建一个订阅者之后,才能消费发布者的消息。

  • 为了消费消息,订阅者必须保持运行的状态。

为了缓和这样严格的时间相关性,JMS 允许订阅者创建一个可持久化的订阅。这样,即使订阅者没有被激活(运行),它也能接收到发布者的消息。

如果希望发送的消息可以不被做任何处理、或者只被一个消息者处理、或者可以被多个消费者处理的话,那么可以采用 Pub/Sub 模型。

消息消费

在 JMS 中,消息的产生和消费都是异步的。对于消费来说,JMS 的消息者可以通过两种方式来消费消息。

  • 同步 - 订阅者或接收者通过 receive 方法来接收消息,receive 方法在接收到消息之前(或超时之前)将一直阻塞;

  • 异步 - 订阅者或接收者可以注册为一个消息监听器。当消息到达之后,系统自动调用监听器的 onMessage 方法。

JNDI - Java 命名和目录接口,是一种标准的 Java 命名系统接口。可以在网络上查找和访问服务。通过指定一个资源名称,该名称对应于数据库或命名服务中的一个记录,同时返回资源连接建立所必须的信息。

JNDI 在 JMS 中起到查找和访问发送目标或消息来源的作用。

JMS 编程模型

ConnectionFactory

创建 Connection 对象的工厂,针对两种不同的 jms 消息模型,分别有 QueueConnectionFactory 和 TopicConnectionFactory 两种。可以通过 JNDI 来查找 ConnectionFactory 对象。

Destination

Destination 的意思是消息生产者的消息发送目标或者说消息消费者的消息来源。对于消息生产者来说,它的 Destination 是某个队列(Queue)或某个主题(Topic);对于消息消费者来说,它的 Destination 也是某个队列或主题(即消息来源)。

所以,Destination 实际上就是两种类型的对象:Queue、Topic。可以通过 JNDI 来查找 Destination。

(3) Connection

Connection 表示在客户端和 JMS 系统之间建立的链接(对 TCP/IP socket 的包装)。Connection 可以产生一个或多个 Session。跟 ConnectionFactory 一样,Connection 也有两种类型:QueueConnection 和 TopicConnection。

(4) Session

Session 是操作消息的接口。可以通过 session 创建生产者、消费者、消息等。Session 提供了事务的功能。当需要使用 session 发送/接收多个消息时,可以将这些发送/接收动作放到一个事务中。同样,也分 QueueSession 和 TopicSession。

消息的生产者

消息生产者由 Session 创建,并用于将消息发送到 Destination。同样,消息生产者分两种类型:QueueSender 和 TopicPublisher。可以调用消息生产者的方法(send 或 publish 方法)发送消息。

消息消费者

消息消费者由 Session 创建,用于接收被发送到 Destination 的消息。两种类型:QueueReceiver 和 TopicSubscriber。可分别通过 session 的 createReceiver(Queue)或 createSubscriber(Topic)来创建。当然,也可以 session 的 creatDurableSubscriber 方法来创建持久化的订阅者。

MessageListener

消息监听器。如果注册了消息监听器,一旦消息到达,将自动调用监听器的 onMessage 方法。EJB 中的 MDB(Message-Driven Bean)就是一种 MessageListener。

深入学习 JMS 对掌握 JAVA 架构,EJB 架构有很好的帮助,消息中间件也是大型分布式系统必须的组件。本次分享主要做全局性介绍,具体的深入需要大家学习,实践,总结,领会。

参考资料

  • 大型网站架构系列:分布式 MQ(一)

  • 大型网站架构系列:MQ(二)

  • 分布式开放 MQ(RocketMQ)的原理与实践

  • 阿里 RocketMQ 优势对比

  • advanced-java 之 MQ

本文分享自微信公众号 - java宝典(java_bible)。
如有侵权,请联系 support@oschina.cn 删除。
本文参与“OSC源创计划”,欢迎正在阅读的你也加入,一起分享。

点赞
收藏
评论区
推荐文章
blmius blmius
2年前
MySQL:[Err] 1292 - Incorrect datetime value: ‘0000-00-00 00:00:00‘ for column ‘CREATE_TIME‘ at row 1
文章目录问题用navicat导入数据时,报错:原因这是因为当前的MySQL不支持datetime为0的情况。解决修改sql\mode:sql\mode:SQLMode定义了MySQL应支持的SQL语法、数据校验等,这样可以更容易地在不同的环境中使用MySQL。全局s
Jacquelyn38 Jacquelyn38
2年前
2020年前端实用代码段,为你的工作保驾护航
有空的时候,自己总结了几个代码段,在开发中也经常使用,谢谢。1、使用解构获取json数据let jsonData  id: 1,status: "OK",data: 'a', 'b';let  id, status, data: number   jsonData;console.log(id, status, number )
皕杰报表之UUID
​在我们用皕杰报表工具设计填报报表时,如何在新增行里自动增加id呢?能新增整数排序id吗?目前可以在新增行里自动增加id,但只能用uuid函数增加UUID编码,不能新增整数排序id。uuid函数说明:获取一个UUID,可以在填报表中用来创建数据ID语法:uuid()或uuid(sep)参数说明:sep布尔值,生成的uuid中是否包含分隔符'',缺省为
Stella981 Stella981
2年前
KVM调整cpu和内存
一.修改kvm虚拟机的配置1、virsheditcentos7找到“memory”和“vcpu”标签,将<namecentos7</name<uuid2220a6d1a36a4fbb8523e078b3dfe795</uuid
Easter79 Easter79
2年前
Twitter的分布式自增ID算法snowflake (Java版)
概述分布式系统中,有一些需要使用全局唯一ID的场景,这种时候为了防止ID冲突可以使用36位的UUID,但是UUID有一些缺点,首先他相对比较长,另外UUID一般是无序的。有些时候我们希望能使用一种简单一些的ID,并且希望ID能够按照时间有序生成。而twitter的snowflake解决了这种需求,最初Twitter把存储系统从MySQL迁移
Wesley13 Wesley13
2年前
mysql设置时区
mysql设置时区mysql\_query("SETtime\_zone'8:00'")ordie('时区设置失败,请联系管理员!');中国在东8区所以加8方法二:selectcount(user\_id)asdevice,CONVERT\_TZ(FROM\_UNIXTIME(reg\_time),'08:00','0
Wesley13 Wesley13
2年前
00:Java简单了解
浅谈Java之概述Java是SUN(StanfordUniversityNetwork),斯坦福大学网络公司)1995年推出的一门高级编程语言。Java是一种面向Internet的编程语言。随着Java技术在web方面的不断成熟,已经成为Web应用程序的首选开发语言。Java是简单易学,完全面向对象,安全可靠,与平台无关的编程语言。
Stella981 Stella981
2年前
Django中Admin中的一些参数配置
设置在列表中显示的字段,id为django模型默认的主键list_display('id','name','sex','profession','email','qq','phone','status','create_time')设置在列表可编辑字段list_editable
Wesley13 Wesley13
2年前
MySQL部分从库上面因为大量的临时表tmp_table造成慢查询
背景描述Time:20190124T00:08:14.70572408:00User@Host:@Id:Schema:sentrymetaLast_errno:0Killed:0Query_time:0.315758Lock_
Python进阶者 Python进阶者
3个月前
Excel中这日期老是出来00:00:00,怎么用Pandas把这个去除
大家好,我是皮皮。一、前言前几天在Python白银交流群【上海新年人】问了一个Pandas数据筛选的问题。问题如下:这日期老是出来00:00:00,怎么把这个去除。二、实现过程后来【论草莓如何成为冻干莓】给了一个思路和代码如下:pd.toexcel之前把这