Rxjs的使用

Rxjs概述

Rxjs通过使用 observable 序列来编写异步和基于事件的程序,它提供了一个核心类型 Observable(可观察者对象),下面一切操作的源头都始于这个对象

可以把 RxJS 当做是用来处理事件的 Lodash 。


创建 Observables

可以使用Observable.create 创建“可观察者对象”,Observable.create 是 Observable 构造函数的别名,它接收一个参数:subscribe 函数。
下面的示例创建了一个 Observable,它每隔一秒会向观察者发送字符串 ‘hi’ :

1
2
3
4
5
6
7
8
9
10
11
12
const observable = Observable.create(function subscribe(observer) {
const id = setInterval(() => {
observer.next('hi')
}, 1000);
});

// 或者
const observable = new Observable((observer) => {
const id = setInterval(() => {
observer.next('hi')
}, 1000);
});

Observables 可以使用 create 来创建, 但通常我们使用所谓的创建操作符, 像 of、from、interval、等等。


订阅 Observables

在上面的示例中,subscribe 函数是用来描述 Observable 最重要的一块。

1
observable.subscribe(x => console.log(x));

执行 Observables

Observable 执行可以传递三种类型的值:

  • “Next” 通知: 发送一个值,比如数字、字符串、对象,等等。
  • “Error” 通知: 发送一个 JavaScript 错误 或 异常。
  • “Complete” 通知: 不再发送任何值。

“Next” 通知是最重要,也是最常见的类型:它们表示传递给观察者的实际数据。”Error” 和 “Complete” 通知只会在 Observable 执行期间发生一次,并且只会执行其中的一个。

在 Observable 执行中, 可能会发送零个到无穷多个 “Next” 通知。如果发送的是 “Error” 或 “Complete” 通知的话,那么之后不会再发送任何通知了。

Observable 严格遵守自身的规则,所以下面的代码不会发送 “Next” 通知 4:

1
2
3
4
5
6
const observable = Observable.create(observer => {
observer.next('🍔');
observer.next('🍟'));
observer.complete();
observer.next('🌭');
});

只要next返回错误就会返回 Error

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
const observable = Observable.create(function subscribe(observer) {
observer.next('🍔');
observer.next('🍟');
observer.error(new Error('Bad 🌭'));
observer.complete();
});

observable.subscribe(res => {
console.log(res);
}, err => {
console.error(err)
},() => {
console.log('done!');
});

输出:
🍔
🍟
Error: Bad 🌭

清理 Observable

Observable 的执行可能是无限的,这样会一直消耗内存,因此我们需要中止它。

当调用了 observable.subscribe ,观察者会被附加到新创建的 Observable 执行中。这个调用还返回一个对象,即 Subscription (订阅):

1
2
3
const subscription = observable.subscribe(res => {
console.log(res);
});

Subscription 表示进行中的执行,使用 subscription.unsubscribe() 你可以取消进行中的执行:

1
subscription.unsubscribe();

Observer (观察者)

观察者是由 Observable 发送的值的消费者。观察者只是一组回调函数的集合每个回调函数对应一种 Observable 发送的通知类型:next、error 和complete 。下面的示例是一个典型的观察者对象:

1
2
3
4
5
const observer = {
next: x => console.log('Observer got a next value: ' + x),
error: err => console.error('Observer got an error: ' + err),
complete: () => console.log('Observer got a complete notification'),
};

要使用观察者,需要把它提供给 Observable 的 subscribe 方法:

1
observable.subscribe(observer);

观察者只是有三个回调函数的对象,每个回调函数对应一种 Observable 发送的通知类型。另外这三个回调不是必须的

另一种写法:

1
2
3
4
5
observable.subscribe(
x => console.log('Observer got a next value: ' + x),
err => console.error('Observer got an error: ' + err),
() => console.log('Observer got a complete notification')
);

操作符

操作符是 Observable 类型上的方法,比如 .map(…)、.filter(…)、.merge(…)等等。当操作符被调用时,它们不会改变已经存在的 Observable 实例。相反,它们返回一个新的 Observable ,它的 subscription 逻辑基于第一个 Observable。

操作符是函数,它基于当前的 Observable 创建一个新的 Observable。这是一个无副作用的操作(除非你想搞事情):前面的 Observable 保持不变。

在下面的示例中,我们创建一个自定义操作符函数,它将从输入 Observable 接收的每个值都乘以10:

1
2
3
4
5
6
7
// 例子一
const observable = from([1, 2, 3]).pipe(
map(x => x * 10),
filter(o => x > 10)
);

observable.subscribe(x => console.log(x));

剖析图:

另外操作符的使用推荐”例子一”的方式(Pipeable操作符)

1
2
3
// 例子二
const observable = from([1, 2, 3]).map(x => x * 10).filter(o => o > 10);
observable.subscribe(x => console.log(x));

有人会用”例子二”的这种写法,这种写法不推荐。别问为什么,问就是Pipeable 操作符😏


Subject (主体)

什么是Subject?
RxJS Subject 是一种特殊类型的 Observable,它允许将值多播给多个观察者

每个 Subject 都是 Observable。 - 对于 Subject,你可以提供一个观察者并使用 subscribe 方法,就可以开始正常接收值。从观察者的角度而言,它无法判断 Observable 执行是来自普通的 Observable 还是 Subject 。

在 Subject 的内部,subscribe 不会调用发送值的新执行。它只是将给定的观察者注册到观察者列表中,类似于其他库或语言中的 addListener 的工作方式。

每个 Subject 都是观察者。 - Subject 是一个有如下方法的对象: next(v)、error(e) 和 complete() 。要给 Subject 提供新值,只要调用 next(theValue),它会将值多播给已注册监听该 Subject 的观察者们。

在下面的示例中,我们为 Subject 添加了两个观察者,然后给 Subject 提供一些值:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
const subject = new Subject();
subject.subscribe({
next: (v) => console.log(`${v}吃🌭`)
});
subject.subscribe({
next: (v) => console.log(`${v}吃🍗`)
});

subject.next('李三');
subject.next('赵四');

// 输出
// 李三吃🌭
// 李三吃🍗
// 赵四吃🌭
// 赵四吃🍗

多播可以理解为一个人(函数)同时(按顺序)去做多件事;比如上面例子:李三一次就吃了🌭🍗

因为 Subject 是观察者,这也就在意味着你可以把 Subject 作为参数传给任何 Observable 的 subscribe 方法,因此可以改改上面的代码:

1
2
3
4
5
6
7
8
9
10
11
12
import { from } from 'rxjs';

var subject = new Subject();
subject.subscribe({
next: (v) => console.log(`${v}吃🌭`)
});
subject.subscribe({
next: (v) => console.log(`${v}吃🍗`)
});
const observable = from(['李三', '赵四']);

observable.subscribe(subject); // 你可以提供一个 Subject 进行订阅

使用上面的方法,我们基本上只是通过 Subject 将单播的 Observable 执行转换为多播的。这也说明了 Subjects 是将任意 Observable 执行共享给多个观察者的唯一方式。

未完待续