前言
从这篇文章开始,系统地学习RxJava2设计思想和源码实现。说起大热门RxJava,网上有很多例如响应式编程、观察者模式等介绍,也有一些优秀的文章以上、下游等概念引初学者入门,在初步学习之后,可能感觉有所收获,但是总觉得不够解渴,要真正知晓其原理,还得结合源码加深理解。
例子
通过生活中的几个角色来学习RxJava2:饭店、厨师、服务员、顾客。
模拟一个情景:饭店有一个很火的套餐,顾客来店默认就要这个套餐(不存在服务员咨询顾客要什么的过程),所以情况应该是这样的
上面的漫画写成RxJava2就是很多入门文章中看到的:事件发起者(上游)
Observable<String> observable = Observable.create(new ObservableOnSubscribe<String>() { @Override public void subscribe(ObservableEmitter<String> e) throws Exception { System.out.println("服务员从厨师那取得 扁食"); e.onNext("扁食"); System.out.println("服务员从厨师那取得 拌面"); e.onNext("拌面"); System.out.println("服务员从厨师那取得 蒸饺"); e.onNext("蒸饺"); System.out.println("厨师告知服务员菜上好了"); e.onComplete(); } });
事件接收者(下游)
Observer<String> observer = new Observer<String>() { @Override public void onSubscribe(Disposable d) { System.out.println("来个沙县套餐!!!"); } @Override public void onNext(String s) { System.out.println("服务员端给顾客 " + s); } @Override public void onError(Throwable e) { } @Override public void onComplete() { System.out.println("服务员告诉顾客菜上好了"); } };
建立联系
observable.subscribe(observer);
打印如下:
来个沙县套餐!!! 服务员从厨师那取得 拌面 服务员端给顾客 拌面 服务员从厨师那取得 扁食 服务员端给顾客 扁食 服务员从厨师那取得 蒸饺 服务员端给顾客 蒸饺 厨师告知服务员菜上好了 服务员告诉顾客菜上好了
下面把一些类代入角色结合源码分析,演员表
源码分析
最初看源码的时候容易因为各个类名字起得很相似看晕,因此先把涉及到的类之间的关系画出来
Observable 是个抽象类,其子类是 ObservableCreate ,如果把 Observable 比成饭店,那 ObservableCreate 就是沙县小吃,看下 Observable 的 create 方法
public static <T> Observable<T> create(ObservableOnSubscribe<T> source) { ObjectHelper.requireNonNull(source, "source is null"); return RxJavaPlugins.onAssembly(new ObservableCreate<T>(source)); }
Observable 的 create 方法只是将接收的 ObservableOnSubscribe 作为参数传递给子类 ObservableCreate 真正实例化
public final class ObservableCreate<T> extends Observable<T> { final ObservableOnSubscribe<T> source; public ObservableCreate(ObservableOnSubscribe<T> source) { this.source = source; } ... }
上面这些代码就是漫画的第一格:饭店要开张(Observable.create),开张的前提是要有一个会做菜的厨师(new ObservableOnSubscribe),接着饭店起名叫沙县小吃(new ObservableCreate),并把这个厨师和沙县小吃建立联系(this.source = source)。厨师有了,但是他并没有立即开始做菜(ObservableOnSubscribe.subscribe()),这个也很好理解,现实生活中厨师也是这样,他做不做菜取决于饭店,毕竟是饭店给他开工资;而饭店是否让厨师做菜很大一个原因取决于有没有顾客上门,看下顾客:
顾客没有什么套路,上菜就吃(onNext),菜上完或菜出问题会有相应的提醒(onComplete/onError),对应上面漫画2。接着看饭店接客 observable.subscribe(observer) 的源码
public final void subscribe(Observer<? super T> observer) { //省略部分代码 subscribeActual(observer); //省略部分代码 } protected abstract void subscribeActual(Observer<? super T> observer);
Observable(饭店) 的 subscribe 方法又会调用 subscribeActual 方法,该方法是个抽象方法,具体实现在子类,看看子类 ObservableCreate(沙县小吃)
protected void subscribeActual(Observer<? super T> observer) { //步骤① CreateEmitter<T> parent = new CreateEmitter<T>(observer); //步骤② observer.onSubscribe(parent); try { //步骤③ source.subscribe(parent); } catch (Throwable ex) { Exceptions.throwIfFatal(ex); parent.onError(ex); } }
先看下涉及到的类以及所属关系
步骤① Emitter 翻译为发射器,这里名字起得也很形象 CreateEmitter(创建发射器) ,即对应服务员,CreateEmitter 创建的时候接收 Observer,就像一个服务员接待一个顾客一样(对应漫画3服务员说话)
步骤② 执行 onSubscribe 方法并接收 CreateEmitter ,所以看到 log 中最先打印该方法的内容,就像顾客认准之后自己的菜是由这个服务员上的(对应漫画3顾客说话)
步骤③ 调用 ObservableOnSubscribe.subscribe ,并接收 CreateEmitter ,就像厨师和该服务员建立联系,之后厨师做的菜都由该服务员端出去。上什么菜取决于 ObservableOnSubscribe.subscribe 中的实现。
分析到这里发现 CreateEmitter(服务员) 起到枢纽作用,看下代码中 e.onNext/e.onComplete 的实现
public void onNext(T t) { if (t == null) { onError(new NullPointerException("onNext called with null. Null values are generally not allowed in 2.x operators and sources.")); return; } if (!isDisposed()) { observer.onNext(t); } } public void onComplete() { if (!isDisposed()) { try { observer.onComplete(); } finally { dispose(); } } }
onNext 中首先判空,服务员端个空盘子出来要被顾客锤成麻瓜;接着发送之前需要执行 isDisposed() 判断,可以理解成顾客是否还需要菜,默认情况下是需要的(!isDisposed() 为 true ),当执行完 onComplete() 方法后会执行 dispose() ,表明顾客不再需要菜了,后续的菜服务员不会再端上来给顾客了。
Observer<String> observer = new Observer<String>() { Disposable disposable; @Override public void onSubscribe(Disposable d) { this.disposable = d; System.out.println("来个沙县套餐!!!"); } @Override public void onNext(String s) { if (s.equals("拌面")) { disposable.dispose(); } System.out.println("服务员端给顾客 " + s); } @Override public void onError(Throwable e) { } @Override public void onComplete() { System.out.println("服务员告诉顾客菜上好了"); } };
打印如下:
来个沙县套餐!!! 服务员从厨师那取得 拌面 服务员端给顾客 拌面 服务员从厨师那取得 扁食 服务员从厨师那取得 蒸饺 厨师告知服务员菜上好了
从上面可以看到一旦执行完 Disposable.dispose() 方法,顾客和服务员就没有后续交流了,就像 Disposable 翻译的那样「一次性」,理解成顾客对服务员说「后续的菜都别上了,你也不要再出现在我面前」;但是服务员和厨师的交流还是保持着,默认情况下厨师并不知道顾客不需要菜了,因此他还是继续做菜,然后交给服务员端出去。当然我们也可以在厨师做下一道菜的之前,判断下顾客还要不要:
Observable<String> observable = Observable.create(new ObservableOnSubscribe<String>() { @Override public void subscribe(ObservableEmitter<String> e) throws Exception { if (!e.isDisposed()) { System.out.println("服务员从厨师那取得 拌面"); e.onNext("拌面"); } if (!e.isDisposed()) { System.out.println("服务员从厨师那取得 扁食"); e.onNext("扁食"); } if (!e.isDisposed()) { System.out.println("服务员从厨师那取得 蒸饺"); e.onNext("蒸饺"); } if (!e.isDisposed()) { System.out.println("厨师告知服务员菜上好了"); e.onComplete(); } } });
打印如下:
来个沙县套餐!!! 服务员从厨师那取得 拌面 服务员端给顾客 拌面
再看下另一种情况
Observable<String> observable = Observable.create(new ObservableOnSubscribe<String>() { @Override public void subscribe(ObservableEmitter<String> e) throws Exception { System.out.println("服务员从厨师那取得 拌面"); e.onNext("拌面"); System.out.println("服务员从厨师那取得 扁食"); e.onNext("扁食"); System.out.println("服务员从厨师那取得 蒸饺"); e.onNext("蒸饺"); System.out.println("厨师告知服务员菜上好了"); e.onComplete(); } }); observable.subscribe();
打印如下
服务员从厨师那取得 拌面 服务员从厨师那取得 扁食 服务员从厨师那取得 蒸饺 厨师告知服务员菜上好了
上面分析了要有顾客厨师才会做菜,这都没顾客怎么也做菜呢?看下源码
public final Disposable subscribe() { return subscribe(Functions.emptyConsumer(), Functions.ON_ERROR_MISSING, Functions.EMPTY_ACTION, Functions.emptyConsumer()); } public final Disposable subscribe(Consumer<? super T> onNext, Consumer<? super Throwable> onError, Action onComplete, Consumer<? super Disposable> onSubscribe) { //省略判空 //默认的顾客 LambdaObserver<T> ls = new LambdaObserver<T>(onNext, onError, onComplete, onSubscribe); subscribe(ls); return ls; } public final void subscribe(Observer<? super T> observer) { subscribeActual(observer); } protected abstract void subscribeActual(Observer<? super T> observer);
原来系统会默认创建一个 LambdaObserver(默认顾客) ,服务员从厨师那端的菜会传给这个顾客。所以可以看出厨师做不做菜只取决于饭店(Observable.subscribe),后面的流程和上面分析的一致。另外上面的代码还出现了Consumer、Action类,这些类里也有对事件的处理,可以理解成顾客选择接收服务员的哪些信息,在 functions 包下还有其他实现
subscribe() 有下面几个重载方法:
//顾客只关心上什么菜 public final Disposable subscribe() {} public final Disposable subscribe(Consumer<? super T> onNext) {} //顾客关心上什么菜以及菜是不是出问题 public final Disposable subscribe(Consumer<? super T> onNext, Consumer<? super Throwable> onError) {} //顾客关心上什么菜、菜是不是有问题、菜是不是上完了 public final Disposable subscribe(Consumer<? super T> onNext, Consumer<? super Throwable> onError, Action onComplete) {} //顾客关心上什么菜、菜是不是有问题、菜是不是上完了、 //onSubscribe()中可以获取Disposable引用,之后选择告诉服务员是否继续上菜 public final Disposable subscribe(Consumer<? super T> onNext, Consumer<? super Throwable> onError, Action onComplete, Consumer<? super Disposable> onSubscribe) {} //由顾客自己决定关心哪些事件,和上一条效果一样 public final void subscribe(Observer<? super T> observer) {}
如果顾客只关心上什么菜,我们可以这么写:
Consumer<String> consumer = new Consumer<String>() { @Override public void accept(String s) throws Exception { System.out.println("服务员端给顾客 " + s); } }; observable.subscribe(consumer);
打印如下:
服务员从厨师那取得 拌面 服务员端给顾客 拌面 服务员从厨师那取得 扁食 服务员端给顾客 扁食 服务员从厨师那取得 蒸饺 服务员端给顾客 蒸饺 厨师告知服务员菜上好了
本文作者: HuYounger