操作符
简单来讲,操作符就是解决某个具体应用问题的模式,在针对不同问题时,需要使用不同的操作符,如合并、过滤、异常捕获等
我的理解就是,如果把一个Observalbe比做一个数组,那么操作符就是数组的方法,可以使用map、filter、forEach来对这个数组进行操作
操作符大致分为以下几类
- 创建类
- 转化类
- 过滤类
- 合并类
- 多播类
- 错误处理类
- 辅助工具类
- 条件分支类
- 数学和合计类
创建类
如果使用new Observable这样的方式来创建,定制性很高,如果只是使用一些简单的数据来创建Observable,那么RxJS中提供了专门用来创建Observable的操作符
部分操作符接受一个可选参数**scheduler**,不过我暂时没有学习,所以先忽略掉
of
of操作符可以指定固定的数据来创建Observable对象
import { of } from "rxjs";
const source$ = of(1, 2, 3);
source$.subscribe(value => console.log(value));
// 1 2 3range
of是指定数据,range则是指定一个范围
import { range } from "rxjs";
const source$ = range(5, 10);
source$.subscribe(value => console.log(value));
// 5 6 7 8 ... 14range函数接受 2 个可选参数,第一个参数为起始值,默认值为 0,第二个参数为个数,默认值为undefined
后续的每一个数都会加 1,向上述例子,从 5 开始,每次加 1,知道个数达到 10 个,也就是到达数字 14 时结束
generate
range可以连续的创建数字,如果想要每次输出相隔两个数字呢?
generate就像是RxJS中的for循环
import { generate } from "rxjs";
const source$ = generate(0, x => x < 10, x => x + 1);
source$.subscribe(value => console.log(value));
// 0 1 2 3 4 5 6 7 8 9generate可接受 4 个参数,第一个参数为初始值,第二个参数为判断条件,第三个参数为值的递增,第四个参数可以是结果的处理函数
const source$ = generate(0, x => x < 10, x => x + 1, x => x * 10);
source$.subscribe(value => console.log(value));
// 0 10 20 30 40 50 60 70 80 90增加第四个参数后,输入会根据处理函数的返回值
相较于for循环来看,RxJS中的generate更纯了,函数式,比如for循环,每次都会对条件变量做处理,所以不能用const
repeat
repeat可以对上游的数据进行重复的输出,如果想要一个1,2,3,1,2,3,1,2,3的数据,用of当然是可以做,但是用repeat更加清晰
import { of } from "rxjs";
import { repeat } from "rxjs/operators";
const source$ = of(1, 2, 3).pipe(repeat(3));
source$.subscribe(value => console.log(value));
// 1 2 3 1 2 3 1 2 3重复 3 次of(1,2,3)的数据,这里调用了一个函数叫pipe,在RxJS v5中,这里的调用方式应该是
of(1, 2, 3).repeat(3)
就是链式操作,但在RxJS v6中,需要使用pipe操作符来,pipe就是一个管道,数据从管道进入,进过处理,再从管道出去
EMPTY、NEVER、throwError
EMPTY是直接返回一个完成的Observable,会直接调用观察者的completed函数
NEVER则是返回一个永不完结的Observable,不会调用观察者的任何函数
throwError是抛出一个错误,如果观察者有error函数则调用,否则抛出一个错误
import { EMPTY, NEVER, throwError } from "rxjs";
const source1$ = EMPTY;
const source2$ = NEVER;
const source3$ = throwError("error");
source1$.subscribe({
complete() {
console.log("completed");
},
});
source2$.subscribe(value => console.log(value));
source3$.subscribe({
error(err) {
console.log(err);
},
});
// completed
// 什么都没有
// errorinterval 和 timer
上面的操作符,都是产生同步数据的,最简单的产生异步数据的方法就是interval和timer,从名字可以看出,它们的功能和setInterval和setTimeout类似
interval
import { interval } from "rxjs";
const source$ = interval(1000);
source$.subscribe(value => console.log(value));
// 0 1 2 3 4 ...interval可接受1个参数,如上每隔1000ms`输出依次,从 0 开始,每次加 1
1 秒后输出 0,再一秒后输出 1,再一秒后输出 2...永不完结
timer
import { timer } from "rxjs";
const source$ = timer(1000);
source$.subscribe(value => console.log(value));
// 0timer可接受 2 个参数,不同的是第一个参数可以为数字,也可以为Date对象,如果是数字,则代表毫秒,等待相应时间后输出 0
上例中,一秒后输出 0,完结
第二个参数则是周期,如果有第二个参数,timer也能用不完结
const source$ = timer(1000, 1000);
source$.subscribe(value => console.log(value));
// 0 1 2 3 4 ...这样的效果就和interval一样了
from
from可以把一个像Observable的参数,转为真正的Observable对象
一个数组像Observable,一个伪数组对象也像,字符串也像,Promise也,都可以通过from转为真正的Observable对象
数组
import { from } from "rxjs";
const source$ = from([1, 2, 3]);
source$.subscribe(value => console.log(value));
// 1 2 3伪数组
const source$ = from({
0: 0,
1: 1,
2: 2,
length: 3,
});
source$.subscribe(value => console.log(value));
// 1 2 3字符串
const source$ = from("hello RxJS");
source$.subscribe(value => console.log(value));
// h e l l o R x J SPromise
const source$ = from(Promise.resolve("resolve"));
source$.subscribe(value => console.log(value));
// 输出resolve这里的Promise是resolve的,如果是reject,则会触发抛出错误或调用观察者的error函数
const source$ = from(Promise.reject("reject"));
source$.subscribe(value => console.log(value));
// 抛出错误rejectfromEvent
fromEvent可以把网页中的DOM事件转化为Observalbe对象
import { fromEvent } from "rxjs";
const source$ = fromEvent(document, "click");
source$.subscribe(value => console.log(value));
// MouseEventfromEvent可以接受 3 个参数,第一个参数为事件的DOM对象,第二个参数为事件的名称,第三个参数是EventListenerOptions
const source$ = fromEvent(document, "click", { once: true });
source$.subscribe(value => console.log(value));如这个事件因为传入了第三个参数,只会触发一次
repeatWhen
repeat能够重复订阅上游的Observable,但是不能控制订阅时间,如等待 2 秒再重新订阅
repeatWhen就可以满足上面的需求
import { of } from "rxjs";
import { delay, repeatWhen } from "rxjs/operators";
const source$ = of(1, 2, 3).pipe(
repeatWhen(notifications => notifications.pipe(delay(2000)))
);
source$.subscribe(value => console.log(value));
// 1 2 3 等待2秒 1 2 3当上游Observable completed的时候,会传给repeatWhen一个notifications参数,这个参数也是Observable对象
defer
defer可以只在观察中订阅的时候才创建Observable对象,通常用于Observable工厂函数
import { defer, fromEvent, interval } from "rxjs";
const clicksOrInterval = defer(function() {
return Math.random() > 0.5 ? fromEvent(document, "click") : interval(1000);
});
clicksOrInterval.subscribe(x => console.log(x));刷新页面后,可能输出MouseEvent,也可能输出interval的数字