36 Mapreduce
lix_uan 589 1

MapReduce概述

MapReduce优缺点

优点

  • 良好的扩展性
  • 高容错性

缺点

  • 不擅长实时计算
  • 不擅长流式计算

MapReduce核心编程思想

36 Mapreduce

MapReduce进程

  • AppMaster:负责整个程序的过程调度及状态协调
  • MapTask:负责Map阶段的整个数据处理流程
  • ReduceTask:负责Reduce阶段的整个数据处理流程

WordCount案例实操

pom.xml添加依赖

<dependencies>
    <dependency>
        <groupId>junit</groupId>
        <artifactId>junit</artifactId>
        <version>4.12</version>
    </dependency>
    <dependency>
        <groupId>org.apache.logging.log4j</groupId>
        <artifactId>log4j-slf4j-impl</artifactId>
        <version>2.12.0</version>
    </dependency>
    <dependency>
        <groupId>org.apache.hadoop</groupId>
        <artifactId>hadoop-client</artifactId>
        <version>3.1.3</version>
    </dependency>
</dependencies>

<build>
    <plugins>
        <plugin>
            <artifactId>maven-compiler-plugin</artifactId>
            <version>3.8.0</version>
            <configuration>
                <source>1.8</source>
                <target>1.8</target>
            </configuration>
        </plugin>
        <plugin>
            <artifactId>maven-assembly-plugin</artifactId>
            <configuration>
                <descriptorRefs>
                    <descriptorRef>jar-with-dependencies</descriptorRef>
                </descriptorRefs>
            </configuration>
            <executions>
                <execution>
                    <id>make-assembly</id>
                    <phase>package</phase>
                    <goals>
                        <goal>single</goal>
                    </goals>
                </execution>
            </executions>
        </plugin>
    </plugins>
</build>

src/main/resources目录下新建log4j2.xml

<?xml version="1.0" encoding="UTF-8"?>
<Configuration status="error" strict="true" name="XMLConfig">
    <Appenders>
        <!-- 类型名为Console,名称为必须属性 -->
        <Appender type="Console" name="STDOUT">
            <Layout type="PatternLayout"
                    pattern="[%p] [%d{yyyy-MM-dd HH:mm:ss}][%c{10}]%m%n" />
        </Appender>

    </Appenders>

    <Loggers>
        <!-- 可加性为false -->
        <Logger name="test" level="info" additivity="false">
            <AppenderRef ref="STDOUT" />
        </Logger>

        <!-- root loggerConfig设置 -->
        <Root level="info">
            <AppenderRef ref="STDOUT" />
        </Root>
    </Loggers>

</Configuration>

编写Mapper类

public class WordcountMapper extends Mapper<LongWritable, Text, Text, IntWritable>{

    Text k = new Text();
    IntWritable v = new IntWritable(1);

    @Override
    protected void map(LongWritable key, Text value, Context context)    throws IOException, InterruptedException {

        // 1 获取一行
        String line = value.toString();

        // 2 切割
        String[] words = line.split(" ");

        // 3 输出
        for (String word : words) {

            k.set(word);
            context.write(k, v);
        }
    }
}

编写Reducer类

public class WordcountReducer extends Reducer<Text, IntWritable, Text, IntWritable>{

int sum;
IntWritable v = new IntWritable();

    @Override
    protected void reduce(Text key, Iterable<IntWritable> values,Context context) throws IOException, InterruptedException {

        // 1 累加求和
        sum = 0;
        for (IntWritable count : values) {
            sum += count.get();
        }

        // 2 输出
         v.set(sum);
        context.write(key,v);
    }
}

编写Driver类

public class WordcountDriver {

    public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {

        // 1 获取配置信息以及封装任务
        Configuration configuration = new Configuration();

        Job job = Job.getInstance(configuration);

        // 2 设置jar加载路径
        job.setJarByClass(WordCountDriver.class);

        // 3 设置map和reduce类
        job.setMapperClass(WordcountMapper.class);
        job.setReducerClass(WordcountReducer.class);

        // 4 设置map输出
        job.setMapOutputKeyClass(Text.class);
        job.setMapOutputValueClass(IntWritable.class);

        // 5 设置最终输出kv类型
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(IntWritable.class);

        // 6 设置输入和输出路径
        FileInputFormat.setInputPaths(job, new Path(args[0]));
        FileOutputFormat.setOutputPath(job, new Path(args[1]));

        // 7 提交
        boolean result = job.waitForCompletion(true);

        System.exit(result ? 0 : 1);
    }
}

