[完结13章]一课掌握Java并发编程精髓

赵嬷嬷
• 阅读 203

资料地址1:https://pan.baidu.com/s/1AcAiXR8afHlpMbdIGVkx3w 提取码:7skv 资料地址2:https://share.weiyun.com/VtbcAU8C 密码:gmqctf

Java并发编程从入门到进阶 多场景实战,众所周知,并发编程是优秀工程师的标准之一,但知识庞杂,复杂性高,常常让人望而却步。但如果没有掌握背后的核心原理,你开发的代码可能会成为难以调试和优化的头疼问题。在此,我将通过上百个案例场景驱动教学+动画直观演示,帮助大家深入、直观地理解并发编程核心概念和底层原理。助力大家在实际工作和面试中都能尽早脱颖而出。

首先,我们先来了解关于并发的基本概念。 并发情况主要会引出三个基本概念,分别是原子性、可见性、有序性三个基本概念

Java中线程的状态分为6种:

  1. 初始(NEW):新创建了一个线程对象,但还没有调用start()方法。
  2. 运行(RUNNABLE):Java线程中将就绪(ready)和运行中(running)两种状态笼统的称为“运行”。 线程对象创建后,其他线程(比如main线程)调用了该对象的start()方法。该状态的线程位于可运行线程池中,等待被线程调度选中,获取CPU的使用权,此时处于就绪状态(ready)。就绪状态的线程在获得CPU时间片后变为运行中状态(running)。
  3. 阻塞(BLOCKED):表示线程阻塞于锁。
  4. 等待(WAITING):进入该状态的线程需要等待其他线程做出一些特定动作(通知或中断)。
  5. 超时等待(TIMED_WAITING):该状态不同于WAITING,它可以在指定的时间后自行返回。
  6. 终止(TERMINATED):表示该线程已经执行完毕。

其实我们可以通过job.setPartitionerClass来设置分区类,不过目前我们是没有设置的,那框架中是不是有默认值啊,是有的,我们可以通过job.getPartitionerClass方法看到默认情况下会使用HashPartitioner这个分区类 那我们来看一下HashPartitioner的实现是什么样子的 /** Partition keys by their {@link Object#hashCode()}. */ @InterfaceAudience.Public @InterfaceStability.Stable public class HashPartitioner<K, V> extends Partitioner<K, V> {

/** Use {@link Object#hashCode()} to partition. */ public int getPartition(K key, V value, int numReduceTasks) { return (key.hashCode() & Integer.MAX_VALUE) % numReduceTasks; }

} 下面我们来具体跑一个这份数据,首先复制一份WordCountJob的代码,新的类名为WordCountJobSkew package com.imooc.mr;

import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.Reducer; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import org.slf4j.Logger; import org.slf4j.LoggerFactory;

import java.io.IOException;

/**

  • 数据倾斜-增加Reduce任务个数
  • Created by xuwei
  • / public class WordCountJobSkew { /**
    • Map阶段
    • / public static class MyMapper extends Mapper<LongWritable, Text,Text,LongWritable>{ Logger logger = LoggerFactory.getLogger(MyMapper.class); /**
      • 需要实现map函数
      • 这个map函数就是可以接收<k1,v1>,产生<k2,v2>
      • @param k1
      • @param v1
      • @param context
      • @throws IOException
      • @throws InterruptedException
      • / @Override protected void map(LongWritable k1, Text v1, Context context)
          throws IOException, InterruptedException {
        //输出k1,v1的值 //System.out.println("<k1,v1>=<"+k1.get()+","+v1.toString()+">"); //logger.info("<k1,v1>=<"+k1.get()+","+v1.toString()+">"); //k1 代表的是每一行数据的行首偏移量,v1代表的是每一行内容 //对获取到的每一行数据进行切割,把单词切割出来 String[] words = v1.toString().split(" "); //把单词封装成<k2,v2>的形式 Text k2 = new Text(words[0]); LongWritable v2 = new LongWritable(1L); //把<k2,v2>写出去 context.write(k2,v2); } }
/**
 * Reduce阶段
 */
