摘要
本文主要讲述了:
- 什么是 Operator
- 创建类运算符
- 管道类运算符
- 高阶 Observable
正文
什么是 Operator
Operator,意为运算符。
Operator是 RxJS 中的函数。
Operator可以分为创建类运算符和管道类运算符
创建类运算符
创建类运算符用于创建Observable实例。
创建类运算符包括:
示例:使用from运算符将数组转换为Observable实例
| 12
 3
 4
 5
 6
 7
 8
 9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 
 | <!DOCTYPE html><html lang="en">
 <head>
 <meta charset="UTF-8" />
 <meta name="viewport" content="width=device-width, initial-scale=1.0" />
 <meta http-equiv="X-UA-Compatible" content="ie=edge" />
 <title>Document</title>
 <script src="https://unpkg.com/core-js-bundle/index.js"></script>
 <script src="https://unpkg.com/regenerator-runtime/runtime.js"></script>
 <script src="https://unpkg.com/@babel/standalone@7/babel.min.js"></script>
 <script src="https://unpkg.com/@reactivex/rxjs@6/dist/global/rxjs.umd.js"></script>
 <script type="text/babel">
 
 const observable = new rxjs.from([1, 2, 3]);
 </script>
 </head>
 <body></body>
 </html>
 
 | 
示例:使用of运算符将数组转换为Observable实例
| 12
 3
 4
 5
 6
 7
 8
 9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 
 | <!DOCTYPE html><html lang="en">
 <head>
 <meta charset="UTF-8" />
 <meta name="viewport" content="width=device-width, initial-scale=1.0" />
 <meta http-equiv="X-UA-Compatible" content="ie=edge" />
 <title>Document</title>
 <script src="https://unpkg.com/core-js-bundle/index.js"></script>
 <script src="https://unpkg.com/regenerator-runtime/runtime.js"></script>
 <script src="https://unpkg.com/@babel/standalone@7/babel.min.js"></script>
 <script src="https://unpkg.com/@reactivex/rxjs@6/dist/global/rxjs.umd.js"></script>
 <script type="text/babel">
 
 const observable = new rxjs.of([1, 2, 3]);
 
 
 const observable = new rxjs.of(1, 2, 3);
 </script>
 </head>
 <body></body>
 </html>
 
 | 
示例:使用ajax运算符将 Ajax 转换为Observable实例
| 12
 3
 4
 5
 6
 7
 8
 9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 
 | <!DOCTYPE html><html lang="en">
 <head>
 <meta charset="UTF-8" />
 <meta name="viewport" content="width=device-width, initial-scale=1.0" />
 <meta http-equiv="X-UA-Compatible" content="ie=edge" />
 <title>Document</title>
 <script src="https://unpkg.com/core-js-bundle/index.js"></script>
 <script src="https://unpkg.com/regenerator-runtime/runtime.js"></script>
 <script src="https://unpkg.com/@babel/standalone@7/babel.min.js"></script>
 <script src="https://unpkg.com/@reactivex/rxjs@6/dist/global/rxjs.umd.js"></script>
 <script type="text/babel">
 
 const observable = rxjs.ajax.ajax(
 'https://api.github.com/users?per_page=1&page=1'
 );
 const subscription = observable.subscribe({
 next(res) {
 console.log(res);
 },
 error(error) {
 
 console.log(error);
 },
 complete() {
 console.log('complete');
 },
 });
 </script>
 </head>
 <body></body>
 </html>
 
 | 
管道类运算符
管道类运算符接受一个Observable实例作为参数,返回一个新的Observable实例。
注意:管道类运算符并不会修改原有的Observable实例。
管道类运算符包括:
示例:使用map运算符
输入:
| 12
 3
 4
 5
 6
 7
 8
 9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 
 | <!DOCTYPE html><html lang="en">
 <head>
 <meta charset="UTF-8" />
 <meta name="viewport" content="width=device-width, initial-scale=1.0" />
 <meta http-equiv="X-UA-Compatible" content="ie=edge" />
 <title>Document</title>
 <script src="https://unpkg.com/core-js-bundle/index.js"></script>
 <script src="https://unpkg.com/regenerator-runtime/runtime.js"></script>
 <script src="https://unpkg.com/@babel/standalone@7/babel.min.js"></script>
 <script src="https://unpkg.com/@reactivex/rxjs@6/dist/global/rxjs.umd.js"></script>
 <script type="text/babel">
 const foo = new rxjs.Observable(function (subscriber) {
 subscriber.next(1);
 subscriber.next(2);
 subscriber.next(3);
 subscriber.complete();
 });
 
 const bar = rxjs.operators.map(function (value, index) {
 return value * 2;
 })(foo);
 
 bar.subscribe({
 next(x) {
 console.log(x);
 },
 complete() {
 console.log('complete');
 },
 });
 </script>
 </head>
 <body></body>
 </html>
 
 | 
输出:
示例:使用catchError运算符
| 12
 3
 4
 5
 6
 7
 8
 9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 
 | <!DOCTYPE html><html lang="en">
 <head>
 <meta charset="UTF-8" />
 <meta name="viewport" content="width=device-width, initial-scale=1.0" />
 <meta http-equiv="X-UA-Compatible" content="ie=edge" />
 <title>Document</title>
 <script src="https://unpkg.com/core-js-bundle/index.js"></script>
 <script src="https://unpkg.com/regenerator-runtime/runtime.js"></script>
 <script src="https://unpkg.com/@babel/standalone@7/babel.min.js"></script>
 <script src="https://unpkg.com/@reactivex/rxjs@6/dist/global/rxjs.umd.js"></script>
 <script type="text/babel">
 const observable = rxjs.ajax
 .ajax('https://api.github.com/users?per_page=1&page=1')
 .pipe(
 rxjs.operators.catchError(function (error) {
 console.log(error);
 return rxjs.of([]);
 })
 );
 const subscription = observable.subscribe({
 next(res) {
 console.log(res);
 },
 error(error) {
 
 console.log(error);
 },
 complete() {
 console.log('complete');
 },
 });
 </script>
 </head>
 <body></body>
 </html>
 
 | 
管道写法
由于管道运算符都是operator(callback)(observable)的形式,当连续使用了多个管道类运算符时,代码可读性很差。例如:c(callback)(b(callback)(a(callback)(observable)))
为了提升可读性,Observable.prototype.pipe应运而生。例如:c(callback)(b(callback)(a(callback)(observable))) === observable.pipe(a(callback), b(callback), c(callback)),且后者的可读性大大提升
示例:
输入:
| 12
 3
 4
 5
 6
 7
 8
 9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 
 | <!DOCTYPE html><html lang="en">
 <head>
 <meta charset="UTF-8" />
 <meta name="viewport" content="width=device-width, initial-scale=1.0" />
 <meta http-equiv="X-UA-Compatible" content="ie=edge" />
 <title>Document</title>
 <script src="https://unpkg.com/core-js-bundle/index.js"></script>
 <script src="https://unpkg.com/regenerator-runtime/runtime.js"></script>
 <script src="https://unpkg.com/@babel/standalone@7/babel.min.js"></script>
 <script src="https://unpkg.com/@reactivex/rxjs@6/dist/global/rxjs.umd.js"></script>
 <script type="text/babel">
 const foo = new rxjs.Observable(function (subscriber) {
 subscriber.next(1);
 subscriber.next(2);
 subscriber.next(3);
 subscriber.complete();
 });
 
 
 const bar = rxjs.operators.map(function (currentValue, index) {
 return currentValue + 1;
 })(
 rxjs.operators.map(function (currentValue, index) {
 return currentValue * 2;
 })(foo)
 );
 
 bar.subscribe({
 next(x) {
 console.log(x);
 },
 complete() {
 console.log('complete');
 },
 });
 </script>
 </head>
 <body></body>
 </html>
 
 | 
或
| 12
 3
 4
 5
 6
 7
 8
 9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 
 | <!DOCTYPE html><html lang="en">
 <head>
 <meta charset="UTF-8" />
 <meta name="viewport" content="width=device-width, initial-scale=1.0" />
 <meta http-equiv="X-UA-Compatible" content="ie=edge" />
 <title>Document</title>
 <script src="https://unpkg.com/core-js-bundle/index.js"></script>
 <script src="https://unpkg.com/regenerator-runtime/runtime.js"></script>
 <script src="https://unpkg.com/@babel/standalone@7/babel.min.js"></script>
 <script src="https://unpkg.com/@reactivex/rxjs@6/dist/global/rxjs.umd.js"></script>
 <script type="text/babel">
 const foo = new rxjs.Observable(function (subscriber) {
 subscriber.next(1);
 subscriber.next(2);
 subscriber.next(3);
 subscriber.complete();
 });
 
 
 const bar = foo.pipe(
 rxjs.operators.map(function (currentValue, index) {
 return currentValue * 2;
 }),
 rxjs.operators.map(function (currentValue, index) {
 return currentValue + 1;
 })
 );
 
 bar.subscribe({
 next(x) {
 console.log(x);
 },
 complete() {
 console.log('complete');
 },
 });
 </script>
 </head>
 <body></body>
 </html>
 
 | 
