HBase原理--BulkLoad

CodeAdventurer
• 阅读 4173

在实际生产环境中,有这样一种场景:用户数据位于HDFS中,业务需要定期将这部分海量数据导入HBase系统,以执行随机查询更新操作。这种场景如果调用写入API进行处理,极有可能会给RegionServer带来较大的写入压力:

•引起RegionServer频繁flush,进而不断compact、split,影响集群稳定性。

•引起RegionServer频繁GC,影响集群稳定性。

•消耗大量CPU资源、带宽资源、内存资源以及IO资源,与其他业务产生资源竞争。

•在某些场景下,比如平均KV大小比较大的场景,会耗尽RegionServer的处理线程,导致集群阻塞。

鉴于存在上述问题,HBase提供了另一种将数据写入HBase集群的方法——BulkLoad。BulkLoad首先使用MapReduce将待写入集群数据转换为HFile文件,再直接将这些HFile文件加载到在线集群中。显然,BulkLoad方案没有将写请求发送给RegionServer处理,可以有效避免上述一系列问题。

BulkLoad核心流程

从HBase的视角来看,BulkLoad主要由两个阶段组成:

1)HFile生成阶段。这个阶段会运行一个MapReduce任务,MapReduce的mapper需要自己实现,将HDFS文件中的数据读出来组装成一个复合KV,其中Key是rowkey,Value可以是KeyValue对象、Put对象甚至Delete对象;MapReduce的reducer由HBase负责,通过方法HFileOutputFormat2.configureIncrementalLoad()进行配置,这个方法主要负责以下事项。

•根据表信息配置一个全局有序的partitioner。

•将partitioner文件上传到HDFS集群并写入DistributedCache。

•设置reduce task的个数为目标表Region的个数。

•设置输出key/value类满足HFileOutputFormat所规定的格式要求。

•根据类型设置reducer执行相应的排序(KeyValueSortReducer或者PutSortReducer)。

这个阶段会为每个Region生成一个对应的HFile文件。

2)HFile导入阶段。HFile准备就绪之后,就可以使用工具completebulkload将HFile加载到在线HBase集群。completebulkload工具主要负责以下工作。

•依次检查第一步生成的所有HFile文件,将每个文件映射到对应的Region。

•将HFile文件移动到对应Region所在的HDFS文件目录下。

•告知Region对应的RegionServer,加载HFile文件对外提供服务。

如果在BulkLoad的中间过程中Region发生了分裂,completebulkload工具会自动将对应的HFile文件按照新生成的Region边界切分成多个HFile文件,保证每个HFile都能与目标表当前的Region相对应。但这个过程需要读取HFile内容,因而并不高效。需要尽量减少HFile生成阶段和HFile导入阶段的延迟,最好能够在HFile生成之后立刻执行HFile导入。

基于BulkLoad两阶段的工作原理,BulkLoad的核心流程如图所示。
HBase原理--BulkLoad

BulkLoad基础案例

在hbase上创建一张表:
create 'test_log','ext'

执行BulkLoad代码:

import org.apache.hadoop.hbase.client.ConnectionFactory
import org.apache.hadoop.hbase.{HBaseConfiguration, KeyValue, TableName}
import org.apache.hadoop.hbase.io.ImmutableBytesWritable
import org.apache.hadoop.hbase.mapreduce.{HFileOutputFormat2, LoadIncrementalHFiles}
import org.apache.hadoop.hbase.util.Bytes
import org.apache.spark.SparkConf
import org.apache.spark.sql.SparkSession

object BulkLoad1 {
  def main(args: Array[String]): Unit = {
    val sparkConf = new SparkConf().setAppName("HbaseBulkLoad")

    val spark = SparkSession.builder
      .config(sparkConf)
      .getOrCreate()
    val sc = spark.sparkContext

    val datas = List(
      ("abc", ("ext", "type", "login")),
      ("ccc", ("ext", "type", "logout"))
    )

    val dataRdd = sc.parallelize(datas)

    val output = dataRdd.map {
      x => {
        val rowKey = Bytes.toBytes(x._1)
        val immutableRowKey = new ImmutableBytesWritable(rowKey)

        val colFam = x._2._1
        val colName = x._2._2
        val colValue = x._2._3

        val kv = new KeyValue(
          rowKey,
          Bytes.toBytes(colFam),
          Bytes.toBytes(colName),
          Bytes.toBytes(colValue.toString)
        )
        (immutableRowKey, kv)
      }
    }


    val hConf = HBaseConfiguration.create()
    hConf.addResource("hbase_site.xml")
    val hTableName = "test_log"
    hConf.set("hbase.mapreduce.hfileoutputformat.table.name",hTableName)
    val tableName = TableName.valueOf(hTableName)
    val conn = ConnectionFactory.createConnection(hConf)
    val table = conn.getTable(tableName)
    val regionLocator = conn.getRegionLocator(tableName)

    val hFileOutput = "/tmp/h_file"

    output.saveAsNewAPIHadoopFile(hFileOutput,
      classOf[ImmutableBytesWritable],
      classOf[KeyValue],
      classOf[HFileOutputFormat2],
      hConf)

    val bulkLoader = new LoadIncrementalHFiles(hConf)

    bulkLoader.doBulkLoad(new Path(hFileOutput),conn.getAdmin,table,regionLocator)
  }

}

