陌小路的个人博客 陌小路的个人博客
首页
  • 技术专区

    • 面试
    • Vue
    • Electron
    • TypeScript
    • Serverless
    • GraphQL
  • 我的秋招之旅
  • 2019年终总结
Todo
收藏夹
关于作者
GitHub

陌小路

前端切图仔
首页
  • 技术专区

    • 面试
    • Vue
    • Electron
    • TypeScript
    • Serverless
    • GraphQL
  • 我的秋招之旅
  • 2019年终总结
Todo
收藏夹
关于作者
GitHub
  • Vue

  • React

  • 面试

  • Electron

  • Serverless

  • GraphQL

  • TypeScript

  • RxJS

    • 介绍
    • 前置知识点
    • Observable
    • Observer
    • Subscription与Subject
    • Cold-Observables与Hot-Observables
    • Schedulers
    • Operators概念
    • 创建型Operators
    • 转换操作符
    • 过滤操作符
    • 组合操作符
    • 总结
  • 工程化

  • Webpack

  • Nestjs

  • WebRTC & P2P

  • Docker

  • Linux

  • Git

  • Svelte

  • 踩坑日记 & 小Tips

  • 其他

  • technology
  • RxJS
陌小路
2020-12-22

Cold-Observables与Hot-Observables

Cold and Hot Observables

# Cold Observables

Cold Observables 只有被 observers 订阅的时候,才会开始产生值。是单播的,有多少个订阅就会生成多少个订阅实例,每个订阅都是从第一个产生的值开始接收值,所以每个订阅接收到的值都是一样的。

如果大家想要参考Cold Observables相关代码,直接看前面的单播示例就行了。

正如单播描述的能力,不管观察者们什么时候开始订阅,源对象都会从初始值开始把所有的数都发给该观察者。

# Hot Observables

Hot Observables 不管有没有被订阅都会产生值。是多播的,多个订阅共享同一个实例,是从订阅开始接受到值,每个订阅接收到的值是不同的,取决于它们是从什么时候开始订阅。

这里有几种场景,我们可以逐一分析一下便于理解:

# “加热”

首先可以忽略代码中出现的陌生的函数,后面会细说。

const source = Rx.Observable.of(1, 2).publish();
source.connect();
source.subscribe((value) => console.log('A:' + value));
setTimeout(() => {
	source.subscribe((value) => console.log('B:' + value));
}, 1000);
1
2
3
4
5
6

这里首先用Rx的操作符of创建了一个Observable,并且后面跟上了一个publish函数,在创建完之后调用connect函数进行开始数据发送。

最终代码的执行结果就是没有任何数据打印出来,分析一下原因其实也比较好理解,由于开启数据发送的时候还没有订阅,并且这是一个Hot Observables,它是不会理会你是否有没有订阅它,开启之后就会直接发送数据,所以A和B都没有接收到数据。

当然你这里如果把connect方法放到最后,那么最终的结果就是A接收到了,B还是接不到,因为A在开启发数据之前就订阅了,而B还要等一秒。

# 更直观的场景

正如上述多播所描述的,其实我们更多想看到的现象是能够A和B两个观察者能够都有接收到数据,然后观察数据的差别,这样会方便理解。

这里直接换一个发射源:

const source = Rx.Observable.interval(1000).take(3).publish();
source.subscribe((value: number) => console.log('A:' + value));
setTimeout(() => {
	source.subscribe((value: number) => console.log('B:' + value));
}, 3000);
source.connect();

// A:0
// A:1
// A:2
// B:2
1
2
3
4
5
6
7
8
9
10
11

这里我们利用interval配合take操作符每秒发射一个递增的数,最多三个,然后这个时候的打印结果就更清晰了,A正常接收到了三个数,B三秒之后才订阅,所以只接收到了最后一个数2,这种方式就是上述多播所描述的并无一二。

# 两者对比

  • Cold Observables:举个栗子会比较好理解一点:比如我们上B站看番,更新了新番,我们不论什么时候去看,都能从头开始看到完整的剧集,与其他人看不看毫无关联,互不干扰。
  • Hot Observables:这就好比我们上B站看直播,直播开始之后就直接开始播放了,不管是否有没有订阅者,也就是说如果你没有一开始就订阅它,那么你过一段时候后再去看,是不知道前面直播的内容的。

# 上述代码中出现的操作符解析

在创建Hot Observables时我们用到了publish与connect函数的结合,其实调用了publish操作符之后返回的结果是一个ConnectableObservable,然后该对象上提供了connect方法让我们控制发送数据的时间。

  • publish:这个操作符把正常的 Observable(Cold Observables )转换成 ConnectableObservable。

  • ConnectableObservable:ConnectableObservable 是多播的共享 Observable,可以同时被多个 observers共享订阅,它是 Hot Observables。ConnectableObservable 是订阅者和真正的源头 Observables(上面例子中的 interval,每隔一秒发送一个值,就是源头 Observables)的中间人,ConnectableObservable 从源头 Observables 接收到值然后再把值转发给订阅者。

  • connect():ConnectableObservable 并不会主动发送值,它有个 connect方法,通过调用 connect 方法,可以启动共享 ConnectableObservable 发送值。当我们调用 ConnectableObservable.prototype.connect 方法,不管有没有被订阅,都会发送值。订阅者共享同一个实例,订阅者接收到的值取决于它们何时开始订阅。

其实这种手动控制的方式还挺麻烦的,有没有什么更加方便的操作方式呢,比如监听到有订阅者订阅了才开始发送数据,一旦所有订阅者都取消了,就停止发送数据?其实也是有的,让我们看看引用计数(refCount):

# 引用计数

这里主要用到了publish结合refCount实现一个“自动挡”的效果。

const source = Rx.Observable.interval(1000).take(3).publish().refCount();
setTimeout(() => {
	source.subscribe(data => { console.log("A:" + data) });
	setTimeout(() => {
		source.subscribe(data => { console.log("B:" + data) });
	}, 1000);
}, 2000);

// A:0
// A:1
// B:1
// A:2
// B:2
1
2
3
4
5
6
7
8
9
10
11
12
13

我们透过结果看本质,能够很轻松的发现,只有当A订阅的时候才开始发送数据(A拿到的数据是从0开始的),并且当B订阅时,也是只能获取到当前发送的数据,而不能获取到之前的数据。

不仅如此,这种“自动挡”当所有订阅者都取消订阅的时候它就会停止再发送数据了。

编辑
上次更新: 2023/11/25, 4:11:00
Subscription与Subject
Schedulers

← Subscription与Subject Schedulers→

最近更新
01
github加速
01-01
02
在线小工具
01-01
03
Lora-Embeddings
11-27
更多文章>
Theme by Vdoing | Copyright © 2020-2024 STDSuperman | MIT License
  • 跟随系统
  • 浅色模式
  • 深色模式
  • 阅读模式