public static class MyReducer extends Reducer<Text,LongWritable,Text,LongWritable>{
    Logger logger = LoggerFactory.getLogger(MyReducer.class);
    /**
     * 针对<k2,{v2...}>的数据进行累加求和,并且最终把数据转化为k3,v3写出去
     * @param k2
     * @param v2s
     * @param context
     * @throws IOException
     * @throws InterruptedException
     */
    @Override
    protected void reduce(Text k2, Iterable<LongWritable> v2s, Context context)
            throws IOException, InterruptedException {
        //创建一个sum变量,保存v2s的和
        long sum = 0L;
        //对v2s中的数据进行累加求和
        for(LongWritable v2: v2s){
            //输出k2,v2的值
            //System.out.println("<k2,v2>=<"+k2.toString()+","+v2.get()+">");
            //logger.info("<k2,v2>=<"+k2.toString()+","+v2.get()+">");
            sum += v2.get();

//模拟Reduce的复杂计算消耗的时间 if(sum % 200 ==0){ Thread.sleep(1); }

        }

        //组装k3,v3
        Text k3 = k2;
        LongWritable v3 = new LongWritable(sum);
        //输出k3,v3的值
        //System.out.println("<k3,v3>=<"+k3.toString()+","+v3.get()+">");
        //logger.info("<k3,v3>=<"+k3.toString()+","+v3.get()+">");
        // 把结果写出去
        context.write(k3,v3);
    }
}

/**
 * 组装Job=Map+Reduce
 */
public static void main(String[] args) {
    try{
        if(args.length!=3){
            //如果传递的参数不够,程序直接退出
            System.exit(100);
        }

        //指定Job需要的配置参数
        Configuration conf = new Configuration();
        //创建一个Job
        Job job = Job.getInstance(conf);

        //注意了:这一行必须设置,否则在集群中执行的时候是找不到WordCountJob这个类的
        job.setJarByClass(WordCountJobSkew.class);

        //指定输入路径(可以是文件,也可以是目录)
        FileInputFormat.setInputPaths(job,new Path(args[0]));
        //指定输出路径(只能指定一个不存在的目录)
        FileOutputFormat.setOutputPath(job,new Path(args[1]));

        //指定map相关的代码
        job.setMapperClass(MyMapper.class);
        //指定k2的类型
        job.setMapOutputKeyClass(Text.class);
        //指定v2的类型
        job.setMapOutputValueClass(LongWritable.class);



        //指定reduce相关的代码
        job.setReducerClass(MyReducer.class);
        //指定k3的类型
        job.setOutputKeyClass(Text.class);
        //指定v3的类型
        job.setOutputValueClass(LongWritable.class);
        //设置reduce任务个数
        job.setNumReduceTasks(Integer.parseInt(args[2]));

        //提交job
        job.waitForCompletion(true);
    }catch(Exception e){
        e.printStackTrace();
    }

}

} 针对这个操作我们需要去修改代码,在这里我们再重新复制一个类,基于WordCountJobSkew复制,新的类名是WordCountJobSkewRandKey package com.imooc.mr;

import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.Reducer; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import org.slf4j.Logger; import org.slf4j.LoggerFactory;

import java.io.IOException; import java.util.Random;

/**

  • 数据倾斜-把倾斜的数据打散
  • Created by xuwei
  • / public class WordCountJobSkewRandKey { /**
    • Map阶段
    • / public static class MyMapper extends Mapper<LongWritable, Text,Text,LongWritable>{ Logger logger = LoggerFactory.getLogger(MyMapper.class); Random random = new Random(); /**
      • 需要实现map函数
      • 这个map函数就是可以接收<k1,v1>,产生<k2,v2>
      • @param k1
      • @param v1
      • @param context
      • @throws IOException
      • @throws InterruptedException
      • / @Override protected void map(LongWritable k1, Text v1, Context context)
          throws IOException, InterruptedException {
        //输出k1,v1的值 //System.out.println("<k1,v1>=<"+k1.get()+","+v1.toString()+">"); //logger.info("<k1,v1>=<"+k1.get()+","+v1.toString()+">"); //k1 代表的是每一行数据的行首偏移量,v1代表的是每一行内容 //对获取到的每一行数据进行切割,把单词切割出来 String[] words = v1.toString().split(" "); //把单词封装成<k2,v2>的形式 String key = words[0]; if("5".equals(key)){
          //把倾斜的key打散,分成10份
          key = "5"+"_"+random.nextInt(10);
        } Text k2 = new Text(key); LongWritable v2 = new LongWritable(1L); //把<k2,v2>写出去 context.write(k2,v2); } }
/**
 * Reduce阶段
 */
