MapReduce之Shuffle,自定义对象,排序已经Combiner

Stella981
• 阅读 809

1. Shuffle:

MapReduce的计算模型主要分为三个阶段,Map, shuffle, Reduce。Map负责数据的过滤,将文件中的数据转化为键值对,Reduce负责合并将具有相同的键的值进行处理合并然后输出到HDFS。为了让Reduce可以并行处理map的结果,必须对Map的输出进行一定的排序和分割,然后交个Reduce,这个过程就是Shuffle。
官方给的图如下:

MapReduce之Shuffle,自定义对象,排序已经Combiner

在这里插入图片描述

上图Map和Reduce之间的就是shuffle,但是猛地一看就是云里雾里的,倒不如下面这个图清楚:

MapReduce之Shuffle,自定义对象,排序已经Combiner

在这里插入图片描述

Map端的shuffle简单来说就是对map的结果进行分区缓存,当缓存不够的时候进行溢写,在溢写的过程中,排序写入到文件,每一次溢写是一个文件,最后将这些文件合并成一个文件。分区排序的意思是相同partition的键值对存储在一起,partition之间是有序有的,每一个partition中的键值对也是有序的,默认是升序。
(1) 缓冲区
Map的输出结果不是直接写到文件的,是先写到缓存区中,缓存区是一个环形结构,是用环形缓存区的目的是尽可能高效的利用内存空间,默认大小是100M,可以通过参数调整缓冲区的大小。如下图:

MapReduce之Shuffle,自定义对象,排序已经Combiner

在这里插入图片描述

这个缓冲区其实是一个字节数组叫做kvbuffer,kvbuffer不只有数据键值对,还有数据的索引叫做kvmeta。

1byte[] kvbuffer;        // main output buffer2private static final int VALSTART = 0;         // val offset in acct3private static final int KEYSTART = 1;         // key offset in acct4private static final int PARTITION = 2;        // partition offset in acct5private static final int VALLEN = 3;           // length of value

索引和数据的放在不同的两个区域,用一个分界点来划分,这个分界点不是一层不变的,会随着每次的溢写而改变,初始的位置为0,数据向上增长,索引向下增长。上图中的buindex是数据的位置索引,一直向上增长,比如初始值为0,写入一个int的key之后变为4,再写入一个int的Value之后变为8.
索引的区记录的是数据键值对的位置,是一个四元组占用4个Int长度:
value的起始位置,key的起始位置,partition的值以及Value的长度。索引的写入是kvindex每次向下跳4个字节,然后再向上填充数据,比如Kvindex初始位置是-4,当第一个键值对写完之后,(Kvindex+0)的位置存放value的起始位置、(Kvindex+1)的位置存放key的起始位置、(Kvindex+2)的位置存放partition的值、(Kvindex+3)的位置存放value的长度,然后Kvindex跳到-8位置,等第二个键值对和索引写完之后,Kvindex跳到-12位置。
kvbuffer 的默认大小为100M,当然可以自己设置:

1public static final String IO_SORT_MB = "mapreduce.task.io.sort.mb";2final int sortmb = job.getInt(JobContext.IO_SORT_MB, 100);

当缓存区达到一定的比例之后,一个后台线程开始把缓存区的数据写入到磁盘,这个写入的过程叫做Spill,即溢写。开始Spill的比例默认是0.8,这个比列可以通过mapreduce.map.sort.spill.percent配置,在后台溢写的同时,map继续向这个剩余的缓存中继续写入数据,写入数据的起始位置是剩余空间的中间,分别向两边写入索引和数据,如果缓存区满了溢写还没与完成的话,map会阻塞直到Spill完成。
spill的比列默认是0.8 也是可以设置的:

1public static final String MAP_SORT_SPILL_PERCENT = "mapreduce.map.sort.spill.percent";2final float spillper = job.getFloat(JobContext.MAP_SORT_SPILL_PERCENT, (float)0.8);

