(微服务)分布式事务-最大努力交付 && 消息最终一致性方案

极客清韵
• 阅读 34478

本文对比 二阶段事务、最大努力交付以及消息最终一致性,并给出部分解决方案,最终一致性方案参考阿里RockMQ事务消息:http://blog.csdn.net/chunlong...
(微服务)分布式事务-最大努力交付 && 消息最终一致性方案

(微服务)分布式事务-最大努力交付 && 消息最终一致性方案

项目git地址:https://github.com/vvsuperman...

一 2阶段事务

分布式系统最终一致性有N种方案,比如2PC(2阶段事务) ,以及三段提交等等,但开销较大,实现起来复杂,比如2阶段事务为例,需要引入一个协调者(Coordinator)来统一掌控所有参与者(Participant)的操作结果

以开会为例:
甲乙丙丁四人要组织一个会议,需要确定会议时间,不妨设甲是协调者,乙丙丁是参与者。
投票阶段:
(1)甲发邮件给乙丙丁,周二十点开会是否有时间;
(2)甲回复有时间;
(3)乙回复有时间;
(4)丙迟迟不回复,此时对于这个活动,甲乙丙均处于阻塞状态,算法无法继续进行;
(5)丙回复有时间(或者没有时间);
提交阶段:
(1)协调者甲将收集到的结果反馈给乙丙丁(什么时候反馈,以及反馈结果如何,在此例中取决与丙的时间与决定);
(2)乙收到;
(3)丙收到;
(4)丁收到;
不仅要锁住参与者的所有资源,而且要锁住协调者资源,开销大。一句话总结就是:2PC效率很低,分布式事务很难做。

在对事实性要求没有那么高的情况下,可以用基于最大努力交付 && 消息队列以及消息存储来解决最终一致性。

二 消息最大努力交付

所谓最大努力交付,就是俺反正用最大努力做,能不能成功,不做完全保证
会涉及到三个模块

  1. 上游应用,发消息到 MQ 队列。
  2. 下游应用(例如短信服务、邮件服务),接受请求,并返回通知结果。
  3. 最大努力通知服务,监听消息队列,将消息存储到数据库中,并按照通知规则调用下游应用的发送通知接口。

具体流程如下

(微服务)分布式事务-最大努力交付 && 消息最终一致性方案

  1. 上游应用发送 MQ 消息到 MQ 组件内,消息内包含通知规则和通知地址
  2. 最大努力通知服务监听到 MQ 内的消息,解析通知规则并放入延时队列等待触发通知
  3. 最大努力通知服务调用下游的通知地址,如果调用成功,则该消息标记为通知成功,如果失败则在满足通知规则(例如 5 分钟发一次,共发送 10 次)的情况下重新放入延时队列等待下次触发。

最大努力通知服务表示在不影响主业务的情况下,尽可能地确保数据的一致性。它需要开发人员根据业务来指定通知规则,在满足通知规则的前提下,尽可能的确保数据的一致,以达到最大努力的目的。

实现上也比较简单,目前主流消息队列都有ack机制,当没收到ack的时候用规则做定时重发即可。
优点:实现简单
缺点:无补偿机制,不保证能够送达
实现要点: 保证消息发送失败之后能够和业务一起回滚;消息接受方保证冥等性;定时重发机制,采用一定的重发策略,例如说指数增长,据说阿里采用redis的zset来完成,参考https://zhuanlan.zhihu.com/p/...
消息进到zset后,DelayQ会通过timer触发(比如秒级),fork相应的消费线程去处理zset里ExecuteTime大于当前时间的消息。DelayQ拿到一条消息后,解析其中的callbackurl,并组装参数,push业务消息给Consumer.
Consumer返回处理成功,那么zrem Codis里的消息。如果处理失败,则计算其下次尝试时间,并更新其ExecuteTime.

三 可靠消息最终一致性方案