public static class MyReducer extends Reducer<Text,LongWritable,Text,LongWritable>{
    Logger logger = LoggerFactory.getLogger(MyReducer.class);
    /**
     * 针对<k2,{v2...}>的数据进行累加求和,并且最终把数据转化为k3,v3写出去
     * @param k2
     * @param v2s
     * @param context
     * @throws IOException
     * @throws InterruptedException
     */
    @Override
    protected void reduce(Text k2, Iterable<LongWritable> v2s, Context context)
            throws IOException, InterruptedException {
        //创建一个sum变量,保存v2s的和
        long sum = 0L;
        //对v2s中的数据进行累加求和
        for(LongWritable v2: v2s){
            //输出k2,v2的值
            //System.out.println("<k2,v2>=<"+k2.toString()+","+v2.get()+">");
            //logger.info("<k2,v2>=<"+k2.toString()+","+v2.get()+">");
            sum += v2.get();
            //模拟Reduce的复杂计算消耗的时间
            if(sum % 200 ==0){
                Thread.sleep(1);
            }

        }

        //组装k3,v3
        Text k3 = k2;
        LongWritable v3 = new LongWritable(sum);
        //输出k3,v3的值
        //System.out.println("<k3,v3>=<"+k3.toString()+","+v3.get()+">");
        //logger.info("<k3,v3>=<"+k3.toString()+","+v3.get()+">");
        // 把结果写出去
        context.write(k3,v3);
    }
}

/**
 * 组装Job=Map+Reduce
 */
public static void main(String[] args) {
    try{
        if(args.length!=3){
            //如果传递的参数不够,程序直接退出
            System.exit(100);
        }

        //指定Job需要的配置参数
        Configuration conf = new Configuration();
        //创建一个Job
        Job job = Job.getInstance(conf);

        //注意了:这一行必须设置,否则在集群中执行的时候是找不到WordCountJob这个类的
        job.setJarByClass(WordCountJobSkewRandKey.class);

        //指定输入路径(可以是文件,也可以是目录)
        FileInputFormat.setInputPaths(job,new Path(args[0]));
        //指定输出路径(只能指定一个不存在的目录)
        FileOutputFormat.setOutputPath(job,new Path(args[1]));

        //指定map相关的代码
        job.setMapperClass(MyMapper.class);
        //指定k2的类型
        job.setMapOutputKeyClass(Text.class);
        //指定v2的类型
        job.setMapOutputValueClass(LongWritable.class);



        //指定reduce相关的代码
        job.setReducerClass(MyReducer.class);
        //指定k3的类型
        job.setOutputKeyClass(Text.class);
        //指定v3的类型
        job.setOutputValueClass(LongWritable.class);
        //设置reduce任务个数
        job.setNumReduceTasks(Integer.parseInt(args[2]));

        //提交job
        job.waitForCompletion(true);
    }catch(Exception e){
        e.printStackTrace();
    }

}

} 调用parallelize()时,有一个重要的参数可以指定,就是将集合切分成多少个partition。 Spark会为每一个partition运行一个task来进行处理。 Spark默认会根据集群的配置来设置partition的数量。我们也可以在调用parallelize()方法时,传入第二个参数,来设置RDD的partition数量,例如:parallelize(arr, 5) scala代码如下: package com.imooc.scala

import org.apache.spark.{SparkConf, SparkContext}

