netty系列之:可以自动通知执行结果的Future,有见过吗?

谜团
• 阅读 1504

简介

在我的心中,JDK有两个经典版本,第一个就是现在大部分公司都在使用的JDK8,这个版本引入了Stream、lambda表达式和泛型,让JAVA程序的编写变得更加流畅,减少了大量的冗余代码。

另外一个版本要早点,还是JAVA 1.X的时代,我们称之为JDK1.5,这个版本引入了java.util.concurrent并发包,从此在JAVA中可以愉快的使用异步编程。

虽然先JDK已经发展到了17版本,但是并发这一块的变动并不是很大。受限于JDK要保持稳定的需求,所以concurrent并发包提供的功能并不能完全满足某些业务场景。所以依赖于JDK的包自行研发了属于自己的并发包。

当然,netty也不例外,一起来看看netty并发包都有那些优势吧。

JDK异步缘起

怎么在java中创建一个异步任务,或者开启一个异步的线程,每个人可能都有属于自己的回答。

大家第一时间可能想到的是创建一个实现Runnable接口的类,然后将其封装到Thread中运行,如下所示:

new Thread(new(RunnableTask())).start()

每次都需要new一个Thread是JDK大神们不可接受的,于是他们产生了一个将thread调用进行封装的想法,而这个封装类就叫做Executor.

Executor是一个interface,首先看一下这个interface的定义:

public interface Executor {

    void execute(Runnable command);
}

接口很简单,就是定义了一个execute方法来执行传入的Runnable命令。

于是我们可以这样来异步开启任务:

   Executor executor = anExecutor;
   executor.execute(new RunnableTask1());
   executor.execute(new RunnableTask2());

看到这里,聪明的小伙伴可能就要问了,好像不对呀,Executor自定义了execute接口,好像跟异步和多线程并没有太大的关系呀?

别急,因为Executor是一个接口,所以我们可以有很多实现。比如下面的直接执行Runnable,让Runnable在当前线程中执行:

 class DirectExecutor implements Executor {
   public void execute(Runnable r) {
     r.run();
   }
 }

又比如下面的在一个新的线程中执行Runnable:

 class ThreadPerTaskExecutor implements Executor {
   public void execute(Runnable r) {
     new Thread(r).start();
   }
 }

又比如下面的将多个任务存放在一个Queue中,执行完一个任务再执行下一个任务的序列执行:

 class SerialExecutor implements Executor {
   final Queue<Runnable> tasks = new ArrayDeque<Runnable>();
   final Executor executor;
   Runnable active;

   SerialExecutor(Executor executor) {
     this.executor = executor;
   }

   public synchronized void execute(final Runnable r) {
     tasks.offer(new Runnable() {
       public void run() {
         try {
           r.run();
         } finally {
           scheduleNext();
         }
       }
     });
     if (active == null) {
       scheduleNext();
     }
   }

   protected synchronized void scheduleNext() {
     if ((active = tasks.poll()) != null) {
       executor.execute(active);
     }
   }
 }

这些Executor都非常完美。但是他们都只能提交任务,提交任务之后就什么都不知道了。这对于好奇的宝宝们是不可忍受的,因为我们需要知道执行的结果,或者对执行任务进行管控。

于是就有了ExecutorService。ExecutorService也是一个接口,不过他提供了shutdown方法来停止接受新的任务,和isShutdown来判断关闭的状态。

除此之外,它还提供了单独调用任务的submit方法和批量调用任务的invokeAll和invokeAny方法。

既然有了execute方法,submit虽然和execute方法基本上执行了相同的操作,但是在方法参数和返回值上有稍许区别。

首先是返回值,submit返回的是Future,Future表示异步计算的结果。 它提供了检查计算是否完成、等待其完成以及检索计算结果的方法。 Future提供了get方法,用来获取计算结果。但是如果调用get方法的同时,计算结果并没有准备好,则会发生阻塞。

其次是submit的参数,一般来说只有Callable才会有返回值,所以我们常用的调用方式是这样的:

