算术和聚合操作

本页展示的操作符用于对整个序列执行算法操作或其它操作,由于这些操作必须等待数据发射完成(通常也必须缓存这些数据),它们对于非常长或者无限的序列来说是危险的,不推荐使用。

rxjava-math 模块的操作符

其它聚合操作符

  • concat( ) — 顺序连接多个Observables
  • count( ) and countLong( ) — 计算数据项的个数并发射结果
  • reduce( ) — 对序列使用reduce()函数并发射最终的结果
  • collect( ) — 将原始Observable发射的数据放到一个单一的可变的数据结构中,然后返回一个发射这个数据结构的Observable
  • toList( ) — 收集原始Observable发射的所有数据到一个列表,然后返回这个列表
  • toSortedList( ) — 收集原始Observable发射的所有数据到一个有序列表,然后返回这个列表
  • toMap( ) — 将序列数据转换为一个Map,Map的key是根据一个函数计算的
  • toMultiMap( ) — 将序列数据转换为一个列表,同时也是一个Map,Map的key是根据一个函数计算的

算术和聚合操作

Average

计算原始Observable发射数字的平均值并发射它

Average操作符操作符一个发射数字的Observable,并发射单个值:原始Observable发射的数字序列的平均值。

这个操作符不包含在RxJava核心模块中,它属于不同的rxjava-math模块。它被实现为四个操作符:averageDouble, averageFloat, averageInteger, averageLong

如果原始Observable不发射任何数据,这个操作符会抛异常:IllegalArgumentException

Min

发射原始Observable的最小值

Min操作符操作一个发射数值的Observable并发射单个值:最小的那个值。

RxJava中,min属于rxjava-math模块。

min接受一个可选参数,用于比较两项数据的大小,如果最小值的数据超过一项,min会发射原始Observable最近发射的那一项。

minBy类似于min,但是它发射的不是最小值,而是发射Key最小的项,Key由你指定的一个函数生成。

Max

发射原始Observable的最大值

Max操作符操作一个发射数值的Observable并发射单个值:最大的那个值。

RxJava中,max属于rxjava-math模块。

max接受一个可选参数,用于比较两项数据的大小,如果最大值的数据超过一项,max会发射原始Observable最近发射的那一项。

maxBy类似于max,但是它发射的不是最大值,而是发射Key最大的项,Key由你指定的一个函数生成。

Count

计算原始Observable发射物的数量,然后只发射这个值

Count操作符将一个Observable转换成一个发射单个值的Observable,这个值表示原始Observable发射的数据的数量。

如果原始Observable发生错误终止,Count不发射数据而是直接传递错误通知。如果原始Observable永远不终止,Count既不会发射数据也不会终止。

RxJava的实现是countcountLong

示例代码

1
2
String[] items = new String[] { "one", "two", "three" };
assertEquals( new Integer(3), Observable.from(items).count().toBlocking().single() );

Sum

计算Observable发射的数值的和并发射这个和

Sum操作符操作一个发射数值的Observable,仅发射单个值:原始Observable所有数值的和。

RxJava的实现是sumDouble, sumFloat, sumInteger, sumLong,它们不是RxJava核心模块的一部分,属于rxjava-math模块。

你可以使用一个函数,计算Observable每一项数据的函数返回值的和。

StringObservable类(这个类不是RxJava核心模块的一部分)中有一个stringConcat操作符,它将一个发射字符串序列的Observable转换为一个发射单个字符串的Observable,后者这个字符串表示的是前者所有字符串的连接。

StringObservable类还有一个join操作符,它将一个发射字符串序列的Observable转换为一个发射单个字符串的Observable,后者这个字符串表示的是前者所有字符串以你指定的分界符连接的结果。

Concat

不交错的发射两个或多个Observable的发射物

Concat操作符连接多个Observable的输出,就好像它们是一个Observable,第一个Observable发射的所有数据在第二个Observable发射的任何数据前面,以此类推。

直到前面一个Observable终止,Concat才会订阅额外的一个Observable。注意:因此,如果你尝试连接一个”热”Observable(这种Observable在创建后立即开始发射数据,即使没有订阅者),Concat将不会看到也不会发射它之前发射的任何数据。

在ReactiveX的某些实现中有一种ConcatMap操作符(名字可能叫concat_all, concat_map, concatMapObserver, for, forIn/for_in, mapcat, selectConcatselectConcatObserver),他会变换原始Observable发射的数据到一个对应的Observable,然后再按观察和变换的顺序进行连接操作。

StartWith操作符类似于Concat,但是它是插入到前面,而不是追加那些Observable的数据到原始Observable发射的数据序列。

Merge操作符也差不多,它结合两个或多个Observable的发射物,但是数据可能交错,而Concat不会让多个Observable的发射物交错。

RxJava中的实现叫concat

还有一个实例方法叫concatWith,这两者是等价的:Observable.concat(a,b)a.concatWith(b)

Reduce

按顺序对Observable发射的每项数据应用一个函数并发射最终的值

Reduce操作符对原始Observable发射数据的第一项应用一个函数,然后再将这个函数的返回值与第二项数据一起传递给函数,以此类推,持续这个过程知道原始Observable发射它的最后一项数据并终止,此时Reduce返回的Observable发射这个函数返回的最终值。

在其它场景中,这种操作有时被称为累积聚集压缩折叠注射等。

注意如果原始Observable没有发射任何数据,reduce抛出异常IllegalArgumentException

reduce默认不在任何特定的调度器上执行。

还有一个版本的reduce额外接受一个种子参数。注意传递一个值为null的种子是合法的,但是与不传种子参数的行为是不同的。如果你传递了种子参数,并且原始Observable没有发射任何数据,reduce操作符将发射这个种子值然后正常终止,而不是抛异常。

提示:不建议使用reduce收集发射的数据到一个可变的数据结构,那种场景你应该使用collect

collectreduce类似,但它的目的是收集原始Observable发射的所有数据到一个可变的数据结构,collect生成的这个Observable会发射这项数据。它需要两个参数:

  1. 一个函数返回可变数据结构
  2. 另一个函数,当传递给它这个数据结构和原始Observable发射的数据项时,适当地修改数据结构。

collect默认不在任何特定的调度器上执行。