Storm结合kafka参数配置详解+代码示例(累计单词出现的次数)

Easter79
• 阅读 564

kafka参数配置详情:

public final BrokerHosts hosts;//设置kafka从哪里获取相关的配置信息 public final String topic;//从哪个topic开始消费 public final String clientId;//设置客户端标识 public int fetchSizeBytes = 1024 * 1024;//发给Kafka的每个FetchRequest中,用此指定想要的response中总的消息的大小 public int socketTimeoutMs = 10000;//设置的超时时间 public int fetchMaxWait = 10000;//设置的在broker无消息时的等待时间 public int bufferSizeBytes = 1024 * 1024;//SimpleConsumer所使用的SocketChannel的读缓冲区大小 public MultiScheme scheme = new RawMultiScheme();//设置从服务器读取的byte[]流反序列化方式 public boolean ignoreZkOffsets = false;//是否强制从Kafka中offset最小的开始读起 public long startOffsetTime = kafka.api.OffsetRequest.EarliestTime();//从哪里的offset开始读取消息,默认从消息的最前端开始,有两种方式可选 public long maxOffsetBehind = Long.MAX_VALUE;//KafkaSpout读取的进度与目标进度相差多少,相差太多,Spout会丢弃中间的消息 public boolean useStartOffsetTimeIfOffsetOutOfRange = true;//如果所请求的offset对应的消息在Kafka中不存在,是否使用startOffsetTime public int metricsTimeBucketSizeInSecs = 60;//多长时间统计一次消息

代码示例:注意事项(kafka是scala编写,所有依赖scala环境,一定要统一scala版本,本次使用的为scala2.10.1)

package kafka; import java.util.Arrays; import java.util.HashMap; import java.util.Iterator; import java.util.Map; import java.util.Map.Entry; import java.util.concurrent.atomic.AtomicInteger;

import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.storm.Config; import org.apache.storm.LocalCluster; import org.apache.storm.StormSubmitter; import org.apache.storm.generated.AlreadyAliveException; import org.apache.storm.generated.AuthorizationException; import org.apache.storm.generated.InvalidTopologyException; import org.apache.storm.kafka.*; import org.apache.storm.spout.SchemeAsMultiScheme; import org.apache.storm.task.OutputCollector; import org.apache.storm.task.TopologyContext; import org.apache.storm.topology.OutputFieldsDeclarer; import org.apache.storm.topology.TopologyBuilder; import org.apache.storm.topology.base.BaseRichBolt; import org.apache.storm.tuple.Fields; import org.apache.storm.tuple.Tuple; import org.apache.storm.tuple.Values;

