docker运行storm及wordcount实例

智数探秘
• 阅读 5067

本文简单介绍下怎么使用docker运行storm以及在springboot中使用storm。

docker-compose

version: '2'
services:
    zookeeper:
        image: zookeeper ##3.4.10
        container_name: zookeeper
        restart: always
        ports:
          - 2181:2181

    nimbus:
        image: storm ## 1.1.1
        container_name: nimbus
        command: storm nimbus
        depends_on:
            - zookeeper
        links:
            - zookeeper
        restart: always
        ports:
            - 6627:6627

    supervisor:
        image: storm
        container_name: supervisor
        command: storm supervisor
        depends_on:
            - nimbus
            - zookeeper
        links:
            - nimbus
            - zookeeper
        restart: always
    ui:
        image: storm
        container_name: stormui
        command: storm ui
        depends_on:
          - nimbus
          - zookeeper
        links:
          - nimbus
          - zookeeper
        restart: always
        ports:
          - 8080:8080

启动之后访问192.168.99.100:8080就可以看见storm-ui的界面

wordcount实例

TestWordSpout

public class TestWordSpout extends BaseRichSpout {
    public static Logger LOG = LoggerFactory.getLogger(TestWordSpout.class);
    boolean _isDistributed;
    SpoutOutputCollector _collector;

    public TestWordSpout() {
        this(true);
    }

    public TestWordSpout(boolean isDistributed) {
        _isDistributed = isDistributed;
    }
        
    public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) {
        _collector = collector;
    }
    
    public void close() {
        
    }
        
    public void nextTuple() {
        Utils.sleep(100);
        final String[] words = new String[] {"nathan", "mike", "jackson", "golda", "bertels"};
        final Random rand = new Random();
        final String word = words[rand.nextInt(words.length)];
        _collector.emit(new Values(word));
    }
    
    public void ack(Object msgId) {

    }

    public void fail(Object msgId) {
        
    }
    
    public void declareOutputFields(OutputFieldsDeclarer declarer) {
        declarer.declare(new Fields("word"));
    }

    @Override
    public Map<String, Object> getComponentConfiguration() {
        if(!_isDistributed) {
            Map<String, Object> ret = new HashMap<String, Object>();
            ret.put(Config.TOPOLOGY_MAX_TASK_PARALLELISM, 1);
            return ret;
        } else {
            return null;
        }
    }    
}

WordCountBolt

public class WordCountBolt extends BaseBasicBolt {

    Map<String, Integer> counts = new HashMap<String, Integer>();

    @Override
    public void execute(Tuple tuple, BasicOutputCollector collector) {
        String word = tuple.getString(0);
        Integer count = counts.get(word);
        if (count == null) count = 0;
        count++;
        counts.put(word, count);
        collector.emit(new Values(word, count));
    }
    @Override
    public void declareOutputFields(OutputFieldsDeclarer declarer) {
        declarer.declare(new Fields("word", "count"));
    }
}

PrintBolt

public class PrintBolt extends BaseBasicBolt {

    @Override
    public void declareOutputFields(OutputFieldsDeclarer declarer) {

    }

    @Override
    public void execute(Tuple tuple, BasicOutputCollector collector) {
        String first = tuple.getString(0);
        int second = tuple.getInteger(1);
        System.out.println(first + "," + second);
    }
}

本地运行

@SpringBootApplication
public class StormDemoApplication implements CommandLineRunner{

    public static void main(String[] args) {
        SpringApplication app = new SpringApplication((StormDemoApplication.class));
        app.setWebEnvironment(false);
        app.run(args);
    }

    @Override
    public void run(String... args) throws Exception {

        TopologyBuilder builder = new TopologyBuilder();
        //并发度10
        builder.setSpout("spout", new TestWordSpout(), 10);
        builder.setBolt("count", new WordCountBolt(), 5).fieldsGrouping("spout", new Fields("word"));
        builder.setBolt("print", new PrintBolt(), 1).shuffleGrouping("count");

        String topologyName = "DemoTopology";
        Config conf = new Config();
        conf.setDebug(true);

        //远程提交 mvn clean package -Dmaven.test.skip=true
//        StormSubmitter.submitTopologyWithProgressBar(topologyName, conf, builder.createTopology());
        try {
            LocalCluster cluster = new LocalCluster();
            cluster.submitTopology(topologyName, conf,builder.createTopology());
            Thread.sleep(60 * 1000);
            cluster.shutdown();
        } catch (Exception e) {
            e.printStackTrace();
        }


    }
}

远程提交

修改提交方式,然后打jar包

        //远程提交 mvn clean package -Dmaven.test.skip=true
        StormSubmitter.submitTopologyWithProgressBar(topologyName, conf, builder.createTopology());

远程提交代码

