分享两段代码,可以直接在项目中复用:
Map Side Join
===
Reduce Side Join
package ReduceJoin;import java.io.IOException;import org.apache.hadoop.conf.Configuration;import org.apache.hadoop.fs.Path;import org.apache.hadoop.io.LongWritable;import org.apache.hadoop.io.Text;import org.apache.hadoop.mapreduce.Job;import org.apache.hadoop.mapreduce.Mapper;import org.apache.hadoop.mapreduce.Reducer;import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;/*user.csv文件:"ID","NAME","SEX""1","user1","0""2","user2","0""3","user3","0""4","user4","1""5","user5","0""6","user6","0""7","user7","1""8","user8","0""9","user9","0"order.csv文件:"USER_ID","NAME""1","order1""2","order2""3","order3""4","order4""7","order7""8","order8""9","order9"*/public class ReduceJoin {    public static class MapClass extends       Mapper<LongWritable, Text, Text, Text>{    //最好在map方法外定义变量,以减少map计算时创建对象的个数    private Text key = new Text();    private Text value = new Text();    private String[] keyValue = null;          @Override      protected void map(LongWritable key, Text value, Context context)          throws IOException, InterruptedException    {        //value是每一行的内容,Text类型,所有我们要把key从value中解析出来        keyValue = value.toString().split(",", 2);        this.key.set(keyValue[0]);  //把外键设为MapReduce key        this.value.set(keyValue[1]);        context.write(this.key, this.value);    }      }  public static class Reduce extends Reducer<Text, Text, Text, Text>{    private Text value = new Text();          @Override      protected void reduce(Text key, Iterable<Text> values, Context context)              throws IOException, InterruptedException    {        StringBuilder valueStr = new StringBuilder();                  //values中的每一个值是不同数据文件中的具有相同key的值        //即是map中输出的多个文件相同key的value值集合        for(Text val : values)          {            valueStr.append(val);            valueStr.append(",");        }                  this.value.set(valueStr.deleteCharAt(valueStr.length()-1).toString());        context.write(key, this.value);    }      }        public static void main(String[] args) throws IllegalArgumentException, IOException, ClassNotFoundException, InterruptedException {        // TODO Auto-generated method stub        Configuration conf = new Configuration();        Job job = new Job(conf, "MyJoin");                  job.setJarByClass(ReduceJoin.class);        job.setMapperClass(MapClass.class);        job.setReducerClass(Reduce.class);        //job.setCombinerClass(Reduce.class);                  job.setOutputKeyClass(Text.class);        job.setOutputValueClass(Text.class);                  job.setInputFormatClass(TextInputFormat.class);        job.setOutputFormatClass(TextOutputFormat.class);                  FileInputFormat.addInputPath(job, new Path(args[0]));        FileOutputFormat.setOutputPath(job, new Path(args[1]));                  System.exit(job.waitForCompletion(true) ? 0 : 1);    }}

分享和点赞是最大的支持~
本文分享自微信公众号 - 大数据技术与架构(import_bigdata)。
如有侵权,请联系 support@oschina.cn 删除。
本文参与“OSC源创计划”,欢迎正在阅读的你也加入,一起分享。
 
  
  
  
 
 
  
 
 
 