storm从入门到放弃教程(5)

Easter79
• 阅读 445

概述

     上一篇【 storm开发环境搭建 】 博文连接:https://my.oschina.net/u/2342969/blog/878765

本篇会深入理解Streams,欢迎同志(此同志非彼同志)们通过私信/评论等方式共同学习了解.

     Streams是storm中一个核心的概念,它是在分布式并行处理和创建的无限序列元组,Streams通过给流元组中字段命名来定义,默认情况下,元组可以包含整型,长整型,短整型,字节,字符串,布尔型,双精度浮点型,单精度浮点型,字节数组,也可以自定义序列化类型。

      下面共同学习一下 Tuple(元组)、OutputFieldsDeclarer、 元组中动态类型以及序列器

Tuple(元组)

    Tuple是storm中主要的数据结构,是storm中使用的基本单元、元组。元组是一个值列表,其中,每个值可以是任意类型。动态类型的元组不需要被定义,元组有类似 getInteger 和getString的帮助方法,无须手动转换结果类型。

     storm需要知道如何序列化所有的值,默认情况下,storm知道如何序列化简单类型,比如字符串、字节数组,如果想使用复杂的对象,则需要注册实现一个该类型的序列器。

     在storm中tuple接口集成了Iuple接口 ,实现类为TupleImpl。

OutputFieldsDeclarer

     tuple的数据结构类似于map的键值对,其中键定义在OutputFieldsDeclarer方法的Fields对象中。

通过以下例子,可以帮助大家更好的理解:

//例2-2 src/main/java/bolts/WordNormalizer.java
package bolts;
import org.apache.storm.task.OutputCollector;
import org.apache.storm.task.TopologyContext;
import org.apache.storm.topology.IRichBolt;
import org.apache.storm.topology.OutputFieldsDeclarer;
import org.apache.storm.tuple.Fields;
import org.apache.storm.tuple.Tuple;
import org.apache.storm.tuple.Values;

import java.util.ArrayList;
import java.util.List;
import java.util.Map;
public class WordNormalizer implements IRichBolt {
    private OutputCollector collector;
    public void cleanup(){}
    /**
     * *bolt*从单词文件接收到文本行,并标准化它。
     * 文本行会全部转化成小写,并切分它,从中得到所有单词。
     */
    public void execute(Tuple input){
        String sentence = input.getString(0);
        String[] words = sentence.split(" ");
        for(String word : words){
            word = word.trim();
            if(!word.isEmpty()){
                word=word.toLowerCase();
                //发布这个单词
                List a = new ArrayList();
                a.add(input);
                collector.emit(a,new Values(word));
            }
        }
        //对元组做出应答
        collector.ack(input);
    }
    public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) {
        this.collector=collector;
    }

    /**
     * 这个*bolt*只会发布“word”域
     */
    public void declareOutputFields(OutputFieldsDeclarer declarer) {
        declarer.declare(new Fields("word"));
    }

    @Override
    public Map<String, Object> getComponentConfiguration() {
        return null;
    }
}

    创建了发送一个字段(“word”)的Bolt,此时tuple的键为“word”,值为execute方法中发送的Values对象。

序列器

    本次介绍的storm0.6.0(含)及后续版本中如何使用序列器,storm在0.6.0之前使用不同的序列器,这里就不介绍老版本的。

     tuple可以由任意类型组合而成,因为storm是分布式的,所以它需要知道在task间如何序列化和反序列化数据的。

     storm使用Kryo进行序列化,Kryo是java开发中一个快速灵活序列器。默认情况下,storm可以序列化基础类型,比如字符串,字节,数组,ArrayList, HashMap, HashSet和 Clojure 集合类型,如果需要使用其他类型,需要自定义序列器。

动态类型

     在元组中没有对应类型的字段。在字段中放入对象和storm动态序列化数据,得到序列化接口前,我们了解一下为什么storm元组是动态类型。

      增加静态类型会大大增加storm API的复杂性, Hadoop  中,使用静态类型的键和值,在使用是需要大量的注释,对于hadoop API使用以及类型安全是一个不值得的负担,动态类型使用起来会很简单。

此外,storm 元组没有一个合理的方式使用静态类型,假如一个bolt订阅了多个流,那些流中元组会有不同类型传输在字段中。当一个bolt在execute方法接收元组,可以接收任何流的元组,就会有很多类型的元组。这样在一个bolt中,就需要为每个类型的tuple生命不同的方法订阅,显然,storm选择了简单方式,使用动态类型。

     最后,另外使用动态类型的理由是storm可以直接被 Clojure  和 JRuby  这类动态类型的语言使用。

自定义序列器

    综上所述,storm 使用Kryo 作为序列器。为了实现自定义序列器,就需要用Kryo注册一个新的序列器,

在Kryo的Github主页: https://github.com/EsotericSoftware/kryo,有更加详细的介绍,这里仅做一下简单介绍。

      增加自定义序列器,需要在拓扑配置中添加“ topology.kryo.register ”属性,它可以配置一组序列器列表,每个序列器可以选择一下两种方式之一:

  1. 用类名注册,在例子中,storm会使用 Kryo  的“FieldsSerializer”序列化类--这可能不是最好的方式,在Kryo文档中有很多种方式。
  2. 从一个类名映射到实现了“ com.esotericsoftware.kryo.Serializer ”的注册器。

例子如下:   

topology.kryo.register:
  - com.mycompany.CustomType1
  - com.mycompany.CustomType2: com.mycompany.serializer.CustomType2Serializer
  - com.mycompany.CustomType3

