XDRush

聊聊RxJava & RxAndroid

一、RxJava简介

What is Rx?

Rx,即Reactive Extensions,是一种编程模型,目标是提供一致的编程接口,有助于更方便的处理异步数据流。响应式编程主要由Observable、Operator和Subscriber组成,一般来讲,响应式编程的信息流如下:

1
Observable -> Operator1 -> Operator2 -> ... -> Subscriber

也就是Observable是事件的生产者,Subscriber是事件最终的消费者,并且,Subscriber通常是在主线程中执行,只负责对事件进行响应,这就要求其处理的事情越少越简单越好,而对事件的处理尽量交由Observable和Operator。

RxJava是什么东东?

1
RxJava is a Java VM implementation of Reactive Extensions: a library for composing asynchronous and event-based programs by using observable sequences.

本质上来讲,RxJava就是一个封装异步操作的库

RxJava核心概念

RxJava最核心的2个概念就是:Observable(被观察者、事件源)和Observer(一般用Subscriber,观察者)。Observable产生一系列事件(触摸事件,网络请求,文件读写),并将这些事件发送出去,Observer处理这些事件所产生的结果。

一个Observable可以发出多个事件,直到结束或者出错,每发出一个事件,就会调用它的Subscriber的onNext()方法,然后再调用onComplete()或者onError()方法作为结束。一个Observable如果没有任何Subscriber,则不会发出任何事件(可简单理解为一段不会执行的代码),也就是Observable和Subscriber必须成对使用。一个Observable可以同时被多个Subscriber订阅。

举一个最简单的例子来理解Observable和Subscriber,Android中通常这么用:

1
2
3
view.setOnClickListener(new View.OnClickListener() {
// ...
});

这里我们可以简单的理解view就是Observable,OnClickListener就是Subscriber,view和OlClickListener通过setOnClickListener()方法进行绑定,如下图所示:

RxJava基本使用步骤

举一个简单的例子来示例RxJava基本使用步骤:向屏幕发送一个字符串,并显示出来:

创建Observable

