MapReduce之自定义分区器Partitioner

Stella981
• 阅读 484

@

目录

  • 问题引出
  • 默认Partitioner分区
  • 自定义Partitioner步骤
  • Partition分区案例实操
  • 分区总结

问题引出

要求将统计结果按照条件输出到不同文件中(分区)。

比如:将统计结果按照手机归属地不同省份输出到不同文件中(分区)

默认Partitioner分区

public class HashPartitioner<K,V> extends Partitioner<K,V>{
    public int getPartition(K key,V value, int numReduceTasks){
        return (key.hashCode() & Integer.MAX VALUE) & numReduceTasks;
    }
}
  • 默认分区是根据keyhashCodeReduceTasks个数取模得到的。
  • 用户没法控制哪个key存储到哪个分区。

自定义Partitioner步骤

  1. 自定义类继承Partitioner,重写getPartition()方法

    public class CustomPartitioner extends Partitioner<Text,FlowBea>{ @Override public int getPartition(Text key,FlowBean value,int numPartitions){ //控制分区代码逻辑 …… return partition; } }

  2. 在Job驱动类中,设置自定义Partitioner

    job.setPartitionerClass(CustomPartitioner.class)

  3. 自定义Partition后,要根据自定义Partitioner的逻辑设置相应数量的ReduceTask

    job.setNumReduceTask(5);//假设需要分5个区

Partition分区案例实操

将统计结果按照手机归属地不同省份输出到不同文件中(分区)

输入数据:
MapReduce之自定义分区器Partitioner

期望输出数据:
手机号136、137、138、139开头都分别放到一个独立的4个文件中,其他开头的放到一个文件中。所以总共分为5个文件,也就是五个区。

相比于之前的自定义flowbean,这次自定义分区,只需要多编写一个分区器,以及在job驱动类中设置分区器,mapper和reducer类不改变

MyPartitioner.java

/*
 * KEY, VALUE: Mapper输出的Key-value类型
 */
public class MyPartitioner extends Partitioner<Text, FlowBean>{

    // 计算分区  numPartitions为总的分区数,reduceTask的数量
    // 分区号必须为int型的值,且必须符合 0<= partitionNum < numPartitions
    @Override
    public int getPartition(Text key, FlowBean value, int numPartitions) {
        
        String suffix = key.toString().substring(0, 3);//前开后闭,取手机号前三位数
        
        int partitionNum=0;//分区编号
        
        
        switch (suffix) {
        case "136":
            partitionNum=numPartitions-1;//由于分区编号不能大于分区总数,所以用这种方法比较好
            break;
        case "137":
            partitionNum=numPartitions-2;
            break;
        case "138":
            partitionNum=numPartitions-3;
            break;
        case "139":
            partitionNum=numPartitions-4;
            break;

        default:
            break;
        }

        return partitionNum;
    }

}

FlowBeanDriver.java

public class FlowBeanDriver {
    
    public static void main(String[] args) throws Exception {
        
        Path inputPath=new Path("e:/mrinput/flowbean");
        Path outputPath=new Path("e:/mroutput/partitionflowbean");
        
        //作为整个Job的配置
        Configuration conf = new Configuration();
        
        //保证输出目录不存在
        FileSystem fs=FileSystem.get(conf);
        
        if (fs.exists(outputPath)) {
            fs.delete(outputPath, true);
        }
        
        // ①创建Job
        Job job = Job.getInstance(conf);
        
        // ②设置Job
        // 设置Job运行的Mapper,Reducer类型,Mapper,Reducer输出的key-value类型
        job.setMapperClass(FlowBeanMapper.class);
        job.setReducerClass(FlowBeanReducer.class);
        
        // Job需要根据Mapper和Reducer输出的Key-value类型准备序列化器,通过序列化器对输出的key-value进行序列化和反序列化
        // 如果Mapper和Reducer输出的Key-value类型一致,直接设置Job最终的输出类型
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(FlowBean.class);
        
        // 设置输入目录和输出目录
        FileInputFormat.setInputPaths(job, inputPath);
        FileOutputFormat.setOutputPath(job, outputPath);
        
        // 设置ReduceTask的数量为5
        job.setNumReduceTasks(5);
        
        // 设置使用自定义的分区器
        job.setPartitionerClass(MyPartitioner.class);
        
        // ③运行Job
        job.waitForCompletion(true);
        
    }
}

FlowBeanMapper.java

/*
 * 1. 统计手机号(String)的上行(long,int),下行(long,int),总流量(long,int)
 * 
 * 手机号为key,Bean{上行(long,int),下行(long,int),总流量(long,int)}为value
 *         
 * 
 * 
 * 
 */
public class FlowBeanMapper extends Mapper<LongWritable, Text, Text, FlowBean>{
    
    private Text out_key=new Text();
    private FlowBean out_value=new FlowBean();
    
    // (0,1    13736230513    192.196.100.1    www.atguigu.com    2481    24681    200)
    @Override
    protected void map(LongWritable key, Text value, Mapper<LongWritable, Text, Text, FlowBean>.Context context)
            throws IOException, InterruptedException {
        
        String[] words = value.toString().split("\t");
        
        //封装手机号
        out_key.set(words[1]);
        // 封装上行
        out_value.setUpFlow(Long.parseLong(words[words.length-3]));
        // 封装下行
        out_value.setDownFlow(Long.parseLong(words[words.length-2]));

        context.write(out_key, out_value);
    }
}

FlowBeanReducer.java

public class FlowBeanReducer extends Reducer<Text, FlowBean, Text, FlowBean>{
    
    private FlowBean out_value=new FlowBean();
    
    @Override
    protected void reduce(Text key, Iterable<FlowBean> values, Reducer<Text, FlowBean, Text, FlowBean>.Context context)
            throws IOException, InterruptedException {
        
        long sumUpFlow=0;
        long sumDownFlow=0;
        
        for (FlowBean flowBean : values) {
            
            sumUpFlow+=flowBean.getUpFlow();
            sumDownFlow+=flowBean.getDownFlow();
            
        }
        
        out_value.setUpFlow(sumUpFlow);
        out_value.setDownFlow(sumDownFlow);
        out_value.setSumFlow(sumDownFlow+sumUpFlow);
        
        context.write(key, out_value);
        
    }
}

FlowBean.java

public class FlowBean implements Writable{
    
    private long upFlow;
    private long downFlow;
    private long sumFlow;
    
    public FlowBean() {
        
    }

    public long getUpFlow() {
        return upFlow;
    }

    public void setUpFlow(long upFlow) {
        this.upFlow = upFlow;
    }

    public long getDownFlow() {
        return downFlow;
    }

    public void setDownFlow(long downFlow) {
        this.downFlow = downFlow;
    }

    public long getSumFlow() {
        return sumFlow;
    }

    public void setSumFlow(long sumFlow) {
        this.sumFlow = sumFlow;
    }

    // 序列化   在写出属性时,如果为引用数据类型,属性不能为null
    @Override
    public void write(DataOutput out) throws IOException {
        
        out.writeLong(upFlow);
        out.writeLong(downFlow);
        out.writeLong(sumFlow);
        
        
    }

    //反序列化   序列化和反序列化的顺序要一致
    @Override
    public void readFields(DataInput in) throws IOException {
        upFlow=in.readLong();
        downFlow=in.readLong();
        sumFlow=in.readLong();
        
    }

    @Override
    public String toString() {
        return  upFlow + "\t" + downFlow + "\t" + sumFlow;
    }
}

输出结果:
总共五个文件
MapReduce之自定义分区器Partitioner
一号区:
MapReduce之自定义分区器Partitioner
二号区:
MapReduce之自定义分区器Partitioner
三号区:
MapReduce之自定义分区器Partitioner

四号区:
MapReduce之自定义分区器Partitioner

其他号码为第五号区:
MapReduce之自定义分区器Partitioner

分区总结

  • 如果ReduceTask的数量 > getPartition的结果数,则会多产生几个空的输出文件part-r-000xx
  • 如果Reduceask的数量 < getPartition的结果数,则有一部分分区数据无处安放,会Exception
  • 如果ReduceTask的数量 = 1,则不管MapTask端输出多少个分区文件,最终结果都交给这一个ReduceTask,最终也就只会产生一个结果文件partr-00000

以刚才的案例分析:
例如:假设自定义分区数为5,则

  • job.setlNlurmReduce Task(1);会正常运行,只不过会产生一个输出文件
  • job.setlNlunReduce Task(2),会报错
  • job.setNumReduceTasks(6);大于5,程序会正常运行,会产生空文件
点赞
收藏
评论区
推荐文章
blmius blmius
2年前
MySQL:[Err] 1292 - Incorrect datetime value: ‘0000-00-00 00:00:00‘ for column ‘CREATE_TIME‘ at row 1
文章目录问题用navicat导入数据时,报错:原因这是因为当前的MySQL不支持datetime为0的情况。解决修改sql\mode:sql\mode:SQLMode定义了MySQL应支持的SQL语法、数据校验等,这样可以更容易地在不同的环境中使用MySQL。全局s
Stella981 Stella981
2年前
Linux VPS新硬盘分区与挂载教程
通过fdiskl我们可以看到/dev/xvdb(此名称因系统而异)容量有23.6G,而且没有分区,接下来我们对它进行分区和挂载,(按照如下方式挂载,重装系统不丢失挂载分区硬盘的数据)注:挂载硬盘过程请按照下面代码“蓝色”标记复制执行既可,不同Linux系统,只需途径名称修改对应系统名称。1.fdiskl
Stella981 Stella981
2年前
Linux和Uboot下eMMC boot分区读写
 关键词:eMMCboot、PARTITION\_CONFIG、force\_ro等。1\.eMMC的分区大部分eMMC都有类似如下的分区,其中BOOT、RPMB和UDA一般是默认存在的,gpp分区需要手动创建。!(https://oscimg.oschina.net/oscnet/bb03777529d529b2
Stella981 Stella981
2年前
Apache Hudi重磅RFC解读之记录级别全局索引
1\.摘要Hudi表允许多种类型操作,包括非常常用的upsert,当然为支持upsert,Hudi依赖索引机制来定位记录在哪些文件中。当前Hudi支持分区和非分区的数据集。分区数据集是将一组文件(数据)放在称为分区的桶中的数据集。一个Hudi数据集可能由N个分区和M个文件组成,这种组织结构也非常方便hive/presto/sp
Stella981 Stella981
2年前
Consistent hashing一致性算法原理
最近在整理redis分布式集群,首先就整理一下分布式算法原理。常见的分区规则有哈希分区和顺序分区两种,Redis采用的是哈希分区规则。节点取余分区使用特定的数据,如Redis的键或用户ID为key,节点数量为N,则:hash(key)%N,计算出哈希值,然后决定映射到哪个节点上,如节点数为4时,哈希值的结果可能为0、1、2,3.现假
Stella981 Stella981
2年前
Hadoop学习之路(二十三)MapReduce中的shuffle详解
概述1、MapReduce中,mapper阶段处理的数据如何传递给reducer阶段,是MapReduce框架中最关键的一个流程,这个流程就叫Shuffle2、Shuffle:数据混洗——(核心机制:数据分区,排序,局部聚合,缓存,拉取,再合并排序)3、具体来说:就是将MapTask输出的处理结果数据,按照Par
Wesley13 Wesley13
2年前
Mysql 表分区分类
针对Mysql数据库,表分区类型简析。【1】表分区类型(1)Range分区:按范围分区。按列值的范围区间进行分区存储;比如:id小于10存储在一个分区;id大于10小于20存储在另外一个分区;(2)List分区:按离散值集合分区。与range分区类似,不过它是按离散值进行分区。(3)Hash分区:按hash算法结果分区。对用户定义的表达式所返
Wesley13 Wesley13
2年前
MySQL 分区表原理及使用详解
1\.什么是表分区?表分区,是指根据一定规则,将数据库中的一张表分解成多个更小的,容易管理的部分。从逻辑上看,只有一张表,但是底层却是由多个物理分区组成。2\.表分区与分表的区别分表:指的是通过一定规则,将一张表分解成多张不同的表。比如将用户订单记录根据时间成多个表。分表与分区的区别在于:
Stella981 Stella981
2年前
Kafka重平衡机制
点击蓝色字体“肉眼品世界”,关注公众号深度价值体系传递!(https://oscimg.oschina.net/oscnet/cdaf2bb2b6804d68997f17d08fa4ea00.jpg)当集群中有新成员加入,或者某些主题增加了分区之后,消费者是怎么进行重新分配分区再进行消费的?这里就涉及到重平衡(Rebala
Wesley13 Wesley13
2年前
Mysql合并表原理
1.概述:合并表是一种早期的、简单的分区实现,和分区表相比有一些不同的限制,并且缺乏优化。分区表严格来说是一个逻辑上的概念,用户无法访问底层的各个分区,对用户来说分区是透明的。但是合并表允许用户单独访问各个子表。分区表和优化器的结合更紧密,这也是未来发展的趋势,而合并表则是一种将要被淘汰的技术,在未来的版本中可能被删除。2.原理: