Spark RDD操作之Map系算子

Stella981
• 阅读 537

    本篇博客将介绍Spark RDD的Map系算子的基本用法。

    1、map

    map将RDD的元素一个个传入call方法,经过call方法的计算之后,逐个返回,生成新的RDD,计算之后,记录数不会缩减。示例代码,将每个数字加10之后再打印出来, 代码如下

import java.util.Arrays;

import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.api.java.function.VoidFunction;

public class Map {
    public static void main(String[] args) {
        SparkConf conf = new SparkConf().setAppName("spark map").setMaster("local[*]");
        JavaSparkContext javaSparkContext = new JavaSparkContext(conf);
        JavaRDD<Integer> listRDD = javaSparkContext.parallelize(Arrays.asList(1, 2, 3, 4));

        JavaRDD<Integer> numRDD = listRDD.map(new Function<Integer, Integer>() {
            @Override
            public Integer call(Integer num) throws Exception {
                return num + 10;
            }
        });
        numRDD.foreach(new VoidFunction<Integer>() {
            @Override
            public void call(Integer num) throws Exception {
                System.out.println(num);
            }
        });
    }

}

    执行结果:

Spark RDD操作之Map系算子

    2、flatMap

    flatMap和map的处理方式一样,都是把原RDD的元素逐个传入进行计算,但是与之不同的是,flatMap返回值是一个Iterator,也就是会一生多,超生

import java.util.Arrays;
import java.util.Iterator;

import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.FlatMapFunction;
import org.apache.spark.api.java.function.VoidFunction;

public class FlatMap {
    public static void main(String[] args) {
        SparkConf conf = new SparkConf().setAppName("spark map").setMaster("local[*]");
        JavaSparkContext javaSparkContext = new JavaSparkContext(conf);
        JavaRDD<String> listRDD = javaSparkContext
                .parallelize(Arrays.asList("hello wold", "hello java", "hello spark"));
        JavaRDD<String> rdd = listRDD.flatMap(new FlatMapFunction<String, String>() {
            private static final long serialVersionUID = 1L;

            @Override
            public Iterator<String> call(String input) throws Exception {
                return Arrays.asList(input.split(" ")).iterator();
            }
        });
        rdd.foreach(new VoidFunction<String>() {
            private static final long serialVersionUID = 1L;
            @Override
            public void call(String num) throws Exception {
                System.out.println(num);
            }
        });
    }

}

    执行结果:

Spark RDD操作之Map系算子

    3、mapPartitions

    mapPartitions一次性将整个分区的数据传入函数进行计算,适用于一次性聚会整个分区的场景

public class MapPartitions {
    public static void main(String[] args) {
        SparkConf conf = new SparkConf().setAppName("spark map").setMaster("local[*]");
        JavaSparkContext javaSparkContext = new JavaSparkContext(conf);
        JavaRDD<String> listRDD = javaSparkContext.parallelize(Arrays.asList("hello", "java", "wold", "spark"), 2);

        /**
         * mapPartitions回调的接口也是FlatMapFunction,FlatMapFunction的第一个泛型是Iterator表示传入的数据,
         * 第二个泛型表示返回数据的类型
         * 
         * mapPartitions传入FlatMapFunction接口处理的数据是一个分区的数据,所以,如果一个分区数据过大,会导致内存溢出
         * 
         */
        JavaRDD<String> javaRDD = listRDD.mapPartitions(new FlatMapFunction<Iterator<String>, String>() {
            int i = 0;

            @Override
            public Iterator<String> call(Iterator<String> input) throws Exception {
                List<String> list = new ArrayList<String>();
                while (input.hasNext()) {
                    list.add(input.next() + i);
                    ++i;
                }
                return list.iterator();
            }
        });

        javaRDD.foreach(new VoidFunction<String>() {
            @Override
            public void call(String t) throws Exception {
                System.out.println(t);
            }
        });
    }

}

    运行结果:

Spark RDD操作之Map系算子

    上面的运算结果,后面的尾标只有0和1,说明FlatMapFunction被调用了两次,与MapPartitions的功能吻合。

    4、mapPartitionsWithIndex

    mapPartitionsWithIndex和mapPartitions一样,一次性传入整个分区的数据进行处理,但是不同的是,这里会传入分区编号进来