“com.mycompany.CustomType1“ 和“com.mycompany.CustomType3“ 使用“FieldsSerializer”序列化,反之,“com.mycompany.CustomType2“会使用”com.mycompany.serializer.CustomType2Serializer“ 序列化。

    storm提供了在拓扑配置中注册序列器的助手,Config类调用registerSerialization方法可以将一个序列器放入配置中。其中有一个高级设置“Config.TOPOLOGY_SKIP_MISSING_KRYO_REGISTRATIONS”,如果把它设置为true,storm将会忽略在类路径无有效代码的序列器,否则,storm找不到序列器,将会排除异常。当在集群中运行了很多使用了不同序列器的拓扑,想通过“storm.yaml”文件为所有拓扑配置好序列器,它就非常有用的。

JAVA序列器

     当storm遇到一个没有序列器注册的类型,它可能会使用java序列器,如果此类型也无法被java序列器序列化,storm就会报出一个错误。

     需要注意的是,无论在CPU消耗方面还是序列化对象的大小, java序列器都是非常耗费资源的。故在生产上使用拓扑的话,强烈建议使用自定义序列器。java序列器在那里,是为了容易设计新的原型。

     通过设置“Config.TOPOLOGY_FALL_BACK_ON_JAVA_SERIALIZATION”项为false,可以关闭java序列器。

特定组件序列化注册

     Storm 0.7.0  可以设置特殊组件配置,当然,如果一个组件定义一个序列化,这个序列器需要对其他bolt可用,否则其他bolt将无法接收那个组件的消息。

     一个拓扑被提交,一组序列器会被拓扑选择为所有组件发送消息使用,这是通过特殊组件序列化注册信息与普通组件序列化注册信息合并实现。当为同一个类注册了多个序列化时,序列器会任意选择一个。

     如果两个特定组件序列器有冲突,则会强制一个特定的类做序列器,只需在拓扑特定配置定义想要的序列器,拓扑特定配置优先于序列器注册的特定组件。

点赞
收藏
评论区
推荐文章
blmius blmius
2年前
MySQL:[Err] 1292 - Incorrect datetime value: ‘0000-00-00 00:00:00‘ for column ‘CREATE_TIME‘ at row 1
文章目录问题用navicat导入数据时,报错:原因这是因为当前的MySQL不支持datetime为0的情况。解决修改sql\mode:sql\mode:SQLMode定义了MySQL应支持的SQL语法、数据校验等,这样可以更容易地在不同的环境中使用MySQL。全局s
Jacquelyn38 Jacquelyn38
2年前
2020年前端实用代码段,为你的工作保驾护航
有空的时候,自己总结了几个代码段,在开发中也经常使用,谢谢。1、使用解构获取json数据let jsonData  id: 1,status: "OK",data: 'a', 'b';let  id, status, data: number   jsonData;console.log(id, status, number )
皕杰报表之UUID
​在我们用皕杰报表工具设计填报报表时,如何在新增行里自动增加id呢?能新增整数排序id吗?目前可以在新增行里自动增加id,但只能用uuid函数增加UUID编码,不能新增整数排序id。uuid函数说明:获取一个UUID,可以在填报表中用来创建数据ID语法:uuid()或uuid(sep)参数说明:sep布尔值,生成的uuid中是否包含分隔符'',缺省为
Easter79 Easter79
2年前
storm从入门到放弃教程(4)
概述    上一篇博文https://my.oschina.net/u/2342969/blog/878084详细讲解在生产和本地如何运行拓扑,本篇就对storm开发环境的搭建进行详细讲解,欢迎同志(此同志非彼同志)们通过私信/评论等方式共同学习了解.总述     搭建一个开发环境,步骤很简单:1.下载storm包
Stella981 Stella981
2年前
Python之time模块的时间戳、时间字符串格式化与转换
Python处理时间和时间戳的内置模块就有time,和datetime两个,本文先说time模块。关于时间戳的几个概念时间戳,根据1970年1月1日00:00:00开始按秒计算的偏移量。时间元组(struct_time),包含9个元素。 time.struct_time(tm_y
Wesley13 Wesley13
2年前
mysql设置时区
mysql设置时区mysql\_query("SETtime\_zone'8:00'")ordie('时区设置失败,请联系管理员!');中国在东8区所以加8方法二:selectcount(user\_id)asdevice,CONVERT\_TZ(FROM\_UNIXTIME(reg\_time),'08:00','0
Wesley13 Wesley13
2年前
00:Java简单了解
浅谈Java之概述Java是SUN(StanfordUniversityNetwork),斯坦福大学网络公司)1995年推出的一门高级编程语言。Java是一种面向Internet的编程语言。随着Java技术在web方面的不断成熟,已经成为Web应用程序的首选开发语言。Java是简单易学,完全面向对象,安全可靠,与平台无关的编程语言。
Stella981 Stella981
2年前
Django中Admin中的一些参数配置
设置在列表中显示的字段,id为django模型默认的主键list_display('id','name','sex','profession','email','qq','phone','status','create_time')设置在列表可编辑字段list_editable
Wesley13 Wesley13
2年前
MySQL部分从库上面因为大量的临时表tmp_table造成慢查询
背景描述Time:20190124T00:08:14.70572408:00User@Host:@Id:Schema:sentrymetaLast_errno:0Killed:0Query_time:0.315758Lock_
Python进阶者 Python进阶者
4个月前
Excel中这日期老是出来00:00:00,怎么用Pandas把这个去除
大家好,我是皮皮。一、前言前几天在Python白银交流群【上海新年人】问了一个Pandas数据筛选的问题。问题如下:这日期老是出来00:00:00,怎么把这个去除。二、实现过程后来【论草莓如何成为冻干莓】给了一个思路和代码如下:pd.toexcel之前把这
Easter79
Easter79
Lv1
今生可爱与温柔,每一样都不能少。
文章
2.8k
粉丝
5
获赞
1.2k