HBase 架构和 Java Api

Stella981
• 阅读 497

HBase 架构

HBase是Hadoop的数据库,能够对大数据提供随机、实时读写访问。他是开源的,分布式的,多版本的,面向列的,存储模型。

在讲解的时候我首先给大家讲解一下HBase的整体结构,如下图

HBase 架构和 Java Api

HBase Master是服务器负责管理所有的HRegion服务器,HBase Master并不存储HBase服务器的任何数据,HBase逻辑上的表可能会划分为多个HRegion,然后存储在HRegion Server群中,HBase Master Server中存储的是从数据到HRegion Server的映射。

一台机器只能运行一个HRegion服务器,数据的操作会记录在Hlog中,在读取数据时候,HRegion会先访问Hmemcache缓存,如果 缓存中没有数据才回到Hstore中上找,每一个列都会有一个Hstore集合,每个Hstore集合包含了很多具体的HstoreFile文件,这些文件是B树结构的,方便快速读取。

HBase数据物理视图如下:

HBase 架构和 Java Api

Ø  Timestamp: 时间戳,每次数据操作对应的时间戳,可以看作是数据的version number****Ø  Row Key: 行键,Table的主键,Table中的记录按照Row Key排序

Ø  Column Family:列簇,Table在水平方向有一个或者多个Column Family组成,一个Column Family中可以由任意多个Column组成,即Column Family支持动态扩展,无需预先定义Column的数量以及类型,所有Column均以二进制格式存储,用户需要自行进行类型转换。

Java来操作HBase数据

如果hbase shell已经设置在环境变量中的话,可以直接输入以下命令进入
 hbase shell
0)查看基本信息
 version  
 status  
 whoami
表的管理

1)查看有哪些表  
list  

2)查结构     describe 'my_test'
查数据内容:  scan 'my_test',{LIMIT => 5}

获取某个key下所有的列簇:列:

get 'test_USERPERHOURINFO_SPARK','20170228#20170228112348'

ps: "LIMIT" 大写

封装一个Java 操作HBase 1.1.7 的Api,代码如下:

/**
 * Created by eric on 2016/11/24
 * 基于 HBase 1.1.7版本 HBase集群 的 java api 封装
 */

package com.bw.util;

    import java.io.File;
    import java.io.FileInputStream;
    import java.io.FileNotFoundException;
    import java.io.IOException;
    import java.io.InputStream;
    import java.util.*;
    
    import org.apache.hadoop.conf.Configuration;
    import org.apache.hadoop.hbase.Cell;
    import org.apache.hadoop.hbase.CellUtil;
    import org.apache.hadoop.hbase.HBaseConfiguration;
    import org.apache.hadoop.hbase.HColumnDescriptor;
    import org.apache.hadoop.hbase.HTableDescriptor;
    import org.apache.hadoop.hbase.TableName;
    import org.apache.hadoop.hbase.client.*;
    import org.apache.hadoop.hbase.io.compress.Compression;
    import org.apache.hadoop.hbase.util.Bytes;

class HBaseUtil {
    private static Configuration conf = null;
    /**
     * 初始化配置 静态初始化块
     */
    static {
        InputStream propInStream = null;
        try {
            propInStream = new FileInputStream(new File("./src/main/resources/config/config.properties"));
        } catch (FileNotFoundException e1) {
            // TODO Auto-generated catch block
            e1.printStackTrace();
        }
        Properties prop = new Properties();
        //load(InputStream inStream)方法从.properties属性文件对应的文件输入流中,加载属性列表到Properties类对象
        try{
            prop.load(propInStream);
        }catch(IOException e){
            System.err.println(e.getMessage());
        }finally{
            try {
                propInStream.close();
            } catch (IOException e) {
                // TODO Auto-generated catch block
                e.printStackTrace();
            }
        }
        //Hbase读取配置文件中的内容
        //prop.getProperty方法是分别是获取属性信息。
        Configuration HBASE_CONFIG = new Configuration();
        HBASE_CONFIG.set("hbase.zookeeper.property.clientPort",prop.getProperty("hbase.zookeeper.property.clientPort"));
        HBASE_CONFIG.set("hbase.zookeeper.quorum",prop.getProperty("hbase.zookeeper.quorum"));
        HBASE_CONFIG.set("hbase.master.port",prop.getProperty("hbase.master.port"));
        HBASE_CONFIG.set("zookeeper.znode.parent",prop.getProperty("hbase.zookeeper.znode.parent"));
        conf = HBaseConfiguration.create(HBASE_CONFIG);
    }