/**

  • 需求:使用集合创建RDD

  • Created by xuwei

  • / object CreateRddByArrayScala {

    def main(args: Array[String]): Unit = { //创建SparkContext val conf = new SparkConf() conf.setAppName("CreateRddByArrayScala ")//设置任务名称 .setMaster("local")//local表示在本地执行 val sc = new SparkContext(conf)

    //创建集合 val arr = Array(1,2,3,4,5) //基于集合创建RDD val rdd = sc.parallelize(arr) val sum = rdd.reduce(_ + _) println(sum)

    //停止SparkContext sc.stop()

    }

}

点赞
收藏
评论区
推荐文章
荀勗 荀勗
5个月前
首个基于Transformer的分割检测+视觉大模型视频课程(附源码+课件)
参考资料地址1:https://pan.baidu.com/s/14g2VTg8JeeZ0pDey7xwGg提取码:2bmp参考资料地址2:https://share.weiyun.com/tnVNHGMD密码:3fj7iy众所周知,视觉系统对于理解和推理
荀勗 荀勗
5个月前
高性能多级网关与多级缓存架构落地实战(完结+附电子书)
参考资料地址1:https://pan.baidu.com/s/12w0TT26aywnoIcogPg8Uw提取码:uzf4参考资料地址2:https://share.weiyun.com/SNltUNLW密码:zi3dc7什么是网关?网关(Gateway
赵颜 赵颜
5个月前
[16章]SpringBoot2 仿B站高性能前端+后端项目(2023新版)
资料地址1:https://pan.baidu.com/s/1cxQDKIi7iu1mGmjRr9a0Mw提取码:tz5s资料地址2:https://pan.baidu.com/s/1DjmuC6Id4oUCNVbxfgcMg提取码:qtf3今天给大家讲讲
笑面虎 笑面虎
5个月前
一课掌握Java并发编程精髓(完结13章)
一课掌握Java并发编程精髓(完结13章)分享一套Java课程——一课掌握Java并发编程精髓(完结13章),附源码PDF课件下载。并发编程1.多线程Java是最先支持多线程的开发的语言之一,Java从一开始就支持了多线程能力。由于现在的CPU已经多是多
吉太 吉太
4个月前
[2023新版16章]SpringBoot+Vue3 项目实战,打造企业级在线办公系统
参考资料地址1:https://pan.baidu.com/s/1ZJGS0SA9pIUr76VUXioNSg提取码:95bd参考资料地址2:https://share.weiyun.com/jVSDdcBU密码:cruqf9SpringBootVue3
荀勗 荀勗
4个月前
一课掌握Java并发编程精髓(完结13章)
资料地址1:https://pan.baidu.com/s/1bPKpzdB6ho4hPhxGfifFAA提取码:ik4v资料地址2:https://share.weiyun.com/VtbcAU8C密码:gmqctf一课掌握Java并发编程精髓,分13章
赵嬷嬷 赵嬷嬷
4个月前
[完结10章]Vue3+Pinia+Vite+TS 还原高性能外卖APP项目
参考资料地址1:https://pan.baidu.com/s/1u0uNBMkOA2NRk3N6myb4Zg提取码:tnlt参考资料地址2:https://share.weiyun.com/Wjw3QpeQ密码:gxrfcwVue3带来的改变,除了其自身
吉太 吉太
1个月前
新版React18+Next.js14+Nest.js全栈开发复杂低代码项目[21章]
资料地址1:https://pan.baidu.com/s/1CpBiE0X4vq9dAoZZCow0bw提取码:wwq9资料地址2:https://share.weiyun.com/vXd3qr0O密码:bcrymy2024版,React18Nest.
鲍二家的 鲍二家的
1个月前
[完结17章]SpringBoot3+Vue3 开发高并发秒杀抢购系统
学习地址1:https://pan.baidu.com/s/1DRZXkQeGkrPwhVTd2ko00g提取码:gpwn学习地址2:https://share.weiyun.com/ysK13sR2密码:74m96t众所周知,作为开发新手,入行、实习、转
鲍二家的 鲍二家的
1个月前
[完结12章]AI Agent智能应用从0到1定制开发
资料地址1:https://pan.baidu.com/s/19YsA0yYI3Q9ebr3iFSJBw提取码:hvhu资料地址2:https://share.weiyun.com/NfCB6NdF密码:rhame3AIAgent已成为企业在构建智能化的定