此方案涉及 3 个模块:

  1. 上游应用,执行业务并发送 MQ 消息。
  2. 可靠消息服务和 MQ 消息组件,协调上下游消息的传递,并确保上下游数据的一致性。
  3. 下游应用,监听 MQ 的消息并执行自身业务。

(微服务)分布式事务-最大努力交付 && 消息最终一致性方案

第一阶段:上游应用执行业务并发送 MQ 消息

上游应用将本地业务执行和消息发送绑定在同一个本地事务中,保证要么本地操作成功并发送 MQ 消息,要么两步操作都失败并回滚。

上游应用和可靠消息之间的业务交互图如下:

(微服务)分布式事务-最大努力交付 && 消息最终一致性方案

  1. 上游应用发送待确认消息到可靠消息系统
  2. 可靠消息系统保存待确认消息并返回
  3. 上游应用执行本地业务
  4. 上游应用通知可靠消息系统确认业务已执行并发送消息。
  5. 可靠消息系统修改消息状态为发送状态并将消息投递到 MQ 中间件。

以上每一步都可能出现失败情况,分析一下这 5 步出现异常后上游业务和消息发送是否一致:

(微服务)分布式事务-最大努力交付 && 消息最终一致性方案

上游应用执行完成,下游应用尚未执行或执行失败时,此事务即处于 BASE 理论的 Soft State 状态。

第二阶段:下游应用监听 MQ 消息并执行业务

下游应用监听 MQ 消息并执行业务,并且将消息的消费结果通知可靠消息服务。

可靠消息的状态需要和下游应用的业务执行保持一致,可靠消息状态不是已完成时,确保下游应用未执行,可靠消息状态是已完成时,确保下游应用已执行。

下游应用和可靠消息服务之间的交互图如下:

(微服务)分布式事务-最大努力交付 && 消息最终一致性方案

  1. 下游应用监听 MQ 消息组件并获取消息
  2. 下游应用根据 MQ 消息体信息处理本地业务
  3. 下游应用向 MQ 组件自动发送 ACK 确认消息被消费
  4. 下游应用通知可靠消息系统消息被成功消费,可靠消息将该消息状态更改为已完成。

以上每一步都可能出现失败情况,分析一下这 4 步出现异常后下游业务和消息状态是否一致:

通过分析以上两个阶段可能失败的情况,为了确保上下游数据的最终一致性,在可靠消息系统中,需要开发 消息状态确认消息重发 两个功能以实现 BASE 理论的 Eventually Consistent 特性。

异常处理一:消息状态确认

可靠消息服务定时监听消息的状态,如果存在状态为待确认并且超时的消息,则表示上游应用和可靠消息交互中的步骤 4 或者 5 出现异常。

可靠消息则携带消息体内的信息向上游应用发起请求查询该业务是否已执行。上游应用提供一个可查询接口供可靠消息追溯业务执行状态,如果业务执行成功则更改消息状态为已发送,否则删除此消息确保数据一致。具体流程如下:

(微服务)分布式事务-最大努力交付 && 消息最终一致性方案

  1. 可靠消息查询超时的待确认状态的消息
  2. 向上游应用查询业务执行的情况
  3. 业务未执行,则删除该消息,保证业务和可靠消息服务的一致性。业务已执行,则修改消息状态为已发送,并发送消息到 MQ 组件。

异常处理二:消息重发

消息已发送则表示上游应用已经执行,接下来则确保下游应用也能正常执行。

可靠消息服务发现可靠消息服务中存在消息状态为已发送并且超时的消息,则表示可靠消息服务和下游应用中存在异常的步骤,无论哪个步骤出现异常,可靠消息服务都将此消息重新投递到 MQ 组件中供下游应用监听。

下游应用监听到此消息后,在保证幂等性的情况下重新执行业务并通知可靠消息服务此消息已经成功消费,最终确保上游应用、下游应用的数据最终一致性。具体流程如下:

(微服务)分布式事务-最大努力交付 && 消息最终一致性方案

  1. 可靠消息服务定时查询状态为已发送并超时的消息
  2. 可靠消息将消息重新投递到 MQ 组件中
  3. 下游应用监听消息,在满足幂等性的条件下,重新执行业务。
  4. 下游应用通知可靠消息服务该消息已经成功消费。

通过消息状态确认和消息重发两个功能,可以确保上游应用、可靠消息服务和下游应用数据的最终一致性。

四 肉身实战Rabbitmq

我们在rabbitmq上肉身实战了一下可靠消息,rabbitmq的发送过程如下

  1. 发送消息到消息服务
  2. 消息队列将消息发送给监听
  3. 消息监听接受并处理消息

我们来看看可能发送异常的四种

1 直接无法到达消息服务

网络断了,抛出异常,业务直接回滚即可。如果出现connection closed错误,直接增加 connection数即可

connectionFactory.setChannelCacheSize(100);

2 消息已经到达服务器,但返回的时候出现异常

rabbitmq提供了确认ack机制,可以用来确认消息是否有返回。因此我们可以在发送前在db中(内存或关系型数据库)先存一下消息,如果ack异常则进行重发

    /**confirmcallback用来确认消息是否有送达消息队列*/     
   rabbitTemplate.setConfirmCallback((correlationData, ack, cause) -> {
        if (!ack) {
            //try to resend msg
        } else {
            //delete msg in db
        }
    });
     /**若消息找不到对应的Exchange会先触发returncallback */
    rabbitTemplate.setReturnCallback((message, replyCode, replyText, tmpExchange, tmpRoutingKey) -> {
        try {
            Thread.sleep(Constants.ONE_SECOND);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }

        log.info("send message failed: " + replyCode + " " + replyText);
        rabbitTemplate.send(message);
    });
    

如果消息没有到exchange,则confirm回调,ack=false
如果消息到达exchange,则confirm回调,ack=true
但如果是找不到exchange,则会先触发returncallback

3 消息送达后,消息服务自己挂了

如果设置了消息持久化,那么ack= true是在消息持久化完成后,就是存到硬盘上之后再发送的,确保消息已经存在硬盘上,万一消息服务挂了,消息服务恢复是能够再重发消息

4 未送达消费者

消息服务收到消息后,消息会处于"UNACK"的状态,直到客户端确认消息

channel.basicQos(1); // accept only one unack-ed message at a time (see below)
final Consumer consumer = new DefaultConsumer(channel) {
  @Override
  public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
    String message = new String(body, "UTF-8");

    System.out.println(" [x] Received '" + message + "'");
    try {
      doWork(message);
    } finally {
       //确认收到消息
      channel.basicAck(envelope.getDeliveryTag(), false);
    }
  }
};
boolean autoAck = false;
channel.basicConsume(TASK_QUEUE_NAME, autoAck, consumer);

5 确认消息丢失

消息返回时假设确认消息丢失了,那么消息服务会重发消息。注意,如果你设置了autoAck= false,但又没应答 channel.baskAck也没有应答 channel.baskNack,那么会导致非常严重的错误:消息队列会被堵塞住,可参考http://blog.sina.com.cn/s/blo...,所以,无论如何都必须应答

6 消费者业务处理异常

消息监听接受消息并处理,假设抛异常了,第一阶段事物已经完成,如果要配置回滚则过于麻烦,即使做事务补偿也可能事务补偿失效的情况,所以这里可以做一个重复执行,比如guava的retry,设置一个指数时间来循环执行,如果n次后依然失败,发邮件、短信,用人肉来兜底。
参考:http://blog.csdn.net/reviveds...