    /**
     * 创建数据库表
     */

    public static void createTable(String tableName, String[] columnFamilys) throws IOException {
        // 建立一个数据库的连接
        Connection conn = ConnectionFactory.createConnection(conf);
        // 创建一个数据库管理员
        HBaseAdmin hAdmin = (HBaseAdmin) conn.getAdmin();
        try {
            if (hAdmin.tableExists(tableName)) {
                System.out.println(tableName + "表已存在");
                conn.close();
                System.exit(0);
            } else {
                // 新建一个表描述
                HTableDescriptor tableDesc = new HTableDescriptor(TableName.valueOf(tableName));
                // 在表描述里添加列族
                for (String columnFamily : columnFamilys) {
                    HColumnDescriptor hColumnDescriptor = new HColumnDescriptor(columnFamily);
                    hColumnDescriptor.setCompressionType(Compression.Algorithm.SNAPPY);//设置压缩类型
                    tableDesc.addFamily(hColumnDescriptor);
                }
                // 根据配置好的表描述建表
                hAdmin.createTable(tableDesc);
                System.out.println("创建" + tableName + "表成功");
            }
            conn.close();
        } catch (IOException e) {
            e.printStackTrace();
        }
    }

    /**
     * 插入或者更新单条数据
     * @param tableName 表名
     * @param rowKey 行键
     * @param family 列簇
     * @param qualifier 限定符(列键名)
     * @param value 列键值
     * @return true: 插入成功; false: 插入失败
     * @throws IOException
     */
    public static boolean saveData(String tableName, String rowKey, String family, String qualifier,
                                   String value) throws IOException{
        try {
            Connection conn = ConnectionFactory.createConnection(conf);
            HTable hTable = (HTable) conn.getTable(TableName.valueOf(tableName));
            Put put = new Put(Bytes.toBytes(rowKey));//参数是 行键

            put.addColumn(
                    Bytes.toBytes(family),
                    Bytes.toBytes(qualifier),
                    Bytes.toBytes(value)
            );

            hTable.put(put);

            hTable.close();
            conn.close();
            return true;
        } catch (IOException e){
            e.printStackTrace();
        }
        return  false;
    }


    /**
     * 读取一个限定符的值
     * @param tableName
     * @param rowKey
     * @param family
     * @param qualifier
     * @return
     * @throws IOException
     */
    public static String getValueByQualifier(String tableName, String rowKey, String family, String qualifier)
    throws IOException{
       try {
           Connection conn = ConnectionFactory.createConnection(conf);
           HTable hTable = (HTable) conn.getTable(TableName.valueOf(tableName));
           Get get = new Get(Bytes.toBytes(rowKey));
           get.addColumn(Bytes.toBytes(family), Bytes.toBytes(qualifier));
           Result res = hTable.get(get);

           hTable.close();
           conn.close();

           return Bytes.toString(CellUtil.cloneValue(res.listCells().get(0)));
       } catch (IOException e) {
           e.printStackTrace();
       }

        return null;
    }

    /**
     * 获取一个列簇的一个限定符的值
     * @param tableName
     * @param rowKey
     * @param family
     * @return
     * @throws IOException
     */
    public static Map<String, String> getValueByFamily (String tableName, String rowKey, String family)
            throws IOException{
        Map<String, String> result = null;
       try {
           Connection conn = ConnectionFactory.createConnection(conf);
           HTable hTable = (HTable) conn.getTable(TableName.valueOf(tableName));
           Get get = new Get(Bytes.toBytes(rowKey));
           get.addFamily(Bytes.toBytes(family));

           Result res = hTable.get(get);
           List<Cell> cs = res.listCells();
           result = cs.size() > 0 ? new HashMap<String, String>() : result;

           for (Cell cell : cs) {
               result.put(
                       Bytes.toString(CellUtil.cloneFamily(cell)),
                       Bytes.toString(CellUtil.cloneValue(cell))
               );
           }
           hTable.close();
           conn.close();
       } catch (IOException e) {
            e.printStackTrace();
       }

        return result;
    }

