一、RxJava简介
What is Rx?
Rx,即Reactive Extensions,是一种编程模型,目标是提供一致的编程接口,有助于更方便的处理异步数据流。响应式编程主要由Observable、Operator和Subscriber组成,一般来讲,响应式编程的信息流如下:
也就是Observable是事件的生产者,Subscriber是事件最终的消费者,并且,Subscriber通常是在主线程中执行,只负责对事件进行响应,这就要求其处理的事情越少越简单越好,而对事件的处理尽量交由Observable和Operator。
RxJava是什么东东?
|
|
本质上来讲,RxJava就是一个封装异步操作的库!
RxJava核心概念
RxJava最核心的2个概念就是:Observable(被观察者、事件源)和Observer(一般用Subscriber,观察者)。Observable产生一系列事件(触摸事件,网络请求,文件读写),并将这些事件发送出去,Observer处理这些事件所产生的结果。
一个Observable可以发出多个事件,直到结束或者出错,每发出一个事件,就会调用它的Subscriber的onNext()方法,然后再调用onComplete()或者onError()方法作为结束。一个Observable如果没有任何Subscriber,则不会发出任何事件(可简单理解为一段不会执行的代码),也就是Observable和Subscriber必须成对使用。一个Observable可以同时被多个Subscriber订阅。
举一个最简单的例子来理解Observable和Subscriber,Android中通常这么用:
|
|
这里我们可以简单的理解view就是Observable,OnClickListener就是Subscriber,view和OlClickListener通过setOnClickListener()方法进行绑定,如下图所示:
RxJava基本使用步骤
举一个简单的例子来示例RxJava基本使用步骤:向屏幕发送一个字符串,并显示出来:
创建Observable
这里定义的Observable仅仅发出了一个”Hello World”字符串,然后就结束了。
创建Subscriber
创建一个Subscriber来处理Observable发出的字符串这个事件,其中onNext(),onCompleted(),onError()是回调方法供Observable调用。
Observable和Subscriber的绑定
最后将创建的observable和subscriber绑定,subscribe()会返回一个Subscription对象(后文会说明这个对象的作用)。
以上便是RxJava使用的“三步走”过程,简单明了。
为什么要用RxJava?
上面给出的打印字符串的例子,如果不用RxJava可能一两行就能搞定,那为什么还要如此大费周章的使用RxJava呢?的确,大多数场景下,RxJava是画蛇添足,但别忘了开头所说,RxJava是为异步而生!
也许Handler,new Thread,AsyncTask…等等已经在脑子里根深蒂固了,不错,这些已经让异步实现更简洁了,但,RxJava会让异步变得更!简!!单!!!并且,随着程序逻辑变得越来越复杂,RxJava依然能够保持代码的简洁性和可读性。注意,这里所说的简单,并不是说代码量会更少,而是指逻辑、结构会更加简单明了。
同样,举一个简单例子:给定一个图片url,设置ImageView【注:这里不考虑Picasso、Fresco等图片库】,
先来看看new Thread()是怎么做
AsyncTask实现的代码量不会比这更少。
再来看看RxJava是怎么实现的
暂且不要管上面每行代码是什么意思,看完下文之后自然会明白。上例也许并不恰当,粗看RxJava行数更多,的确,但RxJava以这种链式结构来实现异步,不至于将将代码搞得到处都是。试想,在此基础上作一点扩展,给定10个url来设置一组ImageView呢?来看看RxJava怎么做:
对的,就是这么简单,只需改动一两个地方,就可以很完美的应对扩展。这也就是上面所说的,异步操作越复杂,越能体现出RxJava的优势!
代码简化
前面举了打印字符的例子,那个例子中代码过于复杂,我们关注的其实也就是一个回调方法而已,那么,能将这些代码精简些吗?实际上,RxJava内置了很多便捷的函数方便我们来简化代码。下面列出一些常用的简化版方法:
Observable简化
Single
Single是Observable的一个精简版,其功能基本与Observable一样;Observable在某些场景下过于重量级,需要关注其回调方法的onNext()/onComplete()/onError(),但实际上大部分场合只需关注onNext()即可,Single正是这么干的。
Observable.just(T…)
Observable.just()用于创建只发出一个事件就结束的Observable对象:
如果传入的是一个数据,则将传入的参数依次发送出去。
Observable.from(T[])
将传入的数组或者Iterator拆分成具体对象,然后依次发送出去。
本质上同上面的just()差不多,归根到底和create()是等价的。
Subscribe的简化
Subscribe有3个方法,其实大部分时候我们只需要关心onNext()方法而不用在意onComplete()和onError()(上面的例子中不难看出这两个方法占用了不少代码体积),这时候我们可以使用Action类(Action类无返回值),RxJava提供了多个参数的Action类,Action1, Action2, Action3, Action4,…,分别表示能处理多少个结果。
Observable.subscribe()方法有个重载的版本,接受3个Action1类型的参数,分别对应其onNext(),onCompleted(),OnError()方法:
如果不关心onError和onCompleted,则只需一个参数即可:
鉴于此,上例中的代码最终可以简化为:
Function简化
RxJava中提供了类似上述Action类的简化,同时也提供了包装多个参数并且有返回值的类Func,
以上便是Func1的源码,T为输入参数,R为返回值;同理,Func2接收两个输入一个返回,以此类推。Func在中间变换时非常重要。
二、RxJava操作符
Why Operator?
对Rx来说,Observable和Subscriber仅仅是个开始,它们本身不过是标准观察者模式的一些轻量级扩展,目的是为了更好的处理事件序列。但Rx真正强大的地方在于它的操作符!
操作符是为了解决对Observable对象的变换问题,其目的在于对Observable发出的事件在最终的Subscriber得到事件结果之前对Observable发出的事件进行修改。变换不仅能处理单个事件,也能同时处理整个序列中的事件,将这些事件转换成不同的事件。
为什么需要操作符?根据响应式函数编程的概念,Subscriber更应该做的事情是“响应”,响应Observable发出的事件,仅此而已!Observable做的事情仅仅是产生事件,事件产生了就再也不关我事!那么问题来了,如果在事件处理完成之前,想对事件做些特别的处理呢?(这种需求大多数场景下应该都会有的吧)。RxJava中的操作符正是为此而产生的!RxJava的强大和精髓就在于其提供了丰富的操作符。并且,Rx操作符拥有简明的异步操作方式,避免了异步系统中的嵌套回调的回调陷阱问题。下面着重介绍几个常用的操作符:
map操作符
map操作符对事件对象直接变化,是RxJava中最常用的操作符。回到前面根据url设置ImageView的例子:
上例中,map()操作符将String对象转换成了一个Bitmap对象后返回,经过map()操作符后,事件的类型也就由String转换为了Bitmap类型。map()是RxJava中最常见的变换,但RxJava提供的变换远不止于此,RxJava还可以对整个事件队列进行变换。
flatMap()操作符
flatMap()操作符比较难理解,为了更好的说明flatMap()原理及如何使用好它,先来看一个简单的例子:打印一组学生中每个学生的所有课程。
从上面的代码中看,flatMap()和map()有一个相同点:把传入的参数经过转换之后返回一个对象,但map()返回的是普通类型的对象,而flatMap()返回的则是Observable对象,并将这个Observable对象激活,藉由这个Observable对象再将一系列事件发送出去。对于分组网络请求使用flatMap()最好不过了。
subscribeOn操作符
用于指定订阅事件所在的线程,也就是异步任务执行的线程,下文详细讲解。
observeOn操作符
指定回调所在的线程,常见的就是主线程,下文详细讲解。
RxJava还提供了很多其他的操作符,有兴趣可以查阅相关文档。
三、RxJava中的线程调度
既然RxJava处理的是异步事件,那么涉及到线程的调度问题是必然的。RxJava强大的另外一点就是其提供了非常非常非常简便的线程操作!
默认情况下,如果不指定Observable和Subscriber所在的线程,则所有操作默认与subscribe()所在的线程保持一致,即:在哪个线程中调用subscribe(),就在那个线程中生产事件,同时也就意味着在哪个线程中消费事件。如果需要控制操作所在的线程,这时Scheduler就派上了用场。
RxJava中的几个Scheduler
RxJava本身已经内置了几个常用的Scheduler,绝大多数场景下,这些Scheduler已经够用了,这些Scheduler主要有:
- Scheduler.immediate()
不指定线程,直接在当前线程中执行,这也是默认的Scheduler。 - Scheduler.newThread()
开启新的线程,并且在新线程中执行相关操作。 - Scheduler.io()
I/O操作相关的Scheduler,当有涉及到文件读写、数据库操作、网络操作等时,强烈建议制定这些操作在IO线程中执行,io()线程内部实现是一个无数量上限的线程池,这时不需要开启新线程,直接重用空闲的线程,因此io()比newThread()具有更高的效率。 - Scheduler.computation()
计算所使用的线程,主要用于执行CPU密集型的计算操作,比如图形计算等。这个Scheduler拥有固定的线程池,大小为CPU核数。 - Scheduler.from(Executor)
使用指定的Executor来作为Scheduler,Executor本质上也是使用了线程池机制,其效率由于new Thread。 - Scheduler.trampoline()
在当前线程中的工作放入到队列中排队,并以此执行队列中的事件。 - Scheduler.test()
用于测试,支持单元测试的高级事件。 - AndroidSchedulers.mainThread()
针对Android引入的一个特殊Scheduler,即Android主线程。
RxJava中Scheduler的使用
没什么好说的,结合上面所说的observeOn()和subscribeOn操作符,直接上例子:
上例中,subscribeOn(Schedulers.io())指定事件的产生实在IO线程中发生,observeOn(Schedulers.mainThread())则指定回调发生在主线中,这种方式对MVP模型非常适用。事实上,RxJava可以指定中间任意一个Operator所在的线程,线程之间的切换非常非常方便!
Scheduler原理
不难发现,RxJava变换的本质都是对事件序列进行处理然后再发送出去,再来看看几个典型的变换源码:
进一步阅读源码我们不难发现,基本上所有的变换都是基于lift(Operator)实现的。
可能存在的内存泄漏问题
每个Observable和Subscriber绑定时就会生成一个Subscription对象,一个Subscription代表Observable和Subscriber之间的连接。当在多线程场景下,有时Activity的onDestroy()执行之后线程才结束(有可能该线程永远都不会结束),那么这是就有可能出现内存泄漏或者其他异常信息,这一点值得引起注意。
Subscription正是避免这种问题的关键,通过调用unsubscribe()方法,通知Observable其所发送的事件不会再被Subscriber接收,Observble也就不会继续发送事件,这就避免了上述问题。
需要注意的是,如果有多组Observable和Subscriber,则每组都需做这个操作,RxJava提供了CompositeSubscription这个类用来集合多组订阅,销毁Activity时,清除CompositeSubscription即可。
四、RxAndroid
RxAndroid是RxJava针对Android的扩展,其目的主要是简化异步数据处理,RxJava的引入为在Android中灵活的使用函数式响应编程提供便利,并且Android的线程机制也使得RxJava能够完美的应用在Android开发中。
除了上面那个根据url设置ImageView这个例子之外(如其说是个例子,倒不如说提供的是一种在Android中处理异步任务的思想),RxJava还为Android提供了几个方便的API。
RxAndroid相关API
HandlerScheduler
除了上面所讲的AndroidSchedulers之外,RxJava还提供了HandlerScheduler,一个用来指定Handler的Scheduler。
这个api方便定制事件所在的子线程。
与Retrofit相结合
对Retrofie,目前我们用到最多的还是Callback接口,事实上Retrofit本身其实已经提供了Observable形式的接口,举一个从服务器拉取网站列表的例子:
先来看Callback形式:
再来看RxJava形式:
对比发现,其实Callback形式和RxJava形式差不多。但,如果改动一下需求:获取的是一些视频文件的url,要求将这些视频下载下来保存到本地呢?可能你已经凌乱了,这用Retrofit怎么搞啊?嵌套Callback可能是条路,你不妨试试看!反正我是已经凌乱了!但是,我知道,RxJava可以很容易的写出来:
See?奏是如此简单!
五、总结
So,上面讲了这么多,希望能够带你入门RxJava和RxAndroid,更多用法不妨去看下源码;如果你已入门,那么,滚蛋吧AsyncTask,忘掉该死new Thread,Callback?再见!!
其他的用法目前暂未知,后续学到了或者用到了,再补充!
参考
[1] https://github.com/ReactiveX/RxJava
[2] https://github.com/ReactiveX/rxjava/wiki
[3] ReactiveX/RxJava文档中文版
[4] 给Android开发者的RxJava详解