核心概念
# Observable
表示一个概念,这个概念是一个可调用的未来值或事件的集合。它能被多个observer
订阅,每个订阅关系相互独立、互不影响。
举个栗子:
假设你订阅了一个博客或者是推送文章的服务号(微信公众号之类的),之后只要公众号更新了新的内容,那么该公众号就会把新的文章推送给你,在这段关系中,这个公众号就是一个Observable
,用来产生数据的数据源。
相信看完上面的描述,你应该对Observable
是个什么东西有了一定的了解了,那么这就好办了,下面我们来看看在RxJS
中如何创建一个Observable
。
const Rx = require('rxjs/Rx')
const myObservable = Rx.Observable.create(observer => {
observer.next('foo');
setTimeout(() => observer.next('bar'), 1000);
});
2
3
4
5
6
我们可以调用Observable.create
方法来创建一个Observable
,这个方法接受一个函数作为参数,这个函数叫做 producer
函数, 用来生成 Observable
的值。这个函数的入参是 observer
,在函数内部通过调用 observer.next()
便可生成有一系列值的一个 Observable
。
我们先不应理会
observer
是个什么东西,从创建一个Observable
的方式来看,其实也就是调用一个API
的事,十分简单,这样一个简单的Observable
对象就创建出来了。
# Observer
一个回调函数的集合,它知道如何去监听由Observable
提供的值。Observer
在信号流中是一个观察者(哨兵)的角色,它负责观察任务执行的状态并向流中发射信号。
这里我们简单实现一下内部的构造:
const observer = {
next: function(value) {
console.log(value);
},
error: function(error) {
console.log(error)
},
complete: function() {
console.log('complete')
}
}
2
3
4
5
6
7
8
9
10
11
在RxJS
中,Observer
是可选的。在next
、error
和 complete
处理逻辑部分缺失的情况下,Observable
仍然能正常运行,为包含的特定通知类型的处理逻辑会被自动忽略。
比如我们可以这样定义:
const observer = {
next: function(value) {
console.log(value);
},
error: function(error) {
console.log(error)
}
}
2
3
4
5
6
7
8
它依旧是可以正常的运行。
那么它又是怎么来配合我们在实际战斗中使用的呢:
const myObservable = Rx.Observable.create((observer) => {
observer.next('111')
setTimeout(() => {
observer.next('777')
}, 3000)
})
myObservable.subscribe((text) => console.log(text));
2
3
4
5
6
7
8
这里直接使用subscribe
方法让一个observer
订阅一个Observable
,我们可以看看这个subscribe
的函数定义来看看怎么实现订阅的:
subscribe(next?: (value: T) => void, error?: (error: any) => void, complete?: () => void): Subscription;
源码是用ts
写的,代码即文档,十分清晰,这里笔者给大家解读一下,我们从入参来看,从左至右依次是next
、error
,complete
,且是可选的,我们可以自己选择性的传入相关回调,从这里也就印证了我们上面所说next
、error
和 complete
处理逻辑部分缺失的情况下仍可以正常运行,因为他们都是可选的。
# Subscription
Subscription
就是表示Observable
的执行,可以被清理。这个对象最常用的方法就是unsubscribe
方法,它不需要任何参数,只是用来清理由Subscription
占用的资源。同时,它还有add
方法可以使我们取消多个订阅。
const myObservable = Rx.Observable.create(observer => {
observer.next('foo');
setTimeout(() => observer.next('bar'), 1000);
});
const subscription = myObservable.subscribe(x => console.log(x));
// 稍后:
// 这会取消正在进行中的 Observable 执行
// Observable 执行是通过使用观察者调用 subscribe 方法启动的
subscription.unsubscribe();
2
3
4
5
6
7
8
9
# Subject (主体)
它是一个代理对象,既是一个 Observable
又是一个 Observer
,它可以同时接受 Observable
发射出的数据,也可以向订阅了它的 observer
发射数据,同时,Subject
会对内部的 observers
清单进行多播(multicast
)
Subjects
是将任意Observable
执行共享给多个观察者的唯一方式
这个时候眼尖的读者会发现,这里产生了一个新概念——多播。
- 那么多播又是什么呢?
- 有了多播是不是还有单播?
- 他们的区别又是什么呢?
接下来就让笔者给大家好好分析这两个概念吧。
# 单播
普通的Observable
是单播的,那么什么是单播呢?
单播的意思是,每个普通的 Observables
实例都只能被一个观察者订阅,当它被其他观察者订阅的时候会产生一个新的实例。也就是普通 Observables
被不同的观察者订阅的时候,会有多个实例,不管观察者是从何时开始订阅,每个实例都是从头开始把值发给对应的观察者。
const Rx = require('rxjs/Rx')
const source = Rx.Observable.interval(1000).take(3);
source.subscribe((value) => console.log('A ' + value))
setTimeout(() => {
source.subscribe((value) => console.log('B ' + value))
}, 1000)
// A 0
// A 1
// B 0
// A 2
// B 1
// B 2
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
看到陌生的调用不要慌,后面会进行详细解析,这里的
source
你可以理解为就是一个每隔一秒发送一个从0开始递增整数的Observable
就行了,且只会发送三次。
从这里我们可以看出两个不同观察者订阅了同一个源(source
),一个是直接订阅,另一个延时一秒之后再订阅。
从打印的结果来看,A
从0开始每隔一秒打印一个递增的数,而B
延时了一秒,然后再从0开始打印,由此可见,A
与B
的执行是完全分开的,也就是每次订阅都创建了一个新的实例。
在许多场景下,我们可能会希望B
能够不从最初始开始接受数据,而是接受在订阅的那一刻开始接受当前正在发送的数据,这就需要用到多播能力了。
# 多播
那么如果实现多播能力呢,也就是实现我们不论什么时候订阅只会接收到实时的数据的功能。
可能这个时候会有小伙伴跳出来了,直接给个中间人来订阅这个源,然后将数据转发给A
和B
不就行了?
const source = Rx.Observable.interval(1000).take(3);
const subject = {
observers: [],
subscribe(target) {
this.observers.push(target);
},
next: function(value) {
this.observers.forEach((next) => next(value))
}
}
source.subscribe(subject);
subject.subscribe((value) => console.log('A ' + value))
setTimeout(() => {
subject.subscribe((value) => console.log('B ' + value))
}, 1000)
// A 0
// A 1
// B 1
// A 2
// B 2
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
先分析一下代码,A
和B
的订阅和单播里代码并无差别,唯一变化的是他们订阅的对象由source
变成了subject
,然后再看看这个subject
包含了什么,这里做了一些简化,移除了error
、complete
这样的处理函数,只保留了next
,然后内部含有一个observers
数组,这里包含了所有的订阅者,暴露一个subscribe
用于观察者对其进行订阅。
在使用过程中,让这个中间商subject
来订阅source
,这样便做到了统一管理,以及保证数据的实时性,因为本质上对于source
来说只有一个订阅者。
这里主要是方便理解,简易实现了
RxJS
中的Subject
的实例,这里的中间人可以直接换成RxJS
的Subject
类实例,效果是一样的
const source = Rx.Observable.interval(1000).take(3);
const subject = new Rx.Subject();
source.subscribe(subject);
subject.subscribe((value) => console.log('A ' + value))
setTimeout(() => {
subject.subscribe((value) => console.log('B ' + value))
}, 1000)
2
3
4
5
6
7
8
9
10
11
12
同样先来看看打印的结果是否符合预期,首先A
的打印结果并无变化,B
首次打印的数字现在是从1开始了,也就当前正在传输的数据,这下满足了我们需要获取实时数据的需求了。
不同于单播订阅者总是需要从头开始获取数据,多播模式能够保证数据的实时性。
除了以上这些,RxJS
还提供了Subject
的三个变体:
BehaviorSubject
ReplaySubject
AsyncSubject
# BehaviorSubject
BehaviorSubject
是一种在有新的订阅时会额外发出最近一次发出的值的Subject
。
]
同样我们结合现实场景来进行理解,假设有我们需要使用它来维护一个状态,在它变化之后给所有重新订阅的人都能发送一个当前状态的数据,这就好比我们要实现一个计算属性,我们只关心该计算属性最终的状态,而不关心过程中变化的数,那么又该怎么处理呢?
我们知道普通的Subject
只会在当前有新数据的时候发送当前的数据,而发送完毕之后就不会再发送已发送过的数据,那么这个时候我们就可以引入BehaviorSubject
来进行终态维护了,因为订阅了该对象的观察者在订阅的同时能够收到该对象发送的最近一次的值,这样就能满足我们上述的需求了。
然后再结合代码来分析这种Subject
应用的场景:
const subject = new Rx.Subject();
subject.subscribe((value) => console.log('A:' + value))
subject.next(1);
// A:1
subject.next(2);
// A:2
setTimeout(() => {
subject.subscribe((value) => console.log('B:' + value)); // 1s后订阅,无法收到值
}, 1000)
2
3
4
5
6
7
8
9
10
11
12
首先演示的是采用普通Subject
来作为订阅的对象,然后观察者A
在实例对象subject
调用next
发送新的值之前订阅的,然后观察者是延时一秒之后订阅的,所以A
接受数据正常,那么这个时候由于B
在数据发送的时候还没订阅,所以它并没有收到数据。
那么我们再来看看采用BehaviorSubject
实现的效果:
const subject = new Rx.BehaviorSubject(0); // 需要传入初始值
subject.subscribe((value: number) => console.log('A:' + value))
// A:0
subject.next(1);
// A:1
subject.next(2);
// A:2
setTimeout(() => {
subject.subscribe((value: number) => console.log('B:' + value))
// B:2
}, 1000)
2
3
4
5
6
7
8
9
10
11
12
13
同样从打印的结果来看,与普通Subject
的区别在于,在订阅的同时源对象就发送了最近一次改变的值(如果没改变则发送初始值),这个时候我们的B
也如愿获取到了最新的状态。
这里在实例化
BehaviorSubject
的时候需要传入一个初始值。
# ReplaySubject
在理解了BehaviorSubject
之后再来理解ReplaySubject
就比较轻松了,ReplaySubject
会保存所有值,然后回放给新的订阅者,同时它提供了入参用于控制重放值的数量(默认重放所有)。
]
什么?还不理解?看码:
const subject = new Rx.ReplaySubject(2);
subject.next(0);
subject.next(1);
subject.next(2);
subject.subscribe((value: number) => console.log('A:' + value))
// A:1
// A:2
subject.next(3);
// A:3
subject.next(4);
// A:4
setTimeout(() => {
subject.subscribe((value: number) => console.log('B:' + value))
// B:3
// B:4
}, 1000)
// 整体打印顺序:
// A:1
// A:2
// A:3
// A:4
// B:3
// B:4
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
我们先从构造函数传参来看,BehaviorSubject
与ReplaySubject
都需要传入一个参数,对BehaviorSubject
来说是初始值,而对于ReplaySubject
来说就是重放先前多少次的值,如果不传入重放次数,那么它将重放所有发射过的值。
从结果上看,如果你不传入确定的重放次数,那么实现的效果与之前介绍的单播效果几乎没有差别。
所以我们再分析代码可以知道在订阅的那一刻,观察者们就能收到源对象前多少次发送的值。
# AsyncSubject
AsyncSubject
只有当 Observable
执行完成时(执行complete()
),它才会将执行的最后一个值发送给观察者,如果因异常而终止,AsyncSubject
将不会释放任何数据,但是会向Observer
传递一个异常通知。
]
AsyncSubject
一般用的比较少,更多的还是使用前面三种。
const subject = new Rx.AsyncSubject();
subject.next(1);
subject.subscribe(res => {
console.log('A:' + res);
});
subject.next(2);
subject.subscribe(res => {
console.log('B:' + res);
});
subject.next(3);
subject.subscribe(res => {
console.log('C:' + res);
});
subject.complete();
subject.next(4);
// 整体打印结果:
// A:3
// B:3
// C:3
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
从打印结果来看其实已经很好理解了,也就是说对于所有的观察者们来说,源对象只会在所有数据发送完毕也就是调用complete
方法之后才会把最后一个数据返回给观察者们。
这就好比小说里经常有的,当你要放技能的时候,先要打一套起手式,打完之后才会放出你的大招。
# Cold Observables 与 Hot Observables
# Cold Observables
Cold Observables
只有被 observers
订阅的时候,才会开始产生值。是单播的,有多少个订阅就会生成多少个订阅实例,每个订阅都是从第一个产生的值开始接收值,所以每个订阅接收到的值都是一样的。
如果大家想要参考
Cold Observables
相关代码,直接看前面的单播示例就行了。
正如单播描述的能力,不管观察者们什么时候开始订阅,源对象都会从初始值开始把所有的数都发给该观察者。
# Hot Observables
Hot Observables
不管有没有被订阅都会产生值。是多播的,多个订阅共享同一个实例,是从订阅开始接受到值,每个订阅接收到的值是不同的,取决于它们是从什么时候开始订阅。
这里有几种场景,我们可以逐一分析一下便于理解:
# “加热”
首先可以忽略代码中出现的陌生的函数,后面会细说。
const source = Rx.Observable.of(1, 2).publish();
source.connect();
source.subscribe((value) => console.log('A:' + value));
setTimeout(() => {
source.subscribe((value) => console.log('B:' + value));
}, 1000);
2
3
4
5
6
这里首先用Rx
的操作符of
创建了一个Observable
,并且后面跟上了一个publish
函数,在创建完之后调用connect
函数进行开始数据发送。
最终代码的执行结果就是没有任何数据打印出来,分析一下原因其实也比较好理解,由于开启数据发送的时候还没有订阅,并且这是一个Hot Observables
,它是不会理会你是否有没有订阅它,开启之后就会直接发送数据,所以A
和B
都没有接收到数据。
当然你这里如果把
connect
方法放到最后,那么最终的结果就是A
接收到了,B
还是接不到,因为A
在开启发数据之前就订阅了,而B
还要等一秒。
# 更直观的场景
正如上述多播所描述的,其实我们更多想看到的现象是能够A
和B
两个观察者能够都有接收到数据,然后观察数据的差别,这样会方便理解。
这里直接换一个发射源:
const source = Rx.Observable.interval(1000).take(3).publish();
source.subscribe((value: number) => console.log('A:' + value));
setTimeout(() => {
source.subscribe((value: number) => console.log('B:' + value));
}, 3000);
source.connect();
// A:0
// A:1
// A:2
// B:2
2
3
4
5
6
7
8
9
10
11
这里我们利用interval
配合take
操作符每秒发射一个递增的数,最多三个,然后这个时候的打印结果就更清晰了,A
正常接收到了三个数,B
三秒之后才订阅,所以只接收到了最后一个数2,这种方式就是上述多播所描述的并无一二。
# 两者对比
Cold Observables
:举个栗子会比较好理解一点:比如我们上B站看番,更新了新番,我们不论什么时候去看,都能从头开始看到完整的剧集,与其他人看不看毫无关联,互不干扰。Hot Observables
:这就好比我们上B站看直播,直播开始之后就直接开始播放了,不管是否有没有订阅者,也就是说如果你没有一开始就订阅它,那么你过一段时候后再去看,是不知道前面直播的内容的。
# 上述代码中出现的操作符解析
在创建Hot Observables
时我们用到了publish
与connect
函数的结合,其实调用了publish
操作符之后返回的结果是一个ConnectableObservable
,然后该对象上提供了connect
方法让我们控制发送数据的时间。
publish
:这个操作符把正常的Observable
(Cold Observables
)转换成ConnectableObservable
。ConnectableObservable
:ConnectableObservable
是多播的共享Observable
,可以同时被多个observers
共享订阅,它是Hot Observables
。ConnectableObservable
是订阅者和真正的源头Observables
(上面例子中的interval
,每隔一秒发送一个值,就是源头Observables
)的中间人,ConnectableObservable
从源头Observables
接收到值然后再把值转发给订阅者。connect()
:ConnectableObservable
并不会主动发送值,它有个connect
方法,通过调用connect
方法,可以启动共享ConnectableObservable
发送值。当我们调用ConnectableObservable.prototype.connect
方法,不管有没有被订阅,都会发送值。订阅者共享同一个实例,订阅者接收到的值取决于它们何时开始订阅。
其实这种手动控制的方式还挺麻烦的,有没有什么更加方便的操作方式呢,比如监听到有订阅者订阅了才开始发送数据,一旦所有订阅者都取消了,就停止发送数据?其实也是有的,让我们看看引用计数(refCount
):
# 引用计数
这里主要用到了publish
结合refCount
实现一个“自动挡”的效果。
const source = Rx.Observable.interval(1000).take(3).publish().refCount();
setTimeout(() => {
source.subscribe(data => { console.log("A:" + data) });
setTimeout(() => {
source.subscribe(data => { console.log("B:" + data) });
}, 1000);
}, 2000);
// A:0
// A:1
// B:1
// A:2
// B:2
2
3
4
5
6
7
8
9
10
11
12
13
我们透过结果看本质,能够很轻松的发现,只有当A
订阅的时候才开始发送数据(A
拿到的数据是从0开始的),并且当B
订阅时,也是只能获取到当前发送的数据,而不能获取到之前的数据。
不仅如此,这种“自动挡”当所有订阅者都取消订阅的时候它就会停止再发送数据了。
# Schedulers (调度器)
用来控制并发并且是中央集权的调度员,允许我们在发生计算时进行协调,例如 setTimeout
或 requestAnimationFrame
或其他。
- 调度器是一种数据结构。 它知道如何根据优先级或其他标准来存储任务和将任务进行排序。
- 调度器是执行上下文。 它表示在何时何地执行任务(举例来说,立即的,或另一种回调函数机制(比如
setTimeout
或process.nextTick
),或动画帧)。 - 调度器有一个(虚拟的)时钟。 调度器功能通过它的
getter
方法now()
提供了“时间”的概念。在具体调度器上安排的任务将严格遵循该时钟所表示的时间。
学到这相信大家也已经或多或少对RxJS
有一定了解了,不知道大家有没有发现一个疑问,前面所展示的代码示例中有同步也有异步,而笔者却没有显示的控制他们的执行,他们的这套执行机制到底是什么呢?
其实他们的内部的调度就是靠的Schedulers
来控制数据发送的时机,许多操作符会预设不同的Scheduler
,所以我们不需要进行特殊处理他们就能良好的进行同步或异步运行。
const source = Rx.Observable.create(function (observer: any) {
observer.next(1);
observer.next(2);
observer.next(3);
observer.complete();
});
console.log('订阅前');
source.observeOn(Rx.Scheduler.async) // 设为 async
.subscribe({
next: (value: number) => { console.log(value); },
error: (err: any) => { console.log('Error: ' + err); },
complete: () => { console.log('complete'); }
});
console.log('订阅后');
// 订阅前
// 订阅后
// 1
// 2
// 3
// complete
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
从打印结果上来看,数据的发送时机的确已经由同步变成了异步,如果不进行调度方式修改,那么“订阅后”的打印应该是在数据发送完毕之后才会执行的。
看完示例之后我们再来研究这个调度器能做哪几种调度:
queue
asap
async
animationFrame
# queue
将每个下一个任务放在队列中,而不是立即执行
queue
延迟使用调度程序时,其行为与async
调度程序相同。
当没有延迟使用时,它将同步安排给定的任务-在安排好任务后立即执行。但是,当递归调用时(即在已调度的任务内部),将使用队列调度程序调度另一个任务,而不是立即执行,该任务将被放入队列并等待当前任务完成。
这意味着,当您使用 queue
调度程序执行任务时,您确定它会在该调度程序调度的其他任何任务开始之前结束。
这个同步与我们平常理解的同步可能不太一样,笔者当时也都困惑了一会。
还是用一个官方的例子来讲解这种调度方式是怎么理解吧:
import { queueScheduler } from 'rxjs';
queueScheduler.schedule(() => {
queueScheduler.schedule(() => console.log('second'));
console.log('first');
});
2
3
4
5
6
我们无需关注陌生的函数调用,我们这里着重于看这种调度方式与平常的同步调度的区别。
首先我们调用queueScheduler
的schedule
方法开始执行,然后函数内部又同样再以同样的方式调用(这里也可以改成递归,不过这里用这个示例去理解可能会好一点),并且传入一个函数,打印second
。
然后继续看下面的语句,一个普通的console.log('first')
,然后我们再来看看打印结果:
// first
// second
2
是不是有点神奇,如果没看明白为啥的,可以再回头看看前面queue
对于递归执行的处理方式。也就是说如果递归调用,它内部会维护一个队列,然后等待先加入队列的任务先执行完成(也就是上面的console.log('first')
执行完才会执行console.log('second')
,因为console.log('second')
这个任务是后加入该队列的)。
# asap
内部基于Promise
实现(Node
端采用process.nextTick
),他会使用可用的最快的异步传输机制,如果不支持Promise
或process.nextTick
或者Web Worker
的 MessageChannel
也可能会调用setTimeout
方式进行调度。
# async
与asap
方式很像,只不过内部采用setInterval
进行调度,大多用于基于时间的操作符。
# animationFrame
从名字看其实相信大家已经就能略知一二了,内部基于requestAnimationFrame
来实现调度,所以执行的时机将与window.requestAnimationFrame
保持一致,适用于需要频繁渲染或操作动画的场景。
# Operators(操作符)
采用函数式编程风格的纯函数 (pure function
),使用像 map
、filter
、concat
、flatMap
等这样的操作符来处理集合。