引言
怎样快速上手一个新的框架或者类库——当然是先从一个 Demo 入手,先会用再去理解背后的原理和思想。
本文主要结合 Rxjs v6 在 React v16 中的使用,从实例出发,谈谈 Rxjs 的应用场景和使用技巧。
概念理解
TL;DR
-
Observer 通过观察者模式订阅 Observable 产生的数据
-
Observable 是单播的;基于代理对象 Subject 可以支持多播
-
Scheduler 基于事件循环机制和任务调度,可以优化 Observable 产生数据的时机
-
Rxjs 其实就是一个简化数据流操作的工具集
Observable & Observer
在 Rxjs 的世界中, Observable 和 Observer 是两个最重要的两个概念,所有操作都是基于这两个对象产生的。
Observable 是一个“可以被观察的对象/发布者”,而 Observer 则是一个“观察者”。一个 Observable 可以生产数据,如果一个观察者对某个被观察者生产的数据感兴趣的话,可以通过主动订阅( subscrible
)这个被观察者。之后,如果被观察者本身的状态发生改变,会发送一个通知,调用观察者的更新方法。如果被观察者不想再接收通知,也可以主动取订( unsubscribe
)。
在 Rxjs 中,Observer 其实就是一个对象,对象上可以包含 next
、 complete
和 error
三种方法。
-
next
是 Observable 通知 Observer “当前有推送的新数据”,不确定时候还有“更多数据” -
complete
是 Observable 通知 Observer 当前推送已经完结 -
error
是出错时的处理状态
一般来说,Observer 可以只定义 next
,其余两个方法都是完结状态,可以只在必要的时候处理。
Subject
单播、广播和多播
-
单播(unicast):一对一
-
广播(broadcast):全局通知,不区分受众
-
多播(multicast):可以通知给指定的一群受众
Cold Observable & Hot Observable
-
Cold Observable 指的是,当一个 Observable 被订阅时,每次都产生一个独立的数据流,无法被多个 Observer 共享,当有新的订阅时其实是生成了新的 Observable 对象。实现的是单播。
-
Hot Observable 指的是,一个 Observable 产生的数据流可以被多个 Observer 订阅并共享。实现的是多播。
在很多情况下,我们希望多个 Observer 订阅到的是同一个数据流。那么,我们要如何把 Cold Observable “加热” 成 Hot Observable —— 利用 Subject!
Subject
在 Rxjs 中,Subject 是一个代理对象,它既有 Observable 的接口,又有 Observer 的接口。 利用 Subject 可以实现多播,当下游需要 Hot Observable 的时候,可以订阅一个 Subject 对象。
Rxjs 提供了三个基础的多播操作符:
-
multicast
-
publish
-
share
Rxjs 提供了三个高级的多播操作符: -
publishLast
-
publishRelpay
-
publishBehavior
这背后其实对应了 Subject 的三个子类提供支持: -
AsyncSubject
-
ReplaySubject
-
BehaviorSubject
利用这三个子类,我们可以在产生新的订阅时,指定数据重播的方式。
Scheduler
Scheduler 可以解释为“调度器”,它能用于控制 Rxjs 数据流中数据消息的推送节奏,增强数据流处理的性能。
我们知道 JavaScript 是单线程运行的,在 Rxjs 中的操作一般是通过调用栈来完成的,但是有时我们需要生产一个数据量很大的数据,同步执行可能导致内存占用过大。因此,Rxjs 提供了 4 种增强性能的任务调度器:
-
asapScheduler
尽快执行 -
asyncScheduler
基于 setTimeout 延时执行,用于基于时间吐出数据的场景 -
queueScheduler
基于队列实现,用于迭代一个大的集合的场景 -
animationFrameScheduler
基于 animationFrame 延迟执行,用于动画场景
这四个调度器可以作为 Observable 的第二个参数来改变 Observable,比如 of
、 from
、 range
、 interval
等。同时,Rxjs 也提供了两个用于指定特定调度器的操作符:
-
observeOn
-
subscribeOn
起手式
订阅数据流
在 React 组件中使用 Rxjs,最简单的用法就是:在组件初始挂载的时候,订阅一个数据流;在组件卸载的时候,取消订阅这个数据流。
- 订阅事件流
import { fromEvent } from "rxjs";
const Component = () => {
useEffect(() => {
let subscription = fromEvent(document.body, "click").subscribe((e) => {
console.log("click", e);
});
return () => subscription.unsubscribe();
}, []);
return <div></div>;
};
- 订阅计数流 这里我们使用 rxjs-hooks 提供的
useObservable
订阅数据流,其实就是将前面的subscribe
和unsubscribe
封装使用。
const Counter = () => {
const numbers$ = useMemo(() => interval(1000), []);
const count = useObservable(() => numbers$);
return <div>{count}</div>;
};
数据隔离
在开发中,我们通常会将业务组件的视图和应用逻辑进行分离,以保证组件的精简和纯净,同时便于复用逻辑层。
Angular 通过依赖注入来将应用逻辑分解为服务,并让这些服务可用于各个组件中。我们可以利用这个思想,将 Rxjs 的操作单独抽离成一个 Service 层,在组件中通过注册使用这些服务。
我们通过一个 TodoList 来分解这个过程。
- 定义一个 TodoService
// todo/model/service.js
class TodoService {
// 用 $ 标记一个 Observable
private refresh$ = new BehaviorSubject(0);
private loadingSource$ = new BehaviorSubject(false);
loading$ = this.loadingSource$.asObservable();
todoList$ = combineLatest(this.refresh$).pipe(
debounceTime(250),
tap(() => {
this.loadingSource$.next(true);
}),
switchMap(() => {
return from(TodoApi.requestList());
}),
tap((res) => {
this.loadingSource$.next(false);
})
);
refresh() {
this.refresh$.next(Math.random());
}
}
- 在组件中使用 Service 我们可以通过一个简单的
useInstance
,在组件的整个生命周期内使用这个 Service 的单例。
const useInstance = (instanceClass) => {
const instance = useRef(null);
return instance.current || (instance.current = new instanceClass());
};
const Todo = (props) => {
const todoService = useInstance(TodoService);
return <div>...</div>;
};
- 在组件中订阅并使用数据流
// todo/index.jsx
export const Todo = () => {
const todoService = useInstance(TodoService);
const todoList = useObservable(() => todoService.todoList$, []);
const loading = useObservable(() => todoService.loading$, false);
return (
<div>
<div>
{todoList.map((todo) => (
<div key={todo.id}>
<input type="checkbox" checked={todo.done}></input>
<span>{todo.name}</span>
</div>
))}
<div className={cx({ [styles.loading]: loading })}></div>
</div>
</div>
);
};
数据流管理
在上述的例子中,我们在单个组件中使用 useInstance
来注册数据源,但是,如果需要在多个组件中共享数据源呢?我们可以基于 Context 甚至 Redux 来管理。
- 定义 Context
const InjectorContext = createContext({ getInstance: () => null });
- 使用 Map 管理多个 Service
const useInjector = (classes) => {
const instanceMap = useInstance(Map);
useEffect(() => {
classes.forEach((cls) => {
if (!instanceMap.has(cls.name)) {
instanceMap.set(cls.name, new cls());
}
});
}, [classes, instanceMap]);
const getInstance = useCallback(
(cls) => {
if (!instanceMap.has(cls.name)) {
instanceMap.set(cls.name, new cls());
}
return instanceMap.get(cls.name);
},
[instanceMap]
);
return { getInstance };
};
const useInject = (injectClass) => {
const instance = useRef();
const Injector = useContext(InjectorContext);
return (
instance.current || (instance.current = Injector.getInstance(injectClass))
);
};
- 在组件中共享 Service 实例
const DataA = (props) => {
const shareService = useInject(ShareService);
return <div>...</div>;
};
const DataB = (props) => {
const shareService = useInject(ShareService);
return <div>...</div>;
};
const SourceApp = () => {
const injector = useInjector([ShareService]);
return (
<InjectorContext.Provider value={injector}>
<div>
<DataA></DataA>
<DataB></DataB>
</div>
</InjectorContext.Provider>
);
};
More
更复杂的情况是,如果两个服务之间需要互相引用,我们可以参考 injection-js、 react-ioc 进行处理,这里就不作深入研究了。
在案例中解读
使用一个新框架,应该从实际业务场景出发,看它是否能真正解决开发中的一些难点。毕竟 Rxjs 只是一个数据控制的工具,落地到实际应用上才有价值。
案例一:TodoList
本节结合一个 TodoList 体会 switchMap
、 concatMap
、 exhaustMap
、 mergeMap
如何优化我们的异步逻辑管理。
这几个操作符都有相同的一个效果,即将一个高阶操作映射成一个可观测的值(比如异步请求的结果),但是接收与处理请求的时机不同。
switchMap
解读:每次只处理最近一次接收的请求,如果上个请求还没完成即发起下个请求,则未完成的请求将被抛弃。
示例:给每个操作增加 1500 ms 的延时,在多次点击之后,只有最后一次的请求被成功处理。
concatMap
解读:每个请求被依次处理,当上个请求处理成功再发起下次请求。
示例:给每个操作增加 1500 ms 的延时,在多次点击之后,请求结果每 1500ms emit 一次。
mergeMap
解读:每个请求都被接收并执行,不做特殊处理。
示例:给每个操作增加 1500 ms 的延时,点击后立即执行无需等待。
exhaustMap
解读:上个请求结束之后才能接收新的请求。
示例:给每个操作增加 1500 ms 的延时,在多次点击过程中,第一个请求还没结束,因此只有第一个请求生效。
案例二:运动的小球
本节通过一个 小球的 JS 动画案例来体会 Rxjs 在开发上带来的逻辑简化以及 Scheduler 的妙用。 我们来看这样一个案例:一个小球能在一片围栏限定的正方形场地内运动,运动方向限定了 上|下|左|右
,小球运动到围栏边缘即终止。
初步分析
这看起来很简单?我们只需要完成:
-
实现
上|下|左|右
4 个方向上的命令函数 -
结合
requestAnimationFrame
,每次步进step
的长度,来显示运动轨迹 -
实现一个钩子函数,在每次步进之前判断下一步是否超出范围,超出则终止
// 一个参考的实现
const XY = {
UP: [0, -1],
DOWN: [0, 1],
LEFT: [-1, 0],
RIGHT: [1, 0],
};
const position = { x: 0, y: 0 };
const isValid = (x, y) => x >= 0 && x <= MAX_LEN && y >= 0 && y <= MAX_LEN;
const run = (action) => {
const factor = XY[action];
let x = factor[0] * step + position.x;
let y = factor[1] * step + position.y;
if (!isValid(x, y)) {
ball.style.left = `${x}px`;
ball.style.top = `${y}px`;
position.x = x;
position.y = y;
requestAnimationFrame(() => run(action));
return true;
}
return false;
};
进一步分析
如果多个命令连续发出,小球应该怎样运动呢?
这里如果不做处理,几个命令的效果可能会叠加,造成掉帧或者失效,这当然不是我们想要的。其他几个方案:
-
阻塞:如果当前运动未停止,则禁止下一个命令的操作按钮
-
排队:连续发出命令,通过一个命令队列存储,当一个操作结束之后判断命令队列是否为空,不为空则取出下一个命令执行
-
竞争:连续发出命令,下一个命令会取消上一个命令
这里其实对应了很多异步请求的场景。在这种情况下,我们想象下代码会变成什么样子?—— 我们可能需要写额外的钩子函数、维护命令队列、结合发布订阅模式或者命令模式去处理不同的情况。
使用操作符简化
结合上一节的知识,这里我们只需要简单改变一个操作符就能实现不同需求下想要的效果。
-
定义一个鼠标点击流
-
指定 animationFrameScheduler 作为任务调度
-
使用 takeWhile,当 run 函数返回
false
值时终止 repeat -
使用 filter 过滤无效的命令
import { animationFrameScheduler, fromEvent, of } from "rxjs";
import { concatMap, filter, repeat, takeWhile, map } from "rxjs/operators";
fromEvent(toolBarRef, "click").pipe(
concatMap((event) =>
of(event.target.dataset.action, animationFrameScheduler).pipe(
filter((action) => !!action),
map(run),
repeat(),
takeWhile((r) => r)
)
)
);
- 使用 concatMap 的效果如下:
- 使用 switchMap 的效果如下:
案例三:聊天室
在这一节,我们通过一个纯前端的简易聊天室来体会 Rxjs 中的 Subject
是如何应用的。
Subject(主体)是一个代理对象,它既是一个 Observable 可以被订阅,又是一个 Observer 可以发送订阅。它的实例可以被多个对象共享。
我们先定义一个 Service 用于管理传输的消息。
class ChatRoomService {
private messageSource$ = new Subject();
// 作为 Observable
message$ = this.messageSource$.pipe(
scan((acc, cur) => [...acc, cur], [])
);
// 作为 Observer
send(msg) {
this.messageSource$.next(msg);
}
}
Subject
Subject 只有被订阅的时候才能接收到数据,因此初始进入聊天室的时候,消息记录为空,类似实时聊天室。
BehaviorSubject
BehaviorSubject 是 Subject 的一个子类,它在有新的订阅时,会重播最近一个值。
ReplaySubject
BehaviorSubject 是 Subject 的一个子类,它在有新的订阅时,会重播全部值。
AsyncSubject
AsyncSubject 是 Subject 的一个子类,它在有新的订阅时,只有异步结果成功才会重播最近一个值。
总结
本文提供了 Rxjs 在 React 中应用的一种方法,并简单介绍了 Rxjs 的基础概念和一些操作符的实际使用。实际上,在定时任务管理、数据流整合、数据流缓存上 Rxjs 也有很大用处。使用 Rxjs 其实就像基于操作符的组合游戏,玩法多样。后面有时间再写一些进阶用法~
涉及到的代码已经放到 github。