RxJS中的Operator


本文作者: jsweibo

本文链接: https://jsweibo.github.io/2019/11/24/RxJS%E4%B8%AD%E7%9A%84Operator/

摘要

本文主要讲述了:

  1. 什么是 Operator
  2. 创建类运算符
  3. 管道类运算符
  4. 高阶 Observable

正文

什么是 Operator

Operator,意为运算符。

Operator是 RxJS 中的函数。

Operator可以分为创建类运算符和管道类运算符

创建类运算符

创建类运算符用于创建Observable实例。

创建类运算符包括:

  • ajax
  • from
  • of
  • range

示例:使用from运算符将数组转换为Observable实例

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
<!DOCTYPE html>
<html lang="en">
<head>
<meta charset="UTF-8" />
<meta name="viewport" content="width=device-width, initial-scale=1.0" />
<meta http-equiv="X-UA-Compatible" content="ie=edge" />
<title>Document</title>
<script src="https://unpkg.com/core-js-bundle/index.js"></script>
<script src="https://unpkg.com/regenerator-runtime/runtime.js"></script>
<script src="https://unpkg.com/@babel/standalone@7/babel.min.js"></script>
<script src="https://unpkg.com/@reactivex/rxjs@6/dist/global/rxjs.umd.js"></script>
<script type="text/babel">
// 将数组中的元素一个接一个地同步推送给订阅者
const observable = new rxjs.from([1, 2, 3]);
</script>
</head>
<body></body>
</html>

示例:使用of运算符将数组转换为Observable实例

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
<!DOCTYPE html>
<html lang="en">
<head>
<meta charset="UTF-8" />
<meta name="viewport" content="width=device-width, initial-scale=1.0" />
<meta http-equiv="X-UA-Compatible" content="ie=edge" />
<title>Document</title>
<script src="https://unpkg.com/core-js-bundle/index.js"></script>
<script src="https://unpkg.com/regenerator-runtime/runtime.js"></script>
<script src="https://unpkg.com/@babel/standalone@7/babel.min.js"></script>
<script src="https://unpkg.com/@reactivex/rxjs@6/dist/global/rxjs.umd.js"></script>
<script type="text/babel">
// 将数组一次性同步推送给订阅者
const observable = new rxjs.of([1, 2, 3]);

// 将参数列表一个接一个地同步推送给订阅者
const observable = new rxjs.of(1, 2, 3);
</script>
</head>
<body></body>
</html>

示例:使用ajax运算符将 Ajax 转换为Observable实例

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
<!DOCTYPE html>
<html lang="en">
<head>
<meta charset="UTF-8" />
<meta name="viewport" content="width=device-width, initial-scale=1.0" />
<meta http-equiv="X-UA-Compatible" content="ie=edge" />
<title>Document</title>
<script src="https://unpkg.com/core-js-bundle/index.js"></script>
<script src="https://unpkg.com/regenerator-runtime/runtime.js"></script>
<script src="https://unpkg.com/@babel/standalone@7/babel.min.js"></script>
<script src="https://unpkg.com/@reactivex/rxjs@6/dist/global/rxjs.umd.js"></script>
<script type="text/babel">
// 注意:不需要使用 new 关键字
const observable = rxjs.ajax.ajax(
'https://api.github.com/users?per_page=1&page=1'
);
const subscription = observable.subscribe({
next(res) {
console.log(res);
},
error(error) {
// 如果Ajax请求发生错误,那么错误会被捕获
console.log(error);
},
complete() {
console.log('complete');
},
});
</script>
</head>
<body></body>
</html>

管道类运算符

管道类运算符接受一个Observable实例作为参数,返回一个新的Observable实例。

注意:管道类运算符并不会修改原有的Observable实例。

管道类运算符包括:

  • map
  • filter
  • catchError

示例:使用map运算符

