35 HDFS

lix_uan
• 阅读 1005

HDFS优缺点

优点

  • 高容错性:多副本
  • 适合处理大数据

缺点

  • 不适合低延时数据访问
  • 无法高效的对大量小文件进行存储:会占用NameNode大量的内存来存储文件目录和块信息
  • 支持数据append,不支持文件的随机修改

HDFS组成架构

NameNode(nn)

  • 管理HDFS名称空间
  • 配置副本策略
  • 管理数据块(Block)的映射信息
  • 处理客户端的读写请求

DataNode

  • 存储实际的数据块
  • 执行数据块的读写操作

Client客户端(API)

  • 将文件切分切分成一个一个的Block,然后进行上传
  • 与NameNode交互,获取文件的位置信息
  • 与DataNode交互,写入或读取数据
  • 通过一些命令来实现对HDFS的增删改查

Secondary NameNode

  • 并非NameNode的热备,当NameNode挂掉的时候,它并不能马上替换NameNode并提供服务
  • 辅助NameNode,分担其工作量
  • 定期合并FsimageEdits,并推送给NameNode
  • 在紧急情况下,可辅助恢复NameNode

HDFS块文件大小

  • 默认大小是128M

为什么块大小不能设置太小,也不能设置太大?

  • HDFS块的大小设置主要取决于磁盘的传输速率
  • 寻址时间为传输时间的1%时,为最佳状态
  • 块设置太小,会增加寻址时间
  • 块设置的太大,处理这块数据时,会非常慢

HDFS的Shell操作

上传

# moveFromLocal:从本地剪切到HDFS
hadoop fs -moveFromLocal README.txt /

# appendToFile:从本地追加到HDFS文件末尾
hadoop fs -appendToFile liubei.txt /sanguo/shuguo/kongming.txt

# put:等同于copyFromLocal,从本地拷贝到HDFS
hadoop fs -put ./liubei.txt /user/lixuan/test/

下载

# get:等同于copyTolocal,从HDFS拷贝到本地
hadoop fs -get /sanguo/shuguo/kongming.txt ./

