聊聊storm的window trigger

永住金刚
• 阅读 2087

本文主要研究一下storm的window trigger

WindowTridentProcessor.prepare

storm-core-1.2.2-sources.jar!/org/apache/storm/trident/windowing/WindowTridentProcessor.java

    public void prepare(Map stormConf, TopologyContext context, TridentContext tridentContext) {
        this.topologyContext = context;
        List<TridentTuple.Factory> parents = tridentContext.getParentTupleFactories();
        if (parents.size() != 1) {
            throw new RuntimeException("Aggregation related operation can only have one parent");
        }

        Long maxTuplesCacheSize = getWindowTuplesCacheSize(stormConf);

        this.tridentContext = tridentContext;
        collector = new FreshCollector(tridentContext);
        projection = new TridentTupleView.ProjectionFactory(parents.get(0), inputFields);

        windowStore = windowStoreFactory.create(stormConf);
        windowTaskId = windowId + WindowsStore.KEY_SEPARATOR + topologyContext.getThisTaskId() + WindowsStore.KEY_SEPARATOR;
        windowTriggerInprocessId = getWindowTriggerInprocessIdPrefix(windowTaskId);

        tridentWindowManager = storeTuplesInStore ?
                new StoreBasedTridentWindowManager(windowConfig, windowTaskId, windowStore, aggregator, tridentContext.getDelegateCollector(), maxTuplesCacheSize, inputFields)
                : new InMemoryTridentWindowManager(windowConfig, windowTaskId, windowStore, aggregator, tridentContext.getDelegateCollector());

        tridentWindowManager.prepare();
    }
  • 这里调用了tridentWindowManager.prepare()

AbstractTridentWindowManager.prepare

storm-core-1.2.2-sources.jar!/org/apache/storm/trident/windowing/AbstractTridentWindowManager.java

    public AbstractTridentWindowManager(WindowConfig windowConfig, String windowTaskId, WindowsStore windowStore,
                                        Aggregator aggregator, BatchOutputCollector delegateCollector) {
        this.windowTaskId = windowTaskId;
        this.windowStore = windowStore;
        this.aggregator = aggregator;
        this.delegateCollector = delegateCollector;

        windowTriggerCountId = WindowTridentProcessor.TRIGGER_COUNT_PREFIX + windowTaskId;

        windowManager = new WindowManager<>(new TridentWindowLifeCycleListener());

        WindowStrategy<T> windowStrategy = windowConfig.getWindowStrategy();
        EvictionPolicy<T> evictionPolicy = windowStrategy.getEvictionPolicy();
        windowManager.setEvictionPolicy(evictionPolicy);
        triggerPolicy = windowStrategy.getTriggerPolicy(windowManager, evictionPolicy);
        windowManager.setTriggerPolicy(triggerPolicy);
    }

    public void prepare() {
        preInitialize();

        initialize();

        postInitialize();
    }

    private void postInitialize() {
        // start trigger once the initialization is done.
        triggerPolicy.start();
    }
  • AbstractTridentWindowManager在构造器里头调用windowStrategy.getTriggerPolicy获取triggerPolicy;prepare方法调用了postInitialize,而它触发triggerPolicy.start()

SlidingDurationWindowStrategy.getTriggerPolicy

storm-core-1.2.2-sources.jar!/org/apache/storm/trident/windowing/strategy/SlidingDurationWindowStrategy.java

    /**
     * Returns a {@code TriggerPolicy} which triggers for every configured sliding window duration.
     *
     * @param triggerHandler
     * @param evictionPolicy
     * @return
     */
    @Override
    public TriggerPolicy<T> getTriggerPolicy(TriggerHandler triggerHandler, EvictionPolicy<T> evictionPolicy) {
        return new TimeTriggerPolicy<>(windowConfig.getSlidingLength(), triggerHandler, evictionPolicy);
    }
  • 以SlidingDurationWindowStrategy为例,这里创建的是TimeTriggerPolicy,其duration为windowConfig.getSlidingLength(),而triggerHandler则为WindowManager

TimeTriggerPolicy.start