/** * Created by shea on 2018/2/2. */ public class KafkaTopology2 {

public static class KafkaWordSplitter extends BaseRichBolt {

    private static final Log LOG = LogFactory.getLog(KafkaWordSplitter.class);
    private static final long serialVersionUID = 886149197481637894L;
    private OutputCollector collector;

    public void prepare(Map stormConf, TopologyContext context,
                        OutputCollector collector) {
        this.collector = collector;
    }

    public void execute(Tuple input) {
        String line = input.getString(0);
        LOG.info("RECV\[kafka -> splitter\] " + line);
        String\[\] words = line.split("\\\\s+");
        for(String word : words) {
            LOG.info("EMIT\[splitter -> counter\] " + word);
            collector.emit(input, new Values(word, 1));
        }
        collector.ack(input);
    }

    public void declareOutputFields(OutputFieldsDeclarer declarer) {
        declarer.declare(new Fields("word", "count"));
    }

}

public static class WordCounter extends BaseRichBolt {

    private static final Log LOG = LogFactory.getLog(WordCounter.class);
    private static final long serialVersionUID = 886149197481637894L;
    private OutputCollector collector;
    private Map<String, AtomicInteger> counterMap;

    public void prepare(Map stormConf, TopologyContext context,
                        OutputCollector collector) {
        this.collector = collector;
        this.counterMap = new HashMap<String, AtomicInteger>();
    }

    public void execute(Tuple input) {
        String word = input.getString(0);
        int count = input.getInteger(1);
        LOG.info("RECV\[splitter -> counter\] " + word + " : " + count);
        AtomicInteger ai = this.counterMap.get(word);
        if(ai ==null ) {
            ai = new AtomicInteger();
            this.counterMap.put(word, ai);
        }
        ai.addAndGet(count);
        collector.ack(input);
        LOG.info("CHECK statistics map: " + this.counterMap);
    }

    @Override
    public void cleanup() {
        LOG.info("The final result:");
        Iterator<Entry<String, AtomicInteger>> iter = this.counterMap.entrySet().iterator();
        while(iter.hasNext()) {
            Entry<String, AtomicInteger> entry = iter.next();
            LOG.info(entry.getKey() + "\\t:\\t" + entry.getValue().get());
        }

    }

    public void declareOutputFields(OutputFieldsDeclarer declarer) {
        declarer.declare(new Fields("word", "count"));
    }
}

public static void main(String\[\] args) throws AlreadyAliveException, InvalidTopologyException, InterruptedException,AuthorizationException {
    String zks = "data1:2181,data2:2181,data3:2181";
    //String topic = "my-replicated-topic5";
    String topic = "test";
    String zkRoot = "/storm"; // default zookeeper root configuration for storm
    String id = "word";

    BrokerHosts brokerHosts = new ZkHosts(zks);
    SpoutConfig spoutConf = new SpoutConfig(brokerHosts, topic, zkRoot, id);
    spoutConf.scheme = new SchemeAsMultiScheme(new StringScheme());
    //spoutConf.forceFromStart = false;//该配置是指,如果该Topology因故障停止处理,下次正常运行时是否从Spout对应数据源Kafka//中的该订阅Topic的起始位置开始读取,如果forceFromStart=true,则之前处理过的Tuple还要重新处理一遍,否则会从上次处理的位置//继续处理,保证Kafka中的Topic数据不被重复处理,是在数据源的位置进行状态记录
    spoutConf.zkServers = Arrays.asList(new String\[\] {"data1", "data2", "data3"});
    spoutConf.zkPort = 2181;

    TopologyBuilder builder = new TopologyBuilder();
    builder.setSpout("kafka-reader", new KafkaSpout(spoutConf), 5); // Kafka我们创建了一个5分区的Topic,这里并行度设置为5
    builder.setBolt("word-splitter", new KafkaWordSplitter(), 2).shuffleGrouping("kafka-reader");
    builder.setBolt("word-counter", new WordCounter()).fieldsGrouping("word-splitter", new Fields("word"));

    Config conf = new Config();

    String name = MyKafkaTopology.class.getSimpleName();
    if (args !=null  && args.length > 0) {
        // Nimbus host name passed from command line
        conf.put(Config.NIMBUS\_HOST, args\[0\]);
        conf.setNumWorkers(3);
        StormSubmitter.submitTopologyWithProgressBar(name, conf, builder.createTopology());
    } else {
        conf.setMaxTaskParallelism(3);
        LocalCluster cluster = new LocalCluster();
        cluster.submitTopology(name, conf, builder.createTopology());
        Thread.sleep(60000);
        cluster.shutdown();
    }
}

}

