«
Rxjs 在 React 中的实践

时间:2022-4   


引言

怎样快速上手一个新的框架或者类库——当然是先从一个 Demo 入手,先会用再去理解背后的原理和思想。

本文主要结合 Rxjs v6 在 React v16 中的使用,从实例出发,谈谈 Rxjs 的应用场景和使用技巧。

概念理解

TL;DR

Observable & Observer

在 Rxjs 的世界中, Observable 和 Observer 是两个最重要的两个概念,所有操作都是基于这两个对象产生的。

Observable 是一个“可以被观察的对象/发布者”,而 Observer 则是一个“观察者”。一个 Observable 可以生产数据,如果一个观察者对某个被观察者生产的数据感兴趣的话,可以通过主动订阅( subscrible )这个被观察者。之后,如果被观察者本身的状态发生改变,会发送一个通知,调用观察者的更新方法。如果被观察者不想再接收通知,也可以主动取订( unsubscribe )。

在 Rxjs 中,Observer 其实就是一个对象,对象上可以包含 nextcomplete  和  error  三种方法。

一般来说,Observer 可以只定义  next ,其余两个方法都是完结状态,可以只在必要的时候处理。

Subject

单播、广播和多播

Cold Observable & Hot Observable

在很多情况下,我们希望多个 Observer 订阅到的是同一个数据流。那么,我们要如何把 Cold Observable “加热” 成 Hot Observable —— 利用 Subject!

Subject

在 Rxjs 中,Subject 是一个代理对象,它既有 Observable 的接口,又有 Observer 的接口。 利用 Subject 可以实现多播,当下游需要 Hot Observable 的时候,可以订阅一个 Subject 对象。

Rxjs 提供了三个基础的多播操作符:

利用这三个子类,我们可以在产生新的订阅时,指定数据重播的方式。

Scheduler

Scheduler 可以解释为“调度器”,它能用于控制 Rxjs 数据流中数据消息的推送节奏,增强数据流处理的性能。

我们知道 JavaScript 是单线程运行的,在 Rxjs 中的操作一般是通过调用栈来完成的,但是有时我们需要生产一个数据量很大的数据,同步执行可能导致内存占用过大。因此,Rxjs 提供了 4 种增强性能的任务调度器:

这四个调度器可以作为 Observable 的第二个参数来改变 Observable,比如  offromrangeinterval  等。同时,Rxjs 也提供了两个用于指定特定调度器的操作符:

起手式

订阅数据流

在 React 组件中使用 Rxjs,最简单的用法就是:在组件初始挂载的时候,订阅一个数据流;在组件卸载的时候,取消订阅这个数据流

import { fromEvent } from "rxjs";

const Component = () => {
  useEffect(() => {
    let subscription = fromEvent(document.body, "click").subscribe((e) => {
      console.log("click", e);
    });
    return () => subscription.unsubscribe();
  }, []);
  return <div></div>;
};
const Counter = () => {
  const numbers$ = useMemo(() => interval(1000), []);
  const count = useObservable(() => numbers$);
  return <div>{count}</div>;
};

数据隔离

在开发中,我们通常会将业务组件的视图和应用逻辑进行分离,以保证组件的精简和纯净,同时便于复用逻辑层。

Angular 通过依赖注入来将应用逻辑分解为服务,并让这些服务可用于各个组件中。我们可以利用这个思想,将 Rxjs 的操作单独抽离成一个 Service 层,在组件中通过注册使用这些服务。

我们通过一个 TodoList 来分解这个过程。

// todo/model/service.js
class TodoService {
  // 用 $ 标记一个 Observable
  private refresh$ = new BehaviorSubject(0);
  private loadingSource$ = new BehaviorSubject(false);
  loading$ = this.loadingSource$.asObservable();
  todoList$ = combineLatest(this.refresh$).pipe(
    debounceTime(250),
    tap(() => {
      this.loadingSource$.next(true);
    }),
    switchMap(() => {
      return from(TodoApi.requestList());
    }),
    tap((res) => {
      this.loadingSource$.next(false);
    })
  );

  refresh() {
    this.refresh$.next(Math.random());
  }
}
const useInstance = (instanceClass) => {
  const instance = useRef(null);
  return instance.current || (instance.current = new instanceClass());
};
const Todo = (props) => {
  const todoService = useInstance(TodoService);
  return <div>...</div>;
};
// todo/index.jsx
export const Todo = () => {
  const todoService = useInstance(TodoService);
  const todoList = useObservable(() => todoService.todoList$, []);
  const loading = useObservable(() => todoService.loading$, false);
  return (
    <div>
      <div>
        {todoList.map((todo) => (
          <div key={todo.id}>
            <input type="checkbox" checked={todo.done}></input>
            <span>{todo.name}</span>
          </div>
        ))}
        <div className={cx({ [styles.loading]: loading })}></div>
      </div>
    </div>
  );
};

数据流管理

在上述的例子中,我们在单个组件中使用  useInstance  来注册数据源,但是,如果需要在多个组件中共享数据源呢?我们可以基于 Context 甚至 Redux 来管理。

const InjectorContext = createContext({ getInstance: () => null });
const useInjector = (classes) => {
  const instanceMap = useInstance(Map);
  useEffect(() => {
    classes.forEach((cls) => {
      if (!instanceMap.has(cls.name)) {
        instanceMap.set(cls.name, new cls());
      }
    });
  }, [classes, instanceMap]);

  const getInstance = useCallback(
    (cls) => {
      if (!instanceMap.has(cls.name)) {
        instanceMap.set(cls.name, new cls());
      }
      return instanceMap.get(cls.name);
    },
    [instanceMap]
  );
  return { getInstance };
};

