Reactor中的Thread和Scheduler

Stella981
• 阅读 716

简介

今天我们要介绍的是Reactor中的多线程模型和定时器模型,Reactor之前我们已经介绍过了,它实际上是观察者模式的延伸。

所以从本质上来说,Reactor是和多线程无关的。你可以把它用在多线程或者不用在多线程。

今天将会给大家介绍一下如何在Reactor中使用多线程和定时器模型。

Thread多线程

先看一下之前举的Flux的创建的例子:

        Flux<String> flux = Flux.generate(
                () -> 0,
                (state, sink) -> {
                    sink.next("3 x " + state + " = " + 3*state);
                    if (state == 10) sink.complete();
                    return state + 1;
                });

        flux.subscribe(System.out::println);

可以看到,不管是Flux generator还是subscriber,他们实际上都是运行在同一个线程中的。

如果我们想让subscribe发生在一个新的线程中,我们需要新启动一个线程,然后在线程内部进行subscribe操作。

        Mono<String> mono = Mono.just("hello ");

        Thread t = new Thread(() -> mono
                .map(msg -> msg + "thread ")
                .subscribe(v ->
                        System.out.println(v + Thread.currentThread().getName())
                )
        );
        t.start();
        t.join();

上面的例子中,Mono在主线程中创建,而subscribe发生在新启动的Thread中。

Schedule定时器

很多情况下,我们的publisher是需要定时去调用一些方法,来产生元素的。Reactor提供了一个新的Schedule类来负责定时任务的生成和管理。

Scheduler是一个接口:

public interface Scheduler extends Disposable 

它定义了一些定时器中必须要实现的方法:

比如立即执行的:

Disposable schedule(Runnable task);

延时执行的:

default Disposable schedule(Runnable task, long delay, TimeUnit unit)

和定期执行的:

default Disposable schedulePeriodically(Runnable task, long initialDelay, long period, TimeUnit unit)

Schedule有一个工具类叫做Schedules,它提供了多个创建Scheduler的方法,它的本质就是对ExecutorService和ScheduledExecutorService进行封装,将其做为Supplier来创建Schedule。

简单点看Schedule就是对ExecutorService的封装。

Schedulers工具类

Schedulers工具类提供了很多个有用的工具类,我们来详细介绍一下:

Schedulers.immediate():

提交的Runnable将会立马在当前线程执行。

Schedulers.single():

使用同一个线程来执行所有的任务。

Schedulers.boundedElastic():

创建一个可重用的线程池,如果线程池中的线程在长时间内都没有被使用,那么将会被回收。boundedElastic会有一个最大的线程个数,一般来说是CPU cores x 10。 如果目前没有可用的worker线程,提交的任务将会被放入队列等待。

Schedulers.parallel():

创建固定个数的工作线程,个数和CPU的核数相关。

Schedulers.fromExecutorService(ExecutorService):

从一个现有的线程池创建Scheduler。

Schedulers.newXXX:

Schedulers提供了很多new开头的方法,来创建各种各样的Scheduler。

我们看一个Schedulers的具体应用,我们可以指定特定的Scheduler来产生元素:

Flux.interval(Duration.ofMillis(300), Schedulers.newSingle("test"))

publishOn 和 subscribeOn

publishOn和subscribeOn主要用来进行切换Scheduler的执行上下文。

先讲一个结论,就是在链式调用中,publishOn可以切换Scheduler,但是subscribeOn并不会起作用。

这是因为真正的publish-subscribe关系只有在subscriber开始subscribe的时候才建立。

下面我们来具体看一下这两个方法的使用情况:

publishOn