storm-core-1.2.2-sources.jar!/org/apache/storm/windowing/TimeTriggerPolicy.java

    public void start() {
        executorFuture = executor.scheduleAtFixedRate(newTriggerTask(), duration, duration, TimeUnit.MILLISECONDS);
    }

   private Runnable newTriggerTask() {
        return new Runnable() {
            @Override
            public void run() {
                // do not process current timestamp since tuples might arrive while the trigger is executing
                long now = System.currentTimeMillis() - 1;
                try {
                    /*
                     * set the current timestamp as the reference time for the eviction policy
                     * to evict the events
                     */
                    if (evictionPolicy != null) {
                        evictionPolicy.setContext(new DefaultEvictionContext(now, null, null, duration));
                    }
                    handler.onTrigger();
                } catch (Throwable th) {
                    LOG.error("handler.onTrigger failed ", th);
                    /*
                     * propagate it so that task gets canceled and the exception
                     * can be retrieved from executorFuture.get()
                     */
                    throw th;
                }
            }
        };
    }
  • start方法注册了一个调度任务,每隔duration触发(windowConfig.getSlidingLength());而run方法是触发handler.onTrigger(),即WindowManager.onTrigger()

WindowManager.onTrigger

storm-core-1.2.2-sources.jar!/org/apache/storm/windowing/WindowManager.java

    /**
     * The callback invoked by the trigger policy.
     */
    @Override
    public boolean onTrigger() {
        List<Event<T>> windowEvents = null;
        List<T> expired = null;
        try {
            lock.lock();
            /*
             * scan the entire window to handle out of order events in
             * the case of time based windows.
             */
            windowEvents = scanEvents(true);
            expired = new ArrayList<>(expiredEvents);
            expiredEvents.clear();
        } finally {
            lock.unlock();
        }
        List<T> events = new ArrayList<>();
        List<T> newEvents = new ArrayList<>();
        for (Event<T> event : windowEvents) {
            events.add(event.get());
            if (!prevWindowEvents.contains(event)) {
                newEvents.add(event.get());
            }
        }
        prevWindowEvents.clear();
        if (!events.isEmpty()) {
            prevWindowEvents.addAll(windowEvents);
            LOG.debug("invoking windowLifecycleListener onActivation, [{}] events in window.", events.size());
            windowLifecycleListener.onActivation(events, newEvents, expired);
        } else {
            LOG.debug("No events in the window, skipping onActivation");
        }
        triggerPolicy.reset();
        return !events.isEmpty();
    }
  • 这里调用了windowLifecycleListener.onActivation(events, newEvents, expired),而windowLifecycleListener为AbstractTridentWindowManager的TridentWindowLifeCycleListener

TridentWindowLifeCycleListener.onActivation

storm-core-1.2.2-sources.jar!/org/apache/storm/trident/windowing/AbstractTridentWindowManager.java

    /**
     * Listener to reeive any activation/expiry of windowing events and take further action on them.
     */
    class TridentWindowLifeCycleListener implements WindowLifecycleListener<T> {

        @Override
        public void onExpiry(List<T> expiredEvents) {
            LOG.debug("onExpiry is invoked");
            onTuplesExpired(expiredEvents);
        }

        @Override
        public void onActivation(List<T> events, List<T> newEvents, List<T> expired) {
            LOG.debug("onActivation is invoked with events size: [{}]", events.size());
            // trigger occurred, create an aggregation and keep them in store
            int currentTriggerId = triggerId.incrementAndGet();
            execAggregatorAndStoreResult(currentTriggerId, events);
        }
    }

   private void execAggregatorAndStoreResult(int currentTriggerId, List<T> tupleEvents) {
        List<TridentTuple> resultTuples = getTridentTuples(tupleEvents);

        // run aggregator to compute the result
        AccumulatedTuplesCollector collector = new AccumulatedTuplesCollector(delegateCollector);
        Object state = aggregator.init(currentTriggerId, collector);
        for (TridentTuple resultTuple : resultTuples) {
            aggregator.aggregate(state, resultTuple, collector);
        }
        aggregator.complete(state, collector);

        List<List<Object>> resultantAggregatedValue = collector.values;

        ArrayList<WindowsStore.Entry> entries = Lists.newArrayList(new WindowsStore.Entry(windowTriggerCountId, currentTriggerId + 1),
                new WindowsStore.Entry(WindowTridentProcessor.generateWindowTriggerKey(windowTaskId, currentTriggerId), resultantAggregatedValue));
        windowStore.putAll(entries);

        pendingTriggers.add(new TriggerResult(currentTriggerId, resultantAggregatedValue));
    }
  • TridentWindowLifeCycleListener.onActivation方法主要是execAggregatorAndStoreResult
  • 而execAggregatorAndStoreResult则依次调用aggregator的init、aggregate及complete方法
  • 最后将TriggerResult放入pendingTriggers

