RabbitMQ 线上事故!慌的一批,脑袋一片空白……

Stella981
• 阅读 275

RabbitMQ 线上事故!慌的一批,脑袋一片空白……

Java技术栈

www.javastack.cn

关注阅读更多优质文章

[

](https://www.oschina.net/action/GoToLink?url=https%3A%2F%2Fmp.weixin.qq.com%2Fs%3F__biz%3DMzI3ODcxMzQzMw%3D%3D%26mid%3D2247486559%26idx%3D2%26sn%3D0eebf45617fb7be5727712e22aac7fb6%26scene%3D21%23wechat_redirect)

RabbitMQ 线上事故!慌的一批,脑袋一片空白……

授权转载自公众号:不一样的科技宅

前言

那天我和同事一起吃完晚饭回公司加班,然后就群里就有人@我说xxx商户说收不到推送,一开始觉得没啥。

我第一反应是不是没注册上,就让客服通知商户,重新登录下试试。这边打开推送的后台进行检查。后面反应收不到推送的越来越多,我就知道这事情不简单。

事故经过

由于大量商户反应收不到推送,我第一反应是不是推送系统挂了,导致没有进行推送。于是让运维老哥检查推送系统各节点的情况,发现都正常。

于是打开RabbitMQ的管控台看了一下,人都蒙了。已经有几万条消息处于ready状态,还有几百条unacked的消息。

我以为推送服务和MQ连接断开了,导致无法推送消息,于是让运维重启推送服务,将所有的推送服务重启完,发现unacked的消息全部变成ready,但是没过多久又有几百条unacked的消息了,这个就很明显了能消费,没有进行ack呀。

当时我以为是网络问题,导致mq无法接收到ack,让运维老哥检查了一下,发现网络没问题。现在看是真的是傻,网络有问题连接都连不上。由于确定的是无法ack造成的,立马将ack模式由原来的manual 改成auto紧急发布。将所有的节点升级好以后,发现推送正常了。

你以为这就结束了其实并没有,没过多久发现有一台MQ服务出现异常,由于生产采用了镜像队列,立即将这台有问题的MQ从集群中移除。直接进行重置,然后加入回集群。这事情算是告一段落了。此时已经接近24:00了。

RabbitMQ 线上事故!慌的一批,脑袋一片空白……

时间来到第二天上午10:00,运维那边又出现报警了,说推送系统有台机器,磁盘快被写满了,并且占用率很高。

我的乖乖从昨晚到现在写了快40G的日志,一看报错信息瞬间就明白问题出在哪里了。麻溜的把bug修了紧急发布。

吐槽一波公司的ELK,压根就没有收集到这个报错信息,导致我没有及时发现。

RabbitMQ 线上事故!慌的一批,脑袋一片空白……

事故重现-队列阻塞

MQ配置

spring:     # 消息队列   rabbitmq:       host: 10.0.0.53       username: guest       password: guest       virtual-host: local       port: 5672       # 消息发送确认     publisher-confirm-type: correlated       # 开启发送失败退回     publisher-returns: true       listener:         simple:           # 消费端最小并发数         concurrency: 1           # 消费端最大并发数         max-concurrency: 5           # 一次请求中预处理的消息数量         prefetch: 2           # 手动应答         acknowledge-mode: manual

问题代码

[@RabbitListener(queues = ORDER_QUEUE)   public void receiveOrder(@Payload String encryptOrderDto,                                         @Headers Map<String,Object> headers,                                         Channel channel) throws Exception {       // 解密和解析       String decryptOrderDto = EncryptUtil.decryptByAes(encryptOrderDto);       OrderDto orderDto = JSON.parseObject(decryptOrderDto, OrderDto.class);          try {           // 模拟推送           pushMsg(orderDto);       }catch (Exception e){           log.error("推送失败-错误信息:{},消息内容:{}", e.getLocalizedMessage(), JSON.toJSONString(orderDto));       }finally {           // 消息签收           channel.basicAck((Long) headers.get(AmqpHeaders.DELIVERY_TAG),false);       }      }](https://www.oschina.net/action/GoToLink?url=http%3A%2F%2Fmp.weixin.qq.com%2Fs%3F__biz%3DMzI3ODcxMzQzMw%3D%3D%26mid%3D2247493133%26idx%3D2%26sn%3D19a6cbb8eff82a9bdfe00a7868723f7e%26chksm%3Deb50633bdc27ea2d964b921b9a9bcc3ddab97cebc6ed53211cdc3b0aa15fc09902e6071fac8d%26scene%3D21%23wechat_redirect)

看起来好像没啥问题。由于和交易系统约定好,订单数据需要先转换json串,然后再使用AES进行加密,所以这边需要,先进行解密然后在进行解析。才能得到订单数据。

为了防止消息丢失,交易系统做了失败重发机制,防止消息丢失,不巧的是重发的时候没有对订单数据进行加密。这就导致推送系统,在解密的时候出异常,从而无法进行ack

默默的吐槽一句:人在家中坐,锅从天上来。

模拟推送

推送代码

发送3条正常的消息

curl http://localhost:8080/sendMsg/3

发送1条错误的消息

curl http://localhost:8080/sendErrorMsg/1

再发送3条正常的消息

curl http://localhost:8080/sendMsg/3

RabbitMQ 线上事故!慌的一批,脑袋一片空白……

观察日志发下,虽然有报错,但是还能正常进行推送。但是RabbitMQ已经出现了一条unacked的消息。非常强悍的 RabbitMQ 总结,这篇推荐看下。关注公众号Java技术栈阅读更多往期消息队列干货。

RabbitMQ 线上事故!慌的一批,脑袋一片空白……

继续发送1条错误的消息

curl http://localhost:8080/sendErrorMsg/1

再发送3条正常的消息

curl http://localhost:8080/sendMsg/3

这个时候你会发现控制台报错,当然错误信息是解密失败,但是正常的消息却没有被消费,这个时候其实队列已经阻塞了。

RabbitMQ 线上事故!慌的一批,脑袋一片空白……

[RabbitMQ](https://www.oschina.net/action/GoToLink?url=http%3A%2F%2Fmp.weixin.qq.com%2Fs%3F__biz%3DMzI3ODcxMzQzMw%3D%3D%26mid%3D2247494139%26idx%3D2%26sn%3Da33f1781bf8ee8094827118a4ffe394d%26chksm%3Deb506ccddc27e5db0191cce12d753b683de89f2cf285410b514fed9bfa46f5a9b1f4b817968f%26scene%3D21%23wechat_redirect)管控台也可以看到,刚刚发送的的3条消息处于ready状态。这个时候就如果一直有消息进入,都会堆积在队里里面无法被消费。

再发送3条正常的消息

curl http://localhost:8080/sendMsg/3

RabbitMQ 线上事故!慌的一批,脑袋一片空白……

分析原因

上面说了是由于没有进行ack导致队里阻塞。那么问题来了,这是为什么呢?其实这是[RabbitMQ](https://www.oschina.net/action/GoToLink?url=http%3A%2F%2Fmp.weixin.qq.com%2Fs%3F__biz%3DMzI3ODcxMzQzMw%3D%3D%26mid%3D2247494139%26idx%3D2%26sn%3Da33f1781bf8ee8094827118a4ffe394d%26chksm%3Deb506ccddc27e5db0191cce12d753b683de89f2cf285410b514fed9bfa46f5a9b1f4b817968f%26scene%3D21%23wechat_redirect)的一种保护机制。防止当消息激增的时候,海量的消息进入consumer而引发consumer宕机。

RabbitMQ提供了一种QOS(服务质量保证)功能,即在非自动确认的消息的前提下,限制信道上的消费者所能保持的最大未确认的数量。可以通过设置PrefetchCount实现。

举例说明:可以理解为在consumer前面加了一个缓冲容器,容器能容纳最大的消息数量就是PrefetchCount

如果容器没有满[RabbitMQ](https://www.oschina.net/action/GoToLink?url=http%3A%2F%2Fmp.weixin.qq.com%2Fs%3F__biz%3DMzI3ODcxMzQzMw%3D%3D%26mid%3D2247494139%26idx%3D2%26sn%3Da33f1781bf8ee8094827118a4ffe394d%26chksm%3Deb506ccddc27e5db0191cce12d753b683de89f2cf285410b514fed9bfa46f5a9b1f4b817968f%26scene%3D21%23wechat_redirect)就会将消息投递到容器内,如果满了就不投递了。当consumer对消息进行ack以后就会将此消息移除,从而放入新的消息。

listener:     simple:       # 消费端最小并发数     concurrency: 1       # 消费端最大并发数     max-concurrency: 5       # 一次处理的消息数量     prefetch: 2       # 手动应答     acknowledge-mode: manual

prefetch参数就是PrefetchCount

通过上面的配置发现prefetch我只配置了2,并且concurrency配置的只有1,所以当我发送了2条错误消息以后,由于解密失败这2条消息一直没有被ack。将缓冲区沾满了,这个时候[RabbitMQ](https://www.oschina.net/action/GoToLink?url=http%3A%2F%2Fmp.weixin.qq.com%2Fs%3F__biz%3DMzI3ODcxMzQzMw%3D%3D%26mid%3D2247494139%26idx%3D2%26sn%3Da33f1781bf8ee8094827118a4ffe394d%26chksm%3Deb506ccddc27e5db0191cce12d753b683de89f2cf285410b514fed9bfa46f5a9b1f4b817968f%26scene%3D21%23wechat_redirect)认为这个consumer已经没有消费能力了就不继续给它推送消息了,所以就造成了队列阻塞。

判断队列是否有阻塞的风险。

ack模式为manual,并且线上出现了unacked消息,这个时候不用慌。由于QOS是限制信道channel上的消费者所能保持的最大未确认的数量。所以允许出现unacked的数量可以通过channelCount * prefetchCount * 节点数量 得出。

channlCount就是由concurrency,max-concurrency决定的。

  • min = concurrency * prefetch * 节点数量

  • max = max-concurrency * prefetch * 节点数量

由此可以的出结论

  • unacked_msg_count < min 队列不会阻塞。但需要及时处理unacked的消息。

  • unacked_msg_count >= min 可能会出现堵塞。

  • unacked_msg_count >= max 队列一定阻塞。

这里需要好好理解一下。

处理方法

其实处理的方法很简单,将解密和解析的方法放入try catch中就解决了这样不管解密正常与否,消息都会被签收。如果出错将会输出错误日志,让开发人员进行处理了。

对于这个就需要有日志监控系统,来及时告警了。

@RabbitListener(queues = ORDER_QUEUE)   public void receiveOrder(@Payload String encryptOrderDto,                                         @Headers Map<String,Object> headers,                                         Channel channel) throws Exception {       try {              // 解密和解析           String decryptOrderDto = EncryptUtil.decryptByAes(encryptOrderDto);           OrderDto orderDto = JSON.parseObject(decryptOrderDto, OrderDto.class);              // 模拟推送           pushMsg(orderDto);       }catch (Exception e){           log.error("推送失败-错误信息:{},消息内容:{}", e.getLocalizedMessage(), encryptOrderDto);       }finally {           // 消息签收           channel.basicAck((Long) headers.get(AmqpHeaders.DELIVERY_TAG),false);       }      }

注意的点

unacked的消息在consumer切断连接后(重启),会自动回到队头。

事故重现-磁盘占用飙升

一开始我不知道代码有问题,就是以为单纯的没有进行ack所以将ack模式改成auto自动,紧急升级了,这样不管正常与否,消息都会被签收,所以在当时确实是解决了问题。

其实现在回想起来是非常危险的操作的,将ack模式改成auto自动,这样会使QOS不生效。会出现大量消息涌入consumer从而造成consumer宕机,可以是因为当时在晚上,交易比较少,并且推送系统有多个节点,才没出现问题。

问题代码

[@RabbitListener(queues = ORDER_QUEUE)   public void receiveOrder(@Payload String encryptOrderDto,                                         @Headers Map<String,Object> headers,                                         Channel channel) throws Exception {       // 解密和解析       String decryptOrderDto = EncryptUtil.decryptByAes(encryptOrderDto);       OrderDto orderDto = JSON.parseObject(decryptOrderDto, OrderDto.class);          try {              // 模拟推送           pushMsg(orderDto);       }catch (Exception e){           log.error("推送失败-错误信息:{},消息内容:{}", e.getLocalizedMessage(), encryptOrderDto);       }finally {           // 消息签收           channel.basicAck((Long) headers.get(AmqpHeaders.DELIVERY_TAG),false);       }      }](https://www.oschina.net/action/GoToLink?url=http%3A%2F%2Fmp.weixin.qq.com%2Fs%3F__biz%3DMzI3ODcxMzQzMw%3D%3D%26mid%3D2247494139%26idx%3D2%26sn%3Da33f1781bf8ee8094827118a4ffe394d%26chksm%3Deb506ccddc27e5db0191cce12d753b683de89f2cf285410b514fed9bfa46f5a9b1f4b817968f%26scene%3D21%23wechat_redirect)

配置文件

listener:     simple:       # 消费端最小并发数     concurrency: 1       # 消费端最大并发数     max-concurrency: 5       # 一次处理的消息数量     prefetch: 2       # 手动应答     acknowledge-mode: auto

由于当时不知道交易系统的重发机制,重发时没有对订单数据加密的bug,所以还是会发出少量有误的消息。

发送1条错误的消息

curl http://localhost:8080/sendErrorMsg/1

RabbitMQ 线上事故!慌的一批,脑袋一片空白……

RabbitMQ 线上事故!慌的一批,脑袋一片空白……

原因

[RabbitMQ](https://www.oschina.net/action/GoToLink?url=http%3A%2F%2Fmp.weixin.qq.com%2Fs%3F__biz%3DMzI3ODcxMzQzMw%3D%3D%26mid%3D2247494139%26idx%3D2%26sn%3Da33f1781bf8ee8094827118a4ffe394d%26chksm%3Deb506ccddc27e5db0191cce12d753b683de89f2cf285410b514fed9bfa46f5a9b1f4b817968f%26scene%3D21%23wechat_redirect)消息监听程序异常时,consumer会向rabbitmq server发送Basic.Reject,表示消息拒绝接受,由于Spring默认requeue-rejected配置为true,消息会重新入队,然后rabbitmq server重新投递。就相当于死循环了,所以控制台在疯狂刷错误日志造成磁盘利用率飙升的原因。

解决方法

default-requeue-rejected: false即可。

总结

  • 个人建议,生产环境不建议使用自动ack,这样会QOS无法生效。

  • 在使用手动ack的时候,需要非常注意消息签收。

  • 其实在将有问题的MQ重置时,是将错误的消息给清除才没有问题了,相当于是消息丢失了。

try {       // 业务逻辑。   }catch (Exception e){       // 输出错误日志。   }finally {       // 消息签收。   }

结尾

如果有人告诉你遇到线上事故不要慌,除非是超级大佬久经沙场。否则就是瞎扯淡,你让他来试试,看看他会不会大脑一片空白,直冒汗。

如果觉得对你有帮助,可以多多评论,多多点赞哦,也可以随手点个关注哦,谢谢。

RabbitMQ 线上事故!慌的一批,脑袋一片空白……

RabbitMQ 线上事故!慌的一批,脑袋一片空白……

RabbitMQ 线上事故!慌的一批,脑袋一片空白……

RabbitMQ 线上事故!慌的一批,脑袋一片空白……

RabbitMQ 线上事故!慌的一批,脑袋一片空白……

RabbitMQ 线上事故!慌的一批,脑袋一片空白……

RabbitMQ 线上事故!慌的一批,脑袋一片空白……

关注Java技术栈看更多干货

RabbitMQ 线上事故!慌的一批,脑袋一片空白……

RabbitMQ 线上事故!慌的一批,脑袋一片空白……

戳原文,获取精选面试题!

本文分享自微信公众号 - Java技术栈(javastack)。
如有侵权,请联系 support@oschina.cn 删除。
本文参与“OSC源创计划”,欢迎正在阅读的你也加入,一起分享。

点赞
收藏
评论区
推荐文章
blmius blmius
2年前
MySQL:[Err] 1292 - Incorrect datetime value: ‘0000-00-00 00:00:00‘ for column ‘CREATE_TIME‘ at row 1
文章目录问题用navicat导入数据时,报错:原因这是因为当前的MySQL不支持datetime为0的情况。解决修改sql\mode:sql\mode:SQLMode定义了MySQL应支持的SQL语法、数据校验等,这样可以更容易地在不同的环境中使用MySQL。全局s
Jacquelyn38 Jacquelyn38
2年前
2020年前端实用代码段,为你的工作保驾护航
有空的时候,自己总结了几个代码段,在开发中也经常使用,谢谢。1、使用解构获取json数据let jsonData  id: 1,status: "OK",data: 'a', 'b';let  id, status, data: number   jsonData;console.log(id, status, number )
皕杰报表之UUID
​在我们用皕杰报表工具设计填报报表时,如何在新增行里自动增加id呢?能新增整数排序id吗?目前可以在新增行里自动增加id,但只能用uuid函数增加UUID编码,不能新增整数排序id。uuid函数说明:获取一个UUID,可以在填报表中用来创建数据ID语法:uuid()或uuid(sep)参数说明:sep布尔值,生成的uuid中是否包含分隔符'',缺省为
Wesley13 Wesley13
2年前
MySQL 的慢 SQL 怎么优化?
!(https://oscimg.oschina.net/oscnet/7b00ec583b5e42cc80e8c56c6556c082.jpg)Java技术栈www.javastack.cn关注阅读更多优质文章(https://www.oschina.net/action/GoToLink?urlhttp
Stella981 Stella981
2年前
Android So动态加载 优雅实现与原理分析
背景:漫品Android客户端集成适配转换功能(基于目标识别(So库35M)和人脸识别库(5M)),导致apk体积50M左右,为优化客户端体验,决定实现So文件动态加载.!(https://oscimg.oschina.net/oscnet/00d1ff90e4b34869664fef59e3ec3fdd20b.png)点击上方“蓝字”关注我
Wesley13 Wesley13
2年前
mysql设置时区
mysql设置时区mysql\_query("SETtime\_zone'8:00'")ordie('时区设置失败,请联系管理员!');中国在东8区所以加8方法二:selectcount(user\_id)asdevice,CONVERT\_TZ(FROM\_UNIXTIME(reg\_time),'08:00','0
Wesley13 Wesley13
2年前
00:Java简单了解
浅谈Java之概述Java是SUN(StanfordUniversityNetwork),斯坦福大学网络公司)1995年推出的一门高级编程语言。Java是一种面向Internet的编程语言。随着Java技术在web方面的不断成熟,已经成为Web应用程序的首选开发语言。Java是简单易学,完全面向对象,安全可靠,与平台无关的编程语言。
Stella981 Stella981
2年前
Django中Admin中的一些参数配置
设置在列表中显示的字段,id为django模型默认的主键list_display('id','name','sex','profession','email','qq','phone','status','create_time')设置在列表可编辑字段list_editable
Stella981 Stella981
2年前
Spring Boot 2.1.6 发布了!
!(https://oscimg.oschina.net/oscnet/e5aaab7a5b9f4aa7a944b00aff253ed2.jpg)Java技术栈www.javastack.cn优秀的Java技术公众号(https://www.oschina.net/action/GoToLink?urlhttps%3
Wesley13 Wesley13
2年前
MySQL部分从库上面因为大量的临时表tmp_table造成慢查询
背景描述Time:20190124T00:08:14.70572408:00User@Host:@Id:Schema:sentrymetaLast_errno:0Killed:0Query_time:0.315758Lock_
Python进阶者 Python进阶者
4个月前
Excel中这日期老是出来00:00:00,怎么用Pandas把这个去除
大家好,我是皮皮。一、前言前几天在Python白银交流群【上海新年人】问了一个Pandas数据筛选的问题。问题如下:这日期老是出来00:00:00,怎么把这个去除。二、实现过程后来【论草莓如何成为冻干莓】给了一个思路和代码如下:pd.toexcel之前把这