输入:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
<!DOCTYPE html>
<html lang="en">
<head>
<meta charset="UTF-8" />
<meta name="viewport" content="width=device-width, initial-scale=1.0" />
<meta http-equiv="X-UA-Compatible" content="ie=edge" />
<title>Document</title>
<script src="https://unpkg.com/core-js-bundle/index.js"></script>
<script src="https://unpkg.com/regenerator-runtime/runtime.js"></script>
<script src="https://unpkg.com/@babel/standalone@7/babel.min.js"></script>
<script src="https://unpkg.com/@reactivex/rxjs@6/dist/global/rxjs.umd.js"></script>
<script type="text/babel">
const foo = new rxjs.Observable(function (subscriber) {
subscriber.next(1);
subscriber.next(2);
subscriber.next(3);
subscriber.complete();
});

const bar = rxjs.operators.map(function (value, index) {
return value * 2;
})(foo);

bar.subscribe({
next(x) {
console.log(x);
},
complete() {
console.log('complete');
},
});
</script>
</head>
<body></body>
</html>

输出:

1
2
3
4
2
4
6
"complete"

示例:使用catchError运算符

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
<!DOCTYPE html>
<html lang="en">
<head>
<meta charset="UTF-8" />
<meta name="viewport" content="width=device-width, initial-scale=1.0" />
<meta http-equiv="X-UA-Compatible" content="ie=edge" />
<title>Document</title>
<script src="https://unpkg.com/core-js-bundle/index.js"></script>
<script src="https://unpkg.com/regenerator-runtime/runtime.js"></script>
<script src="https://unpkg.com/@babel/standalone@7/babel.min.js"></script>
<script src="https://unpkg.com/@reactivex/rxjs@6/dist/global/rxjs.umd.js"></script>
<script type="text/babel">
const observable = rxjs.ajax
.ajax('https://api.github.com/users?per_page=1&page=1')
.pipe(
rxjs.operators.catchError(function (error) {
console.log(error);
return rxjs.of([]);
})
);
const subscription = observable.subscribe({
next(res) {
console.log(res);
},
error(error) {
// 如果Ajax请求发生错误,那么错误会被catchError提前捕获,这里实际不会执行
console.log(error);
},
complete() {
console.log('complete');
},
});
</script>
</head>
<body></body>
</html>

管道写法

由于管道运算符都是operator(callback)(observable)的形式,当连续使用了多个管道类运算符时,代码可读性很差。例如:c(callback)(b(callback)(a(callback)(observable)))

为了提升可读性,Observable.prototype.pipe应运而生。例如:c(callback)(b(callback)(a(callback)(observable))) === observable.pipe(a(callback), b(callback), c(callback)),且后者的可读性大大提升

示例:

输入:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
<!DOCTYPE html>
<html lang="en">
<head>
<meta charset="UTF-8" />
<meta name="viewport" content="width=device-width, initial-scale=1.0" />
<meta http-equiv="X-UA-Compatible" content="ie=edge" />
<title>Document</title>
<script src="https://unpkg.com/core-js-bundle/index.js"></script>
<script src="https://unpkg.com/regenerator-runtime/runtime.js"></script>
<script src="https://unpkg.com/@babel/standalone@7/babel.min.js"></script>
<script src="https://unpkg.com/@reactivex/rxjs@6/dist/global/rxjs.umd.js"></script>
<script type="text/babel">
const foo = new rxjs.Observable(function (subscriber) {
subscriber.next(1);
subscriber.next(2);
subscriber.next(3);
subscriber.complete();
});

// 先计算*2,再计算+1
const bar = rxjs.operators.map(function (currentValue, index) {
return currentValue + 1;
})(
rxjs.operators.map(function (currentValue, index) {
return currentValue * 2;
})(foo)
);

bar.subscribe({
next(x) {
console.log(x);
},
complete() {
console.log('complete');
},
});
</script>
</head>
<body></body>
</html>

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
<!DOCTYPE html>
<html lang="en">
<head>
<meta charset="UTF-8" />
<meta name="viewport" content="width=device-width, initial-scale=1.0" />
<meta http-equiv="X-UA-Compatible" content="ie=edge" />
<title>Document</title>
<script src="https://unpkg.com/core-js-bundle/index.js"></script>
<script src="https://unpkg.com/regenerator-runtime/runtime.js"></script>
<script src="https://unpkg.com/@babel/standalone@7/babel.min.js"></script>
<script src="https://unpkg.com/@reactivex/rxjs@6/dist/global/rxjs.umd.js"></script>
<script type="text/babel">
const foo = new rxjs.Observable(function (subscriber) {
subscriber.next(1);
subscriber.next(2);
subscriber.next(3);
subscriber.complete();
});

