我摊牌了!真正的灰度队列实现方案!全网你都搜不到!

智数玄铁引
• 阅读 1821

背景

目前,公司方面 RPC 调用如 Dubbo、Feign 已经能支持基于灰度的调用,但是 MQ 还没有支持灰度的能力,因此导致在测试和生产环境业务验证、消息隔离方面体验比较差,因此我们基于 RabbitMQ 和 Kafka 实现了消息灰度的能力。

灰度场景

大部分场景下 MQ 的灰度并不会像 RPC 那样那么严格,但是我们需要确认消费场景,即当灰度消费者不存在的情况下,消息是否应该由正常消费者去消费。

我摊牌了!真正的灰度队列实现方案!全网你都搜不到!

1. 灰度消息只由灰度节点消费

事实的情况是可能大家都想要这种严格意义上的消息灰度隔离策略,由此才证明是真正的消息灰度方案,但是这个方案需要考虑一些具体场景问题。

比如,有时候作为灰度节点的发送方,它的功能改动点并不是在 MQ 这块,但是它发送的消息却是灰度消息,而消息的消费方可能也未发生过功能变动,也不会有与之对应的灰度消费标识,这种情况下如果我们将灰度的消息进行丢弃的话,那么会造成最终的数据不完整。

2. 灰度消息可以由正常节点消费

因此,我们再考虑第二种方案,如果当灰度消费节点不存在时,消息会由正常节点消费,当存在灰度节点时,则由灰度节点消费,正常节点消费灰度消息只为了当灰度节点不存在时的兜底。

那么,这种场景仍然可能存在问题,比如当消费节点的消费逻辑发生改变时,由正常节点消费就可能造成业务上的错误。对于此问题我们可以默认认为如果消费方发生逻辑改变,则灰度节点大概率一定是存在的,如果一些异常情况导致的异常或者宕机的场景,仍然能通过人工或者告警判断出来,总之,这个问题认为不算是问题。

灰度方案

我们分别从 MQ 的自身特性和一些通用的处理方式出发,分别探讨 RabbitMQ 和 Kafka 的灰度实现方式。

常规方案:影子Queue/Topic

这个是现在实现 MQ 灰度最为常见的方案,为每一个Queue/Topic都建立一个与之对应的灰度Queue/Topic。

生产者层将要发送的消息进行Queue/Topic/RoutingKey的动态修改,让他发送到灰度或正常的Queue/Topic中。

而消费者层面只需要在应用启动时根据自身的灰度标记动态的切换到灰度Queue/Topic进行监听即可。

但是对于我们目前的系统现状而言,这个方案存在三个问题:

首先,由于我们目前系统测试环境的灰度标签是可以定制的,可能每一个功能上线都会有一个对应的灰度标识,这样带来的问题就是Queue/Topic的数量会随着灰度标识的增加而倍数性的增加。

而不管哪种MQ,过多的Queue/Topic都会对 MQ 本身造成一定处理能力下降。

另外,我们的灰度标签是可以根据启动的实例随意修改的,也就意味着对应的整套Queue/Topic也得跟着灰度的标识随意的创建。这样一来,人工手动跟着创建显然就不太现实,而生产环境中我们的Queue/Topic创建是需要走流程申请的,这又和我们的现状违背。

再者,即便我们能够根据生产者的灰度标识动态的创建Queue/Topic的话,那么至少也需要考虑在灰度生产者实例正常下线时将它创建的Queue/Topic进行销毁,如果异常的下线还需要人工的接入定期的进行Queue/Topic的清理工作。

最后,如果是针对 Kafka 或 RocketMQ,这种方案实行起来还比较简单,如果是对于RabbitMQ,这里又多了一层 Exchange 和 Queue 的绑定关系,不同的生产模式也需要去做各自的适配。

所以,为了在 RabbitMQ 和 Kafka 之间的一致性,我们决定不采用该方案来实现。

RabbitMQ

对于 RabbitMq,我们使用重新入队这个特性来实现灰度队列。

通过重新入队的这个特性,我们可以在生产者发送消息时将灰度的标识标记到消息头,发送时一并发出。