/** * 获取数据,处理数据,发送数据 * ack机制即, spout发送的每一条消息,

 在规定的时间内,spout收到Acker的ack响应,即认为该tuple 被后续bolt成功处理  在规定的时间内,没有收到Acker的ack响应tuple,就触发fail动作,即认为该tuple处理失败,  或者收到Acker发送的fail响应tuple,也认为失败,触发fail动作

另外Ack机制还常用于限流作用: 为了避免spout发送数据太快,而bolt处理太慢,常常设置pending数, 当spout有等于或超过pending数的tuple没有收到ack或fail响应时,跳过执行nextTuple, 从而限制spout发送数据。

网络上另外对kafka配置的解释-----摘自https://www.cnblogs.com/devos/p/4335302.html

public final BrokerHosts hosts; //用以获取Kafka broker和partition的信息 public final String topic;//从哪个topic读取消息 public final String clientId; // SimpleConsumer所用的client id

public int fetchSizeBytes = 1024 \* 1024; //发给Kafka的每个FetchRequest中,用此指定想要的response中总的消息的大小
public int socketTimeoutMs = 10000;//与Kafka broker的连接的socket超时时间
public int fetchMaxWait = 10000;   //当服务器没有新消息时,消费者会等待这些时间
public int bufferSizeBytes = 1024 \* 1024;//SimpleConsumer所使用的SocketChannel的读缓冲区大小
public MultiScheme scheme = new RawMultiScheme();//从Kafka中取出的byte\[\],该如何反序列化
public boolean forceFromStart = false;//是否强制从Kafka中offset最小的开始读起
public long startOffsetTime = kafka.api.OffsetRequest.EarliestTime();//从何时的offset时间开始读,默认为最旧的offset
public long maxOffsetBehind = Long.MAX\_VALUE;//KafkaSpout读取的进度与目标进度相差多少,相差太多,Spout会丢弃中间的消息 

   public boolean useStartOffsetTimeIfOffsetOutOfRange = true;//如果所请求的offset对应的消息在Kafka中不存在,是否使用startOffsetTime    public int metricsTimeBucketSizeInSecs = 60;//多长时间统计一次metrics

对Zookeeper的使用

KafkaSpout的配置中有两个地方可以用到Zookeeper

  1. 用Zookeeper来记录KafkaSpout的处理进度,在topology重新提交或者task重启后继续之前的处理进度。在SpoutConfig中的zkServers, zkPort和zkRoot与此相关。如果zkServer和zkPort没有设置,那么KafkaSpout会使用Storm集群所用的Zookeeper记录这些信息。
  2. 用Zookeeper来获取Kafka中一个topic的所有partition,和每个partition的leader。这需要实现BrokerHosts的子类ZkHosts.但是,这个Zookeepr是可选的。如果使用BrokerHosts的另一个子类StaticHosts,把partition和leader的对应关系硬编码,则不需要Zookeeper来提供此功能。KafkaSpout会从Kafka集群使用的Zookeeper中提取partition和leader的对应关系。而且:
    • 如果使用StatisHosts,那么KafkaSpout会使用StaticCoordinator,这个coordinator不能响应partition leader的变化。
    • 如果使用ZkHosts,那么KafkaSpout会使用ZkCoordinator, 当其refresh()方法被调用后,这个cooridnator会检查发生leader变更的partition,并为之生成新的PartitionManager.从而能够在leader变更后,继续读取消息。

影响初始读取进度的配置项

在一个topology上线后,它从哪个offset开始读取消息呢?有一些配置项对此有影响:

  1. SpoutConfig中的id字段。如果想要一个topology从另一个topology之前的处理进度继续处理,它们需要有相同的id。
  2. KafkaConfig的forceFromStart字段。如果此字段设为true, 那么它一个topology上线后,它会忽略之前相同id的topology的进度,并且从Kafka中最早的消息开始处理。
  3. KafkaConfig的startOffsetTime字段。默认为kafka.api.OffsetRequest.EarliestTime()开始读,也就是从Kafka中最早的消息开始处理。也可以设成kafka.api.OffsetRequest.LatestOffset,也就是最早的消息开始读。也可以自己指定具体的值。
  4. KafkaConfig的maxOffsetBehind字段。这个字段对于KafkaSpout的多个处理流程都有影响。当提交一个新topology时,如果没有forceFromStart, 当KafkaSpout对某个partition的处理进度落后startOffsetTime对应的offset多于此值时,KafkaSpout会丢弃中间的消息,从而强制赶上目标进度.比如,如果startOffsetTime设成了lastestTime,那么如果进度落后超过maxOffsetBehind,KafkaSpout会直接从latestTime对应的offset开始处理。如果设成了froceFromStart,则在提交新任务时,始终会从EarliestTime开始读。
  5. KafkaSpout的userStartOffsetTimeIfOffsetOutOfRange字段。如果设成true,那么当fetch消息时出错,且FetchResponse显示的出错原因是OFFSET_OUT_OF_RANGE,那么就会尝试从KafkaSpout指定的startOffsetTime对应的消息开始读。例如,如果有一批消息因为超过了保存期限被Kafka删除,并且zk里记录的消息在这批被删除的消息里。如果KafkaSpout试图从zk的记录继续读,那么就会出现OFFSET_OUT_OF_RANGE的错误,从而触发这个配置。

实际上maxOffsetBehind有时候有点名不符实。当startOffsetTime为A, zk里的进度为B, A - B > maxOffsetBehind时,应该从A - maxOffsetBehind除开始读或许更好一些,而不是直接跳到startOffsetTime。此处的逻辑参见PartitionManager的实现。

点赞
收藏
评论区
推荐文章
blmius blmius
2年前
MySQL:[Err] 1292 - Incorrect datetime value: ‘0000-00-00 00:00:00‘ for column ‘CREATE_TIME‘ at row 1
文章目录问题用navicat导入数据时,报错:原因这是因为当前的MySQL不支持datetime为0的情况。解决修改sql\mode:sql\mode:SQLMode定义了MySQL应支持的SQL语法、数据校验等,这样可以更容易地在不同的环境中使用MySQL。全局s
Jacquelyn38 Jacquelyn38
2年前
2020年前端实用代码段,为你的工作保驾护航
有空的时候,自己总结了几个代码段,在开发中也经常使用,谢谢。1、使用解构获取json数据let jsonData  id: 1,status: "OK",data: 'a', 'b';let  id, status, data: number   jsonData;console.log(id, status, number )
皕杰报表之UUID
​在我们用皕杰报表工具设计填报报表时,如何在新增行里自动增加id呢?能新增整数排序id吗?目前可以在新增行里自动增加id,但只能用uuid函数增加UUID编码,不能新增整数排序id。uuid函数说明:获取一个UUID,可以在填报表中用来创建数据ID语法:uuid()或uuid(sep)参数说明:sep布尔值,生成的uuid中是否包含分隔符'',缺省为
Wesley13 Wesley13
2年前
Java获得今日零时零分零秒的时间(Date型)
publicDatezeroTime()throwsParseException{    DatetimenewDate();    SimpleDateFormatsimpnewSimpleDateFormat("yyyyMMdd00:00:00");    SimpleDateFormatsimp2newS
Stella981 Stella981
2年前
KVM调整cpu和内存
一.修改kvm虚拟机的配置1、virsheditcentos7找到“memory”和“vcpu”标签,将<namecentos7</name<uuid2220a6d1a36a4fbb8523e078b3dfe795</uuid
Wesley13 Wesley13
2年前
mysql设置时区
mysql设置时区mysql\_query("SETtime\_zone'8:00'")ordie('时区设置失败,请联系管理员!');中国在东8区所以加8方法二:selectcount(user\_id)asdevice,CONVERT\_TZ(FROM\_UNIXTIME(reg\_time),'08:00','0
Wesley13 Wesley13
2年前
00:Java简单了解
浅谈Java之概述Java是SUN(StanfordUniversityNetwork),斯坦福大学网络公司)1995年推出的一门高级编程语言。Java是一种面向Internet的编程语言。随着Java技术在web方面的不断成熟,已经成为Web应用程序的首选开发语言。Java是简单易学,完全面向对象,安全可靠,与平台无关的编程语言。
Stella981 Stella981
2年前
Django中Admin中的一些参数配置
设置在列表中显示的字段,id为django模型默认的主键list_display('id','name','sex','profession','email','qq','phone','status','create_time')设置在列表可编辑字段list_editable
Wesley13 Wesley13
2年前
MySQL部分从库上面因为大量的临时表tmp_table造成慢查询
背景描述Time:20190124T00:08:14.70572408:00User@Host:@Id:Schema:sentrymetaLast_errno:0Killed:0Query_time:0.315758Lock_
Python进阶者 Python进阶者
3个月前
Excel中这日期老是出来00:00:00,怎么用Pandas把这个去除
大家好,我是皮皮。一、前言前几天在Python白银交流群【上海新年人】问了一个Pandas数据筛选的问题。问题如下:这日期老是出来00:00:00,怎么把这个去除。二、实现过程后来【论草莓如何成为冻干莓】给了一个思路和代码如下:pd.toexcel之前把这
Easter79
Easter79
Lv1
今生可爱与温柔,每一样都不能少。
文章
2.8k
粉丝
5
获赞
1.2k