// 先计算*2,再计算+1
const bar = foo.pipe(
rxjs.operators.map(function (currentValue, index) {
return currentValue * 2;
}),
rxjs.operators.map(function (currentValue, index) {
return currentValue + 1;
})
);

bar.subscribe({
next(x) {
console.log(x);
},
complete() {
console.log('complete');
},
});
</script>
</head>
<body></body>
</html>

输出:

1
2
3
4
3
5
7
"complete"

高阶 Observable

高阶ObservableObservableObservable。例如:有一个Observable实例,订阅它会得到Observable实例。

示例:

输入:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
<!DOCTYPE html>
<html lang="en">
<head>
<meta charset="UTF-8" />
<meta name="viewport" content="width=device-width, initial-scale=1.0" />
<meta http-equiv="X-UA-Compatible" content="ie=edge" />
<title>Document</title>
<script src="https://unpkg.com/core-js-bundle/index.js"></script>
<script src="https://unpkg.com/regenerator-runtime/runtime.js"></script>
<script src="https://unpkg.com/@babel/standalone@7/babel.min.js"></script>
<script src="https://unpkg.com/@reactivex/rxjs@6/dist/global/rxjs.umd.js"></script>
<script type="text/babel">
const observable = new rxjs.Observable(function (subscriber) {
subscriber.next(
new rxjs.Observable(function (subscriber) {
subscriber.next(1);
subscriber.complete();
})
);
subscriber.next(
new rxjs.Observable(function (subscriber) {
subscriber.next(2);
subscriber.complete();
})
);
subscriber.next(
new rxjs.Observable(function (subscriber) {
subscriber.next(3);
subscriber.complete();
})
);
subscriber.complete();
});
const subscription = observable.subscribe({
next(x) {
console.log(x);
},
error(error) {
console.log(error);
},
complete() {
console.log('complete');
},
});
</script>
</head>
<body></body>
</html>

输出:

1
2
3
4
Observable
Observable
Observable
"complete"

在高阶Observableobserver.next中继续订阅低阶Observable

示例:

输入:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
<!DOCTYPE html>
<html lang="en">
<head>
<meta charset="UTF-8" />
<meta name="viewport" content="width=device-width, initial-scale=1.0" />
<meta http-equiv="X-UA-Compatible" content="ie=edge" />
<title>Document</title>
<script src="https://unpkg.com/core-js-bundle/index.js"></script>
<script src="https://unpkg.com/regenerator-runtime/runtime.js"></script>
<script src="https://unpkg.com/@babel/standalone@7/babel.min.js"></script>
<script src="https://unpkg.com/@reactivex/rxjs@6/dist/global/rxjs.umd.js"></script>
<script type="text/babel">
const observable = new rxjs.Observable(function (subscriber) {
subscriber.next(
new rxjs.Observable(function (subscriber) {
subscriber.next(1);
subscriber.complete();
})
);
subscriber.next(
new rxjs.Observable(function (subscriber) {
subscriber.next(2);
subscriber.complete();
})
);
subscriber.next(
new rxjs.Observable(function (subscriber) {
subscriber.next(3);
subscriber.complete();
})
);
subscriber.complete();
});
const subscription = observable.subscribe({
next(x) {
console.log(x);
x.subscribe({
next(x) {
console.log(x);
},
error(error) {
console.log(error);
},
complete() {
console.log('x complete');
},
});
},
error(error) {
console.log(error);
},
complete() {
console.log('complete');
},
});
</script>
</head>
<body></body>
</html>

输出:

1
2
3
4
5
6
7
8
9
10
Observable
1
"x complete"
Observable
2
"x complete"
Observable
3
"x complete"
"complete"

扁平化

concatAllmergeAll是管道运算符。使用它们可以将高阶Observable实例扁平化。

concatAll

concatAll会整合并转发低阶Observable实例推送的通知。但concatAll一次只能转发一个低阶Observable实例推送的通知。直到一个实例complete之后,它才会处理下一个实例。若一个实例error了,它将停止转发。