当消费者消费消息时,根据消费者自身标记决定要不要对消息进行消费,如果消费者本身不满足灰度消费规则,则把这条灰度消息进行Requeue处理。

这条消息经过轮询,最终会流转到灰度标识的消费者进行消费。

我摊牌了!真正的灰度队列实现方案!全网你都搜不到!

实现思路

  1. 生产者在发送消息之前获取到当前实例的灰度标记,对消息 Header 添加灰度标记
  2. 对消费者添加监听器,灰度节点消费根据灰度标记判断对灰度消息的消费,正常节点根据开关决定是否消费或者进行 Requeue

生产流程

生产者在启动时,我们通过自动装配,注册 RabbitTemplate 时setBeforePublishPostProcessors添加前置处理器,在发送消息前对消息的 Header 添加灰度标记。

我摊牌了!真正的灰度队列实现方案!全网你都搜不到!

我摊牌了!真正的灰度队列实现方案!全网你都搜不到!

我摊牌了!真正的灰度队列实现方案!全网你都搜不到!

消费流程

首先,在消费时通过监听SimpleMessageListenerContainer重写executeListener方法进行消息处理。

  1. 当灰度开关未打开,执行正常消费逻辑。
  2. 当灰度机器直接匹配到灰度消息时,那么直接消费即可。
  3. 通过监听 Eureka 本地缓存刷新的事件不停地刷新灰度实例的缓存,当正常节点消费灰度消息时,如果灰度实例不存在就可以直接消费。
  4. 如果存在灰度实例且正常节点消费到灰度消息,考虑两种可能,第一是正常的轮询到正常节点,第二是灰度节点prefetch_count达到阈值,阻塞队列已满,灰度消息在正常节点之间不停地轮询。为了解决第二个场景,添加了一层布隆过滤器,当再次匹配到同样的消息时,当前节点将休眠一段短暂的时间。
  5. 上述场景都未匹配到,那么执行 Requeue 操作。

我摊牌了!真正的灰度队列实现方案!全网你都搜不到!

我摊牌了!真正的灰度队列实现方案!全网你都搜不到!

Kafka

在 Kafka 的消费理念中有一层消费者组的概念,每个消费者都有一个对应的消费组。

当消息发布到主题后,只会被投递给订阅它的每个消费组中的一个消费者,两个消费组之间互不影响。

我摊牌了!真正的灰度队列实现方案!全网你都搜不到!

借助这个消费特性,可以将同一个消费组中的灰度消费者单独拎出来,做成一个特殊的消费组,这样每个消费组都会接收到同样的消息。

在正常的消费组中,遇到带有灰度标识的消息,我们只做空消费,不实际执行业务逻辑,在灰度消费组中的消费者,只处理匹配到灰度规则的消息,其它的消息做空消费。

实现思路

  1. 生产者生产灰度消息的时候在消息 Header 里面添加灰度标记
  2. 灰度消费者和正常消费者设置不同的GroupId
  3. 灰度消费者和正常消费者在拿到消息后判断有没有灰度标记,判断配置中心是否开启了消息灰度,如果开启了则进行灰度节点的消费,如果没开启则不消费

生产流程

生产者在启动的时候会去动态装配所有的拦截器,装配的方式为在 BeanPostProcessor 的后置处理器中获取到 KafkaTemplate 对象,把我们的拦截器的类的全限定名 set 进去 config 即可,这里可以支持不管用户自己创建的 Factory对象还是 KafkaTemplate 对象都能进行拦截器的装配。

我摊牌了!真正的灰度队列实现方案!全网你都搜不到!

我摊牌了!真正的灰度队列实现方案!全网你都搜不到!

我摊牌了!真正的灰度队列实现方案!全网你都搜不到!

消费流程

消费的时候也是一样,如果当前节点是灰度节点,那么就修改当前group.id为灰度,最后通过拦截器执行消费逻辑。

我摊牌了!真正的灰度队列实现方案!全网你都搜不到!

我摊牌了!真正的灰度队列实现方案!全网你都搜不到!

我摊牌了!真正的灰度队列实现方案!全网你都搜不到!