打包到hadoop中执行

hadoop jar jar包名 全类名 输入路径 输出路径

#输入路径为一个目录
#输出路径不能存在

Hadoop序列化

序列化概述

  • 序列化:把内存中的对象,转换成字节序列,以便存储到磁盘(持久化)和网络传输

常用数据序列化类型

Java类型 Hadoop Writable类型
Boolean BooleanWritable
Byte ByteWritable
Int IntWritable
Float FloatWritable
Long LongWritable
Double DoubleWritable
String Text
Map MapWritable
Array ArrayWritable
Null NullWritable

序列化案例实操

案例分析

36 Mapreduce

编写流量统计的Bean对象

// 1 实现writable接口
public class FlowBean implements Writable{

    private long upFlow;
    private long downFlow;
    private long sumFlow;

    //2  反序列化时,需要反射调用空参构造函数,所以必须有
    public FlowBean() {
        super();
    }

    public FlowBean(long upFlow, long downFlow) {
        super();
        this.upFlow = upFlow;
        this.downFlow = downFlow;
        this.sumFlow = upFlow + downFlow;
    }

    //3  写序列化方法
    @Override
    public void write(DataOutput out) throws IOException {
        out.writeLong(upFlow);
        out.writeLong(downFlow);
        out.writeLong(sumFlow);
    }

    //4 反序列化方法
    //5 反序列化方法读顺序必须和写序列化方法的写顺序必须一致
    @Override
    public void readFields(DataInput in) throws IOException {
        this.upFlow  = in.readLong();
        this.downFlow = in.readLong();
        this.sumFlow = in.readLong();
    }

    // 6 编写toString方法,方便后续打印到文本
    @Override
    public String toString() {
        return upFlow + "\t" + downFlow + "\t" + sumFlow;
    }

    public long getUpFlow() {
        return upFlow;
    }

    public void setUpFlow(long upFlow) {
        this.upFlow = upFlow;
    }

    public long getDownFlow() {
        return downFlow;
    }

    public void setDownFlow(long downFlow) {
        this.downFlow = downFlow;
    }

    public long getSumFlow() {
        return sumFlow;
    }

    public void setSumFlow(long sumFlow) {
        this.sumFlow = sumFlow;
    }
}

编写Mapper类

public class FlowCountMapper extends Mapper<LongWritable, Text, Text, FlowBean>{

    FlowBean v = new FlowBean();
    Text k = new Text();

    @Override
    protected void map(LongWritable key, Text value, Context context)    throws IOException, InterruptedException {

        // 1 获取一行
        String line = value.toString();

        // 2 切割字段
        String[] fields = line.split("\t");

        // 3 封装对象
        // 取出手机号码
        String phoneNum = fields[1];

        // 取出上行流量和下行流量
        long upFlow = Long.parseLong(fields[fields.length - 3]);
        long downFlow = Long.parseLong(fields[fields.length - 2]);

        k.set(phoneNum);
        v.setDownFlow(downFlow);
        v.setUpFlow(upFlow);

        // 4 写出
        context.write(k, v);
    }
}

编写Reducer类

public class FlowCountReducer extends Reducer<Text, FlowBean, Text, FlowBean> {

    @Override
    protected void reduce(Text key, Iterable<FlowBean> values, Context context)throws IOException, InterruptedException {

        long sum_upFlow = 0;
        long sum_downFlow = 0;

        // 1 遍历所用bean,将其中的上行流量,下行流量分别累加
        for (FlowBean flowBean : values) {
            sum_upFlow += flowBean.getUpFlow();
            sum_downFlow += flowBean.getDownFlow();
        }

        sum_flow = sum_downflow + sum_upflow;

        FlowBean result = new FlowBean(sum_upflow, sum_downflow,sum_flow);

        context.write(key, result);
    }
}

编写Driver类

public class FlowsumDriver {