# getmerge:合并下载多个文件
hadoop fs -getmerge /user/lixuan/test/* ./zaiyiqi.txt

HDFS直接操作

# ls:显示目录信息
# mkdir:在HDFS上创建目录
# cat:显示文件内容
# chgrp、chmod、chown

# cp:从HDFS的一个路径拷贝到HDFS的另一个路径
hadoop fs -cp /sanguo/shuguo/kongming.txt /zhuge.txt

# mv:在HDFS目录中移动文件
hadoop fs -mv /zhuge.txt /sanguo/shuguo/

# tail:显示一个文件的末尾1kb数据
hadoop fs -tail /sanguo/shuguo/kongming.txt

# rm:删除文件或文件夹
# du:统计文件夹的大小信息
hadoop fs -du -s -h /user/lixuan/test
2.7 K  /user/lixuan/test

hadoop fs -du  -h /user/lixuan/test
1.3 K  /user/lixuan/test/README.txt
15     /user/lixuan/test/jinlian.txt
1.4 K  /user/lixuan/test/zaiyiqi.txt

# setrep:设置HDFS文件副本数量
# 设置的副本数只是记录在NameNode元数据中
# 是否真的有这么多副本,还得得看DataNode的数量
# 目前只有3台设备,最多也就3个副本
hadoop fs -setrep 10 /sanguo/shuguo/kongming.txt

HDFS的API操作

环境准备

  • 配置Windows环境变量

  • 导入Maven依赖

    <dependencies>
        <dependency>
            <groupId>junit</groupId>
            <artifactId>junit</artifactId>
            <version>4.12</version>
        </dependency>
        <dependency>
            <groupId>org.apache.logging.log4j</groupId>
            <artifactId>log4j-slf4j-impl</artifactId>
            <version>2.12.0</version>
        </dependency>
        <dependency>
            <groupId>org.apache.hadoop</groupId>
            <artifactId>hadoop-client</artifactId>
            <version>3.1.3</version>
        </dependency>
    </dependencies>
  • 在resources目录下新建log4j2.xml,添加如下配置

    <?xml version="1.0" encoding="UTF-8"?>
    <Configuration status="error" strict="true" name="XMLConfig">
        <Appenders>
            <!-- 类型名为Console,名称为必须属性 -->
            <Appender type="Console" name="STDOUT">
                <!-- 布局为PatternLayout的方式,
                输出样式为[INFO] [2018-01-22 17:34:01][org.test.Console]I'm here -->
                <Layout type="PatternLayout"
                        pattern="[%p] [%d{yyyy-MM-dd HH:mm:ss}][%c{10}]%m%n" />
            </Appender>
    
        </Appenders>
    
        <Loggers>
            <!-- 可加性为false -->
            <Logger name="test" level="info" additivity="false">
                <AppenderRef ref="STDOUT" />
            </Logger>
    
            <!-- root loggerConfig设置 -->
            <Root level="info">
                <AppenderRef ref="STDOUT" />
            </Root>
        </Loggers>
    </Configuration>

创建HdfsClient类

public class HdfsClient{    
@Test
public void testMkdirs() throws IOException, InterruptedException, URISyntaxException{

        // 1 获取文件系统
        Configuration configuration = new Configuration();
        // 配置在集群上运行
        // configuration.set("fs.defaultFS", "hdfs://hadoop102:9820");
        // FileSystem fs = FileSystem.get(configuration);

        FileSystem fs = FileSystem.get(new URI("hdfs://node01:9820"), configuration, "lixuan");

        // 2 创建目录
        fs.mkdirs(new Path("/1108/daxian/banzhang"));

        // 3 关闭资源
        fs.close();
    }
}

35 HDFS

文件上传(测试参数优先级)

@Test
public void testCopyFromLocalFile() throws IOException, InterruptedException, URISyntaxException {

        // 1 获取文件系统
        Configuration configuration = new Configuration();
        configuration.set("dfs.replication", "2");
        FileSystem fs = FileSystem.get(new URI("hdfs://node01:8020"), configuration, "lixuan");

        // 2 上传文件
        fs.copyFromLocalFile(new Path("c:/banzhang.txt"), new Path("/banzhang.txt"));

        // 3 关闭资源
        fs.close();

        System.out.println("over");
}
  • 将hdfs-site.xml拷贝到项目的根目录下

    <?xml version="1.0" encoding="UTF-8"?>
    <?xml-stylesheet type="text/xsl" href="configuration.xsl"?>
    
    <configuration>
        <property>
            <name>dfs.replication</name>
             <value>1</value>
        </property>
    </configuration>
  • 参数的优先级

    • 客户端代码中设置的值 > ClassPath下用户自定义的配置文件 > 服务器的自定义配置(xxx-site.xml) > 服务器的默认配置(xxx-default.xml)

文件下载

@Test
public void testCopyToLocalFile() throws IOException, InterruptedException, URISyntaxException{

        // 1 获取文件系统
        Configuration configuration = new Configuration();
        FileSystem fs = FileSystem.get(new URI("hdfs://node01:9820"), configuration, "lixuan");

        // 2 执行下载操作
        // boolean delSrc 指是否将原文件删除
        // Path src 指要下载的文件路径
        // Path dst 指将文件下载到的路径
        // boolean useRawLocalFileSystem 是否开启文件校验
        fs.copyToLocalFile(false, new Path("/banzhang.txt"), new Path("c:/banhua.txt"), true);

        // 3 关闭资源
        fs.close();
}

删除文件和目录

@Test
public void testDelete() throws IOException, InterruptedException, URISyntaxException{

    // 1 获取文件系统
    Configuration configuration = new Configuration();
    FileSystem fs = FileSystem.get(new URI("hdfs://node01:9820"), configuration, "lixuan");

    // 2 执行删除
    fs.delete(new Path("/0508/"), true);

    // 3 关闭资源
    fs.close();
}

文件更名和移动

@Test
public void testRename() throws IOException, InterruptedException, URISyntaxException{

    // 1 获取文件系统
    Configuration configuration = new Configuration();
    FileSystem fs = FileSystem.get(new URI("hdfs://node01:9820"), configuration, "lixuan"); 

    // 2 修改文件名称
    fs.rename(new Path("/banzhang.txt"), new Path("/banhua.txt"));

    // 3 关闭资源
    fs.close();
}

查看文件详情

@Test
public void testListFiles() throws IOException, InterruptedException, URISyntaxException{

    // 1获取文件系统
    Configuration configuration = new Configuration();
    FileSystem fs = FileSystem.get(new URI("hdfs://node01:9820"), configuration, "lixuan"); 

    // 2 获取文件详情
    RemoteIterator<LocatedFileStatus> listFiles = fs.listFiles(new Path("/"), true);

    while(listFiles.hasNext()){
        LocatedFileStatus status = listFiles.next();

        // 输出详情
        // 文件名称
        System.out.println(status.getPath().getName());
        // 长度
        System.out.println(status.getLen());
        // 权限
        System.out.println(status.getPermission());
        // 分组
        System.out.println(status.getGroup());

        // 获取存储的块信息
        BlockLocation[] blockLocations = status.getBlockLocations();

        for (BlockLocation blockLocation : blockLocations) {

            // 获取块存储的主机节点
            String[] hosts = blockLocation.getHosts();

            for (String host : hosts) {
                System.out.println(host);
            }
        }
    }

// 3 关闭资源
fs.close();
}

文件和文件夹判断

@Test
public void testListStatus() throws IOException, InterruptedException, URISyntaxException{

    // 1 获取文件配置信息
    Configuration configuration = new Configuration();
    FileSystem fs = FileSystem.get(new URI("hdfs://node01:9820"), configuration, "lixuan");

    // 2 判断是文件还是文件夹
    FileStatus[] listStatus = fs.listStatus(new Path("/"));

    for (FileStatus fileStatus : listStatus) {

        // 如果是文件
        if (fileStatus.isFile()) {
                System.out.println("f:"+fileStatus.getPath().getName());
            }else {
                System.out.println("d:"+fileStatus.getPath().getName());
            }
        }

    // 3 关闭资源
    fs.close();
}

HDFS的数据流

HDFS写数据流程

35 HDFS

  • 客户端向NameNode请求上传文件,NameNode检查目标文件,父目录是否存在
  • NameNode返回是否可以上传
  • 客户端请求第一个Block上传到哪几个DataNode服务器上
    • NameNode返回3个节点,分别为dn1,dn2,dn3
    • 客户端请求向dn1上传数据,dn1收到请求,调用dn2,dn2调用dn3,通信管道建立完成
    • dn1、dn2、dn3逐级应答客户端
    • 客户端开始往dn1上传第一个Block
      • 先从磁盘读取数据,放到一个本地内存缓存
      • 以Packet为单位,dn1收到一个Packet就会传给dn2,dn2再传给dn3
      • dn1每传一个Packet会放入一个应答队列等待应答
  • 当一个Block传输完成之后,客户端再次请求NameNode上传第二个Block到哪几个服务器上

网络拓扑-节点距离计算

  • 在HDFS写数据的过程中,NameNode会选择距离最近的DataNode节点 35 HDFS

机架感知-副本存储节点选择

35 HDFS

HDFS读数据的流程

35 HDFS

  • 客户端向NameNode请求下载文件,NameNode通过查询元数据,找到文件块所在的DataNode地址
  • 根据就近原则,随机挑选一台DataNode,请求读取数据
  • DataNode开始传输数据给客户端
    • 从磁盘读取数据流,以Packet为单位来做数据校验
    • 先在本地缓存,然后写入目标文件

NameNode和Secondary NameNode

nn,2nn工作机制

35 HDFS

第一阶段:NameNode启动

  • 第一次启动NameNode格式化后,创建Fsimage和Edits文件
    • 如果不是第一次启动,直接加载镜像文件和编辑日志到内存
  • 客户端对元数据进行增删改的请求
  • NameNode记录操作日志,更新滚动日志
  • NameNode在内存中对元数据进行增删改

第二阶段:Secondary NameNode工作

  • Secondary NameNode询问NameNode是否需要CheckPoint
  • Secondary NameNode请求执行CheckPoint
  • NameNode滚动正在写的Edits日志
  • 将滚动前的编辑日志和镜像文件拷贝到Secondary NameNode
  • Secondary NameNode加载编辑日志和镜像文件到内存,并合并
  • 生成新的镜像文件fsimage.chkpoint
  • 拷贝fsimage.chkpoint到NameNode
  • NameNode将fsimage.chkpoint重新命名成fsimage

Fsimage和Edits解析

  • Fsimage文件
    • HDFS文件系统元数据的一个永久性的检查点,其中包含HDFS文件系统的所有目录和文件inode的序列化信息
  • Edits文件
    • 存放HDF文件系统的所有更新操作
  • 每次NameNode启动的时候都会将Fsimage文件读入内存,加载Edits里面的更新操作,保证内存中的元数据是最新的、同步的

CheckPoint时间设置

  • 通常情况下,Secondary NameNode每隔一小时执行一次CheckPoint
  • 一分钟检查一次操作次数,当操作次数达到100万时,Secondary NameNode也会执行一次CheckPoint

NameNode故障处理

  • 将Secondary NameNode中的数据拷贝到NameNode存储数据的目录

    scp -r lixuan@node03:/opt/module/hadoop-3.1.3/data/dfs/namesecondary/* ./name/

集群安全模式

  • NameNode刚启动时,会处在安全模式,NameNode的文件系统对于客户端来说是只读的
  • 如果满足“最小副本条件”,NameNode会在30秒之后就退出安全模式
  • 在启动一个刚刚格式化的HDFS集群时,因为系统中还没有任何块,所以NameNode不会进入安全模式

DataNode

DataNode工作机制

35 HDFS

  • 一个数据块在DataNode上以文件形式存储在磁盘上,包括两个文件
    • 一个是文件本身
    • 一个是元数据(包括数据块的长度、块数据的校验和、时间戳)
  • DataNode启动后向NameNode注册,通过后,每隔1小时向NameNode上报所有块的信息
  • 每3秒向NameNode发送一次心跳
    • 心跳返回结果带有NameNode给DataNode的命令
    • 如果10分钟没有收到某个DataNode的心跳,则认为该节点不可用
  • 集群运行中可以安全加入和退出一些机器

环境准备

  • 将node03克隆一台node04
  • 修改ip地址和主机名称
  • 删除原来HDFS文件系统中的data和logs
  • source /etc/profile

服役新数据节点

  • 改一下workers文件,改一下脚本

  • 直接启动DataNode,即可关联到集群

  • 如果数据不均衡,可以用命令实现集群的再平衡

    ./start-balancer.sh

退役旧数据节点

  • 白名单和workers文件内容一致

  • 黑名单用于在集群运行过程中退役DataNode节点

    touch whitelist
    
    node01
    node02
    node03
    node04
  • hdfs-site.xml中添加配置

    <!-- 白名单 -->
    <property>
    <name>dfs.hosts</name>
    <value>/opt/module/hadoop-3.1.3/etc/hadoop/whitelist</value>
    </property>
    <!-- 黑名单 -->
    <property>
    <name>dfs.hosts.exclude</name>
    <value>/opt/module/hadoop-3.1.3/etc/hadoop/blacklist</value>
    </property>
  • 如果要退役node04节点

    touch blacklist
    
    node04
  • 分发改动的文件

点赞
收藏
评论区
推荐文章

暂无数据

lix_uan
lix_uan
Lv1
学无止境,即刻前行
文章
7
粉丝
7
获赞
0