提交spark执行:

spark-submit \
--master yarn \
--conf spark.yarn.tokens.hbase.enabled=true \
--deploy-mode client \
--class BulkLoad1
--executor-memory 512m
--driver-memory 512m
--total-executor-cores 2
/home/hadoop/hadoop-2.8.5/files/Spark_study.jar

在hbase shell上查看:

scan 'test_log'
HBase原理--BulkLoad

点赞
收藏
评论区
推荐文章
blmius blmius
4年前
MySQL:[Err] 1292 - Incorrect datetime value: ‘0000-00-00 00:00:00‘ for column ‘CREATE_TIME‘ at row 1
文章目录问题用navicat导入数据时,报错:原因这是因为当前的MySQL不支持datetime为0的情况。解决修改sql\mode:sql\mode:SQLMode定义了MySQL应支持的SQL语法、数据校验等,这样可以更容易地在不同的环境中使用MySQL。全局s
Oracle 分组与拼接字符串同时使用
SELECTT.,ROWNUMIDFROM(SELECTT.EMPLID,T.NAME,T.BU,T.REALDEPART,T.FORMATDATE,SUM(T.S0)S0,MAX(UPDATETIME)CREATETIME,LISTAGG(TOCHAR(
待兔 待兔
1年前
手写Java HashMap源码
HashMap的使用教程HashMap的使用教程HashMap的使用教程HashMap的使用教程HashMap的使用教程22
Stella981 Stella981
4年前
Hive 数据导入HBase的2种方法详解
最近经常被问到这个问题,所以简单写一下总结。Hive数据导入到HBase基本有2个方案:  1、HBase中建表,然后Hive中建一个外部表,这样当Hive中写入数据后,HBase中也会同时更新  2、MapReduce读取Hive数据,然后写入(API或者Bulkload)到HBase1、Hive外部表创
Stella981 Stella981
4年前
Hbase表两种数据备份方法
Hbase表两种数据备份方法导入和导出示例本文将提供两种备份方法——1)基于Hbase提供的类对hbase中某张表进行备份2)基于Hbasesnapshot数据快速备份方法场合:由于线上和测试环境是分离的,无法在测试环境访问线上库,所以需要将线上的hbase表导出一部分到测试环境中的hbase表,这就是本文的由来。
Easter79 Easter79
4年前
Twitter的分布式自增ID算法snowflake (Java版)
概述分布式系统中,有一些需要使用全局唯一ID的场景,这种时候为了防止ID冲突可以使用36位的UUID,但是UUID有一些缺点,首先他相对比较长,另外UUID一般是无序的。有些时候我们希望能使用一种简单一些的ID,并且希望ID能够按照时间有序生成。而twitter的snowflake解决了这种需求,最初Twitter把存储系统从MySQL迁移
Stella981 Stella981
4年前
HBase启动失败
如果在hbase的shell中输入了status报错,hbase(main):001:0statusERROR:org.apache.hadoop.hbase.ipc.ServerNotRunningYetException:Serverisnotrunningyetatorg.apache.ha
Stella981 Stella981
4年前
Hbase FAQ热门问答小集合
这个问答是根据云栖社区上对HBase的FAQ中整理出来的。问:Hbase大量写入很慢,一个列族,每个200多列,一秒写30000条数据,使用mutate添加数据,clientbuffer缓存大小为10M,四台测试机,128G内存,分配60G给Hbase,该怎么优化?答:可以使用bulkload方式写入,通过mr程序生产hfile
Stella981 Stella981
4年前
HBase应该如何优化?
1HBase高可用在HBase中Hmaster负责监控RegionServer的生命周期,均衡RegionServer的负载,如果Hmaster挂掉了,那么整个HBase集群将陷入不健康的状态,此时的工作状态并不会维持太久。所以需要配置hbase的高可用2预分区
Stella981 Stella981
4年前
Hbase基础篇
hbase存储:HBase存储数据其底层使用的是HDFS来作为存储介质,HBase的每一张表对应的HDFS目录上的一个文件夹,文件夹名以HBase表进行命名(如果没有使用命名空间,则默认在default目录下),在表文件夹下存放在若干个Region命名的文件夹,Region文件夹中的每个列簇也是用文件夹进行存储的,每个列簇中存储就是实际的数据,以HF
HBase Sync功能导致HBase入库性能下降
本文分享自天翼云开发者社区《》,作者:5m问题背景与现象HBase入库慢,regionserver日志中大量打印slowsync。原因分析1.对比正常写入时间段监控,检查HBase服务整体CPU、内存以及NameNodeRPC在异常时间段是否增加;2.检查