分区是在写入缓存的时候完成的,看了很多博客说是在溢写的时候进行的分区,感觉不是很对。想想也能明白,既然索引中要写入数据了,实在是没必要溢写的时候补上,并且缓存总放到都是byte数组,来回转换不也是麻烦。我看了看源码确实是在写入缓存的时候进行的分区:

1@Override2public void collect(K key, V value) throws IOException {3  try {4    collector.collect(key, value, partitioner.getPartition(key, value, numPartitions));5  } catch (InterruptedException ie) {6    Thread.currentThread().interrupt();7    throw new IOException("interrupt exception", ie);8  }9}

其中 partitioner是自定义分区或者默认分区,默认分区的会就一个为0,后一个numPartitions 实际上为MapReduceTask的个数:

 1NewOutputCollector(org.apache.hadoop.mapreduce.JobContext jobContext, 2                   JobConf job, 3                   TaskUmbilicalProtocol umbilical, 4                   TaskReporter reporter 5                   ) throws IOException, ClassNotFoundException { 6  collector = createSortingCollector(job, reporter); 7  partitions = jobContext.getNumReduceTasks(); 8  if (partitions > 1) { 9    partitioner = (org.apache.hadoop.mapreduce.Partitioner<K,V>)10      ReflectionUtils.newInstance(jobContext.getPartitionerClass(), job);11  } else {12    partitioner = new org.apache.hadoop.mapreduce.Partitioner<K,V>() {13      @Override14      public int getPartition(K key, V value, int numPartitions) {15        return partitions - 1; //默认情况下 numPartitions 为1 所以返回的是016      }17    };18  }19}20其中getNumReduceTasks()为:21public static final String NUM_REDUCES = "mapreduce.job.reduces";22public int getNumReduceTasks() { return getInt(JobContext.NUM_REDUCES, 1); }

很多博客说分区的数量决定了reduceTak的数量,但是看源码知道不是这么回事,ReduceTask的数量是由自己定义的,默认是1,只有一个ReduTask,如果定了多个task,但是自定义分区的返回值超过了task的数量,则会抛异常:

1if (partition < 0 || partition >= partitions) {2  throw new IOException("Illegal partition for " + key + " (" +3      partition + ")");4}

定义 Reducetask的数量有两种方法 一个是在main法中的job定义

1job.setNumReduceTasks(2);

还有就是配置文件,配置文件是指mapper-default.xml

1name的值为mapreduce.job.reduces

如果定义了ReduceTask的个数,却没有之定义分区的话,默认使用的hash 取模的算法:

 1public static final String PARTITIONER_CLASS_ATTR = "mapreduce.job.partitioner.class"; 2public Class<? extends Partitioner<?,?>> getPartitionerClass()  3   throws ClassNotFoundException { 4  return (Class<? extends Partitioner<?,?>>)  5    conf.getClass(PARTITIONER_CLASS_ATTR, HashPartitioner.class); 6} 7------------ 8public class HashPartitioner<K, V> extends Partitioner<K, V> { 910  /** Use {@link Object#hashCode()} to partition. */11  public int getPartition(K key, V value,12                          int numReduceTasks) {13    return (key.hashCode() & Integer.MAX_VALUE) % numReduceTasks;14  }1516}

(2) Spill: 溢写
前面已经说了当环形缓存区的达到一定的时候进行溢写。溢写由单独的线程完成,不耽误mapTask的执行:

1final Condition spillReady = spillLock.newCondition();2private void startSpill() {3  assert !spillInProgress;4  kvend = (kvindex + NMETA) % kvmeta.capacity();5  bufend = bufmark;6  spillInProgress = true;7  spillReady.signal();8}

上面代码中spillRead.signal()的意思是唤醒线程,Condition 是语言级别的等待唤醒机制,与object中的wait/notify意思相同,但是更可控。
唤醒Spill线程之后,首先执行的是排序。排序的规则是是按照缓存中的数据的partition和key进行排序,移动的只是索引数据,排序的结果是先按照分区,分区相同的按照key排序,key的排序规则我们可以自定义。排序过程中使用的算法是快排,当然如果不想用快排我们也可以定义自己的排序规则,排序完之后就是Spill。在Spill 之前如果设置了Combiner,则先调用Combiner,源码如下:

 1sorter.sort(MapOutputBuffer.this, mstart, mend, reporter); 2for (int i = 0; i < partitions; ++i) { 3  IFile.Writer<K, V> writer = null; 4    if (combinerRunner == null) {  5      // spill directly 6      DataInputBuffer key = new DataInputBuffer(); 7      while (spindex < mend && 8          kvmeta.get(offsetFor(spindex % maxRec) + PARTITION) == i) { 9        final int kvoff = offsetFor(spindex % maxRec);10        int keystart = kvmeta.get(kvoff + KEYSTART);11        int valstart = kvmeta.get(kvoff + VALSTART);12        key.reset(kvbuffer, keystart, valstart - keystart);13        getVBytesForOffset(kvoff, value);14        writer.append(key, value);15        ++spindex;16      }17    } else { 当combiner不为空18      int spstart = spindex;19      while (spindex < mend &&20          kvmeta.get(offsetFor(spindex % maxRec)21                    + PARTITION) == i) {22        ++spindex;23      }24      // Note: we would like to avoid the combiner if we've fewer25      // than some threshold of records for a partition26      if (spstart != spindex) {27        combineCollector.setWriter(writer);28        RawKeyValueIterator kvIter =29          new MRResultIterator(spstart, spindex);30        combinerRunner.combine(kvIter, combineCollector);31      }32    }33}

上面只是截取了一部分。Combiner本质上是一个Reduce,对结果进行预处理,先在map端对结果进行一次合并,以减少map 和reduce之间的数据的传输量,提高网络IO性能,是Mapreduce的一种优化手段,但并不是所有的mapReduce的结果都适合设置Combiner,设置Combiner的原则是在不改变Reduce最终结果的前提下。比如说网上说的求和或者最大值可以设置Combiner,但是求平均值就不行,原因也很简单,因为在map端进行了一次和并,如果求平均值的话,map端先求了一次平均值,到Reduce端的值就是每一个map端求平均值之后的平均值了,那么怎么可能最终的平均值不受影响呢!
开始溢写的时候先创建溢写文件,文件名字类似spill1.out,有个变量记录溢写的次数,文件名每次溢写累加1,溢写的过程中以此按照kvmeta中的partition一次写到一个文件中,但是在文件中怎么知道每一个partition呢,MapReduce中利用了索引,索引中记录了每个partition的位置,长度还有压缩之后的长度,刚开始是记录在内存中的,当达到了一定的内存(默认为1M),则写入index文件:

 1private static final int INDEX_CACHE_MEMORY_LIMIT_DEFAULT = 1024 * 1024; 2public static final String INDEX_CACHE_MEMORY_LIMIT = "mapreduce.task.index.cache.limit.bytes"; 3indexCacheMemoryLimit = job.getInt(JobContext.INDEX_CACHE_MEMORY_LIMIT,INDEX_CACHE_MEMORY_LIMIT_DEFAULT); 4 5//达到最大值 6if (totalIndexCacheMemory >= indexCacheMemoryLimit) { 7  // create spill index file 8  Path indexFilename = 9      mapOutputFile.getSpillIndexFileForWrite(numSpills, partitions10          * MAP_OUTPUT_INDEX_RECORD_LENGTH);11  spillRec.writeToFile(indexFilename, job);12} else {13  indexCacheList.add(spillRec);14  totalIndexCacheMemory +=15    spillRec.size() * MAP_OUTPUT_INDEX_RECORD_LENGTH;16}

文件的名字是spillxx.out.index,所以每次溢写至少有一个index文件和一个out文件:

1@Override2public Path getSpillIndexFileForWrite(int spillNumber, long size)3    throws IOException {4  return lDirAlloc.getLocalPathForWrite(MRJobConfig.OUTPUT + "/spill"5      + spillNumber + ".out.index", size, getConf());6}

map任务每次的执行的最终结果都要写到磁盘上,哪怕最后的结果不足与超过上面的0.8,因为后续的Reduce需要拉去数据进行Reduce任务。
当多次spill之后,会产生多个溢写文件,当map任务执行完之后需要合并当前maptask所产生的溢写文件:
merge的方法家叫做mergeParts() 在MapTask文件中,merge首先读取本地所有的的index文件,将index文件加载到内存中,按照partition一次读写所有index文件这个partition的索引信息,每一个partition的索引信息分装成一个Segment,然后对这个partition的所有Segment进行合并,最终合并成一个Segment。具体的merge方法是Merge.java中有个静态类叫做MergeQueue,其中有一个方法叫做的merge方法。规则是分批对Segment进行合并,就是先取出第一批进行合并,,然后从最小堆中每次取出最小的输出到一个临时文件中,这样就把这一批段合并成一个临时的段,把它加回到segment列表中;再从segment列表中把第二批取出来合并输出到一个临时segment,把其加入到列表中;这样往复执行,直到剩下的段是一批,输出到最终的文件中。最终的索引数据仍然输出到Index文件中。最终输出的文件叫做file.out,index文件叫做file.out.index文件。
(3)ReduceShuffle
ReduceShuffle 首先拉取Map端输出的数据,可能会首先copy到内存在内存中合并,也可能是直接copy到硬盘,视情况而定,判定情况如下:

1private boolean canShuffleToMemory(long requestedSize) {2  return (requestedSize < maxSingleShuffleLimit); 3}

首先说内存中的合并,Reduce要向每一个map拉取数据放到内存,当内存占到一定的比列的时候,开始merge数据,merge完之后把数据写到磁盘,如果设置了Combiner的话,name先调用combine,源码可以看下MergeManagerImpl类 中的startMerge这个方法,当属于该reducer的map输出全部拷贝完成,则会在reducer上生成多个文件(如果拖取的所有map数据总量都没有内存缓冲区,则数据就只存在于内存中),这时开始执行合并操作,即磁盘到磁盘merge,Map的输出数据已经是有序的,Merge进行一次合并排序,所谓Reduce端的sort过程就是这个合并的过程。一般Reduce是一边copy一边sort,即copy和sort两个阶段是重叠而不是完全分开的。最终Reduce shuffle过程会输出一个整体有序的数据块, 之后就是调用reduce了。

2. 序列化 :

如果我们我们要定义自己的数据类型的那话,那就需要我们自己实现了,MapReduce中数据的传输都要实现Writeable。很简单,如下

 1public class Student implements Writable { 2    private String name; 3    private int age; 4    public void write(DataOutput out) throws IOException { 5        out.writeUTF(name); 6        out.writeInt(age); 7 8    } 9    public void readFields(DataInput in) throws IOException {1011        this.name = in.readUTF();12        this.age = in.readInt();13    }

主要是实现write和readFields这两个方法,一个写一个读。但是这样是不够的,这样的话自定义的这个对象只能作为value来传递,如果作为key的话,那肯定不行的,这是因为看源码:

 1public RawComparator getOutputKeyComparator() { 2  Class<? extends RawComparator> theClass = getClass( 3    JobContext.KEY_COMPARATOR, null, RawComparator.class); 4  if (theClass != null) 5    return ReflectionUtils.newInstance(theClass, this); 6  return WritableComparator.get(getMapOutputKeyClass().asSubclass(WritableComparable.class), this); 7} 8public <U> Class<? extends U> asSubclass(Class<U> clazz) { 9    if (clazz.isAssignableFrom(this))10        return (Class<? extends U>) this;11    else12        throw new ClassCastException(this.toString());13}

看 上面的代码,如果没有实现WritableComparable这个类的话,就会爆ClassCastException的异常了,因为后面的排序需要用,所以必须实现。不过看上面的代码, if (theClass != null) 这个,如果这个不为空的话也是可以的,而这个类就是我们在main方法里面设置的自定义排序的排序:

1job.setSortComparatorClass();

那么这个WritableComparable是什么呢?

1@InterfaceAudience.Public2@InterfaceStability.Stable3public interface WritableComparable<T> extends Writable, Comparable<T> {4}

看上面的源码,他实现了Writable,所以我们如果想让自定义的对象作为key,那么实现这个接口就可以了,如下:

 1public class Student implements WritableComparable<Student> { 2    private String name; 3    private int age; 4    public void write(DataOutput out) throws IOException { 5        out.writeUTF(name); 6        out.writeInt(age); 7    } 8    public void readFields(DataInput in) throws IOException { 9        this.name = in.readUTF();10        this.age = in.readInt();11    }12    public int compareTo(Student o) {13        return age - o.age;14    }15}

3. 排序:

在MapReduce的输出中都会默认排序,排序规则是按照字典顺序。当然我们可以自定排序规则,上面的自定义对象已经说了。
如果是基本类型的排序呢?比如说 IntWritable 这个类,他的默认排序规则是升序。我如果想让他降序呢?其实也很简单,每一个基本类型中都有这么一个静态类:

 1public static class Comparator extends WritableComparator { 2  public Comparator() { 3    super(IntWritable.class); 4  } 5 6  @Override 7  public int compare(byte[] b1, int s1, int l1, 8                     byte[] b2, int s2, int l2) { 9    int thisValue = readInt(b1, s1);10    int thatValue = readInt(b2, s2);11    return (thisValue<thatValue ? -1 : (thisValue==thatValue ? 0 : 1));12  }13}1415static {                                        // register this comparator16  WritableComparator.define(IntWritable.class, new Comparator());17}

所以我们只要继承这个类,并比较的时候返回数取反即可:

1public class MyCompare extends IntWritable.Comparator {23    @Override4    public int compare(Object a, Object b) {5        return - super.compare(a, b);6    }7}

当然你需要在main方法中的job上设置自定义的比较器:

1job.setSortComparatorClass(MyCompare.class);

3. 分区:

分区的目的是将不同的内容放到不同的文件中,也可以加快处理速度,毕竟每一个分区对应不同的Reduce,但是需要注意的是返回的分区数不能大于设置的reduceTask的个数,上面的shuffle有提到原因:

 1public class MyPartitioner extends Partitioner<Text, IntWritable> { 2 3    public int getPartition(Text text, IntWritable intWritable, int numPartitions) { 4 5        int hashcode = text.hashCode(); 6        System.out.println("text = " + text + " ; hascode " + hashcode + " ; model = " + hashcode % 3); 7 8        return hashcode % 3; 9    }10}1112job.setPartitionerClass(MyPartitioner.class);

最后一个参数numPartitions 表示的是最大分区个数,也就是reduceTask的值,所以不要超过它。

  1. Combiner
    Combiner虽然本质上是一个reduce,但是没有默认的实现,需要自己定义并且在job中设置才可以。Combiner的作用是先做一次本地的合并,减少网络之间的传输量。但是并不是所有的输出都适合使用Combiner,只有那些不会改变最终结果的才适合使用。
    使用它其实很简单,直接继承Reduce即可。

参考文档:
https://blog.csdn.net/bingduanlbd/article/details/51933914
https://blog.csdn.net/u014374284/article/details/49205885

欢迎关注我的公众号,长按关注我:北风中独行的蜗牛。

MapReduce之Shuffle,自定义对象,排序已经Combiner

本文分享自微信公众号 - 北风中独行的蜗牛(manong_xiaodong)。
如有侵权,请联系 support@oschina.cn 删除。
本文参与“OSC源创计划”,欢迎正在阅读的你也加入,一起分享。

点赞
收藏
评论区
推荐文章
blmius blmius
3年前
MySQL:[Err] 1292 - Incorrect datetime value: ‘0000-00-00 00:00:00‘ for column ‘CREATE_TIME‘ at row 1
文章目录问题用navicat导入数据时,报错:原因这是因为当前的MySQL不支持datetime为0的情况。解决修改sql\mode:sql\mode:SQLMode定义了MySQL应支持的SQL语法、数据校验等,这样可以更容易地在不同的环境中使用MySQL。全局s
皕杰报表之UUID
​在我们用皕杰报表工具设计填报报表时,如何在新增行里自动增加id呢?能新增整数排序id吗?目前可以在新增行里自动增加id,但只能用uuid函数增加UUID编码,不能新增整数排序id。uuid函数说明:获取一个UUID,可以在填报表中用来创建数据ID语法:uuid()或uuid(sep)参数说明:sep布尔值,生成的uuid中是否包含分隔符'',缺省为
Jacquelyn38 Jacquelyn38
3年前
2020年前端实用代码段,为你的工作保驾护航
有空的时候,自己总结了几个代码段,在开发中也经常使用,谢谢。1、使用解构获取json数据let jsonData  id: 1,status: "OK",data: 'a', 'b';let  id, status, data: number   jsonData;console.log(id, status, number )
Stella981 Stella981
2年前
Hadoop(十四)——hadoop之MapReduce理论篇(五)——MapReduce详细工作流程
一、Shuffle机制Mapreduce确保每个reducer的输入都是按键排序的。系统执行排序的过程(即将map输出作为输入传给reducer)称为shuffle。二、MapReduce工作流程1.图示流程!(https://oscimg.oschina.net/oscnet/b44af54dac168
Stella981 Stella981
2年前
JS 对象数组Array 根据对象object key的值排序sort,很风骚哦
有个js对象数组varary\{id:1,name:"b"},{id:2,name:"b"}\需求是根据name或者id的值来排序,这里有个风骚的函数函数定义:function keysrt(key,desc) {  return function(a,b){    return desc ? ~~(ak
Stella981 Stella981
2年前
Hadoop学习之路(二十三)MapReduce中的shuffle详解
概述1、MapReduce中,mapper阶段处理的数据如何传递给reducer阶段,是MapReduce框架中最关键的一个流程,这个流程就叫Shuffle2、Shuffle:数据混洗——(核心机制:数据分区,排序,局部聚合,缓存,拉取,再合并排序)3、具体来说:就是将MapTask输出的处理结果数据,按照Par
Wesley13 Wesley13
2年前
00:Java简单了解
浅谈Java之概述Java是SUN(StanfordUniversityNetwork),斯坦福大学网络公司)1995年推出的一门高级编程语言。Java是一种面向Internet的编程语言。随着Java技术在web方面的不断成熟,已经成为Web应用程序的首选开发语言。Java是简单易学,完全面向对象,安全可靠,与平台无关的编程语言。
Stella981 Stella981
2年前
Django中Admin中的一些参数配置
设置在列表中显示的字段,id为django模型默认的主键list_display('id','name','sex','profession','email','qq','phone','status','create_time')设置在列表可编辑字段list_editable
Wesley13 Wesley13
2年前
MySQL部分从库上面因为大量的临时表tmp_table造成慢查询
背景描述Time:20190124T00:08:14.70572408:00User@Host:@Id:Schema:sentrymetaLast_errno:0Killed:0Query_time:0.315758Lock_
Python进阶者 Python进阶者
8个月前
Excel中这日期老是出来00:00:00,怎么用Pandas把这个去除
大家好,我是皮皮。一、前言前几天在Python白银交流群【上海新年人】问了一个Pandas数据筛选的问题。问题如下:这日期老是出来00:00:00,怎么把这个去除。二、实现过程后来【论草莓如何成为冻干莓】给了一个思路和代码如下:pd.toexcel之前把这