Apache Beam访问HDFS

傅巽
• 阅读 2515

一、直接访问

1.引入HDFS的相关jar包:

    <dependency>
    <groupId>org.apache.beam</groupId>
    <artifactId>beam-sdks-java-io-hadoop-file-system</artifactId>
    <version>2.1.0</version>
    </dependency>

2.使用HadoopFileSystemOptions代替PipelineOptions

public interface WordCountOptions extends HadoopFileSystemOptions {
    @Description("input file")
    @Default.String("hdfs://localhost:9000/tmp/words2")
    String getInputFile();
    void setInputFile(String in);

    @Description("output")
    @Default.String("hdfs://localhost:9000/tmp/hdfsWordCount")
    String getOutput();
    void setOutput(String out);
}

3.给Options指定HDFS配置

    Configuration conf=new Configuration();
    conf.set("fs.default.name", "hdfs://localhost:9000");
    HDFSOption options= PipelineOptionsFactory.fromArgs(args).withValidation()
            .as(HDFSOption.class);
    options.setHdfsConfiguration(ImmutableList.of(conf));
    

4.与访问本地文件一样访问HDFS文件

    Pipeline p = Pipeline.create(options);
    Data = p.apply("Read from HDFS",
            TextIO.read().from(options.getInputFile()));
            

实际测试中发现本地runner(如Direct, Flink Local, Spark Local...)能够成功读写HDFS,但是集群模式下(如Flink Cluster, Spark Cluster...)读写HDFS失败,原因未知。

二、通过HBase访问

除了直接读写HDFS的数据,还可以通过HBase来进行读写。
1.添加相关jar包

    <!--hbase-->
    <dependency>
        <groupId>org.apache.beam</groupId>
        <artifactId>beam-sdks-java-io-hbase</artifactId>
        <version>${beam.verson}</version>
    </dependency>

2.设置HBase连接信息

    Configuration conf = new Configuration();
    conf.setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, 1);
    conf.setStrings("hbase.master.hostname", "localhost");
    conf.setStrings("hbase.regionserver.hostname", "localhost");

3.使用上述的conf读HBase数据

    pipe
            //指定配置和表名
            .apply("Read from HBase",
                    HBaseIO.read().withConfiguration(conf).withTableId("test_tb"))
                       
            .apply(ParDo.of(new DoFn<Result, String>() {
                @ProcessElement
                public void processElement(ProcessContext c) {
                 //读到的数据是HBase API中定义的Result格式,需要按照HBase官方说明进行剥取 
                    Result result = c.element();
                    String rowkey = Bytes.toString(result.getRow());
                    System.out.println("row key: ");

                    for(Cell cell : result.listCells())
                    {
                        System.out.println("qualifier:"+Bytes.toString(CellUtil.cloneQualifier(cell)));
                        System.out.println("value:"+Bytes.toString(CellUtil.cloneValue(cell)));
                    }

                    c.output(rowkey);
                }
            }));

4.写入到HBase

    //写入前需要将string数据封装为Hbase数据格式mutation
    .apply(ParDo.of(new DoFn<String, Mutation>() {
        @ProcessElement
        public void processElement(ProcessContext context) {
            byte[] qual = Bytes.toBytes("qual");
            byte[] cf = Bytes.toBytes("cf");
            byte[] row = Bytes.toBytes("kafka");
            byte[] val = Bytes.toBytes(context.element());
            final Charset UTF_8 = Charset.forName("UTF-8");
            Mutation mutation = new Put(row).addColumn(cf, qual, val);
            context.output(mutation);
        }

    }))
    .apply("write to Hbase",
            HBaseIO.write()
                    .withConfiguration(conf)
                    .withTableId("test_tb"));

