Java并发执行任务的几种方式

Wesley13
• 阅读 302

背景

在编写业务代码时经常遇到并发执行多个任务的需求,因为串行执行太慢,会影响业务代码性能。特别对于直接面向普通用户的业务来说用户体验至关重要,保证用户体验重要的一点是要“快”。业务代码中经常需要调用其它业务接口或者同时从多个数据源取数据再处理等,这种情况下势必要走网络请求,网络消耗必不可少,最好的情况是毫秒级别,一般情况下是几十毫秒级别,甚至几百毫秒,TimeoutException恐怕大家并不陌生。

例子

当你浏览微信朋友圈时,微信会把你朋友最近的动态展示在你朋友圈里,并且按照时间顺序最近的排在前面。你会发现,朋友圈里不会展示把你删除了或者你把他删除了的好友,不会展示被你设置过“不看他(她)的朋友圈”的好友或者对方把你设置过“不让他(她)看我的朋友圈”的好友,不会展示被你拉黑或把你拉黑的好友,不会展示被微信系统标记为spam的好友,等等。对于微信这种支持数亿人聊天的应用,其系统必定很复杂,解耦做得也比较好。 下面模拟下这个接口的实现:

    public List<Feed> lastestFeeds(String wxId) {
                //串行执行
        //1. 获取你得好友列表
        //2. 去掉把你删除的好友
        //3. 去掉被你删除的好友
        //4. 去掉被你设置过"不看他(她)的朋友圈"的好友
        //5. 去掉对你设置过"不让他(她)看我的朋友圈"的好友
        //6. 去掉被你拉黑和把你拉黑的好友
        //7. 去掉被微信系统标记为作弊的好友
        //...
                //8. 获取好友最近动态再返回
    }

对于微信这种复杂的系统,通常不可能从一个接口获取到这些信息,必须从多个接口获取到这些信息后再处理。如果说串行实现这些功能,你可以想象一下是不是慢到吐血,相信微信也不会这么干,否则朋友圈会刷半天也没响应,那么这个用户体验就太糟糕了。那么这个时候并发执行这些子任务就可以很高效的处理掉这种情况。具体到这个接口也就是会把1-7拆解成单个子任务,再丢到线程池异步的执行。最后执行完了,再汇总处理。

    public List<Feed> lastestFeedsV2(String wxId) {
        //并发运行,无先后先后执行
        //1. 获取你得好友列表
        //2. 去掉把你删除的好友
        //3. 去掉被你删除的好友
        //4. 去掉被你设置过"不看他(她)的朋友圈"的好友
        //5. 去掉对你设置过"不让他(她)看我的朋友圈"的好友
        //6. 去掉被你拉黑和把你拉黑的好友
        //7. 去掉被微信系统标记为作弊的好友
        //...
        //等待所有子任务完成,汇总处理
        //8. 获取好友最近动态再返回
    }

那么如何实现并发运行呢,下面讨论几种实现。

CountDownLatch.await() VS ExecutorService.invokeAll()

从功能上讲这两者均可以实现并发执行多个任务并等待的功能。先看代码如何完成以上功能: CountDownLatch实现:

    public List<Feed> lastestFeeds(String wxId) {
        ThreadPoolExecutor executor = ...;
        CountDownLatch latch = new CountDownLatch(7);
        executor.execute(new Runnable() {
            @Override
            public void run() {
                try{
                    //1. 获取你得好友列表
                } catch (Exception e) {

                } finally {
                    latch.countDown();
                }
            }
        });

        executor.execute(new Runnable() {
            @Override
            public void run() {
                try{
                    //2. 去掉把你删除的好友
                } catch (Exception e) {

                } finally {
                    latch.countDown();
                }
            }
        });

        executor.execute(new Runnable() {
            @Override
            public void run() {
                try{
                    //3. 去掉被你删除的好友
                } catch (Exception e) {

                } finally {
                    latch.countDown();
                }
            }
        });

        executor.execute(new Runnable() {
            @Override
            public void run() {
                try{
                    //4. 去掉被你设置过"不看他(她)的朋友圈"的好友
                } catch (Exception e) {

                } finally {
                    latch.countDown();
                }
            }
        });

        executor.execute(new Runnable() {
            @Override
            public void run() {
                try{
                    //5. 去掉对你设置过"不让他(她)看我的朋友圈"的好友
                } catch (Exception e) {

                } finally {
                    latch.countDown();
                }
            }
        });

        executor.execute(new Runnable() {
            @Override
            public void run() {
                try{
                    //6. 去掉被你拉黑和把你拉黑的好友
                } catch (Exception e) {

                } finally {
                    latch.countDown();
                }
            }
        });

        executor.execute(new Runnable() {
            @Override
            public void run() {
                try{
                    //7. 去掉被微信系统标记为作弊的好友
                } catch (Exception e) {

                } finally {
                    latch.countDown();
                }
            }
        });
        try {
            //latch.await();
            latch.await(500, TimeUnit.MILLISECONDS);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }

        //等待所有子任务完成,汇总处理
        //8. 获取好友最近动态再返回
    }