点赞
收藏
评论区
推荐文章
blmius blmius
4年前
MySQL:[Err] 1292 - Incorrect datetime value: ‘0000-00-00 00:00:00‘ for column ‘CREATE_TIME‘ at row 1
文章目录问题用navicat导入数据时,报错:原因这是因为当前的MySQL不支持datetime为0的情况。解决修改sql\mode:sql\mode:SQLMode定义了MySQL应支持的SQL语法、数据校验等,这样可以更容易地在不同的环境中使用MySQL。全局s
Oracle 分组与拼接字符串同时使用
SELECTT.,ROWNUMIDFROM(SELECTT.EMPLID,T.NAME,T.BU,T.REALDEPART,T.FORMATDATE,SUM(T.S0)S0,MAX(UPDATETIME)CREATETIME,LISTAGG(TOCHAR(
Wesley13 Wesley13
4年前
MySQL部分从库上面因为大量的临时表tmp_table造成慢查询
背景描述Time:20190124T00:08:14.70572408:00User@Host:@Id:Schema:sentrymetaLast_errno:0Killed:0Query_time:0.315758Lock_
皕杰报表之UUID
​在我们用皕杰报表工具设计填报报表时,如何在新增行里自动增加id呢?能新增整数排序id吗?目前可以在新增行里自动增加id,但只能用uuid函数增加UUID编码,不能新增整数排序id。uuid函数说明:获取一个UUID,可以在填报表中用来创建数据ID语法:uuid()或uuid(sep)参数说明:sep布尔值,生成的uuid中是否包含分隔符'',缺省为
Wesley13 Wesley13
4年前
mysql设置时区
mysql设置时区mysql\_query("SETtime\_zone'8:00'")ordie('时区设置失败,请联系管理员!');中国在东8区所以加8方法二:selectcount(user\_id)asdevice,CONVERT\_TZ(FROM\_UNIXTIME(reg\_time),'08:00','0
Wesley13 Wesley13
4年前
MySQL数据库InnoDB存储引擎Log漫游(1)
作者:宋利兵来源:MySQL代码研究(mysqlcode)0、导读本文介绍了InnoDB引擎如何利用UndoLog和RedoLog来保证事务的原子性、持久性原理,以及InnoDB引擎实现UndoLog和RedoLog的基本思路。00–UndoLogUndoLog是为了实现事务的原子性,
Wesley13 Wesley13
4年前
PHP创建多级树型结构
<!lang:php<?php$areaarray(array('id'1,'pid'0,'name''中国'),array('id'5,'pid'0,'name''美国'),array('id'2,'pid'1,'name''吉林'),array('id'4,'pid'2,'n
Wesley13 Wesley13
4年前
Java日期时间API系列36
  十二时辰,古代劳动人民把一昼夜划分成十二个时段,每一个时段叫一个时辰。二十四小时和十二时辰对照表:时辰时间24时制子时深夜11:00凌晨01:0023:0001:00丑时上午01:00上午03:0001:0003:00寅时上午03:00上午0
Wesley13 Wesley13
4年前
00:Java简单了解
浅谈Java之概述Java是SUN(StanfordUniversityNetwork),斯坦福大学网络公司)1995年推出的一门高级编程语言。Java是一种面向Internet的编程语言。随着Java技术在web方面的不断成熟,已经成为Web应用程序的首选开发语言。Java是简单易学,完全面向对象,安全可靠,与平台无关的编程语言。
线上SQL超时场景分析-MySQL超时之间隙锁 | 京东物流技术团队
前言之前遇到过一个由MySQL间隙锁引发线上sql执行超时的场景,记录一下。背景说明分布式事务消息表:业务上使用消息表的方式,依赖本地事务,实现了一套分布式事务方案消息表名:mqmessages数据量:3000多万索引:createtime和statuss
Python进阶者 Python进阶者
2年前
Excel中这日期老是出来00:00:00,怎么用Pandas把这个去除
大家好,我是皮皮。一、前言前几天在Python白银交流群【上海新年人】问了一个Pandas数据筛选的问题。问题如下:这日期老是出来00:00:00,怎么把这个去除。二、实现过程后来【论草莓如何成为冻干莓】给了一个思路和代码如下:pd.toexcel之前把这