Spark2Streaming读Kerberos环境的Kafka并写数据到HBase

Stella981
• 阅读 559

温馨提示:要看高清无码套图,请使用手机打开并单击图片放大查看。

Fayson的github:

https://github.com/fayson/cdhproject

提示:代码块部分可以左右滑动查看噢

1.文档编写目的


在前面的文章Fayson介绍了在Kerberos环境下《Spark2Streaming读Kerberos环境的Kafka并写数据到Kudu》,本篇文章Fayson主要介绍如何使用Spark2Streaming访问Kerberos环境的Kafka并将接收到的Kafka数据写入HBase,在介绍本篇文章前,你可能需要知道:

如何在CDH集群启用Kerberos

如何通过Cloudera Manager为Kafka启用Kerberos及使用

示例架构图如下:

Spark2Streaming读Kerberos环境的Kafka并写数据到HBase

示例详细流程图如下:

Spark2Streaming读Kerberos环境的Kafka并写数据到HBase

  • 内容概述:

1.环境准备

2.Spark2Streaming示例开发

3.示例运行

4.总结

  • 测试环境:

1.CM5.14.3/CDH5.14.2

2.CDK2.2.0(Apache Kafka0.10.2)

3.SPARK2.2.0

4.操作系统版本为Redhat7.3

5.采用root用户进行操作

6.集群已启用Kerberos

2.环境准备


1.准备访问Kafka的Keytab文件,使用xst命令导出keytab文件

[root@cdh01 ~]# kadmin.local Authenticating as principal hbase/admin@FAYSON.COM with password.kadmin.local:  xst -norandkey -k fayson.keytab fayson@FAYSON.COM

(可左右滑动)

Spark2Streaming读Kerberos环境的Kafka并写数据到HBase

使用klist命令检查导出的keytab文件是否正确

[root@cdh01 ~]# klist -ek fayson.keytab

Spark2Streaming读Kerberos环境的Kafka并写数据到HBase

2. 准备jaas.cof文件内容如下:

KafkaClient {  com.sun.security.auth.module.Krb5LoginModule required  useKeyTab=true  keyTab="/data/disk1/0286-kafka-shell/conf/fayson.keytab"  principal="fayson@FAYSON.COM";};Client {  com.sun.security.auth.module.Krb5LoginModule required  useKeyTab=true  storeKey=true  keyTab="/data/disk1/0286-kafka-shell/conf/fayson.keytab"  principal="fayson@FAYSON.COM";};

(可左右滑动)

Spark2Streaming读Kerberos环境的Kafka并写数据到HBase

将fayson.keytab和jaas.conf文件拷贝至集群的所有节点统一的/data/disk1/0286-kafka-shell/conf目录下。

3. 准备向Kerberos环境发送数据的脚本,关于脚本这里就不在过多的介绍前面很多文章都有介绍,具体可以参考Fayson的GitHub:

https://github.com/fayson/cdhproject/tree/master/kafkademo/0286-kafka-shell

Spark2Streaming读Kerberos环境的Kafka并写数据到HBase

根据需要将conf下面的配置文件修改为自己集群的环境即可,发送至Kafka的JSON数据示例如下:

{   "occupation": "生产工作、运输工作和部分体力劳动者",   "address": "台东东二路16号-8-8",   "city": "长治",   "marriage": "1",   "sex": "1",   "name": "仲淑兰",   "mobile_phone_num": "13607268580",   "bank_name": "广州银行31",   "id": "510105197906185179",   "child_num": "1",   "fix_phone_num": "15004170180"}

(可左右滑动)

4.登录CM进入SPARK2服务的配置项将spark_kafka_version的kafka版本修改为0.10

Spark2Streaming读Kerberos环境的Kafka并写数据到HBase

5.通过CM下载HBase客户端配置文件

Spark2Streaming读Kerberos环境的Kafka并写数据到HBase

6.将Spark2访问HBase的依赖包添加到集群的/opt/cloudera/parcels/SPARK2/lib/spark2/jars目录下,依赖的jar包如下:

hbase-client-1.2.0-cdh5.14.2.jarhbase-common-1.2.0-cdh5.14.2.jarhbase-protocol-1.2.0-cdh5.14.2.jarhtrace-core-3.2.0-incubating.jar

(可左右滑动)

注意:需要将依赖包拷贝至集群所有节点。

3.Spark2Streaming示例开发


1.使用maven创建scala语言的spark2demo工程,pom.xml依赖如下