ExecutorService实现:

    public List<Feed> lastestFeeds(String wxId) {
        ThreadPoolExecutor executor = ...;
        CountDownLatch latch = new CountDownLatch(7);
        List<Callable<String>> tasks = new ArrayList<>(7);
        tasks.add(new Callable<String>() {
            @Override
            public String call() throws Exception {
                //1. 获取你得好友列表
                return ...;
            }
        });
        tasks.add(new Callable<String>() {
            @Override
            public String call() throws Exception {
                //2. 去掉把你删除的好友
                return ...;
            }
        });
        tasks.add(new Callable<String>() {
            @Override
            public String call() throws Exception {
                //3. 去掉被你删除的好友
                return ...;
            }
        });
        tasks.add(new Callable<String>() {
            @Override
            public String call() throws Exception {
                //4. 去掉被你设置过"不看他(她)的朋友圈"的好友
                return ...;
            }
        });
        tasks.add(new Callable<String>() {
            @Override
            public String call() throws Exception {
                //5. 去掉对你设置过"不让他(她)看我的朋友圈"的好友
                return ...;
            }
        });
        tasks.add(new Callable<String>() {
            @Override
            public String call() throws Exception {
                //6. 去掉被你拉黑和把你拉黑的好友
                return ...;
            }
        });
        tasks.add(new Callable<String>() {
            @Override
            public String call() throws Exception {
                //7. 去掉被微信系统标记为作弊的好友
                return ...;
            }
        });
        try {
//            List<Future<String>> futureList = executor.invokeAll(tasks);
            List<Future<String>> futureList = executor.invokeAll(tasks, 500, TimeUnit.MILLISECONDS);
            for(Future<String> future : futureList) {
                if(future.isCancelled()) {
                    //处理
                }
                try {
                    future.get();
                } catch (ExecutionException e) {
                    e.printStackTrace();
                }
            }
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }

区别

两者实现的区别是什么?从性能上讲应该是没有太大差异,在此要感谢Doug Lea大神,感谢他优雅的实现,具体的实现,请各位看官自行百度和google。 我主要是谈谈这两者使用上的细微差异,可以根据具体业务场景选择一个更合适的。两者在等待子任务结束时,均提供了限时和不限时版本。

如果使用限时版本

关键的差异在于CountDownLatch超时后不再阻塞主线程,而是会继续执行,对于没有完成的子任务,它不知道也不会处理,也就是说没有完成的子任务还是会继续执行。而ExecutorService.invokeAll超时后,会取消所有在线程池任务队列中等待运行的子任务,说白了就是超时的子任务会被取消。 对于我举的微信这个例子来说,主线程等待超时后,子任务再返回数据是没有意义的,继续执行超时的子任务只会浪费CPU而已,尤其是对于QPS较大的业务来说,影响更明显。这种情况下选择ExecutorService.invokeAll可能更好些,当然如果你提供额外的状态标识如定义个AtomicBoolean来辅助实现超时不执行子任务的功能也是可以的。当然有些场景,即便主线程超时了,子任务也必须执行(例如子任务中涉及数据存储,否则数据可能丢失),这种情况下,应该是使用CountDownLatch。 希望读者诸君能够认真体会这点差异。

对于不限时版本

两者无太大差异,但在生产环境中要慎用,因为它不可控。

其它

另外再一点要注意的是,使用CountDownLatch时子任务的结果需要通过线程安全的容器收集如ConcurrentHashMap等,再处理。ExecutorService.invokeAll因为返回的是List,所以可以拿到结果,但如果想通过Future找到是哪个子任务,就只能根据顺序来确认了。

参考文献

我的简书

点赞
收藏
评论区
推荐文章
blmius blmius
2年前
MySQL:[Err] 1292 - Incorrect datetime value: ‘0000-00-00 00:00:00‘ for column ‘CREATE_TIME‘ at row 1
文章目录问题用navicat导入数据时,报错:原因这是因为当前的MySQL不支持datetime为0的情况。解决修改sql\mode:sql\mode:SQLMode定义了MySQL应支持的SQL语法、数据校验等,这样可以更容易地在不同的环境中使用MySQL。全局s
Karen110 Karen110
2年前
一篇文章带你了解JavaScript日期
日期对象允许您使用日期(年、月、日、小时、分钟、秒和毫秒)。一、JavaScript的日期格式一个JavaScript日期可以写为一个字符串:ThuFeb02201909:59:51GMT0800(中国标准时间)或者是一个数字:1486000791164写数字的日期,指定的毫秒数自1970年1月1日00:00:00到现在。1\.显示日期使用
Jacquelyn38 Jacquelyn38
2年前
2020年前端实用代码段,为你的工作保驾护航
有空的时候,自己总结了几个代码段,在开发中也经常使用,谢谢。1、使用解构获取json数据let jsonData  id: 1,status: "OK",data: 'a', 'b';let  id, status, data: number   jsonData;console.log(id, status, number )
皕杰报表之UUID
​在我们用皕杰报表工具设计填报报表时,如何在新增行里自动增加id呢?能新增整数排序id吗?目前可以在新增行里自动增加id,但只能用uuid函数增加UUID编码,不能新增整数排序id。uuid函数说明:获取一个UUID,可以在填报表中用来创建数据ID语法:uuid()或uuid(sep)参数说明:sep布尔值,生成的uuid中是否包含分隔符'',缺省为
Karen110 Karen110
2年前
​一篇文章总结一下Python库中关于时间的常见操作
前言本次来总结一下关于Python时间的相关操作,有一个有趣的问题。如果你的业务用不到时间相关的操作,你的业务基本上会一直用不到。但是如果你的业务一旦用到了时间操作,你就会发现,淦,到处都是时间操作。。。所以思来想去,还是总结一下吧,本次会采用类型注解方式。time包importtime时间戳从1970年1月1日00:00:00标准时区诞生到现在
Wesley13 Wesley13
2年前
Java日期时间API系列31
  时间戳是指格林威治时间1970年01月01日00时00分00秒起至现在的总毫秒数,是所有时间的基础,其他时间可以通过时间戳转换得到。Java中本来已经有相关获取时间戳的方法,Java8后增加新的类Instant等专用于处理时间戳问题。 1获取时间戳的方法和性能对比1.1获取时间戳方法Java8以前
Easter79 Easter79
2年前
Twitter的分布式自增ID算法snowflake (Java版)
概述分布式系统中,有一些需要使用全局唯一ID的场景,这种时候为了防止ID冲突可以使用36位的UUID,但是UUID有一些缺点,首先他相对比较长,另外UUID一般是无序的。有些时候我们希望能使用一种简单一些的ID,并且希望ID能够按照时间有序生成。而twitter的snowflake解决了这种需求,最初Twitter把存储系统从MySQL迁移
Wesley13 Wesley13
2年前
00:Java简单了解
浅谈Java之概述Java是SUN(StanfordUniversityNetwork),斯坦福大学网络公司)1995年推出的一门高级编程语言。Java是一种面向Internet的编程语言。随着Java技术在web方面的不断成熟,已经成为Web应用程序的首选开发语言。Java是简单易学,完全面向对象,安全可靠,与平台无关的编程语言。
Wesley13 Wesley13
2年前
MySQL部分从库上面因为大量的临时表tmp_table造成慢查询
背景描述Time:20190124T00:08:14.70572408:00User@Host:@Id:Schema:sentrymetaLast_errno:0Killed:0Query_time:0.315758Lock_
Python进阶者 Python进阶者
4个月前
Excel中这日期老是出来00:00:00,怎么用Pandas把这个去除
大家好,我是皮皮。一、前言前几天在Python白银交流群【上海新年人】问了一个Pandas数据筛选的问题。问题如下:这日期老是出来00:00:00,怎么把这个去除。二、实现过程后来【论草莓如何成为冻干莓】给了一个思路和代码如下:pd.toexcel之前把这