MapReduce应用

Stella981
• 阅读 576

1、MapReduce实现矩阵相乘

一. 准备数据

#!/bin/bash
if [ $# -ne 3 ]
then
  echo "there must be 3 arguments to generate the two matries file!"
  exit 1
fi
cat /dev/null > M_$1_$2
cat /dev/null > N_$2_$3
for i in `seq 1 $1`
do
    for j in `seq 1 $2`
    do
        s=$((RANDOM%100))
        echo -e "$i,$j\t$s" >>M_$1_$2
    done
done
echo "we have built the matrix file M"

for i in `seq 1 $2`
    do
    for j in ` seq 1 $3`
    do
        s=$((RANDOM%100))
        echo -e "$i,$j\t$s" >>N_$2_$3 
    done
done
echo "we have built the matrix file N"

用一下脚本语言准备数组数据

M_3_2:
1,1    81
1,2    13
2,1    38
2,2    46
3,1    0
3,2    2

N_2_4:
1,1    99
1,2    38
1,3    34
1,4    19
2,1    21
2,2    4
2,3    36
2,4    64

二. 计算

public class Matrix {

    private static class MatrixMapper extends
            Mapper<LongWritable, Text, Text, Text> {

        private static int colN = 0;
        private static int rowM = 0;

        @Override
        protected void setup(
                Mapper<LongWritable, Text, Text, Text>.Context context)
                throws IOException, InterruptedException {

            Configuration configuration = context.getConfiguration();
            colN = configuration.getInt("colN", 0);
            rowM = configuration.getInt("rowM", 0);

        }

        @Override
        protected void map(LongWritable key, Text value,
                Mapper<LongWritable, Text, Text, Text>.Context context)
                throws IOException, InterruptedException {

            FileSplit fileSplit = (FileSplit) context.getInputSplit();
            String fileName = fileSplit.getPath().getName();
            String[] strings = value.toString().split(",");
            int i = Integer.parseInt(strings[0]);
            String[] ser = strings[1].split("\t");
            int j = Integer.parseInt(ser[0]);
            int val = Integer.parseInt(ser[1]);

            if (fileName.startsWith("M")) {

                for (int count = 1; count <= colN; count++) {
                    context.write(new Text(i + "," + count), new Text("M," + j
                            + "," + val + ""));
                }

            } else {

                for (int count = 1; count <= rowM; count++) {
                    context.write(new Text(count + "," + j), new Text("N," + i
                            + "," + val + ""));
                }

            }
        }
    }

    private static class MatrixReduce extends
            Reducer<Text, Text, Text, IntWritable> {

        private static int rowM = 0;

        @Override
        protected void setup(
                Reducer<Text, Text, Text, IntWritable>.Context context)
                throws IOException, InterruptedException {

            Configuration configuration = context.getConfiguration();
            rowM = configuration.getInt("rowM", 0);

        }

        @Override
        protected void reduce(Text key, Iterable<Text> values,
                Reducer<Text, Text, Text, IntWritable>.Context context)
                throws IOException, InterruptedException {

            int sumValue = 0;
            int[] m_Arr = new int[rowM + 1];
            int[] n_Arr = new int[rowM + 1];

            for (Text value : values) {

                String string = value.toString();
                String[] strings = string.split(",");

                if (strings[0].equals("M")) {
                    m_Arr[Integer.parseInt(strings[1])] = Integer
                            .parseInt(strings[2]);
                } else {
                    n_Arr[Integer.parseInt(strings[1])] = Integer
                            .parseInt(strings[2]);
                }
            }

            for (int i = 1; i < rowM + 1; i++) {
                sumValue += m_Arr[i] * n_Arr[i];
            }

            context.write(key, new IntWritable(sumValue));
        }

    }

    public static void main(String[] args) throws IllegalArgumentException,
            IOException, ClassNotFoundException, InterruptedException {

        Configuration configuration = HadoopConfig.getConfiguration();
        configuration.setInt("colN", 4);
        configuration.setInt("rowN", 2);
        configuration.setInt("colM", 2);
        configuration.setInt("rowM", 3);

        Job job = Job.getInstance(configuration, "矩阵相乘");

        job.setJarByClass(Sort.class);
        job.setMapperClass(MatrixMapper.class);

        job.setMapOutputKeyClass(Text.class);
        job.setMapOutputValueClass(Text.class);
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(IntWritable.class);

        job.setReducerClass(MatrixReduce.class);
        FileInputFormat.addInputPath(job, new Path("/matrix"));
        FileOutputFormat.setOutputPath(job, new Path("/matrixOutput"));
        job.waitForCompletion(true);
        System.exit(job.waitForCompletion(true) ? 0 : 1);

    }

}

三. 结果

1,1    8292
1,2    3130
1,3    3222
1,4    2371
2,1    4728
2,2    1628
2,3    2948
2,4    3666
3,1    42
3,2    8
3,3    72
3,4    128

2、MapReduce实现倒排索引

一、准备数据

file1:
one fish
two bird
two monkey

file2:
two peach
three watermelon

二、计算

public class InvertIndex {

    private static class InvertIndexMapper extends
            Mapper<LongWritable, Text, Text, Text> {

        @Override
        protected void map(LongWritable key, Text value,
                Mapper<LongWritable, Text, Text, Text>.Context context)
                throws IOException, InterruptedException {

            FileSplit fileSplit = (FileSplit) context.getInputSplit();
            String fileName = fileSplit.getPath().toString();
            String[] words = value.toString().split(" ");
            for (String string : words) {
                context.write(new Text(string), new Text(fileName + "#" + key.toString()));
            }
            
        }

    }

    private static class InvertIndexReduce extends
            Reducer<Text, Text, Text, Text> {

        @Override
        protected void reduce(Text key, Iterable<Text> values,
                Reducer<Text, Text, Text, Text>.Context context)
                throws IOException, InterruptedException {
            
            StringBuilder stringBuilder = new StringBuilder();
            
            for (Text text : values) {
                    stringBuilder.append(text.toString()).append(";");
            }
            
            context.write(key, new Text(stringBuilder.toString()));
        }
    }

    public static void main(String[] args) throws IOException,
    ClassNotFoundException, InterruptedException{

        Configuration configuration = HadoopConfig.getConfiguration();
        Job job = Job.getInstance(configuration, "倒排索引");
        job.setJarByClass(InvertIndex.class);
        job.setMapperClass(InvertIndexMapper.class);
        job.setMapOutputKeyClass(Text.class);
        job.setMapOutputValueClass(Text.class);
        job.setReducerClass(InvertIndexReduce.class);
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(Text.class);

        FileInputFormat.addInputPath(job, new Path("/data"));
        FileOutputFormat.setOutputPath(job, new Path("/ouput"));
        job.waitForCompletion(true);
        System.exit(job.waitForCompletion(true) ? 0 : 1);
        
    }

三、结果

bird    hdfs://127.0.0.1:8020/data/file1#9;
fish    hdfs://127.0.0.1:8020/data/file1#0;
monkey    hdfs://127.0.0.1:8020/data/file1#18;
one    hdfs://127.0.0.1:8020/data/file1#0;
peach    hdfs://127.0.0.1:8020/data/file2#0;
three    hdfs://127.0.0.1:8020/data/file2#10;
two    hdfs://127.0.0.1:8020/data/file2#0;hdfs://127.0.0.1:8020/data/file1#18;hdfs://127.0.0.1:8020/data/file1#9;
watermelon    hdfs://127.0.0.1:8020/data/file2#10;

3、MapReduce实现复杂倒排索引

一、准备数据

file1:
one fish
two bird
two monkey

file2:
two peach
three watermelon

二、计算

public class ComplexInvertIndex {

    private static class FileNameRecordReader extends RecordReader<Text, Text> {

        LineRecordReader lineRecordReader = new LineRecordReader();
        String fileName;

        @Override
        public void initialize(InputSplit split, TaskAttemptContext context)
                throws IOException, InterruptedException {
            lineRecordReader.initialize(split, context);
            fileName = ((FileSplit) split).getPath().getName();
        }

        @Override
        public boolean nextKeyValue() throws IOException, InterruptedException {
            return lineRecordReader.nextKeyValue();
        }

        @Override
        public Text getCurrentKey() throws IOException, InterruptedException {
            return new Text(fileName);
        }

        @Override
        public Text getCurrentValue() throws IOException, InterruptedException {
            return lineRecordReader.getCurrentValue();
        }

        @Override
        public float getProgress() throws IOException, InterruptedException {
            return lineRecordReader.getProgress();
        }

        @Override
        public void close() throws IOException {
            lineRecordReader.close();
        }

    }

    private static class FileNameInputFormat extends
            FileInputFormat<Text, Text> {

        @Override
        public RecordReader<Text, Text> createRecordReader(InputSplit split,
                TaskAttemptContext context) throws IOException,
                InterruptedException {
            FileNameRecordReader fileNameRecordReader = new FileNameRecordReader();
            fileNameRecordReader.initialize(split, context);
            return fileNameRecordReader;
        }

    }

    private static class ComplexInvertIndexMapper extends
            Mapper<Text, Text, Text, IntWritable> {

        @Override
        protected void map(Text key, Text value,
                Mapper<Text, Text, Text, IntWritable>.Context context)
                throws IOException, InterruptedException {

            String[] strs = value.toString().split(" ");
            for (String string : strs) {
                context.write(new Text( string+"#"+key.toString() ),new IntWritable(1));
            }

        }

    }

    private static class ComplexInvertIndexCombiner extends
            Reducer<Text, IntWritable, Text, IntWritable> {

        @Override
        protected void reduce(Text key, Iterable<IntWritable> values,
                Reducer<Text, IntWritable, Text, IntWritable>.Context context)
                throws IOException, InterruptedException {

            int sum = 0;
            for (IntWritable value : values) {
                sum += value.get();
            }
            context.write(key,new IntWritable(sum));
            System.out.println(key.toString() + sum +"");
        }

    }

    //把key的前面字段聚合,排序
    private static class InvertIndexPartitioner extends
            HashPartitioner<Text, IntWritable> {

        @Override
        public int getPartition(Text key, IntWritable value, int numReduceTasks) {
            String[] strs = key.toString().split("#");
            return super.getPartition(new Text(strs[0]), value, numReduceTasks);
        }

    }                

    private static class ComplexInvertIndexReduce extends
            Reducer<Text, IntWritable, Text, Text> {

        static Map<String, String> map = new HashMap<String, String>();
        
        @Override
        protected void reduce(Text key, Iterable<IntWritable> values,
                Reducer<Text, IntWritable, Text, Text>.Context context)
                throws IOException, InterruptedException {

            String[] strings = key.toString().split("#");
            String word = strings[0];
            String doc = strings[1];
            int sum = 0;
            for(IntWritable value : values){
                sum = sum + value.get();
            }
            if( map.get(word) == null ){
                map.put(word," ("+doc+","+sum+") ");
            }else{
                map.put(word,map.get(word)+" ("+doc+","+sum+") ");
            }
             
        }
        
        @Override
        protected void cleanup(
                Reducer<Text, IntWritable, Text, Text>.Context context)
                throws IOException, InterruptedException {
            for(String key:map.keySet()){
                context.write(new Text(key), new Text(map.get(key)));
            }
        }

    }

    public static void main(String[] args)throws IOException,
    ClassNotFoundException, InterruptedException{

        Configuration configuration = HadoopConfig.getConfiguration();
        Job job = Job.getInstance(configuration, "复杂倒排索引");
        job.setJarByClass(ComplexInvertIndex.class);
        job.setInputFormatClass(FileNameInputFormat.class);
        job.setMapperClass(ComplexInvertIndexMapper.class);
        job.setMapOutputKeyClass(Text.class);
        job.setMapOutputValueClass(IntWritable.class);
        job.setCombinerClass(ComplexInvertIndexCombiner.class);
        job.setReducerClass(ComplexInvertIndexReduce.class);
        job.setPartitionerClass(InvertIndexPartitioner.class);
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(Text.class);
        
        FileInputFormat.addInputPath(job, new Path("/data"));
        FileOutputFormat.setOutputPath(job, new Path("/ouputdata"));
        job.waitForCompletion(true);
        System.exit(job.waitForCompletion(true) ? 0 : 1);
        
    }

三、结果查看

monkey     (file1,1) 
bird     (file1,1) 
fish     (file1,1) 
one     (file1,1) 
peach     (file2,1) 
watermelon     (file2,1) 
three     (file2,1) 
two     (file1,2)  (file2,1)
点赞
收藏
评论区
推荐文章
blmius blmius
2年前
MySQL:[Err] 1292 - Incorrect datetime value: ‘0000-00-00 00:00:00‘ for column ‘CREATE_TIME‘ at row 1
文章目录问题用navicat导入数据时,报错:原因这是因为当前的MySQL不支持datetime为0的情况。解决修改sql\mode:sql\mode:SQLMode定义了MySQL应支持的SQL语法、数据校验等,这样可以更容易地在不同的环境中使用MySQL。全局s
Wesley13 Wesley13
2年前
java将前端的json数组字符串转换为列表
记录下在前端通过ajax提交了一个json数组的字符串,在后端如何转换为列表。前端数据转化与请求varcontracts{id:'1',name:'yanggb合同1'},{id:'2',name:'yanggb合同2'},{id:'3',name:'yang
Stella981 Stella981
2年前
Opencv中Mat矩阵相乘——点乘、dot、mul运算详解
Opencv中Mat矩阵相乘——点乘、dot、mul运算详解2016年09月02日00:00:36 \牧野(https://www.oschina.net/action/GoToLink?urlhttps%3A%2F%2Fme.csdn.net%2Fdcrmg) 阅读数:59593
Stella981 Stella981
2年前
Nginx + lua +[memcached,redis]
精品案例1、Nginxluamemcached,redis实现网站灰度发布2、分库分表/基于Leaf组件实现的全球唯一ID(非UUID)3、Redis独立数据监控,实现订单超时操作/MQ死信操作SelectPollEpollReactor模型4、分布式任务调试Quartz应用
Stella981 Stella981
2年前
Android So动态加载 优雅实现与原理分析
背景:漫品Android客户端集成适配转换功能(基于目标识别(So库35M)和人脸识别库(5M)),导致apk体积50M左右,为优化客户端体验,决定实现So文件动态加载.!(https://oscimg.oschina.net/oscnet/00d1ff90e4b34869664fef59e3ec3fdd20b.png)点击上方“蓝字”关注我
Stella981 Stella981
2年前
MapReduce原理和WordCount数据详细过程
1.MapReduce原理 1.1MapReduce简介     MapReduce是一种分布式计算模型,是Google提出的,主要用于搜索领域,解决海量数据的计算问题。   MR有两个阶段组成:Map和Reduce,用户只需实现map()和reduce()两个函数,即可实现分布
Stella981 Stella981
2年前
Hadoop学习之路(二十三)MapReduce中的shuffle详解
概述1、MapReduce中,mapper阶段处理的数据如何传递给reducer阶段,是MapReduce框架中最关键的一个流程,这个流程就叫Shuffle2、Shuffle:数据混洗——(核心机制:数据分区,排序,局部聚合,缓存,拉取,再合并排序)3、具体来说:就是将MapTask输出的处理结果数据,按照Par
Stella981 Stella981
2年前
Hadoop源代码分析(包hadoop.mapred中的MapReduce接口)
前面已经完成了对org.apache.hadoop.mapreduce的分析,这个包提供了HadoopMapReduce部分的应用API,用于用户实现自己的MapReduce应用。但这些接口是给未来的MapReduce应用的,目前MapReduce框架还是使用老系统(参考补丁HADOOP1230(https://www.oschina.net/act
Stella981 Stella981
2年前
Hadoop之Mapreduce详解
1、什么是Mapreduce   Mapreduce是一个分布式运算程序的编程框架,是用户开发“基于hadoop的数据分析应用”的核心框架;Mapreduce核心功能是将用户编写的业务逻辑代码和自带默认组件整合成一个完整的分布式运算程序,并发运行在一个hadoop集群上;2、Mapreduce框架结构及核心运行机制
Stella981 Stella981
2年前
Shell编程之if简单判断两个数字大小
脚本编辑!/bin/bash定义变量num1$1num2$2判断是否输入两个参数,若是,将两个参数传递给下一个指令动作,若非两个参数,则打印输出内容输出并且退出exit脚本不执行下一个指令if\$ne2\;then  echo'pleaseinput