梦回苍石居  |
Cangshi Live

RxJS 简单入门

前端JAVASCRIPTRXJS
苍石 发表于:2024-04-19 17:48:47  最后编辑于:5 个月前 41 Views

RxJS 入门

什么是RxJS?

RxJS代表响应式扩展JavaScript(Reactive Extensions Library for JavaScript)。它是一个用于处理事件流和异步数据流的库,可以将这些流组合起来以产生更复杂的结果。

响应式编程

响应式编程(Reactive Programming)是一种编程范式,它关注于数据流和变化的传播。这种编程方式特别适用于处理异步数据流

  • 事件驱动(addEventListener/dispatchEvent)

容易产生“回调地狱”(Callback Hell),使代码难以阅读和维护。

// 注册事件
addEventListener('click',(e)=>{
    console.log(e)
    console.log('click')
})

// 触发事件
dispatchEvent(new Event('click'))
  • Promise (Async await)
const p = new Promise((resolve, reject) => {
    resolve('Success')
})

p.then((value) => {
    console.log(value)
})
  • RxJS
const a = new Observable((observer) => {
    observer.next(1);
    observer.next(2);
    observer.next(3);
    setTimeout(() => {
        observer.next(4);
        observer.complete();
    }, 3);
});
a.subscribe((x) => console.log(x))
不同点 事件驱动 Promise RxJS
是否可以中途取消 可以 不可以 可以
是否可以多次触发 可以 不可以 可以
是否支持无状态 不可以 可以 可以

RxJS的主要核心概念

  • Observable: 表示一个可调用的未来值或事件的集合。Observable并不是实际的数据,而是产生数据的源头。
  • Observer: 一个回调函数的集合,用于处理由Observable发出的值,包括nexterrorcomplete
  • Subscription: 表示ObservableObserver之间订阅关系的对象。它提供了一种机制来管理这种关系,包括取消订阅以释放资源。当一个Observer订阅了一个Observable时,会返回一个Subscription对象。这个对象允许Observer取消订阅,或者管理多个订阅。 image.png

RxJS:一个返回值是数据流的函数

Observables 像是没有参数, 但可以泛化为多个值的函数。

考虑如下代码:

function foo() {
  console.log('Hello');
  return 42;
}

var x = foo.call(); // 等同于 foo()
console.log(x);
var y = foo.call(); // 等同于 foo()
console.log(y);

我们期待看到的输出:

"Hello"
42
"Hello"
42

你可以使用Observables重写上面的代码:

var foo = new Observable(function (observer) {
  console.log('Hello');
  observer.next(42);
});

foo.subscribe(function (x) {
  console.log(x);
});
foo.subscribe(function (y) {
  console.log(y);
});

输出是一样的:

"Hello"
42
"Hello"
42

区别是Observables的返回值可以是不同时间产生的多个值。

函数和Observables都是惰性计算,如果你不调用函数,console.log('Hello') 就不会执行。Observables也是如此,如果你不“调用”它(使用subscribe),console.log('Hello') 也不会执行。此外,“调用”或“订阅”是独立的操作:两个函数调用会触发两个单独的副作用,两个Observable 订阅同样也是触发两个单独的副作用。EventEmitters共享副作用并且无论是否存在订阅者都会尽早执行,Observables与之相反,不会共享副作用并且是延迟执行。

Observables 传递值可以是同步的,也可以是异步的。

Observable 可以随着时间的推移“返回”多个值,这是函数所做不到的。你无法这样:

function foo() {
  console.log('Hello');
  return 42;
  return 100; // 死代码,永远不会执行
}

函数只能返回一个值。但 Observables 可以这样:

var foo = new Observable(function (observer) {
  console.log('Hello');
  observer.next(42);
  observer.next(100);
  observer.next(200);
  setTimeout(() => {
    observer.next(300); // 异步执行
  }, 1000);
});

console.log('before');
foo.subscribe(function (x) {
  console.log(x);
});
console.log('after');

输出

"before"
"Hello"
42
100
200
"after"
300

Observables不仅返回了多个值,其中还在1秒后异步返回了一个300 我们可以理解这是一个同步或异步的数据流,当你完成订阅后,就能够获得一个持续的数据流动。

RxJS:Operators操作数据流的函数

我们通过订阅一个Observable可以得到一个数据流,如果我们在订阅后,再通过一个新的Observable将订阅获得的数据流进行一定操作再发送出去,这样如果有新的Observer订阅。

import { Observable } from "rxjs";

const a = new Observable((observer) => {
    observer.next(1);
    observer.next(2);
    observer.next(3);
    setTimeout(() => {
        observer.next(4);
        observer.complete();
    }, 3);
});

const b = new Observable<number>((observer) => {
    // 想要每个值+1
    a.subscribe((x) => {
        observer.next(x + 1);
    });
});

console.log("just before subscribe");
b.subscribe((x) => console.log(x));
console.log("just after subscribe");

得到输出

just before subscribe
2
3
4
just after subscribe
5

可以看到,我们通过订阅b得到了对订阅a后的数据流+1的数据流。 像这样的操作,我将其理解为对数据流的操作,如果有多个这样操作,比如c订阅b再进行操作,这样就形成了订阅链

正常情况下,数据流的处理都是相当复杂的,在RxJS中提供了很多的操作符(Operator)来操作数据流。

如果要实现类似上面+1的逻辑,可以使用map操作符:

import { Observable, map } from "rxjs";

const a = new Observable((observer) => {
    observer.next(1);
    observer.next(2);
    observer.next(3);
    setTimeout(() => {
        observer.next(4);
        observer.complete();
    }, 3);
});

const b = a.pipe(
    map((x) => x + 1)
)

console.log("just before subscribe");
b.subscribe((x) => console.log(x));
console.log("just after subscribe");

map操作符的作用是将每一个数据流中的数据操作一遍,以上代码执行会获得一样的输出结果。 我们通过pipe去使用RxJS给我们提供的操作符,如字面意思一样,可以理解为管道。 pipe函数可以接受多个操作符,并且按顺序处理数据流:

import { Observable, map } from "rxjs";

const a = new Observable((observer) => {
    observer.next(1);
    observer.next(2);
    observer.next(3);
    setTimeout(() => {
        observer.next(4);
        observer.complete();
    }, 3);
});

const b = a.pipe(
    map((x) => x + 1),
    map((x) => x * 2 + "2")
)

console.log("just before subscribe");
b.subscribe((x) => console.log(x));
console.log("just after subscribe");

以上代码在pipe中连续使用了两个map操作符,这使得数据流会按顺序先给第一个map处理,再给第二个map处理,得到的输出结果是:

just before subscribe
42
62
82
just after subscribe
102

更多操作符

https://rxjs.dev/guide/operators

https://segmentfault.com/a/1190000044727567

文章评论 ( 0 )

Person name
未登录用户可以发表匿名评论