点赞
收藏
评论区
推荐文章
blmius blmius
4年前
MySQL:[Err] 1292 - Incorrect datetime value: ‘0000-00-00 00:00:00‘ for column ‘CREATE_TIME‘ at row 1
文章目录问题用navicat导入数据时,报错:原因这是因为当前的MySQL不支持datetime为0的情况。解决修改sql\mode:sql\mode:SQLMode定义了MySQL应支持的SQL语法、数据校验等,这样可以更容易地在不同的环境中使用MySQL。全局s
Oracle 分组与拼接字符串同时使用
SELECTT.,ROWNUMIDFROM(SELECTT.EMPLID,T.NAME,T.BU,T.REALDEPART,T.FORMATDATE,SUM(T.S0)S0,MAX(UPDATETIME)CREATETIME,LISTAGG(TOCHAR(
Wesley13 Wesley13
4年前
MySQL部分从库上面因为大量的临时表tmp_table造成慢查询
背景描述Time:20190124T00:08:14.70572408:00User@Host:@Id:Schema:sentrymetaLast_errno:0Killed:0Query_time:0.315758Lock_
皕杰报表之UUID
​在我们用皕杰报表工具设计填报报表时,如何在新增行里自动增加id呢?能新增整数排序id吗?目前可以在新增行里自动增加id,但只能用uuid函数增加UUID编码,不能新增整数排序id。uuid函数说明:获取一个UUID,可以在填报表中用来创建数据ID语法:uuid()或uuid(sep)参数说明:sep布尔值,生成的uuid中是否包含分隔符'',缺省为
Stella981 Stella981
4年前
Nepxion Discovery 5.5.0 发布
!(https://oscimg.oschina.net/oscnet/f81c043194ef4732880459d00c1a720e.png)发布日志功能更新:增加基于Opentracing调用链的支持,目前支持UberJaeger,实现在SpringCloudGateway、Zuul和服务上的灰度
Stella981 Stella981
4年前
Nginx + lua +[memcached,redis]
精品案例1、Nginxluamemcached,redis实现网站灰度发布2、分库分表/基于Leaf组件实现的全球唯一ID(非UUID)3、Redis独立数据监控,实现订单超时操作/MQ死信操作SelectPollEpollReactor模型4、分布式任务调试Quartz应用
Easter79 Easter79
4年前
Twitter的分布式自增ID算法snowflake (Java版)
概述分布式系统中,有一些需要使用全局唯一ID的场景,这种时候为了防止ID冲突可以使用36位的UUID,但是UUID有一些缺点,首先他相对比较长,另外UUID一般是无序的。有些时候我们希望能使用一种简单一些的ID,并且希望ID能够按照时间有序生成。而twitter的snowflake解决了这种需求,最初Twitter把存储系统从MySQL迁移
Wesley13 Wesley13
4年前
mysql设置时区
mysql设置时区mysql\_query("SETtime\_zone'8:00'")ordie('时区设置失败,请联系管理员!');中国在东8区所以加8方法二:selectcount(user\_id)asdevice,CONVERT\_TZ(FROM\_UNIXTIME(reg\_time),'08:00','0
Wesley13 Wesley13
4年前
00:Java简单了解
浅谈Java之概述Java是SUN(StanfordUniversityNetwork),斯坦福大学网络公司)1995年推出的一门高级编程语言。Java是一种面向Internet的编程语言。随着Java技术在web方面的不断成熟,已经成为Web应用程序的首选开发语言。Java是简单易学,完全面向对象,安全可靠,与平台无关的编程语言。
Stella981 Stella981
4年前
Django中Admin中的一些参数配置
设置在列表中显示的字段,id为django模型默认的主键list_display('id','name','sex','profession','email','qq','phone','status','create_time')设置在列表可编辑字段list_editable
Python进阶者 Python进阶者
2年前
Excel中这日期老是出来00:00:00,怎么用Pandas把这个去除
大家好,我是皮皮。一、前言前几天在Python白银交流群【上海新年人】问了一个Pandas数据筛选的问题。问题如下:这日期老是出来00:00:00,怎么把这个去除。二、实现过程后来【论草莓如何成为冻干莓】给了一个思路和代码如下:pd.toexcel之前把这
智数玄铁引
智数玄铁引
Lv1
运气是计划之外的东西。
文章
3
粉丝
0
获赞
0