陌小路的个人博客 陌小路的个人博客
首页
  • 技术专区

    • 面试
    • Vue
    • Electron
    • TypeScript
    • Serverless
    • GraphQL
  • 我的秋招之旅
  • 2019年终总结
Todo
收藏夹
关于作者
GitHub

陌小路

前端切图仔
首页
  • 技术专区

    • 面试
    • Vue
    • Electron
    • TypeScript
    • Serverless
    • GraphQL
  • 我的秋招之旅
  • 2019年终总结
Todo
收藏夹
关于作者
GitHub
  • Vue

  • React

  • 面试

  • Electron

  • Serverless

  • GraphQL

  • TypeScript

  • RxJS

    • 介绍
    • 前置知识点
    • Observable
    • Observer
    • Subscription与Subject
      • Cold-Observables与Hot-Observables
      • Schedulers
      • Operators概念
      • 创建型Operators
      • 转换操作符
      • 过滤操作符
      • 组合操作符
      • 总结
    • 工程化

    • Webpack

    • Nestjs

    • WebRTC & P2P

    • Docker

    • Linux

    • Git

    • Svelte

    • 踩坑日记 & 小Tips

    • 其他

    • technology
    • RxJS
    陌小路
    2020-12-22

    Subscription与Subject

    # Subscription

    Subscription就是表示Observable的执行,可以被清理。这个对象最常用的方法就是unsubscribe方法,它不需要任何参数,只是用来清理由Subscription占用的资源。同时,它还有add方法可以使我们取消多个订阅。

    const myObservable = Rx.Observable.create(observer => {
      observer.next('foo');
      setTimeout(() => observer.next('bar'), 1000);
    });
    const subscription = myObservable.subscribe(x => console.log(x));
    // 稍后:
    // 这会取消正在进行中的 Observable 执行
    // Observable 执行是通过使用观察者调用 subscribe 方法启动的
    subscription.unsubscribe();
    
    1
    2
    3
    4
    5
    6
    7
    8
    9

    # Subject (主体)

    它是一个代理对象,既是一个 Observable 又是一个 Observer,它可以同时接受 Observable 发射出的数据,也可以向订阅了它的 observer 发射数据,同时,Subject 会对内部的 observers 清单进行多播(multicast)

    Subject

    Subjects 是将任意 Observable 执行共享给多个观察者的唯一方式

    这个时候眼尖的读者会发现,这里产生了一个新概念——多播。

    • 那么多播又是什么呢?
    • 有了多播是不是还有单播?
    • 他们的区别又是什么呢?

    接下来就让笔者给大家好好分析这两个概念吧。

    单播与多播

    # 单播

    普通的Observable是单播的,那么什么是单播呢?

    单播的意思是,每个普通的 Observables 实例都只能被一个观察者订阅,当它被其他观察者订阅的时候会产生一个新的实例。也就是普通 Observables 被不同的观察者订阅的时候,会有多个实例,不管观察者是从何时开始订阅,每个实例都是从头开始把值发给对应的观察者。

    const Rx = require('rxjs/Rx')
    
    const source = Rx.Observable.interval(1000).take(3);
    
    source.subscribe((value) => console.log('A ' + value))
    
    setTimeout(() => {
        source.subscribe((value) => console.log('B ' + value))
    }, 1000)
    
    // A 0
    // A 1
    // B 0
    // A 2
    // B 1
    // B 2
    
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16

    看到陌生的调用不要慌,后面会进行详细解析,这里的source你可以理解为就是一个每隔一秒发送一个从0开始递增整数的Observable就行了,且只会发送三次(take操作符其实也就是限定拿多少个数就不在发送数据了。)。

    从这里我们可以看出两个不同观察者订阅了同一个源(source),一个是直接订阅,另一个延时一秒之后再订阅。

    从打印的结果来看,A从0开始每隔一秒打印一个递增的数,而B延时了一秒,然后再从0开始打印,由此可见,A与B的执行是完全分开的,也就是每次订阅都创建了一个新的实例。

    在许多场景下,我们可能会希望B能够不从最初始开始接受数据,而是接受在订阅的那一刻开始接受当前正在发送的数据,这就需要用到多播能力了。

    # 多播

    那么如果实现多播能力呢,也就是实现我们不论什么时候订阅只会接收到实时的数据的功能。

    可能这个时候会有小伙伴跳出来了,直接给个中间人来订阅这个源,然后将数据转发给A和B不就行了?

    
    const source = Rx.Observable.interval(1000).take(3);
    
    const subject = {
    	observers: [],
    	subscribe(target) {
    		this.observers.push(target);
    	},
    	next: function(value) {
    		this.observers.forEach((next) => next(value))
    	}
    }
    
    source.subscribe(subject);
    
    subject.subscribe((value) => console.log('A ' + value))
    
    setTimeout(() => {
    	subject.subscribe((value) => console.log('B ' + value))
    }, 1000)
    
    // A 0
    // A 1
    // B 1
    // A 2
    // B 2
    
    
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27

    先分析一下代码,A和B的订阅和单播里代码并无差别,唯一变化的是他们订阅的对象由source变成了subject,然后再看看这个subject包含了什么,这里做了一些简化,移除了error、complete这样的处理函数,只保留了next,然后内部含有一个observers数组,这里包含了所有的订阅者,暴露一个subscribe用于观察者对其进行订阅。

    在使用过程中,让这个中间商subject来订阅source,这样便做到了统一管理,以及保证数据的实时性,因为本质上对于source来说只有一个订阅者。

    这里主要是方便理解,简易实现了RxJS中的Subject的实例,这里的中间人可以直接换成RxJS的Subject类实例,效果是一样的

    const source = Rx.Observable.interval(1000).take(3);
    
    const subject = new Rx.Subject();
    
    source.subscribe(subject);
    
    subject.subscribe((value) => console.log('A ' + value))
    
    setTimeout(() => {
    	subject.subscribe((value) => console.log('B ' + value))
    }, 1000)
    
    
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12

    同样先来看看打印的结果是否符合预期,首先A的打印结果并无变化,B首次打印的数字现在是从1开始了,也就当前正在传输的数据,这下满足了我们需要获取实时数据的需求了。

    不同于单播订阅者总是需要从头开始获取数据,多播模式能够保证数据的实时性。

    除了以上这些,RxJS还提供了Subject的三个变体:

    • BehaviorSubject
    • ReplaySubject
    • AsyncSubject

    # BehaviorSubject

    BehaviorSubject 是一种在有新的订阅时会额外发出最近一次发出的值的Subject。

    rdtonI.png]

    同样我们结合现实场景来进行理解,假设有我们需要使用它来维护一个状态,在它变化之后给所有重新订阅的人都能发送一个当前状态的数据,这就好比我们要实现一个计算属性,我们只关心该计算属性最终的状态,而不关心过程中变化的数,那么又该怎么处理呢?

    我们知道普通的Subject只会在当前有新数据的时候发送当前的数据,而发送完毕之后就不会再发送已发送过的数据,那么这个时候我们就可以引入BehaviorSubject来进行终态维护了,因为订阅了该对象的观察者在订阅的同时能够收到该对象发送的最近一次的值,这样就能满足我们上述的需求了。

    然后再结合代码来分析这种Subject应用的场景:

    const subject = new Rx.Subject();
    
    subject.subscribe((value) => console.log('A:' + value))
    
    subject.next(1);
    // A:1
    subject.next(2);
    // A:2
    
    setTimeout(() => {
    	subject.subscribe((value) => console.log('B:' + value)); // 1s后订阅,无法收到值
    }, 1000)
    
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12

    首先演示的是采用普通Subject来作为订阅的对象,然后观察者A在实例对象subject调用next发送新的值之前订阅的,然后观察者是延时一秒之后订阅的,所以A接受数据正常,那么这个时候由于B在数据发送的时候还没订阅,所以它并没有收到数据。

    那么我们再来看看采用BehaviorSubject实现的效果:

    const subject = new Rx.BehaviorSubject(0); // 需要传入初始值
    
    subject.subscribe((value: number) => console.log('A:' + value))
    // A:0
    subject.next(1);
    // A:1
    subject.next(2);
    // A:2
    
    setTimeout(() => {
    	subject.subscribe((value: number) => console.log('B:' + value))
    	// B:2
    }, 1000)
    
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13

    同样从打印的结果来看,与普通Subject的区别在于,在订阅的同时源对象就发送了最近一次改变的值(如果没改变则发送初始值),这个时候我们的B也如愿获取到了最新的状态。

    这里在实例化BehaviorSubject的时候需要传入一个初始值。

    # ReplaySubject

    在理解了BehaviorSubject之后再来理解ReplaySubject就比较轻松了,ReplaySubject会保存所有值,然后回放给新的订阅者,同时它提供了入参用于控制重放值的数量(默认重放所有)。

    ReplaySubject]

    什么?还不理解?看码:

    const subject = new Rx.ReplaySubject(2);
    
    subject.next(0);
    subject.next(1);
    subject.next(2);
    
    subject.subscribe((value: number) => console.log('A:' + value))
    // A:1
    // A:2
    
    subject.next(3);
    // A:3
    subject.next(4);
    // A:4
    
    setTimeout(() => {
    	subject.subscribe((value: number) => console.log('B:' + value))
    	// B:3
    	// B:4
    }, 1000)
    
    // 整体打印顺序:
    // A:1
    // A:2
    // A:3
    // A:4
    // B:3
    // B:4
    
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28

    我们先从构造函数传参来看,BehaviorSubject与ReplaySubject都需要传入一个参数,对BehaviorSubject来说是初始值,而对于ReplaySubject来说就是重放先前多少次的值,如果不传入重放次数,那么它将重放所有发射过的值。

    从结果上看,如果你不传入确定的重放次数,那么实现的效果与之前介绍的单播效果几乎没有差别。

    所以我们再分析代码可以知道在订阅的那一刻,观察者们就能收到源对象前多少次发送的值。

    # AsyncSubject

    AsyncSubject 只有当 Observable 执行完成时(执行complete()),它才会将执行的最后一个值发送给观察者,如果因异常而终止,AsyncSubject将不会释放任何数据,但是会向Observer传递一个异常通知。

    AsyncSubject]

    AsyncSubject一般用的比较少,更多的还是使用前面三种。

    const subject = new Rx.AsyncSubject();
    subject.next(1);
    subject.subscribe(res => {
    	console.log('A:' + res);
    });
    subject.next(2);
    subject.subscribe(res => {
    	console.log('B:' + res);
    });
    subject.next(3);
    subject.subscribe(res => {
    	console.log('C:' + res);
    });
    subject.complete();
    subject.next(4);
    
    // 整体打印结果:
    // A:3
    // B:3
    // C:3
    
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20

    从打印结果来看其实已经很好理解了,也就是说对于所有的观察者们来说,源对象只会在所有数据发送完毕也就是调用complete方法之后才会把最后一个数据返回给观察者们。

    这就好比小说里经常有的,当你要放技能的时候,先要打一套起手式,打完之后才会放出你的大招。

    编辑
    上次更新: 2023/11/25, 4:11:00
    Observer
    Cold-Observables与Hot-Observables

    ← Observer Cold-Observables与Hot-Observables→

    最近更新
    01
    github加速
    01-01
    02
    在线小工具
    01-01
    03
    Lora-Embeddings
    11-27
    更多文章>
    Theme by Vdoing | Copyright © 2020-2024 STDSuperman | MIT License
    • 跟随系统
    • 浅色模式
    • 深色模式
    • 阅读模式