Cold-Observables与Hot-Observables
# Cold Observables
Cold Observables
只有被 observers
订阅的时候,才会开始产生值。是单播的,有多少个订阅就会生成多少个订阅实例,每个订阅都是从第一个产生的值开始接收值,所以每个订阅接收到的值都是一样的。
如果大家想要参考
Cold Observables
相关代码,直接看前面的单播示例就行了。
正如单播描述的能力,不管观察者们什么时候开始订阅,源对象都会从初始值开始把所有的数都发给该观察者。
# Hot Observables
Hot Observables
不管有没有被订阅都会产生值。是多播的,多个订阅共享同一个实例,是从订阅开始接受到值,每个订阅接收到的值是不同的,取决于它们是从什么时候开始订阅。
这里有几种场景,我们可以逐一分析一下便于理解:
# “加热”
首先可以忽略代码中出现的陌生的函数,后面会细说。
const source = Rx.Observable.of(1, 2).publish();
source.connect();
source.subscribe((value) => console.log('A:' + value));
setTimeout(() => {
source.subscribe((value) => console.log('B:' + value));
}, 1000);
2
3
4
5
6
这里首先用Rx
的操作符of
创建了一个Observable
,并且后面跟上了一个publish
函数,在创建完之后调用connect
函数进行开始数据发送。
最终代码的执行结果就是没有任何数据打印出来,分析一下原因其实也比较好理解,由于开启数据发送的时候还没有订阅,并且这是一个Hot Observables
,它是不会理会你是否有没有订阅它,开启之后就会直接发送数据,所以A
和B
都没有接收到数据。
当然你这里如果把
connect
方法放到最后,那么最终的结果就是A
接收到了,B
还是接不到,因为A
在开启发数据之前就订阅了,而B
还要等一秒。
# 更直观的场景
正如上述多播所描述的,其实我们更多想看到的现象是能够A
和B
两个观察者能够都有接收到数据,然后观察数据的差别,这样会方便理解。
这里直接换一个发射源:
const source = Rx.Observable.interval(1000).take(3).publish();
source.subscribe((value: number) => console.log('A:' + value));
setTimeout(() => {
source.subscribe((value: number) => console.log('B:' + value));
}, 3000);
source.connect();
// A:0
// A:1
// A:2
// B:2
2
3
4
5
6
7
8
9
10
11
这里我们利用interval
配合take
操作符每秒发射一个递增的数,最多三个,然后这个时候的打印结果就更清晰了,A
正常接收到了三个数,B
三秒之后才订阅,所以只接收到了最后一个数2,这种方式就是上述多播所描述的并无一二。
# 两者对比
Cold Observables
:举个栗子会比较好理解一点:比如我们上B站看番,更新了新番,我们不论什么时候去看,都能从头开始看到完整的剧集,与其他人看不看毫无关联,互不干扰。Hot Observables
:这就好比我们上B站看直播,直播开始之后就直接开始播放了,不管是否有没有订阅者,也就是说如果你没有一开始就订阅它,那么你过一段时候后再去看,是不知道前面直播的内容的。
# 上述代码中出现的操作符解析
在创建Hot Observables
时我们用到了publish
与connect
函数的结合,其实调用了publish
操作符之后返回的结果是一个ConnectableObservable
,然后该对象上提供了connect
方法让我们控制发送数据的时间。
publish
:这个操作符把正常的Observable
(Cold Observables
)转换成ConnectableObservable
。ConnectableObservable
:ConnectableObservable
是多播的共享Observable
,可以同时被多个observers
共享订阅,它是Hot Observables
。ConnectableObservable
是订阅者和真正的源头Observables
(上面例子中的interval
,每隔一秒发送一个值,就是源头Observables
)的中间人,ConnectableObservable
从源头Observables
接收到值然后再把值转发给订阅者。connect()
:ConnectableObservable
并不会主动发送值,它有个connect
方法,通过调用connect
方法,可以启动共享ConnectableObservable
发送值。当我们调用ConnectableObservable.prototype.connect
方法,不管有没有被订阅,都会发送值。订阅者共享同一个实例,订阅者接收到的值取决于它们何时开始订阅。
其实这种手动控制的方式还挺麻烦的,有没有什么更加方便的操作方式呢,比如监听到有订阅者订阅了才开始发送数据,一旦所有订阅者都取消了,就停止发送数据?其实也是有的,让我们看看引用计数(refCount
):
# 引用计数
这里主要用到了publish
结合refCount
实现一个“自动挡”的效果。
const source = Rx.Observable.interval(1000).take(3).publish().refCount();
setTimeout(() => {
source.subscribe(data => { console.log("A:" + data) });
setTimeout(() => {
source.subscribe(data => { console.log("B:" + data) });
}, 1000);
}, 2000);
// A:0
// A:1
// B:1
// A:2
// B:2
2
3
4
5
6
7
8
9
10
11
12
13
我们透过结果看本质,能够很轻松的发现,只有当A
订阅的时候才开始发送数据(A
拿到的数据是从0开始的),并且当B
订阅时,也是只能获取到当前发送的数据,而不能获取到之前的数据。
不仅如此,这种“自动挡”当所有订阅者都取消订阅的时候它就会停止再发送数据了。