注意:如果一次性向concatAll推送大量的Observable实例,由于concatAll一次性只能处理一个,多余的实例需要排队,这将占用大量内存。

举个例子:老师听取甲乙二人的汇报并以班级的名义发布公告,老师一次只能听一个人的汇报。只有甲汇报完毕之后,老师才能听乙汇报。如果甲说话滔滔不绝,那么老师永远都不会听乙的汇报。如果在甲汇报的过程中,丙也来了,那么丙只能排在乙后面。如果甲在汇报过程中突然倒地不省人事,那么老师会马上送甲去医院而不是继续听乙和丙汇报。

示例:

输入:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
<!DOCTYPE html>
<html lang="en">
<head>
<meta charset="UTF-8" />
<meta name="viewport" content="width=device-width, initial-scale=1.0" />
<meta http-equiv="X-UA-Compatible" content="ie=edge" />
<title>Document</title>
<script src="https://unpkg.com/core-js-bundle/index.js"></script>
<script src="https://unpkg.com/regenerator-runtime/runtime.js"></script>
<script src="https://unpkg.com/@babel/standalone@7/babel.min.js"></script>
<script src="https://unpkg.com/@reactivex/rxjs@6/dist/global/rxjs.umd.js"></script>
<script type="text/babel">
const observable = new rxjs.Observable(function (subscriber) {
subscriber.next(
new rxjs.Observable(function (subscriber) {
subscriber.next(1);

// 如果第一个实例没有complete,那么第二个实例就不会被订阅,尽管它已经在排队
// subscriber.complete();
})
);
subscriber.next(
new rxjs.Observable(function (subscriber) {
subscriber.next(2);
subscriber.error(new Error('hello, error'));
})
);
subscriber.next(
new rxjs.Observable(function (subscriber) {
subscriber.next(3);
subscriber.complete();
})
);
subscriber.complete();
}).pipe(rxjs.operators.concatAll());
const subscription = observable.subscribe({
next(x) {
console.log(x);
},
error(error) {
console.log(error);
},
complete() {
console.log('complete');
},
});
</script>
</head>
<body></body>
</html>

输出:

1
2
3
1
2
"Error: hello, error"

mergeAll

mergeAll会整合并转发低阶Observable实例推送的通知。和concatAll不同,mergeAll一次可以转发多个低阶Observable实例推送的通知。若一个实例error了,它将停止转发。

示例:

输入:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
<!DOCTYPE html>
<html lang="en">
<head>
<meta charset="UTF-8" />
<meta name="viewport" content="width=device-width, initial-scale=1.0" />
<meta http-equiv="X-UA-Compatible" content="ie=edge" />
<title>Document</title>
<script src="https://unpkg.com/core-js-bundle/index.js"></script>
<script src="https://unpkg.com/regenerator-runtime/runtime.js"></script>
<script src="https://unpkg.com/@babel/standalone@7/babel.min.js"></script>
<script src="https://unpkg.com/@reactivex/rxjs@6/dist/global/rxjs.umd.js"></script>
<script type="text/babel">
const observable = new rxjs.Observable(function (subscriber) {
subscriber.next(
new rxjs.Observable(function (subscriber) {
subscriber.next(1);

// 尽管第一个实例没有complete,也不会影响第二个实例的订阅
// subscriber.complete();
})
);
subscriber.next(
new rxjs.Observable(function (subscriber) {
subscriber.next(2);
subscriber.error(new Error('hello, error'));
})
);
subscriber.next(
new rxjs.Observable(function (subscriber) {
subscriber.next(3);
subscriber.complete();
})
);
subscriber.complete();
}).pipe(rxjs.operators.mergeAll());
const subscription = observable.subscribe({
next(x) {
console.log(x);
},
error(error) {
console.log(error);
},
complete() {
console.log('complete');
},
});
</script>
</head>
<body></body>
</html>

输出:

1
2
3
1
2
"Error: hello, error"

参考资料

本文作者: jsweibo

本文链接: https://jsweibo.github.io/2019/11/24/RxJS%E4%B8%AD%E7%9A%84Operator/


本文对你有帮助?请支持我


支付宝
微信