<T> Future<T> submit(Callable<T> task);

如果我们传入Runnable,那么虽然也返回一个Future,但是返回的值是null:

Future<?> submit(Runnable task);

如果我又想传入Runnable,又想Future有返回值怎么办呢?

古人告诉我们,鱼和熊掌不可兼得!但是现在是2021年了,有些事情是可以发生改变了:

<T> Future<T> submit(Runnable task, T result);

上面我们可以传入一个result,当Future中的任务执行完毕之后直接将result返回。

既然ExecutorService这么强大,如何创建ExecutorService呢?

最简单的办法就是用new去创建对应的实例。但是这样不够优雅,于是JDK提供了一个Executors工具类,他提供了多种创建不同ExecutorService的静态方法,非常好用。

netty中的Executor

为了兼容JDK的并发框架,虽然netty中也有Executor,但是netty中的Executor都是从JDK的并发包中衍生出来的。

具体而言,netty中的Executor叫做EventExecutor,他继承自EventExecutorGroup:

public interface EventExecutor extends EventExecutorGroup 

而EventExecutorGroup又继承自JDK的ScheduledExecutorService:

public interface EventExecutorGroup extends ScheduledExecutorService, Iterable<EventExecutor>

为什么叫做Group呢?这个Group的意思是它里面包含了一个EventExecutor的集合。这些结合中的EventExecutor通过Iterable的next方法来进行遍历的。

这也就是为什么EventExecutorGroup同时继承了Iterable类。

然后netty中的其他具体Executor的实现再在EventExecutor的基础之上进行扩展。从而得到了netty自己的EventExecutor实现。

Future的困境和netty的实现

那么JDK中的Future会有什么问题呢?前面我们也提到了JDK中的Future虽然保存了计算结果,但是我们要获取的时候还是需要通过调用get方法来获取。

但是如果当前计算结果还没出来的话,get方法会造成当前线程的阻塞。

别怕,这个问题在netty中被解决了。

先看下netty中Future的定义:

public interface Future<V> extends java.util.concurrent.Future<V> 

可以看到netty中的Future是继承自JDK的Future。同时添加了addListener和removeListener,以及sync和await方法。

先讲一下sync和await方法,两者都是等待Future执行结束。不同之处在于,如果在执行过程中,如果future失败了,则会抛出异常。而await方法不会。

那么如果不想同步调用Future的get方法来获得计算结果。则可以给Future添加listener。

这样当Future执行结束之后,会自动通知listener中的方法,从而实现异步通知的效果,其使用代码如下:

EventExecutorGroup group = new DefaultEventExecutorGroup(4); // 4 threads
Future<?> f = group.submit(new Runnable() { ... });
f.addListener(new FutureListener<?> {
  public void operationComplete(Future<?> f) {
    ..
  }
});

还有一个问题,每次我们提交任务的时候,都需要创建一个EventExecutorGroup,有没有不需要创建就可以提交任务的方法呢?

有的!

netty为那些没有时间创建新的EventExecutorGroup的同志们,特意创建一个全局的GlobalEventExecutor,这是可以直接使用的:

GlobalEventExecutor.INSTANCE.execute(new Runnable() { ... });

GlobalEventExecutor是一个单线程的任务执行器,每隔一秒钟回去检测有没有新的任务,有的话就提交到executor执行。

总结

netty为JDK的并发包提供了非常有用的扩展。大家可以直接使用。

本文已收录于 http://www.flydean.com/46-netty-future-executor/

最通俗的解读,最深刻的干货,最简洁的教程,众多你不知道的小技巧等你来发现!

欢迎关注我的公众号:「程序那些事」,懂技术,更懂你!