输出:
高阶 Observable
高阶Observable即Observable的Observable。例如:有一个Observable实例,订阅它会得到Observable实例。
示例:
输入:
| 12
 3
 4
 5
 6
 7
 8
 9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 
 | <!DOCTYPE html><html lang="en">
 <head>
 <meta charset="UTF-8" />
 <meta name="viewport" content="width=device-width, initial-scale=1.0" />
 <meta http-equiv="X-UA-Compatible" content="ie=edge" />
 <title>Document</title>
 <script src="https://unpkg.com/core-js-bundle/index.js"></script>
 <script src="https://unpkg.com/regenerator-runtime/runtime.js"></script>
 <script src="https://unpkg.com/@babel/standalone@7/babel.min.js"></script>
 <script src="https://unpkg.com/@reactivex/rxjs@6/dist/global/rxjs.umd.js"></script>
 <script type="text/babel">
 const observable = new rxjs.Observable(function (subscriber) {
 subscriber.next(
 new rxjs.Observable(function (subscriber) {
 subscriber.next(1);
 subscriber.complete();
 })
 );
 subscriber.next(
 new rxjs.Observable(function (subscriber) {
 subscriber.next(2);
 subscriber.complete();
 })
 );
 subscriber.next(
 new rxjs.Observable(function (subscriber) {
 subscriber.next(3);
 subscriber.complete();
 })
 );
 subscriber.complete();
 });
 const subscription = observable.subscribe({
 next(x) {
 console.log(x);
 },
 error(error) {
 console.log(error);
 },
 complete() {
 console.log('complete');
 },
 });
 </script>
 </head>
 <body></body>
 </html>
 
 | 
输出:
| 12
 3
 4
 
 | ObservableObservable
 Observable
 "complete"
 
 | 
在高阶Observable的observer.next中继续订阅低阶Observable。
示例:
输入:
| 12
 3
 4
 5
 6
 7
 8
 9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 
 | <!DOCTYPE html><html lang="en">
 <head>
 <meta charset="UTF-8" />
 <meta name="viewport" content="width=device-width, initial-scale=1.0" />
 <meta http-equiv="X-UA-Compatible" content="ie=edge" />
 <title>Document</title>
 <script src="https://unpkg.com/core-js-bundle/index.js"></script>
 <script src="https://unpkg.com/regenerator-runtime/runtime.js"></script>
 <script src="https://unpkg.com/@babel/standalone@7/babel.min.js"></script>
 <script src="https://unpkg.com/@reactivex/rxjs@6/dist/global/rxjs.umd.js"></script>
 <script type="text/babel">
 const observable = new rxjs.Observable(function (subscriber) {
 subscriber.next(
 new rxjs.Observable(function (subscriber) {
 subscriber.next(1);
 subscriber.complete();
 })
 );
 subscriber.next(
 new rxjs.Observable(function (subscriber) {
 subscriber.next(2);
 subscriber.complete();
 })
 );
 subscriber.next(
 new rxjs.Observable(function (subscriber) {
 subscriber.next(3);
 subscriber.complete();
 })
 );
 subscriber.complete();
 });
 const subscription = observable.subscribe({
 next(x) {
 console.log(x);
 x.subscribe({
 next(x) {
 console.log(x);
 },
 error(error) {
 console.log(error);
 },
 complete() {
 console.log('x complete');
 },
 });
 },
 error(error) {
 console.log(error);
 },
 complete() {
 console.log('complete');
 },
 });
 </script>
 </head>
 <body></body>
 </html>
 
 | 
输出:
| 12
 3
 4
 5
 6
 7
 8
 9
 10
 
 | Observable1
 "x complete"
 Observable
 2
 "x complete"
 Observable
 3
 "x complete"
 "complete"
 
 | 
