Hadoop压缩

Stella981
• 阅读 586

一、Hadoop压缩简介

1、hadoop的3个阶段
    (1)分布式文件系统HDFS
    (2)分布式编程框架MapReduce
    (3)yarn框架

2、Hadoop数据压缩
    MR操作过程中进行大量数据传输。
    压缩技术能够有效的减少底层存储(HDFS)读写字节数。
    压缩提高了网络带宽和磁盘空间的效率。
    数据压缩能够有效的节省资源!
    压缩是mr程序的优化策略!
    通过压缩编码对mapper或者reducer数据传输进行数据的压缩,以减少磁盘IO。

3、压缩的基本原则
    1、运算密集型任务少用压缩
    2、IO密集型的任务,多用压缩

4、MR支持的压缩编码
    压缩格式 | hadoop是否自带? |文件拓展名 | 是否可以切分
    DEFAULT  |       是         | .deflate  |     否
    Gzip     |       是         | .gz       |     否
    bzip2    |       是         | .bz2      |     是
    LZO      |       否         | .lzo      |     是
    Snappy   |       否         | .snappy   |     否

5、编码/解码器
    DEFAULT | org.apache.hadoop.io.compress.DefaultCodeC
    Gzip    | org.apache.hadoop.io.compress.GzipCodeC
    bzip2   | org.apache.hadoop.io.compress.BZip2CodeC
    LZO     | com.hadoop.compression.lzo.LzoCodeC
    Snappy  | org.apache.hadoop.io.compress.SnappyCodeC

6、压缩性能
    压缩算法 | 原始文件大小 | 压缩文件大小| 压缩速度 | 解压速度
    gzip     | 8.3GB        |    1.8GB     |17,5MB/s  |58MB/s
    bzip2    | 8.3GB        |    1.1GB     |2.4MB/s   |9.5MB/s
    LZO      | 8.3gb        |    2.9GB     |49.3MB/s  |74.6MB/s

7、使用方式
    (1)map端输出压缩
        //开启map端的输出压缩
        conf.setBoolean("mapreduce.map.output.compress", true);
        //设置压缩方式
        //conf.setClass("mapreduce.map.output.compress.codec", DefaultCodec.class, CompressionCodec.class);
        conf.setClass("mapreduce.map.output.compress.codec",BZip2Codec.class, CompressionCodec.class);
    (2)reduce端输出压缩
        //开启reduce端的输出压缩
        FileOutputFormat.setCompressOutput(job, true);
        //设置压缩方式
        //FileOutputFormat.setOutputCompressorClass(job, DefaultCodec.class);
        //FileOutputFormat.setOutputCompressorClass(job, BZip2Codec.class);
        FileOutputFormat.setOutputCompressorClass(job, GzipCodec.class);

二、Hadoop压缩使用方式

1.Mapper类

package com.css.compress;

import java.io.IOException;

import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;

public class WordCountMapper extends Mapper<LongWritable, Text, Text, IntWritable>{

    // key 起始偏移量 value 数据 context 上下文
    @Override
    protected void map(LongWritable key, Text value, Context context)
            throws IOException, InterruptedException {
        // 1.读取数据
        String line = value.toString();
        // 2.切割 hello hunter
        String[] words = line.split(" ");
        // 3.循环的写到下一个阶段<hello,1><hunter,1>
        for (String w : words) {
            context.write(new Text(w), new IntWritable(1));
        }
    }
}

2.Reducer类

package com.css.compress;

import java.io.IOException;

import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;

public class WordCountReducer extends Reducer<Text, IntWritable, Text, IntWritable>{

    @Override
    protected void reduce(Text key, Iterable<IntWritable> values,
            Context context) throws IOException, InterruptedException {
        // 1.统计单词出现的次数
        int sum = 0;
        // 2.累加求和
        for (IntWritable count : values) {
            // 拿到值累加
            sum += count.get();
        }
        // 3.结果输出
        context.write(key, new IntWritable(sum));
    }    
}

3.Driver类

package com.css.compress;