publishOn可以在链式调用的过程中,进行publish的切换:

    [@Test](https://my.oschina.net/azibug)
    public void usePublishOn() throws InterruptedException {
        Scheduler s = Schedulers.newParallel("parallel-scheduler", 4);
        final Flux<String> flux = Flux
                .range(1, 2)
                .map(i -> 10 + i + ":"+ Thread.currentThread())
                .publishOn(s)
                .map(i -> "value " + i+":"+ Thread.currentThread());

        new Thread(() -> flux.subscribe(System.out::println),"ThreadA").start();
        System.out.println(Thread.currentThread());
        Thread.sleep(5000);
    }

上面我们创建了一个名字为parallel-scheduler的scheduler。

然后创建了一个Flux,Flux先做了一个map操作,然后切换执行上下文到parallel-scheduler,最后右执行了一次map操作。

最后,我们采用一个新的线程来进行subscribe的输出。

先看下输出结果:

Thread[main,5,main]
value 11:Thread[ThreadA,5,main]:Thread[parallel-scheduler-1,5,main]
value 12:Thread[ThreadA,5,main]:Thread[parallel-scheduler-1,5,main]

可以看到,主线程的名字是Thread。Subscriber线程的名字是ThreadA。

那么在publishOn之前,map使用的线程就是ThreadA。 而在publishOn之后,map使用的线程就切换到了parallel-scheduler线程池。

subscribeOn

subscribeOn是用来切换Subscriber的执行上下文,不管subscribeOn出现在调用链的哪个部分,最终都会应用到整个调用链上。

我们看一个例子:

    [@Test](https://my.oschina.net/azibug)
    public void useSubscribeOn() throws InterruptedException {
        Scheduler s = Schedulers.newParallel("parallel-scheduler", 4);
        final Flux<String> flux = Flux
                .range(1, 2)
                .map(i -> 10 + i + ":" + Thread.currentThread())
                .subscribeOn(s)
                .map(i -> "value " + i + ":"+ Thread.currentThread());

        new Thread(() -> flux.subscribe(System.out::println), "ThreadA").start();
        Thread.sleep(5000);
    }

同样的,上面的例子中,我们使用了两个map,然后在两个map中使用了一个subscribeOn用来切换subscribe执行上下文。

看下输出结果:

value 11:Thread[parallel-scheduler-1,5,main]:Thread[parallel-scheduler-1,5,main]
value 12:Thread[parallel-scheduler-1,5,main]:Thread[parallel-scheduler-1,5,main]

可以看到,不管哪个map,都是用的是切换过的parallel-scheduler。

本文的例子learn-reactive

本文作者:flydean程序那些事

本文链接:http://www.flydean.com/reactor-thread-scheduler/

本文来源:flydean的博客

欢迎关注我的公众号:「程序那些事」最通俗的解读,最深刻的干货,最简洁的教程,众多你不知道的小技巧等你来发现!

点赞
收藏
评论区
推荐文章
blmius blmius
2年前
MySQL:[Err] 1292 - Incorrect datetime value: ‘0000-00-00 00:00:00‘ for column ‘CREATE_TIME‘ at row 1
文章目录问题用navicat导入数据时,报错:原因这是因为当前的MySQL不支持datetime为0的情况。解决修改sql\mode:sql\mode:SQLMode定义了MySQL应支持的SQL语法、数据校验等,这样可以更容易地在不同的环境中使用MySQL。全局s
皕杰报表之UUID
​在我们用皕杰报表工具设计填报报表时,如何在新增行里自动增加id呢?能新增整数排序id吗?目前可以在新增行里自动增加id,但只能用uuid函数增加UUID编码,不能新增整数排序id。uuid函数说明:获取一个UUID,可以在填报表中用来创建数据ID语法:uuid()或uuid(sep)参数说明:sep布尔值,生成的uuid中是否包含分隔符'',缺省为
Wesley13 Wesley13
2年前
java多线程和异步回调
   在实际开发过程中遇到的多线程情况不多,但是在生产环境中多线程是最基本的情况,java面试时也会考到,所以看看多线程的知识还是很有必要的。 Thread,Runnable,Callable,Future,FutureTask,Executors这是java常见的接口和类。  thread.run():线程具体要执行的代码,thread.jo
Wesley13 Wesley13
2年前
java多线程实现的三种方式
JAVA多线程实现方式主要有三种:继承Thread类、实现Runnable接口、使用ExecutorService、Callable、Future实现有返回结果的多线程。其中前两种方式线程执行完后都没有返回值,只有最后一种是带返回值的。1、继承Thread类实现多线程继承Thread类的方法尽管被我列为一种多线程实现方式,但Thread本质上也是实现
小万哥 小万哥
12个月前
C++多线程编程和同步机制:详解和实例演示
C中的多线程编程和同步机制使得程序员可以利用计算机的多核心来提高程序的运行效率和性能。本文将介绍多线程编程和同步机制的基本概念和使用方法。多线程编程基础在C中,使用库来创建和管理线程。线程可以通过函数、成员函数或者Lambda表达式来实现。以下是一
Stella981 Stella981
2年前
Netty之大名鼎鼎的EventLoop
EventLoopGroup与Reactor:前面的章节中我们已经知道了,一个Netty程序启动时,至少要指定一个EventLoopGroup(如果使用到的是NIO,通常是指NioEventLoopGroup),那么,这个NioEventLoopGroup在Netty中到底扮演着什么角色呢?我们知道,Netty是Reactor模型的
Stella981 Stella981
2年前
Django中Admin中的一些参数配置
设置在列表中显示的字段,id为django模型默认的主键list_display('id','name','sex','profession','email','qq','phone','status','create_time')设置在列表可编辑字段list_editable
Wesley13 Wesley13
2年前
Java面试问题——线程全面详解总结
一、多线程是什么?为什么要用多线程?介绍多线程之前要介绍线程,介绍线程则离不开进程。首先进程 :是一个正在执行中的程序,每一个进程执行都有一个执行顺序,该顺序是一个执行路径,或者叫一个控制单元;线程:就是进程中的一个独立控制单元,线程在控制着进程的执行。一个进程中至少有一个进程。多线程:一个进程中不只有一
Python进阶者 Python进阶者
3个月前
Excel中这日期老是出来00:00:00,怎么用Pandas把这个去除
大家好,我是皮皮。一、前言前几天在Python白银交流群【上海新年人】问了一个Pandas数据筛选的问题。问题如下:这日期老是出来00:00:00,怎么把这个去除。二、实现过程后来【论草莓如何成为冻干莓】给了一个思路和代码如下:pd.toexcel之前把这
韦康 韦康
1个月前
C++从0实现百万并发Reactor服务器(完结)
C从0实现百万并发Reactor服务器(完结)download》itzcw.com/9300/什么是Reactor服务器Reactor服务器是一种基于事件驱动的服务器架构,通常用于处理高并发的网络请求。它的核心思想是将服务器端的I/O操作和事件处理分离