Subscription与Subject
# Subscription
Subscription
就是表示Observable
的执行,可以被清理。这个对象最常用的方法就是unsubscribe
方法,它不需要任何参数,只是用来清理由Subscription
占用的资源。同时,它还有add
方法可以使我们取消多个订阅。
const myObservable = Rx.Observable.create(observer => {
observer.next('foo');
setTimeout(() => observer.next('bar'), 1000);
});
const subscription = myObservable.subscribe(x => console.log(x));
// 稍后:
// 这会取消正在进行中的 Observable 执行
// Observable 执行是通过使用观察者调用 subscribe 方法启动的
subscription.unsubscribe();
2
3
4
5
6
7
8
9
# Subject (主体)
它是一个代理对象,既是一个 Observable
又是一个 Observer
,它可以同时接受 Observable
发射出的数据,也可以向订阅了它的 observer
发射数据,同时,Subject
会对内部的 observers
清单进行多播(multicast
)
Subjects
是将任意Observable
执行共享给多个观察者的唯一方式
这个时候眼尖的读者会发现,这里产生了一个新概念——多播。
- 那么多播又是什么呢?
- 有了多播是不是还有单播?
- 他们的区别又是什么呢?
接下来就让笔者给大家好好分析这两个概念吧。
# 单播
普通的Observable
是单播的,那么什么是单播呢?
单播的意思是,每个普通的 Observables
实例都只能被一个观察者订阅,当它被其他观察者订阅的时候会产生一个新的实例。也就是普通 Observables
被不同的观察者订阅的时候,会有多个实例,不管观察者是从何时开始订阅,每个实例都是从头开始把值发给对应的观察者。
const Rx = require('rxjs/Rx')
const source = Rx.Observable.interval(1000).take(3);
source.subscribe((value) => console.log('A ' + value))
setTimeout(() => {
source.subscribe((value) => console.log('B ' + value))
}, 1000)
// A 0
// A 1
// B 0
// A 2
// B 1
// B 2
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
看到陌生的调用不要慌,后面会进行详细解析,这里的
source
你可以理解为就是一个每隔一秒发送一个从0开始递增整数的Observable
就行了,且只会发送三次(take
操作符其实也就是限定拿多少个数就不在发送数据了。)。
从这里我们可以看出两个不同观察者订阅了同一个源(source
),一个是直接订阅,另一个延时一秒之后再订阅。
从打印的结果来看,A
从0开始每隔一秒打印一个递增的数,而B
延时了一秒,然后再从0开始打印,由此可见,A
与B
的执行是完全分开的,也就是每次订阅都创建了一个新的实例。
在许多场景下,我们可能会希望B
能够不从最初始开始接受数据,而是接受在订阅的那一刻开始接受当前正在发送的数据,这就需要用到多播能力了。
# 多播
那么如果实现多播能力呢,也就是实现我们不论什么时候订阅只会接收到实时的数据的功能。
可能这个时候会有小伙伴跳出来了,直接给个中间人来订阅这个源,然后将数据转发给A
和B
不就行了?
const source = Rx.Observable.interval(1000).take(3);
const subject = {
observers: [],
subscribe(target) {
this.observers.push(target);
},
next: function(value) {
this.observers.forEach((next) => next(value))
}
}
source.subscribe(subject);
subject.subscribe((value) => console.log('A ' + value))
setTimeout(() => {
subject.subscribe((value) => console.log('B ' + value))
}, 1000)
// A 0
// A 1
// B 1
// A 2
// B 2
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
先分析一下代码,A
和B
的订阅和单播里代码并无差别,唯一变化的是他们订阅的对象由source
变成了subject
,然后再看看这个subject
包含了什么,这里做了一些简化,移除了error
、complete
这样的处理函数,只保留了next
,然后内部含有一个observers
数组,这里包含了所有的订阅者,暴露一个subscribe
用于观察者对其进行订阅。
在使用过程中,让这个中间商subject
来订阅source
,这样便做到了统一管理,以及保证数据的实时性,因为本质上对于source
来说只有一个订阅者。
这里主要是方便理解,简易实现了
RxJS
中的Subject
的实例,这里的中间人可以直接换成RxJS
的Subject
类实例,效果是一样的
const source = Rx.Observable.interval(1000).take(3);
const subject = new Rx.Subject();
source.subscribe(subject);
subject.subscribe((value) => console.log('A ' + value))
setTimeout(() => {
subject.subscribe((value) => console.log('B ' + value))
}, 1000)
2
3
4
5
6
7
8
9
10
11
12
同样先来看看打印的结果是否符合预期,首先A
的打印结果并无变化,B
首次打印的数字现在是从1开始了,也就当前正在传输的数据,这下满足了我们需要获取实时数据的需求了。
不同于单播订阅者总是需要从头开始获取数据,多播模式能够保证数据的实时性。
除了以上这些,RxJS
还提供了Subject
的三个变体:
BehaviorSubject
ReplaySubject
AsyncSubject
# BehaviorSubject
BehaviorSubject
是一种在有新的订阅时会额外发出最近一次发出的值的Subject
。
]
同样我们结合现实场景来进行理解,假设有我们需要使用它来维护一个状态,在它变化之后给所有重新订阅的人都能发送一个当前状态的数据,这就好比我们要实现一个计算属性,我们只关心该计算属性最终的状态,而不关心过程中变化的数,那么又该怎么处理呢?
我们知道普通的Subject
只会在当前有新数据的时候发送当前的数据,而发送完毕之后就不会再发送已发送过的数据,那么这个时候我们就可以引入BehaviorSubject
来进行终态维护了,因为订阅了该对象的观察者在订阅的同时能够收到该对象发送的最近一次的值,这样就能满足我们上述的需求了。
然后再结合代码来分析这种Subject
应用的场景:
const subject = new Rx.Subject();
subject.subscribe((value) => console.log('A:' + value))
subject.next(1);
// A:1
subject.next(2);
// A:2
setTimeout(() => {
subject.subscribe((value) => console.log('B:' + value)); // 1s后订阅,无法收到值
}, 1000)
2
3
4
5
6
7
8
9
10
11
12
首先演示的是采用普通Subject
来作为订阅的对象,然后观察者A
在实例对象subject
调用next
发送新的值之前订阅的,然后观察者是延时一秒之后订阅的,所以A
接受数据正常,那么这个时候由于B
在数据发送的时候还没订阅,所以它并没有收到数据。
那么我们再来看看采用BehaviorSubject
实现的效果:
const subject = new Rx.BehaviorSubject(0); // 需要传入初始值
subject.subscribe((value: number) => console.log('A:' + value))
// A:0
subject.next(1);
// A:1
subject.next(2);
// A:2
setTimeout(() => {
subject.subscribe((value: number) => console.log('B:' + value))
// B:2
}, 1000)
2
3
4
5
6
7
8
9
10
11
12
13
同样从打印的结果来看,与普通Subject
的区别在于,在订阅的同时源对象就发送了最近一次改变的值(如果没改变则发送初始值),这个时候我们的B
也如愿获取到了最新的状态。
这里在实例化
BehaviorSubject
的时候需要传入一个初始值。
# ReplaySubject
在理解了BehaviorSubject
之后再来理解ReplaySubject
就比较轻松了,ReplaySubject
会保存所有值,然后回放给新的订阅者,同时它提供了入参用于控制重放值的数量(默认重放所有)。
]
什么?还不理解?看码:
const subject = new Rx.ReplaySubject(2);
subject.next(0);
subject.next(1);
subject.next(2);
subject.subscribe((value: number) => console.log('A:' + value))
// A:1
// A:2
subject.next(3);
// A:3
subject.next(4);
// A:4
setTimeout(() => {
subject.subscribe((value: number) => console.log('B:' + value))
// B:3
// B:4
}, 1000)
// 整体打印顺序:
// A:1
// A:2
// A:3
// A:4
// B:3
// B:4
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
我们先从构造函数传参来看,BehaviorSubject
与ReplaySubject
都需要传入一个参数,对BehaviorSubject
来说是初始值,而对于ReplaySubject
来说就是重放先前多少次的值,如果不传入重放次数,那么它将重放所有发射过的值。
从结果上看,如果你不传入确定的重放次数,那么实现的效果与之前介绍的单播效果几乎没有差别。
所以我们再分析代码可以知道在订阅的那一刻,观察者们就能收到源对象前多少次发送的值。
# AsyncSubject
AsyncSubject
只有当 Observable
执行完成时(执行complete()
),它才会将执行的最后一个值发送给观察者,如果因异常而终止,AsyncSubject
将不会释放任何数据,但是会向Observer
传递一个异常通知。
]
AsyncSubject
一般用的比较少,更多的还是使用前面三种。
const subject = new Rx.AsyncSubject();
subject.next(1);
subject.subscribe(res => {
console.log('A:' + res);
});
subject.next(2);
subject.subscribe(res => {
console.log('B:' + res);
});
subject.next(3);
subject.subscribe(res => {
console.log('C:' + res);
});
subject.complete();
subject.next(4);
// 整体打印结果:
// A:3
// B:3
// C:3
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
从打印结果来看其实已经很好理解了,也就是说对于所有的观察者们来说,源对象只会在所有数据发送完毕也就是调用complete
方法之后才会把最后一个数据返回给观察者们。
这就好比小说里经常有的,当你要放技能的时候,先要打一套起手式,打完之后才会放出你的大招。