    /**
     *获取一个列簇的所有限定符的值
     * @param tableName
     * @param rowKey
     * @return
     * @throws IOException
     */
    public static Map<String, Map<String, String>> getValueByFamilyAll(String tableName, String rowKey)
            throws IOException{
        Map<String, Map<String, String>> results = null ;

        try {
            Connection conn = ConnectionFactory.createConnection(conf);
            HTable hTable = (HTable) conn.getTable(TableName.valueOf(tableName));
            Get get = new Get(Bytes.toBytes(rowKey));

            Result res = hTable.get(get);
            List<Cell> cs = res.listCells();
            results = cs.size() > 0 ? new HashMap<String, Map<String, String>> () : results;

            for (Cell cell : cs) {
                String familyName = Bytes.toString(CellUtil.cloneFamily(cell));
                if (results.get(familyName) == null)
                {
                    results.put(familyName, new HashMap<String,  String> ());
                }
                results.get(familyName).put(
                        Bytes.toString(CellUtil.cloneQualifier(cell)),
                        Bytes.toString(CellUtil.cloneValue(cell))
                );
            }

            hTable.close();
            conn.close();
        } catch (IOException e) {
            e.printStackTrace();
        }

        return results;
    }

    /**
     * 删除一个限定符
     * @param tableName
     * @param rowKey
     * @param family
     * @param qualifier
     * @return
     */
    public static boolean del(String tableName, String rowKey, String family, String qualifier) {
        try {
            Connection conn = ConnectionFactory.createConnection(conf);
            HTable hTable = (HTable) conn.getTable(TableName.valueOf(tableName));
            Delete del = new Delete(Bytes.toBytes(rowKey));

            if (qualifier != null) {
                del.addColumn(Bytes.toBytes(family), Bytes.toBytes(qualifier));
            } else if (family != null) {
                del.addFamily(Bytes.toBytes(family));
            }
            hTable.delete(del);
            hTable.close();
            conn.close();

            return true;
        } catch (IOException e) {
            e.printStackTrace();
        }

        return false;
    }

    /**
     * 删除一行
     * @param tableName
     * @param rowKey
     * @return
     */
    public static boolean delRowKey(String tableName, String rowKey) {
        return del(tableName, rowKey, null, null);
    }

    /**
     * 删除一行下的一个列簇
     * @param tableName
     * @param rowKey
     * @param family
     * @return
     */
    public static boolean delFamily(String tableName, String rowKey, String family) {
        return del(tableName, rowKey, family, null);
    }

    /**
     * 通过rowkey获取一条数据
     */
    public static void getRow(String tableName, String rowKey) throws IOException {
        // 建立一个数据库的连接
        Connection conn = ConnectionFactory.createConnection(conf);
        // 获取表
        HTable table = (HTable) conn.getTable(TableName.valueOf(tableName));
        // 通过rowkey创建一个get对象
        Get get = new Get(Bytes.toBytes(rowKey));
        // 输出结果
        Result result = table.get(get);
        for (Cell cell : result.rawCells()) {
            System.out.println(
                    "行键:" + new String(CellUtil.cloneRow(cell)) + "\t" +
                            "列族:" + new String(CellUtil.cloneFamily(cell)) + "\t" +
                            "列名:" + new String(CellUtil.cloneQualifier(cell)) + "\t" +
                            "值:" + new String(CellUtil.cloneValue(cell)) + "\t" +
                            "时间戳:" + cell.getTimestamp());
        }
        // 关闭资源
        table.close();
        conn.close();
    }

    /**
     * 全表扫描
     */
    public static void scanTable(String tableName) throws IOException {
        // 建立一个数据库的连接
        Connection conn = ConnectionFactory.createConnection(conf);
        // 获取表
        HTable table = (HTable) conn.getTable(TableName.valueOf(tableName));
        // 创建一个扫描对象
        Scan scan = new Scan();
        // 扫描全表输出结果
        ResultScanner results = table.getScanner(scan);
        for (Result result : results) {
            for (Cell cell : result.rawCells()) {
                System.out.println(
                        "行键:" + new String(CellUtil.cloneRow(cell)) + "\t" +
                                "列族:" + new String(CellUtil.cloneFamily(cell)) + "\t" +
                                "列名:" + new String(CellUtil.cloneQualifier(cell)) + "\t" +
                                "值:" + new String(CellUtil.cloneValue(cell)) + "\t" +
                                "时间戳:" + cell.getTimestamp());
            }
        }
        // 关闭资源
        results.close();
        table.close();
        conn.close();
    }

    /**
     * 删除多条数据
     */
    public static void delRows(String tableName, String[] rows) throws IOException {
        // 建立一个数据库的连接
        Connection conn = ConnectionFactory.createConnection(conf);
        // 获取表
        HTable table = (HTable) conn.getTable(TableName.valueOf(tableName));
        // 删除多条数据
        List<Delete> list = new ArrayList<Delete>();
        for (String row : rows) {
            Delete delete = new Delete(Bytes.toBytes(row));
            list.add(delete);
        }
        table.delete(list);
        // 关闭资源
        table.close();
        conn.close();
    }