import java.io.IOException;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.compress.BZip2Codec;
import org.apache.hadoop.io.compress.CompressionCodec;
import org.apache.hadoop.io.compress.DefaultCodec;
import org.apache.hadoop.io.compress.GzipCodec;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

public class WordCountDriver {
    public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
        // 1.获取job信息
        Configuration conf = new Configuration();
        Job job = Job.getInstance(conf);
        
        // 开启map端的输出压缩
        // conf.setBoolean("mapreduce.map.output.compress", true);
        // 设置压缩方式
        // conf.setClass("mapreduce.map.output.compress.codec", DefaultCodec.class, CompressionCodec.class);
        // conf.setClass("mapreduce.map.output.compress.codec", BZip2Codec.class, CompressionCodec.class);
        
        // 2.获取jar包
        job.setJarByClass(WordCountDriver.class);
        // 3.获取自定义的mapper与reducer类
        job.setMapperClass(WordCountMapper.class);
        job.setReducerClass(WordCountReducer.class);
        // 4.设置map输出的数据类型
        job.setMapOutputKeyClass(Text.class);
        job.setMapOutputValueClass(IntWritable.class);
        // 5.设置reduce输出的数据类型(最终的数据类型)
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(IntWritable.class);
        
        // 开启reduce端的输出压缩
        FileOutputFormat.setCompressOutput(job, true);
        // 设置压缩方式
        // FileOutputFormat.setOutputCompressorClass(job, DefaultCodec.class);
        // FileOutputFormat.setOutputCompressorClass(job, BZip2Codec.class);
        FileOutputFormat.setOutputCompressorClass(job, GzipCodec.class);
        
        // 6.设置输入存在的路径与处理后的结果路径
        FileInputFormat.setInputPaths(job, new Path("c:/compress1031/in"));
        FileOutputFormat.setOutputPath(job, new Path("c:/compress1031/out2"));
        // 7.提交任务
        boolean rs = job.waitForCompletion(true);
        System.out.println(rs?0:1);
    }
}

4.输入文件words.txt

I love Beijing
I love China
Beijing is the capital of China

5.输出文件的名字分别如下

(1)
part-r-00000.bz2

(2)
part-r-00000.deflate

(3)
part-r-00000.gz

三、自定义压缩工具

1.自定义压缩工具类

package com.css.compress;

import java.io.File;
import java.io.FileInputStream;
import java.io.FileOutputStream;
import java.io.IOException;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.io.compress.CompressionCodec;
import org.apache.hadoop.io.compress.CompressionOutputStream;
import org.apache.hadoop.util.ReflectionUtils;

public class TestCompress {
    public static void main(String[] args) throws ClassNotFoundException, IOException {
        compress("c:/compress1031/intest/test.txt","org.apache.hadoop.io.compress.DefaultCodec");
        compress("c:/compress1031/intest/test.txt","org.apache.hadoop.io.compress.BZip2Codec");
        compress("c:/compress1031/intest/test.txt","org.apache.hadoop.io.compress.GzipCodec");
    }
    
    // 测试压缩方法
    private static void compress(String fileName, String method) throws ClassNotFoundException, IOException{
        // 1.获取输入流
        FileInputStream fis = new FileInputStream(new File(fileName));
        Class<?> cName = Class.forName(method);
        CompressionCodec codec = (CompressionCodec) ReflectionUtils.newInstance(cName, new Configuration());
        // 2.输出流
        FileOutputStream fos = new FileOutputStream(new File(fileName + codec.getDefaultExtension()));
        // 3.创建压缩输出流
        CompressionOutputStream cos = codec.createOutputStream(fos);
        // 4.流的对拷
        IOUtils.copyBytes(fis, cos, 1024*1024*2, false);
        // 5.关闭资源
        fis.close();
        cos.close();
        fos.close();
    }
}

2.输入文件名

test.txt

3.输出文件名

(1)
test.txt.deflate

(2)
test.txt.bz2

