自定义操作符.md
实现自己的操作符
你可以实现你自己的Observable操作符,本文展示怎么做。
如果你的操作符是被用于_创造_一个Observable,而不是变换或者响应一个Observable,使用 [create( )](http://reactivex.io/documentation/operators/create.html)
方法,不要试图手动实现 Observable
。另外,你可以按照下面的用法说明创建一个自定义的操作符。
如果你的操作符是用于Observable发射的单独的数据项,按照下面的说明做:Sequence Operators 。如果你的操作符是用于变换Observable发射的整个数据序列,按照这个说明做:Transformational Operators 。
提示: 在一个类似于Groovy的语言Xtend中,你可以以 extension methods 的方式实现你自己的操作符 ,不使用本文的方法,它们也可以链式调用。详情参见 RxJava and Xtend
序列操作符
下面的例子向你展示了怎样使用lift( )
操作符将你的自定义操作符(在这个例子中是 myOperator
)与标准的RxJava操作符(如ofType
和map
)一起使用:
1 | fooObservable = barObservable.ofType(Integer).map({it*2}).lift(new MyOperator<T>()).map({"transformed by myOperator: " + it}); |
下面这部分向你展示了你的操作符的脚手架形式,以便它能正确的与lift()
搭配使用。
实现你的操作符
将你的自定义操作符定义为实现了 [Operator](http://reactivex.io/RxJava/javadoc/rx/Observable.Operator.html)
接口的一个公开类, 就像这样:
1 | public class MyOperator<T> implements Operator<T> { |
变换操作符
下面的例子向你展示了怎样使用 compose( )
操作符将你得自定义操作符(在这个例子中,是一个名叫myTransformer
的操作符,它将一个发射整数的Observable转换为发射字符串的)与标准的RxJava操作符(如ofType
和map
)一起使用:
1 | fooObservable = barObservable.ofType(Integer).map({it*2}).compose(new MyTransformer<Integer,String>()).map({"transformed by myOperator: " + it}); |
下面这部分向你展示了你的操作符的脚手架形式,以便它能正确的与compose()
搭配使用。
实现你的变换器
将你的自定义操作符定义为实现了 [Transformer](http://reactivex.io/RxJava/javadoc/rx/Observable.Transformer.html)
接口的一个公开类,就像这样:
1 | public class MyTransformer<Integer,String> implements Transformer<Integer,String> { |
参见
其它需要考虑的
- 在发射任何数据(或者通知)给订阅者之前,你的序列操作符可能需要检查它的
[Subscriber.isUnsubscribed( )](https://mcxiaoke.gitbooks.io/rxdocs/content/topics/Observable#unsubscribing)
状态,如果没有订阅者了,没必要浪费时间生成数据项。 - 请注意:你的序列操作符必须复合Observable协议的核心原则:
- 它可能调用订阅者的
[onNext( )](https://mcxiaoke.gitbooks.io/rxdocs/content/topics/Observable#onnext-oncompleted-and-onerror)
方法任意次,但是这些调用必须是不重叠的。 - 它只能调用订阅者的
[onCompleted( )](https://mcxiaoke.gitbooks.io/rxdocs/content/topics/Observable#onnext-oncompleted-and-onerror)
或[onError( )](https://mcxiaoke.gitbooks.io/rxdocs/content/topics/Observable#onnext-oncompleted-and-onerror)
正好一次,但不能都调用,而且不能在这之后调用订阅者的[onNext( )](https://mcxiaoke.gitbooks.io/rxdocs/content/topics/Observable#onnext-oncompleted-and-onerror)
方法。 - 如果你不能保证你得操作符遵从这两个原则,你可以给它添加
[serialize( )](https://mcxiaoke.gitbooks.io/rxdocs/content/topics/Observable-Utility-Operators#serialize)
操作符,它会强制保持正确的行为。
- 它可能调用订阅者的
- 请关注这里 Issue #1962 &mdash;需要有一个计划创建一个测试脚手架,你可以用它来写测试验证你的新操作符遵从了Observable协议。
- 不要让你的操作符阻塞别的操作。
- When possible, you should compose new operators by combining existing operators, rather than implementing them with new code. RxJava itself does this with some of its standard operators, for example:
- 如果可能,你应该组合现有的操作符创建你的新操作符,而不是从零开始实现它。RxJava自身的标准操作符也是这样做的,例如:
[first( )](http://reactivex.io/documentation/operators/first.html)
被定义为take(1).single( )
[ignoreElements( )](http://reactivex.io/documentation/operators/ignoreelements.html)
被定义为filter(alwaysFalse( ))
[reduce(a)](http://reactivex.io/documentation/operators/reduce.html)
被定义为scan(a).last( )
- 如果你的操作符使用了函数或者lambda表达式作为参数,请注意它们可能是异常的来源,而且要准备好捕获这些异常,并且使用
1
onError()
通知订阅者。
- 某些异常被认为是致命的,对它们来说,调用
onError()
毫无意义,那样或者是无用的,或者只是对问题的妥协。你可以使用Exceptions.throwIfFatal(throwable)
方法过滤掉这些致命的异常,并重新抛出它们,而不是试图发射关于它们的通知。 - 一般说来,一旦发生错误应立即通知订阅者,而不是首先尝试发射更多的数据。
- 请注意
null
可能是Observable发射的一个合法数据。频繁发生错误的一个来源是:测试一些变量并且将持有一个非null
值作为是否发射了数据的替代。一个值为null
的数据仍然是一个发射数据项,它与没有发射任何东西是不能等同的。 - 想让你的操作符在反压(backpressure)场景中变得得好可能会非常棘手。可以参考Dávid Karnok的博客 Advanced RxJava,这里有一个涉及到的各种因素和怎样处理它们的很值得看的讨论。
插件让你可以用多种方式修改RxJava的默认行为:
- 修改默认的计算、IO和新线程调度器集合
- 为RxJava可能遇到的特殊错误注册一个错误处理器
- 注册一个函数记录一些常规RxJava活动的发生
RxJavaSchedulersHook
这个插件让你可以使用你选择的调度器覆盖默认的计算、IO和新线程调度 (Scheduler
),要做到这些,需要继承 RxJavaSchedulersHook
类并覆写这些方法:
Scheduler getComputationScheduler( )
Scheduler getIOScheduler( )
Scheduler getNewThreadScheduler( )
Action0 onSchedule(action)
然后是下面这些步骤:
- 创建一个你实现的
RxJavaSchedulersHook
子类的对象。 - 使用
RxJavaPlugins.getInstance( )
获取全局的RxJavaPlugins对象。 - 将你的默认调度器对象传递给
RxJavaPlugins
的registerSchedulersHook( )
方法。
完成这些后,RxJava会开始使用你的方法返回的调度器,而不是内置的默认调度器。
RxJavaErrorHandler
这个插件让你可以注册一个函数处理传递给 Subscriber.onError(Throwable)
的错误。要做到这一点,需要继承 RxJavaErrorHandler
类并覆写这个方法:
void handleError(Throwable e)
然后是下面这些步骤:
- 创建一个你实现的
RxJavaErrorHandler
子类的对象。 - 使用
RxJavaPlugins.getInstance( )
获取全局的RxJavaPlugins对象。 - 将你的错误处理器对象传递给
RxJavaPlugins
的registerErrorHandler( )
方法。
完成这些后,RxJava会开始使用你的错误处理器处理传递给 Subscriber.onError(Throwable)
的错误。
RxJavaObservableExecutionHook
这个插件让你可以注册一个函数用于记录日志或者性能数据收集,RxJava在某些常规活动时会调用它。要做到这一点,需要继承 RxJavaObservableExecutionHook
类并覆写这些方法:
方法 | 何时调用 |
---|---|
onCreate( ) |
在 Observable.create( ) |
方法中 | |
onSubscribeStart( ) |
在 Observable.subscribe( ) |
之前立刻 | |
onSubscribeReturn( ) |
在 Observable.subscribe( ) |
之后立刻 | |
onSubscribeError( ) |
在Observable.subscribe( ) |
执行失败时 | |
onLift( ) |
在Observable.lift( ) |
方法中 |
然后是下面这些步骤:
- 创建一个你实现的
RxJavaObservableExecutionHook
子类的对象。 - 使用
RxJavaPlugins.getInstance( )
获取全局的RxJavaPlugins对象。 - 将你的Hook对象传递给
RxJavaPlugins
的registerObservableExecutionHook( )
方法。
When you do this, RxJava will begin to call your functions when it encounters the specific conditions they were designed to take note of. 完成这些后,在满足某些特殊的条件时,RxJava会开始调用你的方法。