<dependency>    <groupId>org.apache.hbase</groupId>    <artifactId>hbase-client</artifactId>    <version>1.2.0-cdh5.14.2</version></dependency>

(可左右滑动)

具体需要的依赖包,可以参考Fayson前面的文章《Spark2Streaming读Kerberos环境的Kafka并写数据到Kudu

2.添加访问HBase的集群配置信息hdfs-site.xml/core-stie.xml/hbase-site.xml文件

Spark2Streaming读Kerberos环境的Kafka并写数据到HBase

3.在resources下创建0289.properties配置文件,内容如下:

kafka.brokers=cdh02.fayson.com:9092,cdh03.fayson.com:9092,cdh04.fayson.com:9092kafka.topics=kafka_hbase_topicprincipal.account=fayson@FAYSON.COMkeytab.filepath=/data/disk1/spark2streaming-kafka-hbase/conf/fayson.keytab

(可左右滑动)

4.创建HBaseUtils.scala类,主要用于创建HBase的Connection

package utilsimport java.io.Fileimport java.security.PrivilegedActionimport org.apache.hadoop.hbase.{HBaseConfiguration}import org.apache.hadoop.hbase.client.{Connection, ConnectionFactory}import org.apache.hadoop.security.UserGroupInformation/**  * package: utils  * describe: 访问Kerberos环境下的HBase  * creat_user: Fayson   * email: htechinfo@163.com  * creat_date: 2018/6/25  * creat_time: 下午10:46  * 公众号:Hadoop实操  */object HBaseUtil {  /**    * HBase 配置文件路径    * @param confPath    * @return    */  def getHBaseConn(confPath: String, principal: String, keytabPath: String): Connection = {    val configuration = HBaseConfiguration.create    val coreFile = new File(confPath + File.separator + "core-site.xml")    if(!coreFile.exists()) {      val in = HBaseUtil.getClass.getClassLoader.getResourceAsStream("hbase-conf/core-site.xml")      configuration.addResource(in)    }    val hdfsFile = new File(confPath + File.separator + "hdfs-site.xml")    if(!hdfsFile.exists()) {      val in = HBaseUtil.getClass.getClassLoader.getResourceAsStream("hbase-conf/hdfs-site.xml")      configuration.addResource(in)    }    val hbaseFile = new File(confPath + File.separator + "hbase-site.xml")    if(!hbaseFile.exists()) {      val in = HBaseUtil.getClass.getClassLoader.getResourceAsStream("hbase-conf/hbase-site.xml")      configuration.addResource(in)    }    UserGroupInformation.setConfiguration(configuration)    UserGroupInformation.loginUserFromKeytab(principal, keytabPath)    val loginUser = UserGroupInformation.getLoginUser    loginUser.doAs(new PrivilegedAction[Connection] {      override def run(): Connection = ConnectionFactory.createConnection(configuration)    })  }}

(可左右滑动)

5.创建Kafka2Spark2HBase.scala文件,内容如下:

package com.cloudera.streamingimport java.io.{File, FileInputStream}import java.util.Propertiesimport org.apache.commons.lang.StringUtilsimport org.apache.hadoop.hbase.TableNameimport org.apache.hadoop.hbase.client.Putimport org.apache.hadoop.hbase.util.Bytesimport org.apache.kafka.common.serialization.StringDeserializerimport org.apache.log4j.{Level, Logger}import org.apache.spark.SparkConfimport org.apache.spark.sql.SparkSessionimport org.apache.spark.streaming.kafka010.{ConsumerStrategies, KafkaUtils, LocationStrategies}import org.apache.spark.streaming.{Seconds, StreamingContext}import utils.HBaseUtilimport scala.util.Tryimport scala.util.parsing.json.JSON/**  * package: com.cloudera.streaming  * describe: Kerberos环境中Spark2Streaming应用实时读取Kafka数据,解析后存入HBase  * 使用spark2-submit的方式提交作业  * spark2-submit --class com.cloudera.streaming.Kafka2Spark2Hbase \    --master yarn \    --deploy-mode client \    --executor-memory 2g \    --executor-cores 2 \    --driver-memory 2g \    --num-executors 2 \    --queue default  \    --principal fayson@FAYSON.COM \    --keytab /data/disk1/spark2streaming-kafka-hbase/conf/fayson.keytab \    --files "/data/disk1/spark2streaming-kafka-hbase/conf/jaas.conf#jaas.conf" \    --driver-java-options "-Djava.security.auth.login.config=/data/disk1/spark2streaming-kafka-hbase/conf/jaas.conf" \    --conf "spark.executor.extraJavaOptions=-Djava.security.auth.login.config=/data/disk1/spark2streaming-kafka-hbase/conf/jaas.conf" \    spark2-demo-1.0-SNAPSHOT.jar  * creat_user: Fayson   * email: htechinfo@163.com  * creat_date: 2018/6/25  * creat_time: 下午10:40  * 公众号:Hadoop实操  */object Kafka2Spark2Hbase {  Logger.getLogger("com").setLevel(Level.ERROR) //设置日志级别  var confPath: String = System.getProperty("user.dir") + File.separator + "conf"  def main(args: Array[String]): Unit = {    //加载配置文件    val properties = new Properties()    val file = new File(confPath + File.separator + "0288.properties")    if(!file.exists()) {      val in = Kafka2Spark2Kudu.getClass.getClassLoader.getResourceAsStream("0289.properties")      properties.load(in);    } else {      properties.load(new FileInputStream(confPath))    }    val brokers = properties.getProperty("kafka.brokers")    val topics = properties.getProperty("kafka.topics")    val principal = properties.getProperty("principal.account")    val keytabFilePath = properties.getProperty("keytab.filepath")    println("kafka.brokers:" + brokers)    println("kafka.topics:" + topics)    if(StringUtils.isEmpty(brokers)|| StringUtils.isEmpty(topics) || StringUtils.isEmpty(principal) || StringUtils.isEmpty(keytabFilePath)) {      println("未配置Kafka和Kerberos信息")      System.exit(0)    }    val topicsSet = topics.split(",").toSet    val spark = SparkSession.builder().appName("Kafka2Spark2HBase-kerberos").config(new SparkConf()).getOrCreate()    val ssc = new StreamingContext(spark.sparkContext, Seconds(5)) //设置Spark时间窗口,每5s处理一次    val kafkaParams = Map[String, Object]("bootstrap.servers" -> brokers      , "auto.offset.reset" -> "latest"      , "security.protocol" -> "SASL_PLAINTEXT"      , "sasl.kerberos.service.name" -> "kafka"      , "key.deserializer" -> classOf[StringDeserializer]      , "value.deserializer" -> classOf[StringDeserializer]      , "group.id" -> "testgroup"    )    val dStream = KafkaUtils.createDirectStream[String, String](ssc,      LocationStrategies.PreferConsistent,      ConsumerStrategies.Subscribe[String, String](topicsSet, kafkaParams))    dStream.foreachRDD(rdd => {      rdd.foreachPartition(partitionRecords => {        val connection = HBaseUtil.getHBaseConn(confPath, principal, keytabFilePath) // 获取Hbase连接        partitionRecords.foreach(line => {          //将Kafka的每一条消息解析为JSON格式数据          val jsonObj =  JSON.parseFull(line.value())          println(line.value())          val map:Map[String,Any] = jsonObj.get.asInstanceOf[Map[String, Any]]          val rowkey = map.get("id").get.asInstanceOf[String]          val name = map.get("name").get.asInstanceOf[String]          val sex = map.get("sex").get.asInstanceOf[String]          val city = map.get("city").get.asInstanceOf[String]          val occupation = map.get("occupation").get.asInstanceOf[String]          val mobile_phone_num = map.get("mobile_phone_num").get.asInstanceOf[String]          val fix_phone_num = map.get("fix_phone_num").get.asInstanceOf[String]          val bank_name = map.get("bank_name").get.asInstanceOf[String]          val address = map.get("address").get.asInstanceOf[String]          val marriage = map.get("marriage").get.asInstanceOf[String]          val child_num = map.get("child_num").get.asInstanceOf[String]          val tableName = TableName.valueOf("user_info")          val table = connection.getTable(tableName)          val put = new Put(Bytes.toBytes(rowkey))          put.addColumn(Bytes.toBytes("info"), Bytes.toBytes("name"), Bytes.toBytes(name))          put.addColumn(Bytes.toBytes("info"), Bytes.toBytes("sex"), Bytes.toBytes(sex))          put.addColumn(Bytes.toBytes("info"), Bytes.toBytes("city"), Bytes.toBytes(city))          put.addColumn(Bytes.toBytes("info"), Bytes.toBytes("occupation"), Bytes.toBytes(occupation))          put.addColumn(Bytes.toBytes("info"), Bytes.toBytes("mobile_phone_num"), Bytes.toBytes(mobile_phone_num))          put.addColumn(Bytes.toBytes("info"), Bytes.toBytes("fix_phone_num"), Bytes.toBytes(fix_phone_num))          put.addColumn(Bytes.toBytes("info"), Bytes.toBytes("bank_name"), Bytes.toBytes(bank_name))          put.addColumn(Bytes.toBytes("info"), Bytes.toBytes("address"), Bytes.toBytes(address))          put.addColumn(Bytes.toBytes("info"), Bytes.toBytes("marriage"), Bytes.toBytes(marriage))          put.addColumn(Bytes.toBytes("info"), Bytes.toBytes("child_num"), Bytes.toBytes(child_num))          Try(table.put(put)).getOrElse(table.close())//将数据写入HBase,若出错关闭table          table.close()//分区数据写入HBase后关闭连接        })        connection.close()      })    })    ssc.start()    ssc.awaitTermination()  }}

(可左右滑动)

6.使用mvn命令编译工程,注意由于是scala工程编译时mvn命令要加scala:compile

mvn clean scala:compile package7.将编译好的spark2-demo-1.0-SNAPSHOT.jar包上传至服务器

Spark2Streaming读Kerberos环境的Kafka并写数据到HBase

7.将编译好的spark2-demo-1.0-SNAPSHOT.jar包上传至服务器

Spark2Streaming读Kerberos环境的Kafka并写数据到HBase

将Spark2应用的配置文件放在conf目录下,内容如下:

Spark2Streaming读Kerberos环境的Kafka并写数据到HBase

0289.properties配置文件内容如下:

Spark2Streaming读Kerberos环境的Kafka并写数据到HBase

jaas.conf文件内容如下:

Spark2Streaming读Kerberos环境的Kafka并写数据到HBase

将spark2streaming-kafka-hbase目录拷贝至集群的所有节点

4.示例运行


1.使用spark2-submit命令向集群提交Spark2Streaming作业

spark2-submit --class com.cloudera.streaming.Kafka2Spark2Hbase \--master yarn \--deploy-mode client \--executor-memory 2g \--executor-cores 2 \--driver-memory 2g \--num-executors 2 \--queue default  \--principal fayson@FAYSON.COM \--keytab /data/disk1/spark2streaming-kafka-hbase/conf/fayson.keytab \--files "/data/disk1/spark2streaming-kafka-hbase/conf/jaas.conf#jaas.conf" \--driver-java-options "-Djava.security.auth.login.config=/data/disk1/spark2streaming-kafka-hbase/conf/jaas.conf" \--conf "spark.executor.extraJavaOptions=-Djava.security.auth.login.config=/data/disk1/spark2streaming-kafka-hbase/conf/jaas.conf" \spark2-demo-1.0-SNAPSHOT.jar

(可左右滑动)

通过CM查看作业是否提交成功

Spark2Streaming读Kerberos环境的Kafka并写数据到HBase

Spark2的UI界面

Spark2Streaming读Kerberos环境的Kafka并写数据到HBase

2.运行脚本向Kafka的Kafka_hbase_topic生产消息

Spark2Streaming读Kerberos环境的Kafka并写数据到HBase

3.使用hbase shell命令查看数据是否入库成功

Spark2Streaming读Kerberos环境的Kafka并写数据到HBase

5.总结


1.本示例中SparkStreaming读取Kerberos环境的Kafka集群,使用的是spark-streaming-kafka0.10.0版本的依赖包,在Spark中提供两个的另外一个版本的为0.8.0版本,在选择依赖包时需要注意与Spark版本的兼容性问题,具体可以参考官网地址:

http://spark.apache.org/docs/2.2.0/streaming-kafka-integration.html

(可左右滑动)

2.在前面的文章Fayson也有介绍Java访问Kerberos环境的Kafka,需要使用到jaas.conf文件,这里的jaas.conf文件Fayson通过spark2-submit的方式指定,注意我们的jaas.conf文件及keytab需要在集群的所有节点存在,因为Driver和Executor是随机在集群的节点上启动的。

3.同样在scala代码中访问Kafka是也一样需要添加Kerberos相关的配置security.protocol和sasl.kerberos.service.name参数。

4.Spark2默认的kafka版本为0.9需要通过CM将默认的Kafka版本修改为0.10

5.注意在0289.properties配置文件中,指定了keytab文件的绝对路径,如果指定的为相对路径可能会出现Kerberos认证失败。

6.在访问Kerberos环境的HBase,需要加载HBase的客户端配置文件,因为在访问HBase时需要使用Hadoop的UserGroupInformation对象登录Kerberos账号,为了方便直接将三个配置文件加载。

GitHub地址如下:

https://github.com/fayson/cdhproject/blob/master/spark2demo/src/main/scala/com/cloudera/streaming/Kafka2Spark2Hbase.scala

https://github.com/fayson/cdhproject/blob/master/spark2demo/src/main/scala/utils/HBaseUtil.scala

本文分享自微信公众号 - Hadoop实操(gh_c4c535955d0f)。
如有侵权,请联系 support@oschina.cn 删除。
本文参与“OSC源创计划”,欢迎正在阅读的你也加入,一起分享。

点赞
收藏
评论区
推荐文章
blmius blmius
2年前
MySQL:[Err] 1292 - Incorrect datetime value: ‘0000-00-00 00:00:00‘ for column ‘CREATE_TIME‘ at row 1
文章目录问题用navicat导入数据时,报错:原因这是因为当前的MySQL不支持datetime为0的情况。解决修改sql\mode:sql\mode:SQLMode定义了MySQL应支持的SQL语法、数据校验等,这样可以更容易地在不同的环境中使用MySQL。全局s
Jacquelyn38 Jacquelyn38
3年前
2020年前端实用代码段,为你的工作保驾护航
有空的时候,自己总结了几个代码段,在开发中也经常使用,谢谢。1、使用解构获取json数据let jsonData  id: 1,status: "OK",data: 'a', 'b';let  id, status, data: number   jsonData;console.log(id, status, number )
皕杰报表之UUID
​在我们用皕杰报表工具设计填报报表时,如何在新增行里自动增加id呢?能新增整数排序id吗?目前可以在新增行里自动增加id,但只能用uuid函数增加UUID编码,不能新增整数排序id。uuid函数说明:获取一个UUID,可以在填报表中用来创建数据ID语法:uuid()或uuid(sep)参数说明:sep布尔值,生成的uuid中是否包含分隔符'',缺省为
Stella981 Stella981
2年前
Python3通过JDBC访问非Kerberos环境的Impala
温馨提示:如果使用电脑查看图片不清晰,可以使用手机打开文章单击文中的图片放大查看高清原图。Fayson的github:https://github.com/fayson/cdhproject提示:代码块部分可以左右滑动查看噢1.文档编写目的在前面Fayson介绍了在
Wesley13 Wesley13
2年前
mysql设置时区
mysql设置时区mysql\_query("SETtime\_zone'8:00'")ordie('时区设置失败,请联系管理员!');中国在东8区所以加8方法二:selectcount(user\_id)asdevice,CONVERT\_TZ(FROM\_UNIXTIME(reg\_time),'08:00','0
Wesley13 Wesley13
2年前
00:Java简单了解
浅谈Java之概述Java是SUN(StanfordUniversityNetwork),斯坦福大学网络公司)1995年推出的一门高级编程语言。Java是一种面向Internet的编程语言。随着Java技术在web方面的不断成熟,已经成为Web应用程序的首选开发语言。Java是简单易学,完全面向对象,安全可靠,与平台无关的编程语言。
Stella981 Stella981
2年前
Spark2Streaming读Kerberos环境的Kafka并写数据到HDFS
温馨提示:如果使用电脑查看图片不清晰,可以使用手机打开文章单击文中的图片放大查看高清原图。Fayson的github:https://github.com/fayson/cdhproject提示:代码块部分可以左右滑动查看噢1.文档编写目的在前面的文章Fayson介
Stella981 Stella981
2年前
Impala的Short
温馨提示:要看高清无码套图,请使用手机打开并单击图片放大查看。Fayson的github:https://github.com/fayson/cdhproject提示:代码块部分可以左右滑动查看噢1.HDFS的ShortCircuitLocalReads我们知道读取HDF
Wesley13 Wesley13
2年前
MySQL部分从库上面因为大量的临时表tmp_table造成慢查询
背景描述Time:20190124T00:08:14.70572408:00User@Host:@Id:Schema:sentrymetaLast_errno:0Killed:0Query_time:0.315758Lock_
Python进阶者 Python进阶者
4个月前
Excel中这日期老是出来00:00:00,怎么用Pandas把这个去除
大家好,我是皮皮。一、前言前几天在Python白银交流群【上海新年人】问了一个Pandas数据筛选的问题。问题如下:这日期老是出来00:00:00,怎么把这个去除。二、实现过程后来【论草莓如何成为冻干莓】给了一个思路和代码如下:pd.toexcel之前把这