前言
众所周知RxJava有许多优点比如强大的链式调用,方便的线程调度,但是我对其原理还是了解的太少了,因此打算阅读下源码,先从一个最基本的例子开始。注:仓库地址
例子
以下代码只是为了示例,正常情况下不会这么写。
fun main() { |
输出结果:
onObserverSubscribe |
那么为什么会按这个顺序输出呢?从代码中也可以看出从始至终也只调用了 create、subscribe 两个方法,先来看看 create 的源码。
public static <T> Observable<T> create(ObservableOnSubscribe<T> source) { |
可以看出当我们没有设置 onObservableAssembly 时其实就是直接创建了一个 ObservableCreate 实例返回,接着看看 subscribe 。
public final void subscribe(Observer<? super T> observer) { |
可以看到内部就是调用了 subscribeActual 方法,而这个方法是个抽象方法,ObservableCreate 实现了该方法。
protected void subscribeActual(Observer<? super T> observer) { |
内部主要就是先创建了一个 CreateEmitter 实例,然后调用 observer.onSubscribe 方法,最后再调用 source.subscribe 方法,这就解释了 onObserverSubscribe 和 onSourceSubscribe 的输出,而 source 的subscribe 方法又调用了三次 onNext 方法和一次 onComplete 方法,先看看 onNext。
// CreateEmitter.java |
如果还没 dispose 那么直接就调用了 observer.onNext,这也就解释了 onNext 1、onNext 2、onNext 3 三个输出接着看 onComplete 。
// CreateEmitter.java |
如果还没 dispose 就直接调用 observer.onComplete ,这就解释了 onComplete 的输出,并且最后还会执行 dispose ,此外注意到 Observer还有一个 onError 回调,该方法可以通过调用 emitter.onError 手动触发。
// CreateEmitter.java |
可以看到当还没被 dispose 就会调用到 observer.onError 方法,至此这个基本 demo 的源码已经分析完毕。
总结下上述代码其实就分为如下几步:
- 创建 Observable 实例。
- 调用 observable.subscribeActual 方法。
- 调用 observer.onSubscribe 方法。
- 调用 source.subscribe方法。
- 上述的 subscribe 方法内部可以执行若干次 onNext ,最多一次 onError、onComplete。
下面来从源码的角度研究研究 RxJava 中的几个基本方法。
基本方法
首先从最基本的 map 方法开始。
map
map 方法用于对上游事件进行一次转化,如下图所示:
![](/Users/hefuwei/GitHub/blog/source/_posts/RxJava2 源码初探/map.png)
示例代码如下所示:
fun main() { |
输出结果:
onObserverSubscribe |
很明显 map 方法会对所有的 next 的数据做一次变化这里是加1,接着看看 map 的源码实现:
public final <R> Observable<R> map(Function<? super T, ? extends R> mapper) { |
内部创建了一个 ObservableMap 实例并将当前的 Observable 实例和 Function 实例传入,根据本文一开始的分析当调用 Observable.subscribe 方法其实会调用 subscribeActual 方法。
// ObservableMap.java |
创建了 MapObserver 实例将 Observer 实例进行包装然后调用 source.subscribe,这个 source 其实就是上一级 Observable 实例本例中对应 ObservableCreate 实例,接着根据上文的分析会调用该 MapObserver 实例的onNext 三次然后调用一次 onComplete 。
// MapObserver.java |
可以看到内部调用了 mapper.apply 方法,接着将拿到的结果当做参数调用 downstream.onNext 方法,注意这里的 downStream 就是外界创建的一个 Observer 对象,因此 map 就是通过代理下游 Observer 实例完成数据转换,使用 Kotlin 重写 map 方法来加深下理解,代码如下:
fun <T, R> Observable<T>.newMap(converter: (T) -> R): Observable<R> { |
newMap 新建一个 Observable 实例假设叫 mapObservable 返回,下游会调用 mapObservable.subscribe ,因此 mapObservable.subscribeActual 会被执行,在这里面新建一个 Observer 实例假设叫 mapObserver,再去调用上游 observable.subscribe 传入 mapObserver,这样从下到上一条线就串起来了,接着上游执行 onNext ,mapObserver.onNext 得到执行,将值进行转化后调用下游 observer.onNext ,onComplete、onError 也就类似。
flapMap
flatMap 方法用于将上游的每一个 onNext 事件都转换成一个 Observable 实例,然后将这些个 Observable 事件合并起来传递给下游。
![](/Users/hefuwei/GitHub/blog/source/_posts/RxJava2 源码初探/flatMap.png)
fun main() { |
输出结果:
onObserverSubscribe |
很显然 flatMap 将每一个事件比如 1 转换成一个拥有1、2 两个事件的 Observable 实例,来看看其源码实现。
public final <R> Observable<R> flatMap(Function<? super T, ? extends ObservableSource<? extends R>> mapper) { |
默认 delayErrors 为 false 表示当一个事件出现异常就会停止整个事件序列,默认并发数为 Int 的最大值,默认缓存大小为 128,然后根据这些参数和当前 Observable 实例构建出一个 ObservableFlatMap 实例,看看其subscribeActual 方法。
// ObservableFlatMap.java |
内部又通过这些参数和下游的 Observer 实例构建了一个 MergeObserver 实例,接着看看其 onSubscribe 方法。
// MergeObserver.java |
如果已经有上游了就不做任何处理不然进行上游的赋值,然后回调了下游也就是自定义的那个 Observer 的onSubscribe 方法,接着看看其 onNext 方法是怎么把一个输入源转化成一个 Observable 的。
public void onNext(T t) { |
先是调用了传入的 apply 方法将每个 onNext 数据源转化为 Observable 实例,接着调用 subscribeInner 方法。
void subscribeInner(ObservableSource<? extends U> p) { |
为每个 Observable 对象都创建了一个 InnerObserver 实例,然后将其放入到一个数组中去,最后调用 subscribe 方法进行订阅,由于 apply 方法返回了一个 ObservableFromArray 实例,所以看看其 subscribeActual 方法。
// ObservableFromArray.java |
observer 指代 InnerObserver,看看其 onSubscribe 方法。
public void onSubscribe(Disposable d) { |
接着会调用到 MergeObserver.drain 方法。
void drain() { |
可以看到内部调用了 mapper.apply 方法,接着将拿到的结果 v 当做参数调用 downstream.onNext 方法,注意这里的 downStream 就是外界创建的一个 Observer 对象。
zip
zip 方法通过一个函数将多个 Observables 的发射物结合到一起,基于这个函数的结果为每个结合体发射单个数据项。如下图所示:
![](/Users/hefuwei/GitHub/blog/source/_posts/RxJava2 源码初探/zip.png)
下面是一个使用zip操作符的一个简单例子:
fun main() { |
输出结果如下,onSubscribe 输出后过一秒输出 A1,再过一秒输出 A2,再过一秒输出 A3。
onSubscribe |
那么为什么输出结果会是这个样子的呢?来看看zip的源码实现。
public static <T1, T2, R> Observable<R> zip( |
根据源码可以看出 zip
方法其实最终创建了一个 ObservableZip
实例,直接看其 subscribeActual
。
// ObservableZip |
可以看出 ZipCoordinator的 subscribe
内部创建了输入源大小的 ZipObserver 实例,然后调用每个输入源的 subscribe
方法,这样当输入源发送事件时就会调用 ZipObserver 的 onNext
方法。
// ZipObserver.java |
主要看看 ZipCoordinator 的 drain
方法。
public void drain() { |
结合示例,首先 Obserable1 会发送一个 A 事件,将其放入到了一个队列中去,接着 drain
遍历所有的 ZipObserver,第一个 ZipObserver 可以从队列中事件将其赋值给 os[0]
,第二个取不到因此 emptyCount++
,然后退出循环。接着 Observable1 又发送了一个 B 事件,再将其放入队列中,然后执行 drain
,这次因为 os[0]
已经不为 null 所以不会从队列中取,os[1]
还是 null,退出循环继续执行,接着 Observable1 再次发送一个 C 事件,这个跟 B 事件处理逻辑一样,再接着 Observable2 会发送一个 1 事件,将其放入队列,执行 drain
将 os[1]
赋值成1,由于 emptyCount
等于0,因此会执行 zipper.apply
,这个方法内部会回调传入的 BiFunction 的 apply
方法(示例中仅仅进行了字符串拼接),获取到结果 A1,回调下游的 onNext 方法,然后将 row 这个数组置空,接着线程睡眠 1 秒,然后再次发送事件 2,将其放入队列中,执行 drain
,方法内部遍历两个 ZipObserver 并且都能从队列中取到事件,所以 emptyCount
等于 0,接着就会执行 apply
然后获取到结果 B2,调用下游的 onNext
,后面 Observable2 的 3 事件也跟 2 事件一样就不说了。
三、总结
通过分析 map、flatMap、zip 两个方法可以总结出以下几个的结论。
- subscribeActual 方法总是会调用上游的 subscribe 方法。
- onSubscribe 方法总是会调用下游的 onSubscribe 方法。
- Observer 实例的 onSubscribe 会在事件发射前调用。
- RxJava 提供的一些操作符其实会在内部创建自己的 Observable 和 Observer 实例,这样可以对下游 Observer 事件进行拦截。