转换操作符
那么什么是转换操作符呢,众所周知,我们在日常业务中,总是需要与各种各样的数据打交道,很多时候我们都不是直接就会对传输过来的数据进行使用,而是会对其做一定的转换,让他成为更加契合我们需求的形状,这就是转换操作符的作用所在了。
# buffer
定义:
public buffer(closingNotifier: Observable<any>): Observable<T[]>
将过往的值收集到一个数组中,并且仅当另一个 Observable
发出通知时才发出此数组。这相当于有一个缓冲区,将数据收集起来,等到一个信号来临,再释放出去。
;
改操作符就有点像一个大水坝,一些时候我们会选择蓄水,等到特定时候,再由领导下命令打开水坝,让水流出去。
举个栗子:
假设我们有这样一个需求,我们有一个接口是专门用于获取特定数据的,但是呢该接口一次性只返回一个数据,这让我们很苦恼,因为产品想让数据量达到特定值再控制进行操作,也就是他点击一下某个按钮,再去将这些数据渲染出来,那该怎么办呢?
这个时候就需要我们的buffer
操作符大展身手了:
const btn = document.createElement('button');
btn.innerText = '你点我啊!'
document.body.appendChild(btn);
const click = Rx.Observable.fromEvent(btn, 'click');
const interval = Rx.Observable.interval(1000);
const source = interval.buffer(click);
source.subscribe(x => console.log(x));
2
3
4
5
6
7
这里我们直接用
interval
来演示接口获取数据,然后再配合buffer
进行功能实现。
这里我们等四秒之后再点击一下按钮,打印出来的值为:[0, 1, 2, 3]
,然后再等8秒,点击按钮:[4, 5, 6, 7, 8, 9, 10, 11]
。
从现象看,我们不难看出,我们已经实现了通过按钮来控制数据的发送。同时我们可以发现另一个现象,发送出去的数据就直接会在缓冲区中被清空,然后重新收集新的数据。
这其实也不难理解,我们还是用水坝来举例,我们打开水坝放水一段时间之后,然后关闭它继续蓄水,那么我第二次打开水坝放出去的水自然是我新蓄的水。
# concatMap
定义:
public concatMap(project: function(value: T, ?index: number): ObservableInput, resultSelector: function(outerValue: T, innerValue: I, outerIndex: number, innerIndex: number): any): Observable
这个操作符还是有点意思的,我们先看看官网的描述:
将源值投射为一个合并到输出
Observable
的Observable
,以串行的方式等待前一个完成再合并下一个Observable
。
不知道各位读者是否感受到了“一丝丝”的不好理解呢,不过等笔者举个小例子就能轻松的搞懂了:
假设你遇到了这样一个场景,你和女朋友一起在小吃街逛街,但是呢女朋友有个不好的毛病,她总喜欢这家买完吃一口然后剩下让你吃,然后另一家买一点吃一口然后剩下还是让你吃,而你呢每次吃东西也是要时间的,一般会心疼男朋友的女朋友就会等你吃完再去买下一家的,这种情况下,你还是能吃完再休息会;另一种情况呢,女朋友不管你吃完没,她继续买买买,然后你手里的吃的越来越多,你吃的速度完全赶不上女朋友买的速度,那这个时候呢就会导致你负重越来越大,最后顶不住心态爆炸了。
以上情景包含了concatMap
的几个核心点以及需要注意的地方:
- 源值发送一个数据,然后你传入的内部
Observable
就会开始工作或者是发送数据,订阅者就能收到数据了,也就是内部的Observable
相当于总是要等源对象发送一个数据才会进行新一轮工作,并且要等本轮工作完成了才能继续下一轮。 - 如果本轮工作还未完成又接受到了源对象发送的数据,那么将会用一个队列保存,然后等本轮完成立即检查该队列里是否还有,如果有则立马开启下一轮。
- 如果内部
Observable
的工作时间大于源对象发送的数据的间隔时间,那么就会导致缓存队列越来越大,最后造成性能问题
其实通俗点理解就是,一个工厂流水线,一个负责发材料的,另一个负责制作产品的,发材料的就是源对象,制作产品的就是这个内部Observable
,这个工厂里产出的只会是成品也就是制作完成的,所以订阅者要等这个制作产品的人做完一个才能拿到一个。
如果发材料的速度比制作的人制作一个产品要快就会产生材料堆积,那么随着时间推移就会越堆越多,导致工厂装不下。
借助代码理解:
const source = Rx.Observable.interval(3000);
const result = source.concatMap(val => Rx.Observable.interval(1000).take(2));
result.subscribe(x => console.log(x));
2
3
首先分析一下代码结构,我们先创建了一个每隔三秒发送一个数据的源对象,接着调用实例方法concatMap
,并给该方法传入一个返回Observable
对象的函数,最终获得经过concatMap
转化后的Observable
对象,并对其进行订阅。
运行结果为:首先程序运行的第三秒source
会发送第一个数据,然后这时我们传入的内部Observable
,开始工作,经过两秒发送两个递增的数,接着订阅函数逐步打印出这两个数,等待一秒后也就是程序运行的第6秒,source
发送第二个数,这个时候重复上述流程。
# map
定义:
public map(project: function(value: T, index: number): R, thisArg: any): Observable<R>
如果说你使用js
中数组的map
方法较多的话,可能这里基本就不用看了,用法完全一致。
你只需要传入一个函数,那么函数的第一个参数就是数据源的每个数据,第二个参数就是该数据的索引值,你只需要返回一个计算或者其他操作之后的返回值即可作为订阅者实际获取到的值。
const source = Rx.Observable.interval(1000).take(3);
const result = source.map(x => x * 2);
result.subscribe(x => console.log(x));
2
3
take
操作符其实也就是限定拿多少个数就不在发送数据了。
这里用于演示将每个数据源的值都乘以2然后发送给订阅者,所以打印的值分别为:0、2、4。
# mapTo
定义:
public mapTo(value: any): Observable
忽略数据源发送的数据,只发送指定的值(传参)。
就像是一个你讨厌的人让你帮忙传话,他说了一大堆表白的话,然后让你传给某个妹子,你因为讨厌他所以不想帮他,于是跟那个妹子说我喜欢你,最后你们幸福的生活在一起了。
const source = Rx.Observable.interval(1000).take(3);
const result = source.mapTo(666);
result.subscribe(x => console.log(x));
2
3
就像这段代码,数据源发送的是0、1、2,而订阅者实际收到的是三个666。
# mergeMap
定义:
public mergeMap(project: function(value: T, ?index: number): ObservableInput, resultSelector: function(outerValue: T, innerValue: I, outerIndex: number, innerIndex: number): any, concurrent: number): Observable
这个定义看上有点吓人,不过我们不要慌,我们只需要了解他得大多数情况的用法即可。
这里你是否还记得前面在
empty
操作符介绍的部分提到的,笔者留了个坑没补,就是演示mergeMap
与empty
是如何进行配合的?这里就把这个坑填上。
const source = Rx.Observable.interval(1000).take(3);
const result = source.mergeMap(x => x % 2 === 0 ? Rx.Observable.of(x) : Rx.Observable.empty());
result.subscribe(x => console.log(x));
2
3
输入源是一个会发送0、1、2三个数的数据源,我们调用mergeMap
操作符,并传入一个函数,该函数的功能就是,如果输入源发送的当前值是偶数则发送给订阅者,否则就不发送。
这里面mergeMap
主要做了一个整合的能力,我们可以将它与map
进行对比,我们可以发现map
的返回值必须是一个数值,而mergeMap
返回值是要求是一个Observable
,也就是说,我们可以返回任意转换或具备其他能力的Observable
。
# pluck
定义:
public pluck(properties: ...string): Observable
用于选择出每个数据对象上的指定属性值。
就比如某个数据源发送的数据是一个对象,对象上面有一个name
属性,并且订阅者指向知道这个name
属性,那么就可以使用该操作符来提取该属性值给用户。
const source = Rx.Observable.of({name: '张三'}, {name: '李四'});
const result = source.pluck('name');
result.subscribe(x => console.log(x));
// 张三
// 李四
2
3
4
5
6
毫无疑问,这个操作符就是为了提取属性来的,相当于我们使用map
操作符来处理一下提取出name
再返回给订阅者。
# scan
定义:
public scan(accumulator: function(acc: R, value: T, index: number): R, seed: T | R): Observable<R>
累加器操作符,可以用来做状态管理,用处挺多。
就用法来看,我们可以参考一下
js
中数组的reduce
函数。
假设我们现在有一个需求,我们想要将数据源发送过来的数据累加之后再返回给订阅者,这又该怎么做呢?
const source = Rx.Observable.interval(1000).take(4);
const result = source.scan((acc, cur) => acc + cur, 0);
result.subscribe(x => console.log(x));
2
3
从代码上看,数据源发送了四个值:0、1、2、3,而订阅者每次收到的值将分别是前面已接收到的数与当前数的和也就是:0、1、3、6。
然后再看用法,我们给scan
操作符第一个参数传入了一个函数,接收两个值:acc
(前一次累加的结果或初始值)、cur
(当前值),第二个参数则是计算的初始值。
# switchMap
定义:
public switchMap(project: function(value: T, ?index: number): ObservableInput, resultSelector: function(outerValue: T, innerValue: I, outerIndex: number, innerIndex: number): any): Observable
其实也就是
switch
操作符与map
操作符的结合,switch
操作符会在组合操作符中讲到。
主要作用首先会对多个Observable
进行合并,并且具备打断能力,也就是说合并的这个几个Observable
,某个Observable
最先开始发送数据,这个时候订阅者能正常的接收到它的数据,但是这个时候另一个Observable
也开始发送数据了,那么第一个Observable
发送数据就被打断了,只会发送后来者发送的数据。
用通俗的话来说就是,有人在说话,突然你大声开始说话,人家就被你打断了,这个时候大家就只能听到你说话了。
const btn = document.createElement('button');
btn.innerText = '我要发言!'
document.body.appendChild(btn);
const source = Rx.Observable.fromEvent(btn, 'click');
const result = source.switchMap(x => Rx.Observable.interval(1000).take(3));
result.subscribe(x => console.log(x));
2
3
4
5
6
代码实现的功能就是,当某位同学点击按钮,则开始从0开始发送数字,这个时候如果同学一还没发送完数据,同学二再点一下,则同学一的数据就不会再发了,开始发同学二的。
假设同学一点完之后,第二秒同学二点击了一下按钮,则打印结果:0、1、0、1、2,这里从第二个0开始就是同学二发送的数据了。
# 其他转换操作符
官网传送门:转换操作符
bufferCount
bufferTime
bufferToggle
bufferWhen
concatMapTo
exhaustMap
expand
groupBy
mergeMapTo
mergeScan
pairwise
partition
switchMapTo
window
windowCount
windowTime
windowToggle
windowWhen