@Test
    public void remoteSubmit() throws InvalidTopologyException, AuthorizationException, AlreadyAliveException {

        Config conf = new Config();
        conf.put(Config.NIMBUS_SEEDS,Arrays.asList("192.168.99.100")); //配置nimbus连接主机地址,比如:192.168.10.1
        conf.put(Config.NIMBUS_THRIFT_PORT,6627);//配置nimbus连接端口,默认 6627
        conf.put(Config.STORM_ZOOKEEPER_SERVERS, Arrays.asList("192.168.99.100")); //配置zookeeper连接主机地址,可以使用集合存放多个
        conf.put(Config.STORM_ZOOKEEPER_PORT,2181); //配置zookeeper连接端口,默认2181
        conf.setDebug(true);
        conf.setNumWorkers(1);

        TopologyBuilder builder = new TopologyBuilder();
        //并发度10
        builder.setSpout("spout", new TestWordSpout(), 10);
        builder.setBolt("count", new WordCountBolt(), 5).fieldsGrouping("spout", new Fields("word"));
        builder.setBolt("print", new PrintBolt(), 1).shuffleGrouping("count");

        String topologyName = "DemoTopology";

        //非常关键的一步,使用StormSubmitter提交拓扑时,不管怎么样,都是需要将所需的jar提交到nimbus上去,如果不指定jar文件路径,
        //storm默认会使用System.getProperty("storm.jar")去取,如果不设定,就不能提交
        System.setProperty("storm.jar","/Users/downloads/storm-demo-0.0.1-SNAPSHOT.jar");
        StormSubmitter.submitTopology(topologyName, conf, builder.createTopology());
    }

clojars仓库问题

修改~/.m2/settings.xml

    <mirrors>
        <mirror>
            <id>nexus-aliyun</id>
            <mirrorOf>*,!Clojars</mirrorOf>
            <name>Nexus aliyun</name>
            <url>http://maven.aliyun.com/nexus/content/groups/public</url>
    </mirror>

    <repository>
           <id>Clojars</id>
           <name>Clojars Repository</name>
           <url>http://clojars.org/repo/</url>
           <releases><enabled>true</enabled></releases>
           <snapshots><enabled>true</enabled></snapshots>
    </repository>

doc

点赞
收藏
评论区
推荐文章
Easter79 Easter79
4年前
storm drpc实例
序本文主要演示一下stormdrpc实例配置version:'2'services:supervisor:image:stormcontainer_name:supervisorcommand:stormsupe
Easter79 Easter79
4年前
storm从入门到放弃教程(4)
概述    上一篇博文https://my.oschina.net/u/2342969/blog/878084详细讲解在生产和本地如何运行拓扑,本篇就对storm开发环境的搭建进行详细讲解,欢迎同志(此同志非彼同志)们通过私信/评论等方式共同学习了解.总述     搭建一个开发环境,步骤很简单:1.下载storm包
Stella981 Stella981
4年前
Apache Kafka
ApacheKafka教程之与Storm集成http://blogxinxiucan.sh1.newtouch.com/原文地址:http://blogxinxiucan.sh1.newtouch.com/2017/07/13/ApacheKafka与Storm集成/(htt
Easter79 Easter79
4年前
Storm客户端提交任务失败原因分析
storm客户端提交topology失败:java.lang.RuntimeException:org.apache.thrift7.transport.TTransportExceptionatbacktype.storm.StormSubmitter.submitTopology(StormSubmitter.j
Easter79 Easter79
4年前
Storm VS Flink ——性能对比
!(https://oscimg.oschina.net/oscnet/2cec00eb2dccf5fdec8def77207da253a86.jpg)1.背景ApacheFlink和ApacheStorm是当前业界广泛使用的两个分布式实时计算框架。其中ApacheStorm(以下简称“Storm”)在美团点评实时
Easter79 Easter79
4年前
Storm消息机制
Storm消息机制博客分类:分布式计算这章讨论Storm's reliability capabilities, 如何保证从spout emit出来的所有tuple都被正确的执行(fully processed)?What does it mean for a message to be "fully processed"?首先的问题
Easter79 Easter79
4年前
Storm视频教程
Storm流计算之项目实战篇(StormKafkaHBaseHighchartsJQuery,含3个完整实际项目)课程分类:大数据适合人群:初级课时数量:40课时更新程度:70%用到技术:stormKafkaHBaseHighchartsJQuery涉及项目:3个Storm完整项目咨询qq:18402155
Easter79 Easter79
4年前
Storm 性能优化
原文地址:https://www.jianshu.com/p/f645eb7944b0目录1.场景假设2.调优步骤和方法3.Storm的部分特性4.Storm并行度5.Storm消息机制6.StormUI解析7.性能优化场景假设在介绍Storm的
Easter79 Easter79
4年前
Storm消息处理可靠性保证
Storm可以保证每一个从spout发出的消息能被完全处理。本章描述storm是如何完成这个保证以及用户如何从storm的可靠性能力获益的。消息“完全处理”的含义一个tuple从spout发出后可能会触发成千上万的tuple基于它而创建。以workcount的topology为例考虑下:TopologyBuild
kafka数据同步到mysql
本文分享自天翼云开发者社区《》,作者:刘猛kafka安装使用dockercompose进行安装,dockercompose文件如下:version:'2'services:zookeeper:image:wurstmeister/zookeeperport
智数探秘
智数探秘
Lv1
今日听君歌一曲,暂凭杯酒长精神。
文章
5
粉丝
0
获赞
0