扁平化
concatAll和mergeAll是管道运算符。使用它们可以将高阶Observable实例扁平化。
concatAll
concatAll会整合并转发低阶Observable实例推送的通知。但concatAll一次只能转发一个低阶Observable实例推送的通知。直到一个实例complete之后,它才会处理下一个实例。若一个实例error了,它将停止转发。
注意:如果一次性向concatAll推送大量的Observable实例,由于concatAll一次性只能处理一个,多余的实例需要排队,这将占用大量内存。
举个例子:老师听取甲乙二人的汇报并以班级的名义发布公告,老师一次只能听一个人的汇报。只有甲汇报完毕之后,老师才能听乙汇报。如果甲说话滔滔不绝,那么老师永远都不会听乙的汇报。如果在甲汇报的过程中,丙也来了,那么丙只能排在乙后面。如果甲在汇报过程中突然倒地不省人事,那么老师会马上送甲去医院而不是继续听乙和丙汇报。
示例:
输入:
| 12
 3
 4
 5
 6
 7
 8
 9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 
 | <!DOCTYPE html><html lang="en">
 <head>
 <meta charset="UTF-8" />
 <meta name="viewport" content="width=device-width, initial-scale=1.0" />
 <meta http-equiv="X-UA-Compatible" content="ie=edge" />
 <title>Document</title>
 <script src="https://unpkg.com/core-js-bundle/index.js"></script>
 <script src="https://unpkg.com/regenerator-runtime/runtime.js"></script>
 <script src="https://unpkg.com/@babel/standalone@7/babel.min.js"></script>
 <script src="https://unpkg.com/@reactivex/rxjs@6/dist/global/rxjs.umd.js"></script>
 <script type="text/babel">
 const observable = new rxjs.Observable(function (subscriber) {
 subscriber.next(
 new rxjs.Observable(function (subscriber) {
 subscriber.next(1);
 
 
 
 })
 );
 subscriber.next(
 new rxjs.Observable(function (subscriber) {
 subscriber.next(2);
 subscriber.error(new Error('hello, error'));
 })
 );
 subscriber.next(
 new rxjs.Observable(function (subscriber) {
 subscriber.next(3);
 subscriber.complete();
 })
 );
 subscriber.complete();
 }).pipe(rxjs.operators.concatAll());
 const subscription = observable.subscribe({
 next(x) {
 console.log(x);
 },
 error(error) {
 console.log(error);
 },
 complete() {
 console.log('complete');
 },
 });
 </script>
 </head>
 <body></body>
 </html>
 
 | 
输出:
| 12
 3
 
 | 12
 "Error: hello, error"
 
 | 
mergeAll
mergeAll会整合并转发低阶Observable实例推送的通知。和concatAll不同,mergeAll一次可以转发多个低阶Observable实例推送的通知。若一个实例error了,它将停止转发。
示例:
输入:
| 12
 3
 4
 5
 6
 7
 8
 9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 
 | <!DOCTYPE html><html lang="en">
 <head>
 <meta charset="UTF-8" />
 <meta name="viewport" content="width=device-width, initial-scale=1.0" />
 <meta http-equiv="X-UA-Compatible" content="ie=edge" />
 <title>Document</title>
 <script src="https://unpkg.com/core-js-bundle/index.js"></script>
 <script src="https://unpkg.com/regenerator-runtime/runtime.js"></script>
 <script src="https://unpkg.com/@babel/standalone@7/babel.min.js"></script>
 <script src="https://unpkg.com/@reactivex/rxjs@6/dist/global/rxjs.umd.js"></script>
 <script type="text/babel">
 const observable = new rxjs.Observable(function (subscriber) {
 subscriber.next(
 new rxjs.Observable(function (subscriber) {
 subscriber.next(1);
 
 
 
 })
 );
 subscriber.next(
 new rxjs.Observable(function (subscriber) {
 subscriber.next(2);
 subscriber.error(new Error('hello, error'));
 })
 );
 subscriber.next(
 new rxjs.Observable(function (subscriber) {
 subscriber.next(3);
 subscriber.complete();
 })
 );
 subscriber.complete();
 }).pipe(rxjs.operators.mergeAll());
 const subscription = observable.subscribe({
 next(x) {
 console.log(x);
 },
 error(error) {
 console.log(error);
 },
 complete() {
 console.log('complete');
 },
 });
 </script>
 </head>
 <body></body>
 </html>
 
 | 
输出:
| 12
 3
 
 | 12
 "Error: hello, error"
 
 | 
参考资料
        
          
        
        
          
          
  本文对你有帮助?请支持我
  
  
    
       支付宝
      支付宝
    
    
       微信
      微信