点赞
收藏
评论区
推荐文章
blmius blmius
3年前
MySQL:[Err] 1292 - Incorrect datetime value: ‘0000-00-00 00:00:00‘ for column ‘CREATE_TIME‘ at row 1
文章目录问题用navicat导入数据时,报错:原因这是因为当前的MySQL不支持datetime为0的情况。解决修改sql\mode:sql\mode:SQLMode定义了MySQL应支持的SQL语法、数据校验等,这样可以更容易地在不同的环境中使用MySQL。全局s
Wesley13 Wesley13
3年前
MySQL部分从库上面因为大量的临时表tmp_table造成慢查询
背景描述Time:20190124T00:08:14.70572408:00User@Host:@Id:Schema:sentrymetaLast_errno:0Killed:0Query_time:0.315758Lock_
美凌格栋栋酱 美凌格栋栋酱
6个月前
Oracle 分组与拼接字符串同时使用
SELECTT.,ROWNUMIDFROM(SELECTT.EMPLID,T.NAME,T.BU,T.REALDEPART,T.FORMATDATE,SUM(T.S0)S0,MAX(UPDATETIME)CREATETIME,LISTAGG(TOCHAR(
皕杰报表之UUID
​在我们用皕杰报表工具设计填报报表时,如何在新增行里自动增加id呢?能新增整数排序id吗?目前可以在新增行里自动增加id,但只能用uuid函数增加UUID编码,不能新增整数排序id。uuid函数说明:获取一个UUID,可以在填报表中用来创建数据ID语法:uuid()或uuid(sep)参数说明:sep布尔值,生成的uuid中是否包含分隔符'',缺省为
Peter20 Peter20
4年前
mysql中like用法
like的通配符有两种%(百分号):代表零个、一个或者多个字符。\(下划线):代表一个数字或者字符。1\.name以"李"开头wherenamelike'李%'2\.name中包含"云",“云”可以在任何位置wherenamelike'%云%'3\.第二个和第三个字符是0的值wheresalarylike'\00%'4\
Wesley13 Wesley13
3年前
VBox 启动虚拟机失败
在Vbox(5.0.8版本)启动Ubuntu的虚拟机时,遇到错误信息:NtCreateFile(\\Device\\VBoxDrvStub)failed:0xc000000034STATUS\_OBJECT\_NAME\_NOT\_FOUND(0retries) (rc101)Makesurethekern
Wesley13 Wesley13
3年前
FLV文件格式
1.        FLV文件对齐方式FLV文件以大端对齐方式存放多字节整型。如存放数字无符号16位的数字300(0x012C),那么在FLV文件中存放的顺序是:|0x01|0x2C|。如果是无符号32位数字300(0x0000012C),那么在FLV文件中的存放顺序是:|0x00|0x00|0x00|0x01|0x2C。2.  
Easter79 Easter79
3年前
Twitter的分布式自增ID算法snowflake (Java版)
概述分布式系统中,有一些需要使用全局唯一ID的场景,这种时候为了防止ID冲突可以使用36位的UUID,但是UUID有一些缺点,首先他相对比较长,另外UUID一般是无序的。有些时候我们希望能使用一种简单一些的ID,并且希望ID能够按照时间有序生成。而twitter的snowflake解决了这种需求,最初Twitter把存储系统从MySQL迁移
Wesley13 Wesley13
3年前
mysql设置时区
mysql设置时区mysql\_query("SETtime\_zone'8:00'")ordie('时区设置失败,请联系管理员!');中国在东8区所以加8方法二:selectcount(user\_id)asdevice,CONVERT\_TZ(FROM\_UNIXTIME(reg\_time),'08:00','0
Wesley13 Wesley13
3年前
PHP创建多级树型结构
<!lang:php<?php$areaarray(array('id'1,'pid'0,'name''中国'),array('id'5,'pid'0,'name''美国'),array('id'2,'pid'1,'name''吉林'),array('id'4,'pid'2,'n
Python进阶者 Python进阶者
1年前
Excel中这日期老是出来00:00:00,怎么用Pandas把这个去除
大家好,我是皮皮。一、前言前几天在Python白银交流群【上海新年人】问了一个Pandas数据筛选的问题。问题如下:这日期老是出来00:00:00,怎么把这个去除。二、实现过程后来【论草莓如何成为冻干莓】给了一个思路和代码如下:pd.toexcel之前把这