1
2
3
4
5
6
7
8
9
10
11
12
// String为输入
Observable<String> observable = Observable.create(new OnSubscribee<String>() {
@Override
public void call(Subscribe<? super String> subscriber) {
// 产生事件
// TODO: ......
// 事件完成的结果回调
subscriber.onNext("Hello World.");
subscripber.onCompleted();
}
)

这里定义的Observable仅仅发出了一个”Hello World”字符串,然后就结束了。

创建Subscriber

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
Subscriber<String> subscriber = new Subscriber<String> {
@Override
public void onCompleted() {
}
@Override
public void onError(Throwable e) {
e.printStackTrace();
}
@Override
public void onNext(String string) {
System.out.println(string);
}
}

创建一个Subscriber来处理Observable发出的字符串这个事件,其中onNext(),onCompleted(),onError()是回调方法供Observable调用。

Observable和Subscriber的绑定

1
Subscription subscription = observable.subscribe(subscriber);

最后将创建的observable和subscriber绑定,subscribe()会返回一个Subscription对象(后文会说明这个对象的作用)。
以上便是RxJava使用的“三步走”过程,简单明了。

为什么要用RxJava?

上面给出的打印字符串的例子,如果不用RxJava可能一两行就能搞定,那为什么还要如此大费周章的使用RxJava呢?的确,大多数场景下,RxJava是画蛇添足,但别忘了开头所说,RxJava是为异步而生!

也许Handler,new Thread,AsyncTask…等等已经在脑子里根深蒂固了,不错,这些已经让异步实现更简洁了,但,RxJava会让异步变得更!简!!单!!!并且,随着程序逻辑变得越来越复杂,RxJava依然能够保持代码的简洁性和可读性。注意,这里所说的简单,并不是说代码量会更少,而是指逻辑、结构会更加简单明了。

同样,举一个简单例子:给定一个图片url,设置ImageView【注:这里不考虑Picasso、Fresco等图片库】,

先来看看new Thread()是怎么做

1
2
3
4
5
6
7
8
9
10
11
12
new Thread(new Runnable() {
@Override
public void run() {
mBitmap = downloadBitmap(url);
getActivity().runOnUiThread(new Runnable() {
@Override
public void run() {
mImageView.setImageBitmap(mBitmap);
}
});
}
}).start();

AsyncTask实现的代码量不会比这更少。

再来看看RxJava是怎么实现的

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
Observable.just(url)
.map(new Func1<String, Bitmap>) {
@Override
public Bitmap call(String url) {
return downloadBitmap(url);
}
}
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(new Action1<Bitmap>() {
@Override
public void call(Bitmap bitmap) {
mImageView.setImageBitmap(mBitmap);
}
});

暂且不要管上面每行代码是什么意思,看完下文之后自然会明白。上例也许并不恰当,粗看RxJava行数更多,的确,但RxJava以这种链式结构来实现异步,不至于将将代码搞得到处都是。试想,在此基础上作一点扩展,给定10个url来设置一组ImageView呢?来看看RxJava怎么做:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
String[] iconUrls = new String[]{...};
// ...
Observable.from(iconUrls)
.map(new Func1<String, Bitmap>) {
@Override
public Bitmap call(String url) {
return downloadBitmap(url);
}
}
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(new Action1<Bitmap>() {
@Override
public void call(Bitmap bitmap) {
mImageView[i].setImageBitmap(mBitmap);
}
});

对的,就是这么简单,只需改动一两个地方,就可以很完美的应对扩展。这也就是上面所说的,异步操作越复杂,越能体现出RxJava的优势!

代码简化

前面举了打印字符的例子,那个例子中代码过于复杂,我们关注的其实也就是一个回调方法而已,那么,能将这些代码精简些吗?实际上,RxJava内置了很多便捷的函数方便我们来简化代码。下面列出一些常用的简化版方法:

Observable简化

Single

Single是Observable的一个精简版,其功能基本与Observable一样;Observable在某些场景下过于重量级,需要关注其回调方法的onNext()/onComplete()/onError(),但实际上大部分场合只需关注onNext()即可,Single正是这么干的。

1
2
3
4
5
6
7
8
9
10
11
Single.just(T t)
.subscribe(new SingleSubscriber<T> {
@Override
public void onSuccess(T t) {
// ...
}
@Override void onError(Throwable error) {
}
});

Observable.just(T…)
Observable.just()用于创建只发出一个事件就结束的Observable对象:

1
Observable<String> observable = Observable.just("Hello World");

如果传入的是一个数据,则将传入的参数依次发送出去。

1
2
3
4
5
Observable observable = Observable.just("hello", "world");
// 将会依次调用:
// onNext("hello");
// onNext("world");
// onComplete();

Observable.from(T[])
将传入的数组或者Iterator拆分成具体对象,然后依次发送出去。

1
2
String[] strings = {"hello", "world"};
Observable observable = Observable.from(strings);

本质上同上面的just()差不多,归根到底和create()是等价的。

Subscribe的简化

Subscribe有3个方法,其实大部分时候我们只需要关心onNext()方法而不用在意onComplete()和onError()(上面的例子中不难看出这两个方法占用了不少代码体积),这时候我们可以使用Action类(Action类无返回值),RxJava提供了多个参数的Action类,Action1, Action2, Action3, Action4,…,分别表示能处理多少个结果。

1
2
3
4
5
6
Action1<String> onNextAction = new Action1<String>() {
@Override
public void call(String s) {
// TODO: do something...
}
}

Observable.subscribe()方法有个重载的版本,接受3个Action1类型的参数,分别对应其onNext(),onCompleted(),OnError()方法:

1
Observable.subscribe(onNextAction, onErrorAction, onCompleteAction);

如果不关心onError和onCompleted,则只需一个参数即可:

1
Observable.subscribe(onNextAction);

鉴于此,上例中的代码最终可以简化为:

1
2
3
4
5
6
7
Observable.just("Hello World")
.subscribe(new Action1<String>() {
@Override
public void call(String s) {
// TODO: do something...
}
});

Function简化

RxJava中提供了类似上述Action类的简化,同时也提供了包装多个参数并且有返回值的类Func,

1
2
3
public interface Func1<T, R> extends Function {
R call(T t);
}

以上便是Func1的源码,T为输入参数,R为返回值;同理,Func2接收两个输入一个返回,以此类推。Func在中间变换时非常重要。

二、RxJava操作符

Why Operator?

对Rx来说,Observable和Subscriber仅仅是个开始,它们本身不过是标准观察者模式的一些轻量级扩展,目的是为了更好的处理事件序列。但Rx真正强大的地方在于它的操作符!

操作符是为了解决对Observable对象的变换问题,其目的在于对Observable发出的事件在最终的Subscriber得到事件结果之前对Observable发出的事件进行修改。变换不仅能处理单个事件,也能同时处理整个序列中的事件,将这些事件转换成不同的事件。

为什么需要操作符?根据响应式函数编程的概念,Subscriber更应该做的事情是“响应”,响应Observable发出的事件,仅此而已!Observable做的事情仅仅是产生事件,事件产生了就再也不关我事!那么问题来了,如果在事件处理完成之前,想对事件做些特别的处理呢?(这种需求大多数场景下应该都会有的吧)。RxJava中的操作符正是为此而产生的!RxJava的强大和精髓就在于其提供了丰富的操作符。并且,Rx操作符拥有简明的异步操作方式,避免了异步系统中的嵌套回调的回调陷阱问题。下面着重介绍几个常用的操作符:

map操作符

map操作符对事件对象直接变化,是RxJava中最常用的操作符。回到前面根据url设置ImageView的例子:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
Observable.just(url)
.map(new Func1<String, Bitmap>) {
@Override
public Bitmap call(String url) {
return downloadBitmap(url);
}
}
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(new Action1<Bitmap>() {
@Override
public void call(Bitmap bitmap) {
mImageView.setImageBitmap(mBitmap);
}
});

上例中,map()操作符将String对象转换成了一个Bitmap对象后返回,经过map()操作符后,事件的类型也就由String转换为了Bitmap类型。map()是RxJava中最常见的变换,但RxJava提供的变换远不止于此,RxJava还可以对整个事件队列进行变换。

flatMap()操作符

flatMap()操作符比较难理解,为了更好的说明flatMap()原理及如何使用好它,先来看一个简单的例子:打印一组学生中每个学生的所有课程。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
Student[] students = {...};
Subscriber<Course> subscriber = new Subscriber<Course>() {
@Override
public void onNext(Course course) {
System.out.println(course.getName());
}
// ... 忽略onComplete()和onError()
};
Observable.from(students)
.flatMap(new Func1<Student, Observable<Course>() {
@Override
public Observable<Course> call(Student student) {
return Observable.from(student.getCourses());
}
})
.subscribe(subscriber);

从上面的代码中看,flatMap()和map()有一个相同点:把传入的参数经过转换之后返回一个对象,但map()返回的是普通类型的对象,而flatMap()返回的则是Observable对象,并将这个Observable对象激活,藉由这个Observable对象再将一系列事件发送出去。对于分组网络请求使用flatMap()最好不过了。

subscribeOn操作符

用于指定订阅事件所在的线程,也就是异步任务执行的线程,下文详细讲解。

observeOn操作符

指定回调所在的线程,常见的就是主线程,下文详细讲解。

RxJava还提供了很多其他的操作符,有兴趣可以查阅相关文档。

三、RxJava中的线程调度

既然RxJava处理的是异步事件,那么涉及到线程的调度问题是必然的。RxJava强大的另外一点就是其提供了非常非常非常简便的线程操作!

默认情况下,如果不指定Observable和Subscriber所在的线程,则所有操作默认与subscribe()所在的线程保持一致,即:在哪个线程中调用subscribe(),就在那个线程中生产事件,同时也就意味着在哪个线程中消费事件。如果需要控制操作所在的线程,这时Scheduler就派上了用场。

RxJava中的几个Scheduler

RxJava本身已经内置了几个常用的Scheduler,绝大多数场景下,这些Scheduler已经够用了,这些Scheduler主要有:

  • Scheduler.immediate()
    不指定线程,直接在当前线程中执行,这也是默认的Scheduler。
  • Scheduler.newThread()
    开启新的线程,并且在新线程中执行相关操作。
  • Scheduler.io()
    I/O操作相关的Scheduler,当有涉及到文件读写、数据库操作、网络操作等时,强烈建议制定这些操作在IO线程中执行,io()线程内部实现是一个无数量上限的线程池,这时不需要开启新线程,直接重用空闲的线程,因此io()比newThread()具有更高的效率。
  • Scheduler.computation()
    计算所使用的线程,主要用于执行CPU密集型的计算操作,比如图形计算等。这个Scheduler拥有固定的线程池,大小为CPU核数。
  • Scheduler.from(Executor)
    使用指定的Executor来作为Scheduler,Executor本质上也是使用了线程池机制,其效率由于new Thread。
  • Scheduler.trampoline()
    在当前线程中的工作放入到队列中排队,并以此执行队列中的事件。
  • Scheduler.test()
    用于测试,支持单元测试的高级事件。
  • AndroidSchedulers.mainThread()
    针对Android引入的一个特殊Scheduler,即Android主线程。

RxJava中Scheduler的使用

没什么好说的,结合上面所说的observeOn()和subscribeOn操作符,直接上例子:

1
2
3
4
5
6
7
8
9
Observable.just(1, 2, 3, 4)
.subscribeOn(Schedulers.io()) // 指定subscribe()发生在IO线程中
.observeOn(AndroidSchedulers.mainThread()) // 指定Subscribe的回调在主线程中进行
.subscribe(new Action1<Integer> {
@Override
public void call(Integer number) {
Log.d(TAG, String.valueOf(number));
}
});

上例中,subscribeOn(Schedulers.io())指定事件的产生实在IO线程中发生,observeOn(Schedulers.mainThread())则指定回调发生在主线中,这种方式对MVP模型非常适用。事实上,RxJava可以指定中间任意一个Operator所在的线程,线程之间的切换非常非常方便!

Scheduler原理

不难发现,RxJava变换的本质都是对事件序列进行处理然后再发送出去,再来看看几个典型的变换源码:

1
2
3
4
5
6
7
8
9
// map()实现
public final <R> Observable<R> map(Func1<? super T, ? extends R> func) {
return lift(new OperatorMap<T, R>(func));
}
// isEmpty()实现
public final Observable<Boolean> isEmpty() {
return lift((OperatorAny<T>) HolderAnyForEmpty.INSTANCE);
}

进一步阅读源码我们不难发现,基本上所有的变换都是基于lift(Operator)实现的。

可能存在的内存泄漏问题

每个Observable和Subscriber绑定时就会生成一个Subscription对象,一个Subscription代表Observable和Subscriber之间的连接。当在多线程场景下,有时Activity的onDestroy()执行之后线程才结束(有可能该线程永远都不会结束),那么这是就有可能出现内存泄漏或者其他异常信息,这一点值得引起注意。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
// subscribe源码摘录
public final Subscription subscribe(final Action1<? super T> onNext) {
if (onNext == null) {
throw new IllegalArgumentException("onNext can not be null");
}
return subscribe(new Subscriber<T>() {
@Override
public final void onCompleted() {
// do nothing
}
@Override
public final void onError(Throwable e) {
throw new OnErrorNotImplementedException(e);
}
@Override
public final void onNext(T args) {
onNext.call(args);
}
});
}

Subscription正是避免这种问题的关键,通过调用unsubscribe()方法,通知Observable其所发送的事件不会再被Subscriber接收,Observble也就不会继续发送事件,这就避免了上述问题。

1
2
3
4
5
6
7
8
9
@Override
protected void onDestroy() {
super.onDestroy();
if (null != mSubscription && !mSubscription.isUnsubscribed()) {
// 解除Observable和Subscriber之间的绑定
mSubscription.unsubscribe();
}
}

需要注意的是,如果有多组Observable和Subscriber,则每组都需做这个操作,RxJava提供了CompositeSubscription这个类用来集合多组订阅,销毁Activity时,清除CompositeSubscription即可。

1
2
3
4
5
6
7
8
private final CompositeSubscription mCompositeSubscription = new CompositeSubscription();
// ...
mCompositeSubscription.add(subscription);
@Override
protected void onDestroy() {
mCompositeSubscription.clear();
}

四、RxAndroid

RxAndroid是RxJava针对Android的扩展,其目的主要是简化异步数据处理,RxJava的引入为在Android中灵活的使用函数式响应编程提供便利,并且Android的线程机制也使得RxJava能够完美的应用在Android开发中。

除了上面那个根据url设置ImageView这个例子之外(如其说是个例子,倒不如说提供的是一种在Android中处理异步任务的思想),RxJava还为Android提供了几个方便的API。

RxAndroid相关API

HandlerScheduler
除了上面所讲的AndroidSchedulers之外,RxJava还提供了HandlerScheduler,一个用来指定Handler的Scheduler。

1
2
3
4
private Handler mHandler = new Handler(getMainLooper());
// ...
subscribeOn(HandlerScheduler.from(mHandler)) // 指定发出事件所在的子线程
// ...

这个api方便定制事件所在的子线程。

与Retrofit相结合
对Retrofie,目前我们用到最多的还是Callback接口,事实上Retrofit本身其实已经提供了Observable形式的接口,举一个从服务器拉取网站列表的例子:
先来看Callback形式:

1
2
3
4
5
6
7
8
9
10
11
@Get("/list_url")
public void getWebsiteList(@QueryMap Map<String, String>, Callback<WebsiteStruct> callback);
// ...
getWebsiteList(queryMap, new Callback<WebsiteStruct> {
@Override
public void onResponse(...) {
// ...
}
// ... onFailure()
});

再来看RxJava形式:

1
2
3
4
5
6
7
8
9
10
11
12
13
@GET("/list_url")
public Observable<WebsiteStruct> getWebsiteList(@Query Map<String, String>);
// ...
getWebsiteList(queryMap)
.observeOn(AndroidSchedulers.mainThread())
.subscribe(new Observer<WebsiteStruce>() {
@Override
public void onNext(WebsiteStruce websiteStruce) {
// ...
}
// ...
});

对比发现,其实Callback形式和RxJava形式差不多。但,如果改动一下需求:获取的是一些视频文件的url,要求将这些视频下载下来保存到本地呢?可能你已经凌乱了,这用Retrofit怎么搞啊?嵌套Callback可能是条路,你不妨试试看!反正我是已经凌乱了!但是,我知道,RxJava可以很容易的写出来:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
getWebsiteList(queryMap)
.flatMap(new Func1<WebsiteStruct, Observable<String> > {
@Override
public Observable<String> call(WebsiteStruct websiteStruct) {
return Observable.from(websietStruct.getData());
}
})
.map(new Func1<String, String> {
@Override
public String call(String url) {
File file = downloadFile(url);
saveFileToLocal(file);
}
})
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(new Subscriber<String> {
// ...
@Override
public void onNext(String string) {
Toast.makeText(...string...);
}
})

See?奏是如此简单!

五、总结

So,上面讲了这么多,希望能够带你入门RxJava和RxAndroid,更多用法不妨去看下源码;如果你已入门,那么,滚蛋吧AsyncTask,忘掉该死new Thread,Callback?再见!!

其他的用法目前暂未知,后续学到了或者用到了,再补充!

参考

[1] https://github.com/ReactiveX/RxJava
[2] https://github.com/ReactiveX/rxjava/wiki
[3] ReactiveX/RxJava文档中文版
[4] 给Android开发者的RxJava详解