组合操作符
# concatAll
定义:
public concatAll(): Observable
顾名思义,该操作符有点像我们js
中数组方法concat
,用于将多个Observable
合成一个,不过它有个注意点在于它是串行的,也就是合并了两个Observable
,那订阅者在获取值的时候会先获取完第一个Observable
,之后才开始接收到后一个Observable
的值。
const source1 = Rx.Observable.of(1, 2);
const source2 = source1.map(x => Rx.Observable.interval(1000).take(3));
const result = source2.concatAll();
result.subscribe(x => console.log(x));
2
3
4
根据上面的文字介绍,相信大家对于这段代码应该也能多少看得懂一些,没错,这段代码的含义就是我们的数据源发送了两个数,并且采用map
操作符处理完返回了一个新的Observable
,这个时候为了订阅者能够正常的接收多个Observable
,则采用concatAll
合并一下,并且最终订阅者收到的结果依次为:0、1、2、0、1、2。
# mergeAll
定义:
public mergeAll(concurrent: number): Observable
与concatAll
几乎没太大差别,唯一不同的就是它是并行的,也就是合并的多个Observable
发送数据时是不分先后的。
# combineLatest
定义:
public combineLatest(other: ObservableInput, project: function): Observable
组合多个 Observables
来创建一个 Observable
,该 Observable
的值根据每个输入 Observable
的最新值计算得出的。
这个操作符光从简介来看不太好理解,我们来结合实例进行讲解吧。
const s1 = Rx.Observable.interval(2000).take(3);
const s2 = Rx.Observable.interval(1000).take(5);
const result = s1.combineLatest(s2, (a, b) => a + b);
result.subscribe(x => console.log(x));
2
3
4
打印结果依次是:0、1、2、3、4、5、6。
首先我们看这个combineLatest
的使用方式,它是一个实例操作符,这里演示的是将s1
与s2
结合到一起,并且第二个参数需要传入回调,对结合的值进行处理,由于我们这里只结合了两个,故只接收a
、b
两个参数,该回调函数的返回值即为订阅者获取到的值。
从结果看其实也看不出来啥,主要是这个过程如下:
s2
发送一个0,而此时s1
未发送值,则我们传入的回调不会执行,订阅者也不会接收到值。s1
发送一个0,而s2
最后一次发送的值为0,故调用回调函数,并把这两个参数传入,最终订阅者收到s2
发送一个1,而s1
最后一次发送的为0,故结果为1。s1
发送一个1,而s2
最后一次发送的值为1,故结果为2。s2
发送一个值为2,而s1
最后一次发送的值为1,故结果为3。s2
发送一个值为3,而s1
最后一次发送的值为1,故结果为4。- ...重复上述步骤。
这里有个注意点,我们会发现
s1
、s2
在某些时候会同时发送数据,但是这个也会有先后顺序的,所以这个时候就看他们谁先定义那么谁就会先发送,从上面步骤中你们应该也能发现这个现象。
其实也就是结合的多个源之间存在一种依赖关系,也就是两个源都至少发送了一个值,订阅者才会收到消息,等到两个源都发送完毕,最后才会发出结束信号。
# zip
定义:
public static zip(observables: *): Observable<R>
将多个 Observable
组合以创建一个 Observable
,该 Observable
的值是由所有输入 Observables
的值按顺序计算而来的。如果最后一个参数是函数, 这个函数被用来计算最终发出的值.否则, 返回一个顺序包含所有输入值的数组.
通俗点说就是多个源之间会进行顺位对齐计算,跟前面的combineLatest
有点差别。
话不多说,上码:
const s1 = Rx.Observable.interval(1000).take(3);
const s2 = Rx.Observable.interval(2000).take(5);
const result = s1.zip(s2, (a, b) => a + b);
result.subscribe(x => console.log(x));
2
3
4
打印结果依次是:0、2、4。
怎么理解呢,首先我们记住一句话,多个源之间用来计算的数是顺位对齐的,也就是说s1
的第一个数对齐s2
的第一个数,这种一一对应的计算,最终订阅者收到的就是将多个对齐的数传入我们在调用zip
的最后一个回调函数,也就是用来计算完值最终返回给用户的结果,这是可选的。
等到两个源中的任意一个源结束了之后,整体就会发出结束信号,因为后续不存在可以对齐的数了。
# startWidth
定义:
public startWith(values: ...T, scheduler: Scheduler): Observable
返回的 Observable
会先发出作为参数指定的项,然后再发出由源 Observable
所发出的项。
怎么理解呢,其实很好举例,比如有一串糖葫芦,整体都是一个颜色,你觉得不好看,于是你在这串糖葫芦的前面插了几个颜色不一样的糖葫芦,这个时候用户吃的时候就会先吃到你插在最前面的糖葫芦。
const source = Rx.Observable.interval(1000).take(3);
const result = source.startWith(666)
result.subscribe(x => console.log(x));
2
3
打印结果为:666、0、1、2。
是不是很好理解呢。
# switch
定义:
public switch(): Observable<T>
通过只订阅最新发出的内部 Observable
,将高阶 Observable
转换成一阶 Observable
。
对于该操作符的用法其实前面我们在介绍switchMap
这个转换操作符时就已经说到了,相当于map
+switch
=switchMap
。
举个栗子:
const btn = document.createElement('button');
btn.innerText = '我要发言!'
document.body.appendChild(btn);
const source = Rx.Observable.fromEvent(btn, 'click');
const source2 = source.map(x => Rx.Observable.interval(1000).take(3));
const result = source2.switch();
result.subscribe(x => console.log(x));
2
3
4
5
6
7
上述代码实现的效果与switchMap
一致,当用户点击按钮时会开始发送数据,当这次数据发送未完成时,再次点击按钮,则会开始一个新的发射数据流程,将原先的发射数据流程直接抛弃。
# 其他组合操作符
官网传送门:组合操作符
combineAll
concat
exhaust
forkJoin
merge
race
withLatestFrom
zipAll
# 多播操作符
官网传送门:多播操作符
cache
multicast
publish
publishBehavior
publishLast
publishReplay
share
待完善...
# 错误处理操作符
官网传送门:错误处理操作符
catch
retry
retryWhen
待完善...
# 工具操作符
官网传送门:工具操作符
do
delay
delayWhen
dematerialize
finally
let
materialize
observeOn
subscribeOn
timeInterval
timestamp
timeout
timeoutWith
toArray
toPromise
待完善...
# 条件和布尔操作符
官网传送门:条件和布尔操作符
defaultIfEmpty
every
find
findIndex
isEmpty
待完善...
# 数学和聚合操作符
官网传送门:数学和聚合操作符
count
max
min
reduce
待完善...