三.Flume事务和内部原理

字节逸风
• 阅读 4463

1.Flume 事务

Flume使用两个独立的事务分别负责从soucrce到channel,以及从channel到sink的事件传递。
在Source到Channel之间的叫put事务,在Channel到Sink之间的叫Take事务。
事务两个特性就是:成功了提交,失败了回滚。

1.1 put事务

从source到channel过程中,数据在flume中会被封装成Event对象,多个event被放到一个事务中,
然后把这个包含events的事务放到channel中。
三.Flume事务和内部原理

  • 1.事务开始的时候会调用一个doPut方法,doPut方法的会将这批数据batch data,也就是一批event放到putList中。

doPut传递的数据的大小可以通过参数bathchSize配置。
putList的大小则通过channel的参数transactionCapacity进行配置。

  • 2 当数据成功存放到putList之后,调用doCommit()方法,putList中所有的event进入channel()中,

    • 1)成功则清空putList.
    • 2) 不成功的情况

      • 从putList传输到channel过程出问题,在doCommit提交之后,事务在向channel放的过程中,遇到问题。

      sink那边取数据速度要比Source这边放数据速度慢,导致channel中的数据积压,这个时候就会造成putList中的数据放不进去。
      这时会进行事务的回滚操作,调用doRollback方法,doRollback方法会做两个事情:

        - 1、清空putList中的数据; 
        - 2、抛出channelException异常。

      当source捕捉到doRollback抛出的异常,就会把刚才的一批数据重新采集一下,采集完之后重新走事务的流程。

      • 在数据采集的过程中也有可能出现问题,同样是调用doRollback方法来对事务进行回滚。

1.2 take事务

三.Flume事务和内部原理

  • 1.事务开始时,调用doTake方法,将channel中的event提取到(剪切)takeList中,
  • 2.如果后面的sink是HDFS Sink,同时在写入HDFS的IO缓冲流中放一份event。
  • 3.当takeList中存放的Event达到约定数量(batchSize) ,就会调用doCommit方法:

    • 成功执行情况下:

      • 如果是HDFS Sink,那么手动调用IO流的flush方法,将IO流缓冲区的数据写入到HDFS磁盘中,同时清空takeList中的数据
    • 失败情况下:

      • 1.网络延迟等原因导致传输数据失败,

      调用doRollback方法来进行回滚,takeList中还有备份数据,所以将takeList中的数据原封不动地还给channel,这时候就完成了事务的回滚。

      • 2.如果takeList数据有一部分传输成功了,剩下的因为网络延迟传输失败了。

      同样会调用doRollback方法来进行回滚,它会把整个takeList中的数据返回给channel,然后继续进行数据的读写。
      如此一来,再次进行事务时候,就会存在数据重复的可能。

2.Flume内部原理

  • 1). Source采集数据

EventBuilder.withBody(body)将数据封装成Event对象,
getChannelProcessor().processEvent(event)将数据交给Channel Processor
通过源码可以看到,以avro source为例

 public Void append(AvroFlumeOGEvent evt) throws AvroRemoteException
  {
    .....

    Event event = EventBuilder.withBody(evt.getBody().array(), headers); // 将数据封装成Event对象,
    try {
      getChannelProcessor().processEvent(event); // 将数据交给Channel Processor
      this.counterGroup.incrementAndGet("rpc.events");
    } catch (ChannelException ex) {
      return null;
    }

    this.counterGroup.incrementAndGet("rpc.successful");
    return null;
  }

  • 2)Channel Processor将Event事件传递给拦截器链interceptorChain.intercept(event),然后将数据返回给Channel Processor。
  • 3)Channel Processor将拦截过滤之后的Event事件传递给Channel选择器(Channel Selector)),Channel Selector返回给Channel Processor写入event事件的Channel列表

其中Channel Selectors有两种类型:
  - 1.Replicating Channel Selector : 将source过来的events发往所有的channel(相当于复制多份,默认使用的channel selector)
  - 2.Multiplexing Channel Selector:可以指定source发过来的events发往的channel

  • 4)Channel Processor根据Channel选择器的选择结果,将Event事件写入相应的Channel

看下channel Processor源码
首先构造器中直接定义了selector和拦截器interceptorChain

  public ChannelProcessor(ChannelSelector selector)
  {
    this.selector = selector;
    this.interceptorChain = new InterceptorChain();
  }
 

然后在processEvent和processEventBatch(List<Event> events)

 public void processEvent(Event event)
  {
    event = this.interceptorChain.intercept(event); // 提交到拦截器链
    if (event == null) {
      return;
    }

    List requiredChannels = this.selector.getRequiredChannels(event); // 提交到channel 选择器
    for (Iterator localIterator = requiredChannels.iterator(); localIterator.hasNext(); ) { reqChannel = (Channel)localIterator.next();
      Transaction tx = reqChannel.getTransaction();
      Preconditions.checkNotNull(tx, "Transaction object must not be null");
      try {
        tx.begin();
        reqChannel.put(event);
        tx.commit();
      } catch (Throwable t) {
        tx.rollback();
        if ((t instanceof Error)) {
          LOG.error("Error while writing to required channel: " + reqChannel, t);
          throw ((Error)t);
        }if ((t instanceof ChannelException)) {
          throw ((ChannelException)t);
        }
        throw new ChannelException("Unable to put event on required channel: " + reqChannel, t);
      }
      finally
      {
        if (tx != null)
          tx.close();
      }
    }
    Channel reqChannel;
    List optionalChannels = this.selector.getOptionalChannels(event); 
    for (Channel optChannel : optionalChannels) {
      Transaction tx = null;
      try {
        tx = optChannel.getTransaction();
        tx.begin();

        optChannel.put(event); // 将event事件写入channel

        tx.commit();
      } catch (Throwable t) {
        tx.rollback();
        LOG.error("Unable to put event on optional channel: " + optChannel, t);
        if ((t instanceof Error))
          throw ((Error)t);
      }
      finally {
        if (tx != null)
          tx.close();
      }
    }
  }
}

  • 5)SinkProcessor启动sink,sink在channel中去轮询,取出channel中的event事件。
    SinkProcessor有三种,

    • DefaultSinkProcessor(默认的,内部无任何逻辑,只是单纯的调用sink)、
    • LoadBalancingSinkProcessor(负载均衡)、
    • FaioverSinkProcessor(容灾恢复)
