连接操作

这一节解释[ConnectableObservable](http://reactivex.io/RxJava/javadoc/rx/observables/ConnectableObservable.html) 和它的子类以及它们的操作符:

一个可连接的Observable与普通的Observable差不多,除了这一点:可连接的Observable在被订阅时并不开始发射数据,只有在它的connect()被调用时才开始。用这种方法,你可以等所有的潜在订阅者都订阅了这个Observable之后才开始发射数据。

The following example code shows two Subscribers subscribing to the same Observable. In the first case, they subscribe to an ordinary Observable; in the second case, they subscribe to a Connectable Observable that only connects after both Subscribers subscribe. Note the difference in the output: 下面的示例代码展示了两个订阅者订阅同一个Observable的情况。第一种情形,它们订阅一个普通的Observable;第二种情形,它们订阅一个可连接的Observable,并且在两个都订阅后再连接。注意输出的不同:

示例 #1:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
def firstMillion  = Observable.range( 1, 1000000 ).sample(7, java.util.concurrent.TimeUnit.MILLISECONDS);

firstMillion.subscribe(
{ println("Subscriber #1:" + it); }, // onNext
{ println("Error: " + it.getMessage()); }, // onError
{ println("Sequence #1 complete"); } // onCompleted
);

firstMillion.subscribe(
{ println("Subscriber #2:" + it); }, // onNext
{ println("Error: " + it.getMessage()); }, // onError
{ println("Sequence #2 complete"); } // onCompleted
);
Subscriber #1:211128
Subscriber #1:411633
Subscriber #1:629605
Subscriber #1:841903
Sequence #1 complete
Subscriber #2:244776
Subscriber #2:431416
Subscriber #2:621647
Subscriber #2:826996
Sequence #2 complete

示例 #2:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
def firstMillion  = Observable.range( 1, 1000000 ).sample(7, java.util.concurrent.TimeUnit.MILLISECONDS).publish();

firstMillion.subscribe(
{ println("Subscriber #1:" + it); }, // onNext
{ println("Error: " + it.getMessage()); }, // onError
{ println("Sequence #1 complete"); } // onCompleted
);

firstMillion.subscribe(
{ println("Subscriber #2:" + it); }, // onNext
{ println("Error: " + it.getMessage()); }, // onError
{ println("Sequence #2 complete"); } // onCompleted
);

firstMillion.connect();
Subscriber #2:208683
Subscriber #1:208683
Subscriber #2:432509
Subscriber #1:432509
Subscriber #2:644270
Subscriber #1:644270
Subscriber #2:887885
Subscriber #1:887885
Sequence #2 complete
Sequence #1 complete

Connect

让一个可连接的Observable开始发射数据给订阅者

可连接的Observable (connectable Observable)与普通的Observable差不多,不过它并不会在被订阅时开始发射数据,而是直到使用了Connect操作符时才会开始。用这个方法,你可以等待所有的观察者都订阅了Observable之后再开始发射数据。

RxJava中connectConnectableObservable接口的一个方法,使用publish操作符可以将一个普通的Observable转换为一个ConnectableObservable

调用ConnectableObservableconnect方法会让它后面的Observable开始给发射数据给订阅者。

connect方法返回一个Subscription对象,可以调用它的unsubscribe方法让Observable停止发射数据给观察者。

即使没有任何订阅者订阅它,你也可以使用connect方法让一个Observable开始发射数据(或者开始生成待发射的数据)。这样,你可以将一个”冷”的Observable变为”热”的。

Publish

将普通的Observable转换为可连接的Observable

可连接的Observable (connectable Observable)与普通的Observable差不多,不过它并不会在被订阅时开始发射数据,而是直到使用了Connect操作符时才会开始。用这种方法,你可以在任何时候让一个Observable开始发射数据。

RxJava的实现为publish

有一个变体接受一个函数作为参数。这个函数用原始Observable发射的数据作为参数,产生一个新的数据作为ConnectableObservable给发射,替换原位置的数据项。实质是在签名的基础上添加一个Map操作。

RefCount

让一个可连接的Observable行为像普通的Observable

可连接的Observable (connectable Observable)与普通的Observable差不多,不过它并不会在被订阅时开始发射数据,而是直到使用了Connect操作符时才会开始。用这种方法,你可以在任何时候让一个Observable开始发射数据。

RefCount操作符把从一个可连接的Observable连接和断开的过程自动化了。它操作一个可连接的Observable,返回一个普通的Observable。当第一个订阅者订阅这个Observable时,RefCount连接到下层的可连接Observable。RefCount跟踪有多少个观察者订阅它,直到最后一个观察者完成才断开与下层可连接Observable的连接。

RxJava中的实现为refCount,还有一个操作符叫share,它的作用等价于对一个Observable同时应用publishrefCount操作。

Replay

保证所有的观察者收到相同的数据序列,即使它们在Observable开始发射数据之后才订阅

可连接的Observable (connectable Observable)与普通的Observable差不多,不过它并不会在被订阅时开始发射数据,而是直到使用了Connect操作符时才会开始。用这种方法,你可以在任何时候让一个Observable开始发射数据。

如果在将一个Observable转换为可连接的Observable之前对它使用Replay操作符,产生的这个可连接Observable将总是发射完整的数据序列给任何未来的观察者,即使那些观察者在这个Observable开始给其它观察者发射数据之后才订阅。

RxJava的实现为replay,它有多个接受不同参数的变体,有的可以指定replay的最大缓存数量,有的还可以指定调度器。

有一种 replay返回一个普通的Observable。它可以接受一个变换函数为参数,这个函数接受原始Observable发射的数据项为参数,返回结果Observable要发射的一项数据。因此,这个操作符其实是replay变换之后的数据项。