RxJava2 只看这一篇文章就够了

循循善诱
• 阅读 512

0. 简介

RxJava 其实就是提供一套异步编程的 API,这套 API 是基于观察者模式的,而且是链式调用的,所以使用 RxJava 编写的代码的逻辑会非常简洁。
RxJava 有以下三个基本的元素:

  • 被观察者(Observable)
  • 观察者(Observer)
  • 订阅(subscribe)

下面来说说以上三者是如何协作的:
首先在 gradle 文件中添加依赖:

implementation 'io.reactivex.rxjava2:rxjava:2.1.4'
implementation 'io.reactivex.rxjava2:rxandroid:2.0.2'

创建被观察者:

Observable observable = Observable.create(new ObservableOnSubscribe<Integer>() {
    @Override
    public void subscribe(ObservableEmitter<Integer> e) throws Exception {
        Log.d(TAG, "=========================currentThread name: " + Thread.currentThread().getName());
        e.onNext(1);
        e.onNext(2);
        e.onNext(3);
        e.onComplete();
    }
});

创建观察者:

Observer observer = new Observer<Integer>() {
    @Override
    public void onSubscribe(Disposable d) {
        Log.d(TAG, "======================onSubscribe");
    }

    @Override
    public void onNext(Integer integer) {
        Log.d(TAG, "======================onNext " + integer);
    }

    @Override
    public void onError(Throwable e) {
        Log.d(TAG, "======================onError");
    }

    @Override
    public void onComplete() {
        Log.d(TAG, "======================onComplete");
    }
};




observable.subscribe(observer);
复制代码这里其实也可以使用链式调用:
Observable.create(new ObservableOnSubscribe < Integer > () {
    @Override
    public void subscribe(ObservableEmitter < Integer > e) throws Exception {
        Log.d(TAG, "=========================currentThread name: " + Thread.currentThread().getName());
        e.onNext(1);
        e.onNext(2);
        e.onNext(3);
        e.onComplete();
    }
})
.subscribe(new Observer < Integer > () {
    @Override
    public void onSubscribe(Disposable d) {
        Log.d(TAG, "======================onSubscribe");
    }

    @Override
    public void onNext(Integer integer) {
        Log.d(TAG, "======================onNext " + integer);
    }

    @Override
    public void onError(Throwable e) {
        Log.d(TAG, "======================onError");
    }

    @Override
    public void onComplete() {
        Log.d(TAG, "======================onComplete");
    }
});
复制代码被观察者发送的事件有以下几种,总结如下表:



事件种类
作用




onNext()
发送该事件时,观察者会回调 onNext() 方法


onError()
发送该事件时,观察者会回调 onError() 方法,当发送该事件之后,其他事件将不会继续发送


onComplete()
发送该事件时,观察者会回调 onComplete() 方法,当发送该事件之后,其他事件将不会继续发送



其实可以把 RxJava 比喻成一个做果汁,家里有很多种水果(要发送的原始数据),你想榨点水果汁喝一下,这时候你就要想究竟要喝什么水果汁呢?如果你想喝牛油果雪梨柠檬汁,那你就要把这三种水果混在一起榨汁(使用各种操作符变换你想发送给观察者的数据),榨完后,你就可以喝上你想要的果汁了(把处理好的数据发送给观察者)。
总结如下图:

![图片](https://p1-jj.byteimg.com/tos-cn-i-t2oaga2asx/gold-user-assets/2018/5/26/1639a8ee56b13c41~tplv-t2oaga2asx-zoom-in-crop-mark:3024:0:0:0.awebp)

下面就来讲解 RxJava 各种常见的操作符。
1. 创建操作符
以下就是讲解创建被观察者的各种操作符。
1.1 create()
方法预览:
public static <T> Observable<T> create(ObservableOnSubscribe<T> source)
复制代码有什么用:
创建一个被观察者
怎么用:
Observable<String> observable = Observable.create(new ObservableOnSubscribe<String>() {
    @Override
    public void subscribe(ObservableEmitter<String> e) throws Exception {
        e.onNext("Hello Observer");
        e.onComplete();
    }
});
复制代码上面的代码非常简单,创建 ObservableOnSubscribe 并重写其 subscribe 方法,就可以通过 ObservableEmitter 发射器向观察者发送事件。
以下创建一个观察者,来验证这个被观察者是否成功创建。
Observer<String> observer = new Observer<String>() {
    @Override
    public void onSubscribe(Disposable d) {

    }

    @Override
    public void onNext(String s) {
        Log.d("chan","=============onNext " + s);
    }

    @Override
    public void onError(Throwable e) {

    }

    @Override
    public void onComplete() {
        Log.d("chan","=============onComplete ");
    }
};
        
observable.subscribe(observer);
        
复制代码打印结果:
05-20 16:16:50.654 22935-22935/com.example.louder.rxjavademo D/chan: =============onNext Hello Observer
=============onComplete
复制代码1.2 just()
方法预览:
public static <T> Observable<T> just(T item) 
......
public static <T> Observable<T> just(T item1, T item2, T item3, T item4, T item5, T item6, T item7, T item8, T item9, T item10)
复制代码有什么用?
创建一个被观察者,并发送事件,发送的事件不可以超过10个以上。
怎么用?
Observable.just(1, 2, 3)
.subscribe(new Observer < Integer > () {
    @Override
    public void onSubscribe(Disposable d) {
        Log.d(TAG, "=================onSubscribe");
    }

    @Override
    public void onNext(Integer integer) {
        Log.d(TAG, "=================onNext " + integer);
    }

    @Override
    public void onError(Throwable e) {
        Log.d(TAG, "=================onError ");
    }

    @Override
    public void onComplete() {
        Log.d(TAG, "=================onComplete ");
    }
});

复制代码上面的代码直接使用链式调用,代码也非常简单,这里就不细说了,看看打印结果:
05-20 16:27:26.938 23281-23281/? D/chan: =================onSubscribe
=================onNext 1
=================onNext 2
=================onNext 3
=================onComplete 
复制代码1.3 From 操作符
1.3.1  fromArray()
方法预览:
public static <T> Observable<T> fromArray(T... items)作者:玉刚说链接:

有什么用?

这个方法和 just() 类似,只不过 fromArray 可以传入多于10个的变量,并且可以传入一个数组。

怎么用?

Integer array[] = {1, 2, 3, 4};
Observable.fromArray(array)
.subscribe(new Observer < Integer > () {
    @Override
    public void onSubscribe(Disposable d) {
        Log.d(TAG, "=================onSubscribe");
    }

    @Override
    public void onNext(Integer integer) {
        Log.d(TAG, "=================onNext " + integer);
    }

    @Override
    public void onError(Throwable e) {
        Log.d(TAG, "=================onError ");
    }

    @Override
    public void onComplete() {
        Log.d(TAG, "=================onComplete ");
    }
});

复制代码代码和 just() 基本上一样,直接看打印结果:

05-20 16:35:23.797 23574-23574/com.example.louder.rxjavademo D/chan: =================onSubscribe
=================onNext 1
=================onNext 2
=================onNext 3
=================onNext 4
=================onComplete 

1.3.2 fromCallable()

方法预览:

public static <T> Observable<T> fromCallable(Callable<? extends T> supplier)

复制代码有什么用?

这里的 Callablejava.util.concurrent 中的 Callable,CallableRunnable 的用法基本一致,只是它会返回一个结果值,这个结果值就是发给观察者的。

怎么用?

Observable.fromCallable(new Callable < Integer > () {

    @Override
    public Integer call() throws Exception {
        return 1;
    }
})
.subscribe(new Consumer < Integer > () {
    @Override
    public void accept(Integer integer) throws Exception {
        Log.d(TAG, "================accept " + integer);
    }
});
05-26 13:01:43.009 6890-6890/? D/chan: ================accept 1
复制代码1.3.3 fromFuture()
方法预览:
public static <T> Observable<T> fromFuture(Future<? extends T> future)

复制代码有什么用?

参数中的 Futurejava.util.concurrent 中的 Future,Future 的作用是增加了 cancel() 等方法操作 Callable,它可以通过 get() 方法来获取 Callable 返回的值。

点赞
收藏
评论区
推荐文章
代码哈士奇 代码哈士奇
4年前
uni-app使用uniCloud时做类似于拦截器和请求结果再处理(类似于请求和响应拦截)
想要在使用uniCloud的使用拦截请求怎么办再次封装uniCloud.callFunction特别说明这里的token是我自己存储成token如果你使用了uniid官方的推荐是('uniidtoken')('uniidtokenexpired')存储了uniidtoken后请求会自动携带这里的res.result.code0是因为我的云
徐小夕 徐小夕
4年前
什么是低代码(Low-Code)?
阿里云云原生应用研发平台EMAS彭群(楚衡)https://www.cnblogs.com/aliyunemas/p/14004815.html一、前言如果选择用一个关键词来代表即将过去的2020年,我相信所有人都会认同是“新冠”。疫情来得太快就像龙卷风,短短数月就阻断了全世界范围内无数人与人之间的物理连接。但好在,我们已经全面迈入
Wesley13 Wesley13
3年前
java通过sina端口提取股票历史数据并存入MySQL
 1.提取股票代码代码见:http://www.oschina.net/code/snippet\_2688840\_55337(http://www.oschina.net/code/snippet_2688840_55337) 2抓取sina股票的json页面数据;代码见:http://www.oschina.net/code/snip
虾米大王 虾米大王
3年前
java代码099
code099.jspInserttitlehere$pageScope.user.name
虾米大王 虾米大王
3年前
java代码020
code020.jsp解决中文乱码name参数的值为:sex参数的值为:
Stella981 Stella981
3年前
Linux自动检测网站心跳通知shell脚本
!/bin/bashLIST("http://xxxx.com")NAME("评价系统getwindowList接口")for((i0;i<${LIST@};i))doHTTP_CODEcurlo/dev/nullsw"%{http_code}""${LIST
Wesley13 Wesley13
3年前
Oracle:Pivot 转多列并包含多个名称
SELECTFROM(SELECTl.DISTRIBUTOR_ID,d.SKU_CODE,d.WH_CODE,d.ORDER_PACKAGES,d.PRICE,d.YEARLY||d.MONTHLYasYM,d
Stella981 Stella981
3年前
Guava中的EventBus
其实代码中经常会遇到跟主流程分支出去的异步逻辑,比如说:爬虫处理逻辑中,进行心跳打点,订单处理中,需要触发用户的个人信息变更等。这个时候就应该使用观察者模式。EventBus是Guava的事件处理机制,是设计模式中的观察者模式(生产/消费者编程模型)的优雅实现。对于事件监听和发布订阅模式,EventBus是一个非常优雅和简单解决方案,我们不用创建
Wesley13 Wesley13
3年前
mysql _01
\where中不可以使用别名,因为where先于select执行以下是错误的SQL:select    code,   continentcont,   name,   populationpop from    country where    cont'asia'
Stella981 Stella981
3年前
PowerDesigner列名、注释内容互换
在用PowerDesigner时,常常在NAME或Comment中写中文在Code中写英文,Name只会显示给我们看,Code会使用在代码中,但Comment中的文字会保存到数据库TABLE的Description中,有时候我们写好了Name再写一次Comment很麻烦,以下两段代码就可以解决这个问题。在PowerDesigner中PowerDesig
代码的艺术-Writing Code Like a Pianist
前言如何评定一个系统的质量?什么样的系统或者软件可以称之为高质量?可以从三个角度来看,一是架构设计,例如技术选型、分布式系统中的数据一致性考虑等,二是项目管理,无论是敏捷开发还是瀑布式开发,都应当对技术负债进行清理,对代码进行重构等,最后离不开的是代码质量