摘要
本文主要讲述了:
- 什么是 Operator
- 创建类运算符
- 管道类运算符
- 高阶 Observable
正文
什么是 Operator
Operator,意为运算符。
Operator
是 RxJS 中的函数。
Operator
可以分为创建类运算符和管道类运算符
创建类运算符
创建类运算符用于创建Observable
实例。
创建类运算符包括:
示例:使用from
运算符将数组转换为Observable
实例
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18
| <!DOCTYPE html> <html lang="en"> <head> <meta charset="UTF-8" /> <meta name="viewport" content="width=device-width, initial-scale=1.0" /> <meta http-equiv="X-UA-Compatible" content="ie=edge" /> <title>Document</title> <script src="https://unpkg.com/core-js-bundle/index.js"></script> <script src="https://unpkg.com/regenerator-runtime/runtime.js"></script> <script src="https://unpkg.com/@babel/standalone@7/babel.min.js"></script> <script src="https://unpkg.com/@reactivex/rxjs@6/dist/global/rxjs.umd.js"></script> <script type="text/babel"> const observable = new rxjs.from([1, 2, 3]); </script> </head> <body></body> </html>
|
示例:使用of
运算符将数组转换为Observable
实例
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21
| <!DOCTYPE html> <html lang="en"> <head> <meta charset="UTF-8" /> <meta name="viewport" content="width=device-width, initial-scale=1.0" /> <meta http-equiv="X-UA-Compatible" content="ie=edge" /> <title>Document</title> <script src="https://unpkg.com/core-js-bundle/index.js"></script> <script src="https://unpkg.com/regenerator-runtime/runtime.js"></script> <script src="https://unpkg.com/@babel/standalone@7/babel.min.js"></script> <script src="https://unpkg.com/@reactivex/rxjs@6/dist/global/rxjs.umd.js"></script> <script type="text/babel"> const observable = new rxjs.of([1, 2, 3]);
const observable = new rxjs.of(1, 2, 3); </script> </head> <body></body> </html>
|
示例:使用ajax
运算符将 Ajax 转换为Observable
实例
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32
| <!DOCTYPE html> <html lang="en"> <head> <meta charset="UTF-8" /> <meta name="viewport" content="width=device-width, initial-scale=1.0" /> <meta http-equiv="X-UA-Compatible" content="ie=edge" /> <title>Document</title> <script src="https://unpkg.com/core-js-bundle/index.js"></script> <script src="https://unpkg.com/regenerator-runtime/runtime.js"></script> <script src="https://unpkg.com/@babel/standalone@7/babel.min.js"></script> <script src="https://unpkg.com/@reactivex/rxjs@6/dist/global/rxjs.umd.js"></script> <script type="text/babel"> const observable = rxjs.ajax.ajax( 'https://api.github.com/users?per_page=1&page=1' ); const subscription = observable.subscribe({ next(res) { console.log(res); }, error(error) { console.log(error); }, complete() { console.log('complete'); }, }); </script> </head> <body></body> </html>
|
管道类运算符
管道类运算符接受一个Observable
实例作为参数,返回一个新的Observable
实例。
注意:管道类运算符并不会修改原有的Observable
实例。
管道类运算符包括:
示例:使用map
运算符
输入:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35
| <!DOCTYPE html> <html lang="en"> <head> <meta charset="UTF-8" /> <meta name="viewport" content="width=device-width, initial-scale=1.0" /> <meta http-equiv="X-UA-Compatible" content="ie=edge" /> <title>Document</title> <script src="https://unpkg.com/core-js-bundle/index.js"></script> <script src="https://unpkg.com/regenerator-runtime/runtime.js"></script> <script src="https://unpkg.com/@babel/standalone@7/babel.min.js"></script> <script src="https://unpkg.com/@reactivex/rxjs@6/dist/global/rxjs.umd.js"></script> <script type="text/babel"> const foo = new rxjs.Observable(function (subscriber) { subscriber.next(1); subscriber.next(2); subscriber.next(3); subscriber.complete(); });
const bar = rxjs.operators.map(function (value, index) { return value * 2; })(foo);
bar.subscribe({ next(x) { console.log(x); }, complete() { console.log('complete'); }, }); </script> </head> <body></body> </html>
|
输出:
示例:使用catchError
运算符
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36
| <!DOCTYPE html> <html lang="en"> <head> <meta charset="UTF-8" /> <meta name="viewport" content="width=device-width, initial-scale=1.0" /> <meta http-equiv="X-UA-Compatible" content="ie=edge" /> <title>Document</title> <script src="https://unpkg.com/core-js-bundle/index.js"></script> <script src="https://unpkg.com/regenerator-runtime/runtime.js"></script> <script src="https://unpkg.com/@babel/standalone@7/babel.min.js"></script> <script src="https://unpkg.com/@reactivex/rxjs@6/dist/global/rxjs.umd.js"></script> <script type="text/babel"> const observable = rxjs.ajax .ajax('https://api.github.com/users?per_page=1&page=1') .pipe( rxjs.operators.catchError(function (error) { console.log(error); return rxjs.of([]); }) ); const subscription = observable.subscribe({ next(res) { console.log(res); }, error(error) { console.log(error); }, complete() { console.log('complete'); }, }); </script> </head> <body></body> </html>
|
管道写法
由于管道运算符都是operator(callback)(observable)
的形式,当连续使用了多个管道类运算符时,代码可读性很差。例如:c(callback)(b(callback)(a(callback)(observable)))
为了提升可读性,Observable.prototype.pipe
应运而生。例如:c(callback)(b(callback)(a(callback)(observable)))
=== observable.pipe(a(callback), b(callback), c(callback))
,且后者的可读性大大提升
示例:
输入:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40
| <!DOCTYPE html> <html lang="en"> <head> <meta charset="UTF-8" /> <meta name="viewport" content="width=device-width, initial-scale=1.0" /> <meta http-equiv="X-UA-Compatible" content="ie=edge" /> <title>Document</title> <script src="https://unpkg.com/core-js-bundle/index.js"></script> <script src="https://unpkg.com/regenerator-runtime/runtime.js"></script> <script src="https://unpkg.com/@babel/standalone@7/babel.min.js"></script> <script src="https://unpkg.com/@reactivex/rxjs@6/dist/global/rxjs.umd.js"></script> <script type="text/babel"> const foo = new rxjs.Observable(function (subscriber) { subscriber.next(1); subscriber.next(2); subscriber.next(3); subscriber.complete(); });
const bar = rxjs.operators.map(function (currentValue, index) { return currentValue + 1; })( rxjs.operators.map(function (currentValue, index) { return currentValue * 2; })(foo) );
bar.subscribe({ next(x) { console.log(x); }, complete() { console.log('complete'); }, }); </script> </head> <body></body> </html>
|
或
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41
| <!DOCTYPE html> <html lang="en"> <head> <meta charset="UTF-8" /> <meta name="viewport" content="width=device-width, initial-scale=1.0" /> <meta http-equiv="X-UA-Compatible" content="ie=edge" /> <title>Document</title> <script src="https://unpkg.com/core-js-bundle/index.js"></script> <script src="https://unpkg.com/regenerator-runtime/runtime.js"></script> <script src="https://unpkg.com/@babel/standalone@7/babel.min.js"></script> <script src="https://unpkg.com/@reactivex/rxjs@6/dist/global/rxjs.umd.js"></script> <script type="text/babel"> const foo = new rxjs.Observable(function (subscriber) { subscriber.next(1); subscriber.next(2); subscriber.next(3); subscriber.complete(); });
const bar = foo.pipe( rxjs.operators.map(function (currentValue, index) { return currentValue * 2; }), rxjs.operators.map(function (currentValue, index) { return currentValue + 1; }) );
bar.subscribe({ next(x) { console.log(x); }, complete() { console.log('complete'); }, }); </script> </head> <body></body> </html>
|
输出:
高阶 Observable
高阶Observable
即Observable
的Observable
。例如:有一个Observable
实例,订阅它会得到Observable
实例。
示例:
输入:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48
| <!DOCTYPE html> <html lang="en"> <head> <meta charset="UTF-8" /> <meta name="viewport" content="width=device-width, initial-scale=1.0" /> <meta http-equiv="X-UA-Compatible" content="ie=edge" /> <title>Document</title> <script src="https://unpkg.com/core-js-bundle/index.js"></script> <script src="https://unpkg.com/regenerator-runtime/runtime.js"></script> <script src="https://unpkg.com/@babel/standalone@7/babel.min.js"></script> <script src="https://unpkg.com/@reactivex/rxjs@6/dist/global/rxjs.umd.js"></script> <script type="text/babel"> const observable = new rxjs.Observable(function (subscriber) { subscriber.next( new rxjs.Observable(function (subscriber) { subscriber.next(1); subscriber.complete(); }) ); subscriber.next( new rxjs.Observable(function (subscriber) { subscriber.next(2); subscriber.complete(); }) ); subscriber.next( new rxjs.Observable(function (subscriber) { subscriber.next(3); subscriber.complete(); }) ); subscriber.complete(); }); const subscription = observable.subscribe({ next(x) { console.log(x); }, error(error) { console.log(error); }, complete() { console.log('complete'); }, }); </script> </head> <body></body> </html>
|
输出:
1 2 3 4
| Observable Observable Observable "complete"
|
在高阶Observable
的observer.next
中继续订阅低阶Observable
。
示例:
输入:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59
| <!DOCTYPE html> <html lang="en"> <head> <meta charset="UTF-8" /> <meta name="viewport" content="width=device-width, initial-scale=1.0" /> <meta http-equiv="X-UA-Compatible" content="ie=edge" /> <title>Document</title> <script src="https://unpkg.com/core-js-bundle/index.js"></script> <script src="https://unpkg.com/regenerator-runtime/runtime.js"></script> <script src="https://unpkg.com/@babel/standalone@7/babel.min.js"></script> <script src="https://unpkg.com/@reactivex/rxjs@6/dist/global/rxjs.umd.js"></script> <script type="text/babel"> const observable = new rxjs.Observable(function (subscriber) { subscriber.next( new rxjs.Observable(function (subscriber) { subscriber.next(1); subscriber.complete(); }) ); subscriber.next( new rxjs.Observable(function (subscriber) { subscriber.next(2); subscriber.complete(); }) ); subscriber.next( new rxjs.Observable(function (subscriber) { subscriber.next(3); subscriber.complete(); }) ); subscriber.complete(); }); const subscription = observable.subscribe({ next(x) { console.log(x); x.subscribe({ next(x) { console.log(x); }, error(error) { console.log(error); }, complete() { console.log('x complete'); }, }); }, error(error) { console.log(error); }, complete() { console.log('complete'); }, }); </script> </head> <body></body> </html>
|
输出:
1 2 3 4 5 6 7 8 9 10
| Observable 1 "x complete" Observable 2 "x complete" Observable 3 "x complete" "complete"
|
扁平化
concatAll
和mergeAll
是管道运算符。使用它们可以将高阶Observable
实例扁平化。
concatAll
concatAll
会整合并转发低阶Observable
实例推送的通知。但concatAll
一次只能转发一个低阶Observable
实例推送的通知。直到一个实例complete
之后,它才会处理下一个实例。若一个实例error
了,它将停止转发。
注意:如果一次性向concatAll
推送大量的Observable
实例,由于concatAll
一次性只能处理一个,多余的实例需要排队,这将占用大量内存。
举个例子:老师听取甲乙二人的汇报并以班级的名义发布公告,老师一次只能听一个人的汇报。只有甲汇报完毕之后,老师才能听乙汇报。如果甲说话滔滔不绝,那么老师永远都不会听乙的汇报。如果在甲汇报的过程中,丙也来了,那么丙只能排在乙后面。如果甲在汇报过程中突然倒地不省人事,那么老师会马上送甲去医院而不是继续听乙和丙汇报。
示例:
输入:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50
| <!DOCTYPE html> <html lang="en"> <head> <meta charset="UTF-8" /> <meta name="viewport" content="width=device-width, initial-scale=1.0" /> <meta http-equiv="X-UA-Compatible" content="ie=edge" /> <title>Document</title> <script src="https://unpkg.com/core-js-bundle/index.js"></script> <script src="https://unpkg.com/regenerator-runtime/runtime.js"></script> <script src="https://unpkg.com/@babel/standalone@7/babel.min.js"></script> <script src="https://unpkg.com/@reactivex/rxjs@6/dist/global/rxjs.umd.js"></script> <script type="text/babel"> const observable = new rxjs.Observable(function (subscriber) { subscriber.next( new rxjs.Observable(function (subscriber) { subscriber.next(1);
}) ); subscriber.next( new rxjs.Observable(function (subscriber) { subscriber.next(2); subscriber.error(new Error('hello, error')); }) ); subscriber.next( new rxjs.Observable(function (subscriber) { subscriber.next(3); subscriber.complete(); }) ); subscriber.complete(); }).pipe(rxjs.operators.concatAll()); const subscription = observable.subscribe({ next(x) { console.log(x); }, error(error) { console.log(error); }, complete() { console.log('complete'); }, }); </script> </head> <body></body> </html>
|
输出:
1 2 3
| 1 2 "Error: hello, error"
|
mergeAll
mergeAll
会整合并转发低阶Observable
实例推送的通知。和concatAll
不同,mergeAll
一次可以转发多个低阶Observable
实例推送的通知。若一个实例error
了,它将停止转发。
示例:
输入:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50
| <!DOCTYPE html> <html lang="en"> <head> <meta charset="UTF-8" /> <meta name="viewport" content="width=device-width, initial-scale=1.0" /> <meta http-equiv="X-UA-Compatible" content="ie=edge" /> <title>Document</title> <script src="https://unpkg.com/core-js-bundle/index.js"></script> <script src="https://unpkg.com/regenerator-runtime/runtime.js"></script> <script src="https://unpkg.com/@babel/standalone@7/babel.min.js"></script> <script src="https://unpkg.com/@reactivex/rxjs@6/dist/global/rxjs.umd.js"></script> <script type="text/babel"> const observable = new rxjs.Observable(function (subscriber) { subscriber.next( new rxjs.Observable(function (subscriber) { subscriber.next(1);
}) ); subscriber.next( new rxjs.Observable(function (subscriber) { subscriber.next(2); subscriber.error(new Error('hello, error')); }) ); subscriber.next( new rxjs.Observable(function (subscriber) { subscriber.next(3); subscriber.complete(); }) ); subscriber.complete(); }).pipe(rxjs.operators.mergeAll()); const subscription = observable.subscribe({ next(x) { console.log(x); }, error(error) { console.log(error); }, complete() { console.log('complete'); }, }); </script> </head> <body></body> </html>
|
输出:
1 2 3
| 1 2 "Error: hello, error"
|
参考资料
本文对你有帮助?请支持我