点赞
收藏
评论区
推荐文章
Stella981 Stella981
3年前
Redis事务,持久化,哨兵机制
1Redis事务基本事务指令Redis提供了一定的事务支持,可以保证一组操作原子执行不被打断,但是如果执行中出现错误,事务不能回滚,Redis未提供回滚支持。multi 开启事务exec 执行事务127.0.0.1:6379multiOK127.0.0.
Wesley13 Wesley13
3年前
MySQL数据库InnoDB存储引擎Log漫游(1)
作者:宋利兵来源:MySQL代码研究(mysqlcode)0、导读本文介绍了InnoDB引擎如何利用UndoLog和RedoLog来保证事务的原子性、持久性原理,以及InnoDB引擎实现UndoLog和RedoLog的基本思路。00–UndoLogUndoLog是为了实现事务的原子性,
Wesley13 Wesley13
3年前
mysql 事物四大特性和事物的四个隔离
1、事物四大特性(ACID)1.原子性(atomicity):一个事务必须视为一个不可分割的最小工作单元,整个事务中的所有操作要么全部提交成功,要么全部失败回滚,对于一个事务来说,不可能只执行其中的一部分操作,这就是事务的原子性。2.一致性(consistency):数据库总数从一个一致性的状态转换到另一个一致性的状态。3.隔离性
Wesley13 Wesley13
3年前
MySQL Transaction
分布式事务两阶段提交在分布式事务中,需要协调所有分布式原子事务参与者,并决定提交或回滚分布式事务,因此采用两阶段提交协议:第一阶段为请求阶段或表决阶段,事务协调者通知事务参与者准备提交或取消事务,然后进入表决过程,事务参与者将表决结果告知协调者是否同意提交事务;第二阶段是提交阶段,协调者收集到所有参与者的表决结果,当且仅当所有表决者都同意提交事务
Wesley13 Wesley13
3年前
MySql学习17
一.数据库事务的四大特性(ACID)如果一个数据库声称支持事务的操作,那么该数据库必须要具备以下四个特性:原子性(Atomicity):原子性是指事务包含的所有操作要么全部成功,要么全部失败回滚,这和前面两篇博客介绍事务的功能是一样的概念,因此事务的操作如果成功就必须要完全应用到数据库,如果操
Stella981 Stella981
3年前
RabbitMQ 如何保证消息的可靠性
一条消费成功被消费经历了生产者MQ消费者,因此在这三个步骤中都有可能造成消息丢失。一消息生产者没有把消息成功发送到MQ1.1事务机制AMQP协议提供了事务机制,在投递消息时开启事务支持,如果消息投递失败,则回滚事务。自定义事务管理器@Configuration
Easter79 Easter79
3年前
Spring事务原理一探
Spring事务原理一探概括来讲,事务是一个由有限操作集合组成的逻辑单元。事务操作包含两个目的,数据一致以及操作隔离。数据一致是指事务提交时保证事务内的所有操作都成功完成,并且更改永久生效;事务回滚时,保证能够恢复到事务执行之前的状态。操作隔离则是指多个同时执行的事务之间应该相互独立,互不影响。事务是一个比较广泛的概念,事务
Wesley13 Wesley13
3年前
MySQL常见问题
事务四大特性原子性:不可分割的操作单元,事务中所有操作,要么全部成功;要么撤回到执行事务之前的状态一致性:如果在执行事务之前数据库是一致的,那么在执行事务之后数据库也还是一致的;隔离性:事务操作之间彼此独立和透明互不影响。事务独立运行。这通常使用锁来实现。一个事务处理后的结果,影响了其他事务,那么其他事务会撤回。
Wesley13 Wesley13
3年前
Mysql事务,并发问题,锁机制
1、什么是事务事务是一条或多条数据库操作语句的组合,具备ACID,4个特点。原子性:要不全部成功,要不全部撤销隔离性:事务之间相互独立,互不干扰一致性:数据库正确地改变状态后,数据库的一致性约束没有被破坏持久性:事务的提交结果,将持久保存在数据库中2、事务并发会产生什么问题1)第一类丢失更新:在没有事务隔离的情况下,两个事务都同时
Spring事务实现原理
1、引言spring的springtx模块提供了对事务管理支持,使用spring事务可以让我们从复杂的事务处理中得到解脱,无需要去处理获得连接、关闭连接、事务提交和回滚等这些操作。spring事务有编程式事务和声明式事务两种实现方式。编程式事务是通过编写代
京东云开发者 京东云开发者
8个月前
Spring事务实现原理
作者:京东零售范锡军1、引言spring的springtx模块提供了对事务管理支持,使用spring事务可以让我们从复杂的事务处理中得到解脱,无需要去处理获得连接、关闭连接、事务提交和回滚等这些操作。spring事务有编程式事务和声明式事务两种实现方式。编
字节逸风
字节逸风
Lv1
四张机,鸳鸯织就欲双飞,可怜未老头先白;春波碧草,晓寒深处,相对浴红衣。
文章
5
粉丝
0
获赞
0