(3)
test.txt.gz
点赞
收藏
评论区
推荐文章
blmius blmius
2年前
MySQL:[Err] 1292 - Incorrect datetime value: ‘0000-00-00 00:00:00‘ for column ‘CREATE_TIME‘ at row 1
文章目录问题用navicat导入数据时,报错:原因这是因为当前的MySQL不支持datetime为0的情况。解决修改sql\mode:sql\mode:SQLMode定义了MySQL应支持的SQL语法、数据校验等,这样可以更容易地在不同的环境中使用MySQL。全局s
Wesley13 Wesley13
2年前
java将前端的json数组字符串转换为列表
记录下在前端通过ajax提交了一个json数组的字符串,在后端如何转换为列表。前端数据转化与请求varcontracts{id:'1',name:'yanggb合同1'},{id:'2',name:'yanggb合同2'},{id:'3',name:'yang
Stella981 Stella981
2年前
Hadoop2.7.3完全分布式集群安装过程
需要安装的软件Hadoop包含HDFS集群和YARN集群。部署Hadoop就是部署HDFS和YARN集群。机器数量、角色4台。NameNode1台、DataNode3台主机名IP角色amaster192.168.37.143NameNode:9000ResourceManag
Stella981 Stella981
2年前
Hadoop 新 MapReduce 框架 Yarn 详解
HadoopMapReduceV2(Yarn)框架简介原HadoopMapReduce框架的问题对于业界的大数据存储及分布式处理系统来说,Hadoop是耳熟能详的卓越开源分布式文件存储及处理框架,对于Hadoop框架的介绍在此不再累述,读者可参考Hadoop官方简介(https://www.oschina.net/action
Stella981 Stella981
2年前
Hadoop是一种开源的适合大数据的分布式存储和处理的平台
“Hadoop能做什么?”,概括如下:  1)搜索引擎:这也正是DougCutting设计Hadoop的初衷,为了针对大规模的网页快速建立索引;  2)大数据存储:利用Hadoop的分布式存储能力,例如数据备份、数据仓库等;  3)大数据处理:利用Hadoop的分布式处理能力,例如数据挖掘、数据分析等;  4)
Stella981 Stella981
2年前
MapReduce编程模型和计算框架架构原理
Hadoop解决大规模数据分布式计算的方案是MapReduce。MapReduce既是一个编程模型,又是一个计算框架。也就是说,开发人员必须基于MapReduce编程模型进行编程开发,然后将程序通过MapReduce计算框架分发到Hadoop集群中运行。我们先看一下作为编程模型的MapReduce。MapReduce编程模型
Stella981 Stella981
2年前
Spark源码剖析
4. Hadoop相关配置及Executor环境变量的设置4.1 Hadoop相关配置信息默认情况下,Spark使用HDFS作为分布式文件系统,所以需要获取Hadoop相关配置信息的代码如下:!(https://oscimg.oschina.net/
Stella981 Stella981
2年前
Hadoop之Mapreduce详解
1、什么是Mapreduce   Mapreduce是一个分布式运算程序的编程框架,是用户开发“基于hadoop的数据分析应用”的核心框架;Mapreduce核心功能是将用户编写的业务逻辑代码和自带默认组件整合成一个完整的分布式运算程序,并发运行在一个hadoop集群上;2、Mapreduce框架结构及核心运行机制
Stella981 Stella981
2年前
Hadoop技术原理总结
Hadoop技术原理总结1、Hadoop运行原理Hadoop是一个开源的可运行于大规模集群上的分布式并行编程框架,其最核心的设计包括:MapReduce和HDFS。基于Hadoop,你可以轻松地编写可处理海量数据的分布式并行程序,并将其运行于由成百上千个结点组成的大规模计算机集群上。基于MapReduce计算模型编写分布式并行程序相对简单,
Wesley13 Wesley13
2年前
MAPREDUCER学习笔记
MAPREDUCE基本原理      一,概念理解  1,Mapreduce是一个分布式运算程序的编程架构,相对于HDFS来说就是客户端。其核心功能就是将用户编写的业务逻辑代码和自带默认组件整合成一个完整的分布式运算程序,并运行在一个hadoop集群上。  2,基本整体架构:MEAppMaster,MapTask,R