经测试,无论本地runner还是集群runner都能成功读写。
但是发现一个问题,使用mvn exec:java进行调试成功,而使用shade插件打包成jar运行却一直报错,说Mutation没有指定coder,beam论坛上求助后得到的回复是maven-shade-plugin版本太旧,需要更新到3.0.0以上版本,但我改了3.0的版本之后还是一样的错误。后来添加了ServicesResourceTransformer才解决。

<transformers>
    <transformer implementation="org.apache.maven.plugins.shade.resource.ServicesResourceTransformer"/>
</transformers>
点赞
收藏
评论区
推荐文章
Wesley13 Wesley13
3年前
MySQL部分从库上面因为大量的临时表tmp_table造成慢查询
背景描述Time:20190124T00:08:14.70572408:00User@Host:@Id:Schema:sentrymetaLast_errno:0Killed:0Query_time:0.315758Lock_
美凌格栋栋酱 美凌格栋栋酱
6个月前
Oracle 分组与拼接字符串同时使用
SELECTT.,ROWNUMIDFROM(SELECTT.EMPLID,T.NAME,T.BU,T.REALDEPART,T.FORMATDATE,SUM(T.S0)S0,MAX(UPDATETIME)CREATETIME,LISTAGG(TOCHAR(
Wesley13 Wesley13
3年前
JAVA API 连接 HA(High Available) Hadoop集群
使用JAVAAPI连接HDFS时我们需要使用NameNode的地址,开启HA后,两个NameNode可能会主备切换,如果连接的那台主机NameNode挂掉了,连接就会失败.HDFS提供了nameservices的方式进行访问,这样只要有一个NameNode活着,都可以正常访问.HDFSNameNodeHA
Stella981 Stella981
3年前
ASMSupport教程4.7 生成关系运算符
<p在java中,关系运算符是很常用的,分别是&gt;,,&lt;,&gt;,&lt;,!这六种,我们按照惯例看看我们需要生成的代码:</p<divid"scid:9D7513F9C04C4721824A2B34F0212519:dfec0f1ca2ec4ebabc9b91c161fbfa47"class"wlWri
Wesley13 Wesley13
3年前
FLV文件格式
1.        FLV文件对齐方式FLV文件以大端对齐方式存放多字节整型。如存放数字无符号16位的数字300(0x012C),那么在FLV文件中存放的顺序是:|0x01|0x2C|。如果是无符号32位数字300(0x0000012C),那么在FLV文件中的存放顺序是:|0x00|0x00|0x00|0x01|0x2C。2.  
Wesley13 Wesley13
3年前
HDFS知识学习
HDFS设计前提与目标1.硬件错误是常态而不是异常。HDFS被设计为运行在普通硬件上,所以硬件故障时正常的,HDFS可能由成百上千的服务器节点构成,每个服务器节点上都存储着文件系统的部分数据,而HDFS的每个组件随时都有可能出现故障。因此,错误检测并快速自动恢复是HDFS的最核心的设计目标。2.流式数据访问。运行在HDFS上的应用主要是
Easter79 Easter79
3年前
Springmvc 发送邮件功能
1、引入相关jar包<dependency<groupIdorg.apache.velocity</groupId<artifactIdvelocity</artifactId<version1.7</version</dependency
可莉 可莉
3年前
10.Spark之RDD及编程接口
1.起点HelloWorld    valscnewSparkContext("spark://...","HelloWorld","SPARK\_HOME路径","APP\_JAR路径")    valfilesc.textFile("hdfs:///root/Log")   
Stella981 Stella981
3年前
Hive在SQL标准权限模式下创建UDF失败的问题排查
环境:CDH5.16Hive1.1.0已开启KerberosHive授权使用SQLStandardsBasedAuthorization模式(以下简称SSBA模式)症状表现:在编译好UDF的jar包之后,上传到HDFS目录。hdfs dfsmkdi
Wesley13 Wesley13
3年前
JavaWeb 调用接口
JavaWeb 如何调用接口CreateTime2018年4月2日19:04:29Author:Marydon1.所需jar包!(https://oscimg.oschina.net/oscnet/0f139