    /**
     * 删除数据库表
     */
    public static boolean deleteTable(String tableName) throws IOException {
        // 建立一个数据库的连接
        Connection conn = ConnectionFactory.createConnection(conf);
        // 创建一个数据库管理员
        HBaseAdmin hAdmin = (HBaseAdmin) conn.getAdmin();
        if (hAdmin.tableExists(tableName)) {
            // 失效表
            hAdmin.disableTable(tableName);
            // 删除表
            hAdmin.deleteTable(tableName);
            conn.close();
            return true;
        } else {
            conn.close();
            return false;
        }
    }

}
点赞
收藏
评论区
推荐文章
blmius blmius
2年前
MySQL:[Err] 1292 - Incorrect datetime value: ‘0000-00-00 00:00:00‘ for column ‘CREATE_TIME‘ at row 1
文章目录问题用navicat导入数据时,报错:原因这是因为当前的MySQL不支持datetime为0的情况。解决修改sql\mode:sql\mode:SQLMode定义了MySQL应支持的SQL语法、数据校验等,这样可以更容易地在不同的环境中使用MySQL。全局s
Jacquelyn38 Jacquelyn38
2年前
2020年前端实用代码段,为你的工作保驾护航
有空的时候,自己总结了几个代码段,在开发中也经常使用,谢谢。1、使用解构获取json数据let jsonData  id: 1,status: "OK",data: 'a', 'b';let  id, status, data: number   jsonData;console.log(id, status, number )
皕杰报表之UUID
​在我们用皕杰报表工具设计填报报表时,如何在新增行里自动增加id呢?能新增整数排序id吗?目前可以在新增行里自动增加id,但只能用uuid函数增加UUID编码,不能新增整数排序id。uuid函数说明:获取一个UUID,可以在填报表中用来创建数据ID语法:uuid()或uuid(sep)参数说明:sep布尔值,生成的uuid中是否包含分隔符'',缺省为
Easter79 Easter79
2年前
Twitter的分布式自增ID算法snowflake (Java版)
概述分布式系统中,有一些需要使用全局唯一ID的场景,这种时候为了防止ID冲突可以使用36位的UUID,但是UUID有一些缺点,首先他相对比较长,另外UUID一般是无序的。有些时候我们希望能使用一种简单一些的ID,并且希望ID能够按照时间有序生成。而twitter的snowflake解决了这种需求,最初Twitter把存储系统从MySQL迁移
Wesley13 Wesley13
2年前
mysql设置时区
mysql设置时区mysql\_query("SETtime\_zone'8:00'")ordie('时区设置失败,请联系管理员!');中国在东8区所以加8方法二:selectcount(user\_id)asdevice,CONVERT\_TZ(FROM\_UNIXTIME(reg\_time),'08:00','0
Stella981 Stella981
2年前
HBase启动失败
如果在hbase的shell中输入了status报错,hbase(main):001:0statusERROR:org.apache.hadoop.hbase.ipc.ServerNotRunningYetException:Serverisnotrunningyetatorg.apache.ha
Wesley13 Wesley13
2年前
00:Java简单了解
浅谈Java之概述Java是SUN(StanfordUniversityNetwork),斯坦福大学网络公司)1995年推出的一门高级编程语言。Java是一种面向Internet的编程语言。随着Java技术在web方面的不断成熟,已经成为Web应用程序的首选开发语言。Java是简单易学,完全面向对象,安全可靠,与平台无关的编程语言。
Stella981 Stella981
2年前
Django中Admin中的一些参数配置
设置在列表中显示的字段,id为django模型默认的主键list_display('id','name','sex','profession','email','qq','phone','status','create_time')设置在列表可编辑字段list_editable
Wesley13 Wesley13
2年前
MySQL部分从库上面因为大量的临时表tmp_table造成慢查询
背景描述Time:20190124T00:08:14.70572408:00User@Host:@Id:Schema:sentrymetaLast_errno:0Killed:0Query_time:0.315758Lock_
Python进阶者 Python进阶者
3个月前
Excel中这日期老是出来00:00:00,怎么用Pandas把这个去除
大家好,我是皮皮。一、前言前几天在Python白银交流群【上海新年人】问了一个Pandas数据筛选的问题。问题如下:这日期老是出来00:00:00,怎么把这个去除。二、实现过程后来【论草莓如何成为冻干莓】给了一个思路和代码如下:pd.toexcel之前把这