    public static void main(String[] args) throws IllegalArgumentException, IOException, ClassNotFoundException, InterruptedException {

        // 1 获取配置信息,或者job对象实例
        Configuration configuration = new Configuration();
        Job job = Job.getInstance(configuration);

        // 2 指定本程序的jar包所在的本地路径
        job.setJarByClass(FlowsumDriver.class);

        // 3 指定本业务job要使用的mapper/Reducer业务类
        job.setMapperClass(FlowCountMapper.class);
        job.setReducerClass(FlowCountReducer.class);

        // 4 指定mapper输出数据的kv类型
        job.setMapOutputKeyClass(Text.class);
        job.setMapOutputValueClass(FlowBean.class);

        // 5 指定最终输出的数据的kv类型
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(FlowBean.class);

        // 6 指定job的输入原始文件所在目录
        FileInputFormat.setInputPaths(job, new Path(args[0]));
        FileOutputFormat.setOutputPath(job, new Path(args[1]));

        // 7 将job中配置的相关参数,以及job所用的java类所在的jar包, 提交给yarn去运行
        boolean result = job.waitForCompletion(true);
        System.exit(result ? 0 : 1);
    }
}

Map框架原理

数据切片与MapTask并行度决定机制

36 Mapreduce

FileInputFormat切片机制

  • 切片大小默认等于Block大小
  • 切片时不考虑数据集整体,而实针对每一个文件单独切片

CombineTextInputFormat

  • 如果有大量小文件,就会产生大量的MapTask,处理效率极其低下 36 Mapreduce

MapReduce详细工作流程

36 Mapreduce 36 Mapreduce

Shuffle过程详解

  • MapTask收集map()方法输出的kv对,放到内存缓冲区中
  • 从内存缓冲区不断溢出本地磁盘文件,可能会溢出多个文件
  • 多个溢出文件会被合并成大的溢出文件
  • 在溢出过程及合并的过程中,都会调用Partitioner进行分区和针对key进行排序
  • ReduceTask根据自己的分区号,去各个MapTask机器上取相应的结果分区数据
  • ReduceTask会抓取到同一个分区来自不同MapTask的结果文件,ReduceTask会将这些文件再进行合并(归并排序)
  • 合并成大文件后,Shuffle的过程也就结束了
  • 注意:Shuffle中的缓冲区大小会影响到MapReduce程序的执行效率。原则上,缓冲区越大,磁盘io的次数越少,执行速度就越快

Shuffle机制图解

36 Mapreduce

Partition分区案例实操

案例分析

36 Mapreduce

编写分区类

public class ProvincePartitioner extends Partitioner<Text, FlowBean> {

    @Override
    public int getPartition(Text key, FlowBean value, int numPartitions) {

        // 1 获取电话号码的前三位
        String preNum = key.toString().substring(0, 3);

        int partition = 4;

        // 2 判断是哪个省
        if ("136".equals(preNum)) {
            partition = 0;
        }else if ("137".equals(preNum)) {
            partition = 1;
        }else if ("138".equals(preNum)) {
            partition = 2;
        }else if ("139".equals(preNum)) {
            partition = 3;
        }

        return partition;
    }
}

Driver类增加自定义数据分区设置和ReduceTask设置

job.setPartitionerClass(ProvincePartitioner.class);

job.setNumReduceTasks(5);

WritableComparable排序案例

36 Mapreduce

Combiner合并案例

36 Mapreduce

Map Join

  • Map表适用于一张表十分小、一张表很大的场景

案例需求

36 Mapreduce 36 Mapreduce 36 Mapreduce

MapJoinDriver

// 加载缓存数据
job.addCacheFile(new URI("hdfs://node01/cache/pd.txt"));
// Map端Join的逻辑不需要Reduce阶段,设置reduceTask数量为0
job.setNumReduceTasks(0);

MapJoinMapper中的setup方法中读取缓存文件

 //任务开始前将pd数据缓存进pdMap
    @Override
    protected void setup(Context context) throws IOException, InterruptedException {
        //通过缓存文件得到小表数据pd.txt
        URI[] cacheFiles = context.getCacheFiles();
        Path path = new Path(cacheFiles[0]);
        //获取文件系统对象,并开流
        FileSystem fs = FileSystem.get(context.getConfiguration());
        FSDataInputStream fis = fs.open(path);
        //通过包装流转换为reader,方便按行读取
        BufferedReader reader = new BufferedReader(new InputStreamReader(fis, "UTF-8"));
        //逐行读取,按行处理
        String line;
        while (StringUtils.isNotEmpty(line = reader.readLine())) {
            //切割一行    
//01    小米
            String[] split = line.split("\t");
            pdMap.put(split[0], split[1]);
        }
        //关流
        IOUtils.closeStream(reader);
    }

    @Override
    protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
        //读取大表数据    