public class mapPartitionsWithIndex {
    public static void main(String[] args) {
        SparkConf conf = new SparkConf().setAppName("spark map").setMaster("local[*]");
        JavaSparkContext javaSparkContext = new JavaSparkContext(conf);
        JavaRDD<String> listRDD = javaSparkContext.parallelize(Arrays.asList("hello", "java", "wold", "spark"), 2);

        /**
         *和mapPartitions一样,一次性传入整个分区的数据进行处理,但是不同的是,这里会传入分区编号进来
         * 
         */
        JavaRDD<String> javaRDD = listRDD.mapPartitionsWithIndex(new Function2<Integer, Iterator<String>, Iterator<String>>() {

            @Override
            public Iterator<String> call(Integer v1, Iterator<String> v2) throws Exception {
                List<String> list = new ArrayList<String>();
                while (v2.hasNext()) {
                    list.add(v2.next() + "====分区编号:"+v1);
                }
                return list.iterator();
            }
        
        },true);

        javaRDD.foreach(new VoidFunction<String>() {
            @Override
            public void call(String t) throws Exception {
                System.out.println(t);
            }
        });
    }

}

    执行结果:

    Spark RDD操作之Map系算子

点赞
收藏
评论区
推荐文章
blmius blmius
2年前
MySQL:[Err] 1292 - Incorrect datetime value: ‘0000-00-00 00:00:00‘ for column ‘CREATE_TIME‘ at row 1
文章目录问题用navicat导入数据时,报错:原因这是因为当前的MySQL不支持datetime为0的情况。解决修改sql\mode:sql\mode:SQLMode定义了MySQL应支持的SQL语法、数据校验等,这样可以更容易地在不同的环境中使用MySQL。全局s
Easter79 Easter79
2年前
swap空间的增减方法
(1)增大swap空间去激活swap交换区:swapoff v /dev/vg00/lvswap扩展交换lv:lvextend L 10G /dev/vg00/lvswap重新生成swap交换区:mkswap /dev/vg00/lvswap激活新生成的交换区:swapon v /dev/vg00/lvswap
Jacquelyn38 Jacquelyn38
2年前
2020年前端实用代码段,为你的工作保驾护航
有空的时候,自己总结了几个代码段,在开发中也经常使用,谢谢。1、使用解构获取json数据let jsonData  id: 1,status: "OK",data: 'a', 'b';let  id, status, data: number   jsonData;console.log(id, status, number )
皕杰报表之UUID
​在我们用皕杰报表工具设计填报报表时,如何在新增行里自动增加id呢?能新增整数排序id吗?目前可以在新增行里自动增加id,但只能用uuid函数增加UUID编码,不能新增整数排序id。uuid函数说明:获取一个UUID,可以在填报表中用来创建数据ID语法:uuid()或uuid(sep)参数说明:sep布尔值,生成的uuid中是否包含分隔符'',缺省为
Wesley13 Wesley13
2年前
java8新特性
Stream将List转换为Map,使用Collectors.toMap方法进行转换背景:User类,类中分别有id,name,age三个属性。List集合,userList,存储User对象1、指定keyvalue,value是对象中的某个属性值。 Map<Integer,StringuserMap1userList.str
Stella981 Stella981
2年前
KVM调整cpu和内存
一.修改kvm虚拟机的配置1、virsheditcentos7找到“memory”和“vcpu”标签,将<namecentos7</name<uuid2220a6d1a36a4fbb8523e078b3dfe795</uuid
Stella981 Stella981
2年前
Python之time模块的时间戳、时间字符串格式化与转换
Python处理时间和时间戳的内置模块就有time,和datetime两个,本文先说time模块。关于时间戳的几个概念时间戳,根据1970年1月1日00:00:00开始按秒计算的偏移量。时间元组(struct_time),包含9个元素。 time.struct_time(tm_y
Wesley13 Wesley13
2年前
mysql设置时区
mysql设置时区mysql\_query("SETtime\_zone'8:00'")ordie('时区设置失败,请联系管理员!');中国在东8区所以加8方法二:selectcount(user\_id)asdevice,CONVERT\_TZ(FROM\_UNIXTIME(reg\_time),'08:00','0
Wesley13 Wesley13
2年前
MySQL部分从库上面因为大量的临时表tmp_table造成慢查询
背景描述Time:20190124T00:08:14.70572408:00User@Host:@Id:Schema:sentrymetaLast_errno:0Killed:0Query_time:0.315758Lock_
Python进阶者 Python进阶者
3个月前
Excel中这日期老是出来00:00:00,怎么用Pandas把这个去除
大家好,我是皮皮。一、前言前几天在Python白银交流群【上海新年人】问了一个Pandas数据筛选的问题。问题如下:这日期老是出来00:00:00,怎么把这个去除。二、实现过程后来【论草莓如何成为冻干莓】给了一个思路和代码如下:pd.toexcel之前把这