const useInject = (injectClass) => {
  const instance = useRef();
  const Injector = useContext(InjectorContext);
  return (
    instance.current || (instance.current = Injector.getInstance(injectClass))
  );
};
const DataA = (props) => {
  const shareService = useInject(ShareService);
  return <div>...</div>;
};
const DataB = (props) => {
  const shareService = useInject(ShareService);
  return <div>...</div>;
};

const SourceApp = () => {
  const injector = useInjector([ShareService]);
  return (
    <InjectorContext.Provider value={injector}>
      <div>
        <DataA></DataA>
        <DataB></DataB>
      </div>
    </InjectorContext.Provider>
  );
};

More

更复杂的情况是,如果两个服务之间需要互相引用,我们可以参考 injection-js、 react-ioc 进行处理,这里就不作深入研究了。

在案例中解读

使用一个新框架,应该从实际业务场景出发,看它是否能真正解决开发中的一些难点。毕竟 Rxjs 只是一个数据控制的工具,落地到实际应用上才有价值。

案例一:TodoList

本节结合一个 TodoList 体会  switchMapconcatMapexhaustMapmergeMap  如何优化我们的异步逻辑管理。

这几个操作符都有相同的一个效果,即将一个高阶操作映射成一个可观测的值(比如异步请求的结果),但是接收与处理请求的时机不同。

switchMap

解读:每次只处理最近一次接收的请求,如果上个请求还没完成即发起下个请求,则未完成的请求将被抛弃。

示例:给每个操作增加 1500 ms 的延时,在多次点击之后,只有最后一次的请求被成功处理。

concatMap

解读:每个请求被依次处理,当上个请求处理成功再发起下次请求。

示例:给每个操作增加 1500 ms 的延时,在多次点击之后,请求结果每 1500ms emit 一次。

mergeMap

解读:每个请求都被接收并执行,不做特殊处理。

示例:给每个操作增加 1500 ms 的延时,点击后立即执行无需等待。

exhaustMap

解读:上个请求结束之后才能接收新的请求。

示例:给每个操作增加 1500 ms 的延时,在多次点击过程中,第一个请求还没结束,因此只有第一个请求生效。

案例二:运动的小球

本节通过一个 小球的 JS 动画案例来体会 Rxjs 在开发上带来的逻辑简化以及 Scheduler 的妙用。 我们来看这样一个案例:一个小球能在一片围栏限定的正方形场地内运动,运动方向限定了 上|下|左|右 ,小球运动到围栏边缘即终止。

初步分析

这看起来很简单?我们只需要完成:

// 一个参考的实现
const XY = {
  UP: [0, -1],
  DOWN: [0, 1],
  LEFT: [-1, 0],
  RIGHT: [1, 0],
};
const position = { x: 0, y: 0 };
const isValid = (x, y) => x >= 0 &amp;&amp; x <= MAX_LEN &amp;&amp; y >= 0 &amp;&amp; y <= MAX_LEN;
const run = (action) => {
  const factor = XY[action];
  let x = factor[0] * step + position.x;
  let y = factor[1] * step + position.y;
  if (!isValid(x, y)) {
    ball.style.left = `${x}px`;
    ball.style.top = `${y}px`;
    position.x = x;
    position.y = y;
    requestAnimationFrame(() => run(action));
    return true;
  }
  return false;
};

进一步分析

如果多个命令连续发出,小球应该怎样运动呢?

这里如果不做处理,几个命令的效果可能会叠加,造成掉帧或者失效,这当然不是我们想要的。其他几个方案:

这里其实对应了很多异步请求的场景。在这种情况下,我们想象下代码会变成什么样子?—— 我们可能需要写额外的钩子函数、维护命令队列、结合发布订阅模式或者命令模式去处理不同的情况。

使用操作符简化

结合上一节的知识,这里我们只需要简单改变一个操作符就能实现不同需求下想要的效果。

import { animationFrameScheduler, fromEvent, of } from "rxjs";
import { concatMap, filter, repeat, takeWhile, map } from "rxjs/operators";

fromEvent(toolBarRef, "click").pipe(
  concatMap((event) =>
    of(event.target.dataset.action, animationFrameScheduler).pipe(
      filter((action) => !!action),
      map(run),
      repeat(),
      takeWhile((r) => r)
    )
  )
);

案例三:聊天室

在这一节,我们通过一个纯前端的简易聊天室来体会 Rxjs 中的  Subject  是如何应用的。

Subject(主体)是一个代理对象,它既是一个 Observable 可以被订阅,又是一个 Observer 可以发送订阅。它的实例可以被多个对象共享。

我们先定义一个 Service 用于管理传输的消息。

class ChatRoomService {
  private messageSource$ = new Subject();
  // 作为 Observable
  message$ = this.messageSource$.pipe(
    scan((acc, cur) => [...acc, cur], [])
  );
  // 作为 Observer
  send(msg) {
    this.messageSource$.next(msg);
  }
}

Subject

Subject 只有被订阅的时候才能接收到数据,因此初始进入聊天室的时候,消息记录为空,类似实时聊天室。

BehaviorSubject

BehaviorSubject 是 Subject 的一个子类,它在有新的订阅时,会重播最近一个值。

ReplaySubject

BehaviorSubject 是 Subject 的一个子类,它在有新的订阅时,会重播全部值。

AsyncSubject

AsyncSubject 是 Subject 的一个子类,它在有新的订阅时,只有异步结果成功才会重播最近一个值。

总结

本文提供了 Rxjs 在 React 中应用的一种方法,并简单介绍了 Rxjs 的基础概念和一些操作符的实际使用。实际上,在定时任务管理、数据流整合、数据流缓存上 Rxjs 也有很大用处。使用 Rxjs 其实就像基于操作符的组合游戏,玩法多样。后面有时间再写一些进阶用法~

涉及到的代码已经放到 github

参考