//1001    01    1
        String[] fields = value.toString().split("\t");
        //通过大表每行数据的pid,去pdMap里面取出pname
        String pname = pdMap.get(fields[1]);
        //将大表每行数据的pid替换为pname
        text.set(fields[0] + "\t" + pname + "\t" + fields[2]);
        //写出
        context.write(text,NullWritable.get());
    }
}

Yarn资源调度器

Yarn架构

36 Mapreduce

Yarn工作机制

  • MR程序提交到客户端所在的节点
  • YarnRunner向ResourceManager申请一个Application
  • RM将应用程序资源路径返回给YarnRunner
  • 该程序将运行所需资源提交到HDFS上
  • 资源提交完毕后,,申请运行AppMaster
  • RM将用户请求初始化成一个Task
  • 其中一个NodeManager领取到Task任务
  • NodeManager创建容器Container,并产生AppMaster
  • Container从HDFS上拷贝资源到本地
  • AppMaster向RM申请运行MapTask资源
  • RM将运行MapTask任务分配给另外两个NodeManager,另两个NodeManager分别领取任务并创建容器
  • MR向两个接收到任务的NodeManager发送程序启动脚本,这两个NodeManager分别启动MapTask,MapTask对数据分区排序
  • AppMaster等待所有MapTask运行完毕后,向RM申请容器,运行ReduceTask
  • ReduceTask向MapTask获取相应分区的数据
  • 程序运行完毕后,MR会向RM申请注销自己

容量调度器

36 Mapreduce

  • 容量保证
    • 管理员可为每个队列设置资源最低保证和资源使用上限,所有提交到该队列的应用程序共享这些资源
  • 灵活性
    • 如果一个队列中有资源剩余,可以暂时共享给那些需要资源的队列
    • 一旦该队列有新的应用程序提交,其他队列借调的资源会归还给该队列

容量调度器多队列提交案例

  • Yarn默认是一条单队列的调度器,实际使用中会出现单个任务阻塞整个队列的情况
  • 同时公司需要分业务限制集群使用率
  • 这就需要我们按照业务种类配置多条任务队列

修改capacity-shceduler.xml

<!-- 指定多队列,增加hive队列 -->
<property>
    <name>yarn.scheduler.capacity.root.queues</name>
    <value>default,hive</value>
    <description>
      The queues at the this level (root is the root queue).
    </description>
</property>

<!-- 降低default队列资源额定容量为40%,默认100% -->
<property>
    <name>yarn.scheduler.capacity.root.default.capacity</name>
    <value>40</value>
</property>

<!-- 降低default队列资源最大容量为60%,默认100% -->
<property>
    <name>yarn.scheduler.capacity.root.default.maximum-capacity</name>
    <value>60</value>
</property>
<!-- 指定hive队列的资源额定容量 -->
<property>
    <name>yarn.scheduler.capacity.root.hive.capacity</name>
    <value>60</value>
</property>

<property>
    <name>yarn.scheduler.capacity.root.hive.user-limit-factor</name>
    <value>1</value>
</property>

<!-- 指定hive队列的资源最大容量 -->
<property>
    <name>yarn.scheduler.capacity.root.hive.maximum-capacity</name>
    <value></value>
</property>

<property>
    <name>yarn.scheduler.capacity.root.hive.state</name>
    <value>RUNNING</value>
</property>

<property>
    <name>yarn.scheduler.capacity.root.hive.acl_submit_applications</name>
    <value>*</value>
</property>

<property>
    <name>yarn.scheduler.capacity.root.hive.acl_administer_queue</name>
    <value>*</value>
</property>

<property>
    <name>yarn.scheduler.capacity.root.hive.acl_application_max_priority</name>
    <value>*</value>
</property>

<property>
    <name>yarn.scheduler.capacity.root.hive.maximum-application-lifetime</name>
    <value>-1</value>
</property>

<property>
    <name>yarn.scheduler.capacity.root.hive.default-application-lifetime</name>
    <value>-1</value>
</property>
  • 配置完成后使用命令yarn rmadmin -refreshQueues刷新队列

  • 默认的任务都是提交到default队列的,如果希望向其他队列提交任务,需要在Driver类中声明

    configuration.set("mapreduce.job.queuename","hive");
评论区

索引目录