小结

  • storm在TimeTriggerPolicy.start的时候注册了定时任务TriggerTask,以SlidingDurationWindowStrategy为例,它的调度间隔为windowConfig.getSlidingLength()
  • TriggerTask定时触发WindowManager.onTrigger方法,该方法会回调windowLifecycleListener.onActivation
  • AbstractTridentWindowManager提供了TridentWindowLifeCycleListener,它的onActivation主要是调用execAggregatorAndStoreResult;而execAggregatorAndStoreResult方法主要完成对aggregator的一系列调用,先是调用init方法,然后遍历resultTuples挨个调用aggregate方法,最后complete方法(从这里可以清晰看到Aggregator接口的各个方法的调用逻辑及顺序)

doc

点赞
收藏
评论区
推荐文章
Easter79 Easter79
3年前
storm drpc实例
序本文主要演示一下stormdrpc实例配置version:'2'services:supervisor:image:stormcontainer_name:supervisorcommand:stormsupe
Stella981 Stella981
3年前
Apache Kafka
ApacheKafka教程之与Storm集成http://blogxinxiucan.sh1.newtouch.com/原文地址:http://blogxinxiucan.sh1.newtouch.com/2017/07/13/ApacheKafka与Storm集成/(htt
Easter79 Easter79
3年前
Storm客户端提交任务失败原因分析
storm客户端提交topology失败:java.lang.RuntimeException:org.apache.thrift7.transport.TTransportExceptionatbacktype.storm.StormSubmitter.submitTopology(StormSubmitter.j
Easter79 Easter79
3年前
Storm VS Flink ——性能对比
!(https://oscimg.oschina.net/oscnet/2cec00eb2dccf5fdec8def77207da253a86.jpg)1.背景ApacheFlink和ApacheStorm是当前业界广泛使用的两个分布式实时计算框架。其中ApacheStorm(以下简称“Storm”)在美团点评实时
Stella981 Stella981
3年前
HUE通过oozie工作流执行shell脚本
HUE通过oozie工作流执行shell脚本2018年01月17日16:20:38阅读数:217!(https://oscimg.oschina.net/oscnet/9a1eae74ef4af985291a2a47b8e62ec7dc4.png)首先上传对应的jar包和storm.sh脚本
Easter79 Easter79
3年前
Storm消息机制
Storm消息机制博客分类:分布式计算这章讨论Storm's reliability capabilities, 如何保证从spout emit出来的所有tuple都被正确的执行(fully processed)?What does it mean for a message to be "fully processed"?首先的问题
Easter79 Easter79
3年前
Trident学习笔记(一)
1\.Trident入门Trident\ 三叉戟 storm高级抽象,支持有状态流处理; 好处是确保消费被处理一次; 以小批次方式处理输入流,得到精准一次性处理 ; 不再使用bolt,使用functions、aggreates、filters以及states。 TridentT
Easter79 Easter79
3年前
Storm视频教程
Storm流计算之项目实战篇(StormKafkaHBaseHighchartsJQuery,含3个完整实际项目)课程分类:大数据适合人群:初级课时数量:40课时更新程度:70%用到技术:stormKafkaHBaseHighchartsJQuery涉及项目:3个Storm完整项目咨询qq:18402155
Easter79 Easter79
3年前
Storm 性能优化
原文地址:https://www.jianshu.com/p/f645eb7944b0目录1.场景假设2.调优步骤和方法3.Storm的部分特性4.Storm并行度5.Storm消息机制6.StormUI解析7.性能优化场景假设在介绍Storm的
Easter79 Easter79
3年前
Storm消息处理可靠性保证
Storm可以保证每一个从spout发出的消息能被完全处理。本章描述storm是如何完成这个保证以及用户如何从storm的可靠性能力获益的。消息“完全处理”的含义一个tuple从spout发出后可能会触发成千上万的tuple基于它而创建。以workcount的topology为例考虑下:TopologyBuild
永住金刚
永住金刚
Lv1
海边的日落就是这么优雅,这么精彩!
文章
4
粉丝
0
获赞
0