连接操作.md
连接操作
这一节解释[ConnectableObservable](http://reactivex.io/RxJava/javadoc/rx/observables/ConnectableObservable.html)
和它的子类以及它们的操作符:
- ConnectableObservable.connect( ) — 指示一个可连接的Observable开始发射数据
- Observable.publish( ) — 将一个Observable转换为一个可连接的Observable
- Observable.replay( ) — 确保所有的订阅者看到相同的数据序列,即使它们在Observable开始发射数据之后才订阅
- ConnectableObservable.refCount( ) — 让一个可连接的Observable表现得像一个普通的Observable
一个可连接的Observable与普通的Observable差不多,除了这一点:可连接的Observable在被订阅时并不开始发射数据,只有在它的connect()
被调用时才开始。用这种方法,你可以等所有的潜在订阅者都订阅了这个Observable之后才开始发射数据。
The following example code shows two Subscribers subscribing to the same Observable. In the first case, they subscribe to an ordinary Observable; in the second case, they subscribe to a Connectable Observable that only connects after both Subscribers subscribe. Note the difference in the output: 下面的示例代码展示了两个订阅者订阅同一个Observable的情况。第一种情形,它们订阅一个普通的Observable;第二种情形,它们订阅一个可连接的Observable,并且在两个都订阅后再连接。注意输出的不同:
示例 #1:
1 | def firstMillion = Observable.range( 1, 1000000 ).sample(7, java.util.concurrent.TimeUnit.MILLISECONDS); |
示例 #2:
1 | def firstMillion = Observable.range( 1, 1000000 ).sample(7, java.util.concurrent.TimeUnit.MILLISECONDS).publish(); |
Connect
让一个可连接的Observable开始发射数据给订阅者
可连接的Observable (connectable Observable)与普通的Observable差不多,不过它并不会在被订阅时开始发射数据,而是直到使用了Connect
操作符时才会开始。用这个方法,你可以等待所有的观察者都订阅了Observable之后再开始发射数据。
RxJava中connect
是ConnectableObservable
接口的一个方法,使用publish
操作符可以将一个普通的Observable转换为一个ConnectableObservable
。
调用ConnectableObservable
的connect
方法会让它后面的Observable开始给发射数据给订阅者。
connect
方法返回一个Subscription
对象,可以调用它的unsubscribe
方法让Observable停止发射数据给观察者。
即使没有任何订阅者订阅它,你也可以使用connect
方法让一个Observable开始发射数据(或者开始生成待发射的数据)。这样,你可以将一个”冷”的Observable变为”热”的。
- Javadoc: connect()
- Javadoc: connect(Action1)
Publish
将普通的Observable转换为可连接的Observable
可连接的Observable (connectable Observable)与普通的Observable差不多,不过它并不会在被订阅时开始发射数据,而是直到使用了Connect
操作符时才会开始。用这种方法,你可以在任何时候让一个Observable开始发射数据。
RxJava的实现为publish
。
- Javadoc: publish()
有一个变体接受一个函数作为参数。这个函数用原始Observable发射的数据作为参数,产生一个新的数据作为ConnectableObservable
给发射,替换原位置的数据项。实质是在签名的基础上添加一个Map
操作。
- Javadoc: publish(Func1)
RefCount
让一个可连接的Observable行为像普通的Observable
可连接的Observable (connectable Observable)与普通的Observable差不多,不过它并不会在被订阅时开始发射数据,而是直到使用了Connect
操作符时才会开始。用这种方法,你可以在任何时候让一个Observable开始发射数据。
RefCount
操作符把从一个可连接的Observable连接和断开的过程自动化了。它操作一个可连接的Observable,返回一个普通的Observable。当第一个订阅者订阅这个Observable时,RefCount
连接到下层的可连接Observable。RefCount
跟踪有多少个观察者订阅它,直到最后一个观察者完成才断开与下层可连接Observable的连接。
RxJava中的实现为refCount
,还有一个操作符叫share
,它的作用等价于对一个Observable同时应用publish
和refCount
操作。
- Javadoc: refCount()
- Javadoc: share()
Replay
保证所有的观察者收到相同的数据序列,即使它们在Observable开始发射数据之后才订阅
可连接的Observable (connectable Observable)与普通的Observable差不多,不过它并不会在被订阅时开始发射数据,而是直到使用了Connect
操作符时才会开始。用这种方法,你可以在任何时候让一个Observable开始发射数据。
如果在将一个Observable转换为可连接的Observable之前对它使用Replay
操作符,产生的这个可连接Observable将总是发射完整的数据序列给任何未来的观察者,即使那些观察者在这个Observable开始给其它观察者发射数据之后才订阅。
RxJava的实现为replay
,它有多个接受不同参数的变体,有的可以指定replay
的最大缓存数量,有的还可以指定调度器。
- Javadoc: replay()
- Javadoc: replay(int)
- Javadoc: replay(long,TimeUnit)
- Javadoc: replay(int,long,TimeUnit)
有一种 replay
返回一个普通的Observable。它可以接受一个变换函数为参数,这个函数接受原始Observable发射的数据项为参数,返回结果Observable要发射的一项数据。因此,这个操作符其实是replay
变换之后的数据项。
- Javadoc: replay(Func1)
- Javadoc: replay(Func1,int)
- Javadoc: replay(Func1,long,TimeUnit)
- Javadoc: replay(Func1,int,long,TimeUnit)