20 January 2020 — Tech
在字节跳动新社区 EX 前端组实习期间的产物,主要是介绍一下如何入门比较好以及 RxJS 和 React Hooks 的结合使用。
建议学习顺序
- 官网 overview
- 自己写经典场景的代码
- 对比一下别人的实现
- 看看 operators
- 有兴趣看看源码啥的?
整体概念
基本概念
- Observable:
- represents the idea of an invokable collection of future values or events.
- 未来的一系列可调用值或事件
- Observer:
- is a collection of callbacks that knows how to listen to values delivered by the Observable.
- 知晓如何监听 Observable 发出的值的回调函数
- Subscription:
- represents the execution of an Observable, is primarily useful for cancelling the execution.
- 是 Observer 和 Observable 之间的关系,subscribe 之后,Observable 才会执行。一般用来取消
- Operators:
- are pure functions that enable a functional programming style of dealing with collections with operations like map, filter, concat, reduce, etc.
- 对 Input Observable 发出的值做处理,发出 Output Observable
- Subject:
- is the equivalent to an EventEmitter, and the only way of multicasting a value or event to multiple Observers.
- 观察者模式 -> 发布订阅模式
- Schedulers:
- are centralized dispatchers to control concurrency, allowing us to coordinate when computation happens on e.g. setTimeout or requestAnimationFrame or others.
- 控制并发性,协调 setTimeout 或 requestAnimationFrame
宝石图
Multicasted Observables & Reference counting
Observable 可通过 Subject 向多个 subscribers(监听器)发送消息,这成为 multicasted(多播)Observable,但是 unicast(单播的) Observable,即普通的 Observable 只有一个 Observer。 注意:这里的【一个 】Observer 意思在于,同一个 Observable 被两次 subscribe,第二次监听的是一个用同样方法构建出来的新的 Observer。比如对 fromEvent(document, ‘click’).pipe(mapTo(1),scan((acc, one) => acc + one, 0)) subscribe 两次,两次的计数是独立的。
multicast returns an Observable that looks like a normal Observable, but works like a Subject when it comes to subscribing. multicast returns a ConnectableObservable, which is simply an Observable with the connect() method.
The connect() method is important to determine exactly when the shared Observable execution will start. Because connect() does source.subscribe(subject) under the hood, connect() returns a Subscription, which you can unsubscribe from in order to cancel the shared Observable execution.
import { from, Subject } from "rxjs";
import { multicast } from "rxjs/operators";
const source = from([1, 2, 3]);
const subject = new Subject();
const multicasted = source.pipe(multicast(subject));
// These are, under the hood, `subject.subscribe({...})`:
multicasted.subscribe({
next: v => console.log(`observerA: ${v}`)
});
multicasted.subscribe({
next: v => console.log(`observerB: ${v}`)
});
// This is, under the hood, `source.subscribe(subject)`:
multicasted.connect();
但是在 subscribe 之后显式调用 connect 有点弱智,所以就有了 refCount,监听器数量从 0 到 1,自动 connect,变为 0 时完全 unsubscribe。
const source = interval(500);
const subject = new Subject();
const refCounted = source.pipe(multicast(subject), refCount());
let subscription1, subscription2;
// This calls `connect()`, because
// it is the first subscriber to `refCounted`
console.log("observerA subscribed");
subscription1 = refCounted.subscribe({
next: v => console.log(`observerA: ${v}`)
});
// This is when the shared Observable execution will stop, because
// `refCounted` would have no more subscribers after this
setTimeout(() => {
console.log("observerA unsubscribed");
subscription1.unsubscribe();
}, 2000);
Subject
- BehaviorSubject 有监听器加入时,先发送当前值
- ReplaySubject 会记录多个值,并且在有新监听器加入时,把最近的 n 个值(可配置)一起发送过去。
- AsyncSubject 只在 complete 的时候,发送最近一次的值
Operators 总结
触发条件控制
用 Observable 做条件限制
- audit:给定 Observable 触发,才会把 source 的值传出来
- debounce:防抖,一般传 ( ) => interval,如果在给定时间内,source 没有新事件,则将之前的值传出,否则丢弃并重新计时
- delayWhen:一般传 ( ) => interval, source 的每一个值的延迟,都满足给定 Observable 的条件。
- 比如 source 是 of(1,2,3);但是给定 Observable 是 (num) => interval((4-num)*1000),则 1 会在 3 秒后发出,3 会在 1 秒或发出
- Sample:给定 Observable 触发,如果距上一次触发,source 有新值,则发送最新值(在此期间内的旧的忽略),没有则不传
- throttle:节流,一般传 () => interval,在给定时间内的新事件会被忽略
- observeOn:在监听事件发生时才触发,比如 observeOn(animationFrameScheduler) 可以做渲染优化
- skipUntil:直到传入的 Observable 触发时,source 的值才会被发送
- takeUntil:传入的 Observable 触发后,source complete
- timeoutWith:(time, Observable),指定时间内 source 没有发出值,则 subscribe 给定 Observable
用值做条件限制
- auditTime:每隔多长时间发一次 source Observable 的最新值
- debounceTime:防抖
- delay:source 值发送延迟 xx 时间,还可以传 Date,Observable 在此日期后才有效
- distinctUntilChanged:邻位去重,即如果当前 source 值与上一次一样,忽略此次
- distinctUntilKeyChanged:邻位去重,即如果当前 source 值与上一次的值,键为 key 的值一样,忽略
- filter:source 值传入,返回为 true 才触发
- sampleTime:给定时间内,如果 source 有新值,则发送最新值(在此期间内的旧的忽略),没有则不发送
- throttleTime:(duration, scheduler, config: { leading, trailing })
- 节流,可以做双击的判断 throttleTime(400, asyncScheduler,{ leading: false, trailing: true })
- subscribeOn:返回按照指定的 ScheduleLike 监听 source 的 Observable
- skip:source 的前几个值会被忽略
- skipLast:source complete 之前的几个值会被忽略
- skipWhile:传入的函数返回值为 true 时,source 的值都会被忽略
- take:返回 source 发出的前 n 个值,并 complete
- takeLast:返回 source complete 前几个值,并 complete
- takeWhite:传入的函数返回值为 true 时,source 的值都会被发出
- timeout:指定时间内,source 没有发出值则直接 error 一个转多个
- window:将 source 分为多个 Observable,分隔点为传入的 Observable 触发时
- windowCount:(windowSize, startWindowEvery) 将 source 分为多个 Observable,每 n 个值为一个
- windowTime:将 source 分为多个 Observable,每 time 时间分一次
- windowToggle:(openings, closingSelector) 将 source 分为多个 Observable,每 openings 和 closingSelector 对应的事件发生之间的值分为一个
- windowWhen:将 source 分为多个 Observable,closingSelector 每俩值之间的分为一个
多个转一个
高阶 Observable 处理
《高阶 Observable》是指返回值为 Observable 的 Observable(参考高阶函数),比如 fromEvent(‘click’).map(() => interval(1000)) 的意思就是每一次 click 都会创建一个 interval Observable,则一个点击监听的 Observable 会发送多个 interval Observable。
- combineAll:多个高阶 Observable,有任意 next 时,并且此时所有都发送过值时,发送 Array 类型的组合值
- concatAll:多个高阶 Observable,前一个 complete 之后,才会处理下一个 的 next
- mergeAll:多个高阶 Observable,任一 next 时,发送其值
- switchAll:多个高阶 Observable,后一个第一次 next 时,前一个的值就不管了
- zipAll:
多个 Observable 处理
对多个 Observable 进行统一处理的函数,不是 operators。这里因为跟上下文有关联所以放一起
- combineLatest:多个 Observable,任意 next,并且此时所有 observable 都发送过值时,发送 Array 类型的组合值
- concat:多个 Observable,前一个 complete 后,处理下一个 next 值
- exhaust:前一个 Observable 还未 complete 时,后一个 Observable 被丢弃
- merge:多个 Observable,任意 next 时,发送值
Observable 转换: xxxMap
这类 operators 处理的问题是将多个 Observable 合并成一个,规则如下
- concatMap:前一个 complete 再处理下一个
- exhaustMap:前一个未 complete 时,丢弃下一个。
- flatMap:同 mergeMap
- mergeMap:按所有 Observable next 发送的顺序,发出值
- switchMap:永远发出最新被创建的 Observable 的值
- expand:用前一个 Observable 的值递归构造新 Observable,最后 merge 成一个,用返回 Empty 或者 take/takeUntil 停止
xxxMapTo
这类 operators 处理的问题是将多个 Observable 合并成一个,与上面不同的是,传参是个固定值,
- concatMapTo
- mergeMapTo
- switchMapTo
上述多类函数的比较
// concat
const timer1 = interval(1000).pipe(take(10));
const timer2 = interval(2000).pipe(take(6));
const timer3 = interval(500).pipe(take(10));
const result = concat(timer1, timer2, timer3);
// concatAll
const clicks = fromEvent(document, 'click');
const higherOrder = clicks.pipe(
map(ev => interval(1000).pipe(take(4))),
);
## 这边如果直接 subscribe(highOrder),得到的就是 Interval Observable
const firstOrder = higherOrder.pipe(concatAll());
// concatMap
const clicks = fromEvent(document, 'click');
const result = clicks.pipe(
concatMap(ev => interval(1000).pipe(take(4)))
);
// concatMapTo
const clicks = fromEvent(document, 'click');
const result = clicks.pipe(
concatMapTo(interval(1000).pipe(take(4))),
);
Observable 值处理
缓存:buffer
- buffer:给定 Observable 触发时传 source 缓存的值,每次重新缓存
- bufferCount:
- (bufferSize, startBufferEvery),到 bufferSize 时开始缓存,每次新增 startBufferEvery 时更新缓存并发出新的 Observable
- bufferTime:
- (time):每 time 时间发送一次在过去 n 秒内 source 的新值
- (next, every):每 every 时间发送过去 next 时间内 source 的新值
- bufferToggle:
- (openings, closingSelector):发送在 openings 和对应的 closingSelector 之间 source 产生的值
- bufferWhen:
- (closingSelector):发送在 closingSelector 触发之前 source 产生的值
多播 Observable 相关
- publish:处理成 connectable Observable,需要先 connect 才会 next 值
- refCount:使 ConnectableObservable 像常规 Observable 一样,直接 subscribe 就会获得发出的值
- share:pipe(share( )),相当于 multicast(( ) => new Subject( )) 和 refCount( )
- shareReplay:
- publishBehaviour:subscribe 时先发送当前值
- publishLast:返回 connectable observable 序列,只发送 complete 前的最后一个值
- publishReplay:(bufferSize, windowTime, selectorOrScheduler, scheduler),subscribe 时发送 n 个
作用于 complete
- count:不传参统计 next 次数,传函数为参数,统计符合函数条件的值的个数
- max:过程中最大值
- min:过程中最小值
- reduce:在 complete 的时候发出最后的 reduce 结果
- distinct:对 source 所有发出的值进行去重,在 complete 的时候给出
- endWith:在 source complete 之后,再发一个值再 complete
- defaultIfEmpty:如果 source 在 complete 时都未发出任何值,则将这个值传出
- throwIfEmpty:如果 source 在 complete 时都未发出任何值,抛出 EmptyError
- elementAt:返回 source 发出的事件流的给定 index 的事件
- every:返回 source 发出的事件流的所有值是否符合该函数要求
- find:返回 source 发出事件流的所有值中第一个符合要求的值,并且 complete
- findIndex:返回 source 发出的所有值中第一个符合要求的 index,并且 complete
- first:传参则返回 source 发出的所有值中第一个符合要求的值,不传参返回第一个;并 complete、
- ignoreElements:忽略 source 的所有值,在 complete 或 error 的时候告知
- isEmpty:在 complete 的时候告知 source 是否发送过值,没有为 true
- last:在 complete 的时候给出 source 发送的最后一个值
- sequenceEqual:source 与 传入的 Observable 发出的值是否完全一致
- single:是否有且只有一个符合要求的值,不同情况返回值不一样
- toArray:在 complete 时,将 source 所有发出的值串成 Array 发出
作用于过程
- dematerialize:将 Observable of Notifications 转化为 actual Observable
- materialize:将 source 发出的所有值转化为 Notification,并带有类型说明(next 还是 error)
- mergeScan:缓存中间计算结果,每次可以根据之前的结果进行计算
- scan:与 mergeScan 差不多
- groupBy:对 source 发出事件流的所有值做分组,返回多个 GroupedObservable,每组一个
- map:对 source 的值做转换,将结果值转化为 Observable 发出
- mapTo:Observable 发出的值转化为固定值
- multicast:将 source 转化为多播的 Observable
- expand:用前一个 Observable 的值递归构造新 Observable,最后 merge 成一个,用返回 Empty 或者 take/takeUntil 停止
- pairwise:将 source 相邻的值发出,bufferCount(2,1)?
- pluck:将 source 发出的值中键为 key 的值取出,多个值就一直取
- repeat:返回多次 subscribe source 的 Observable,重复发送 complete 之前的所有值
- retry:source error 时,重新 subscribe,最大次数为 count
- repeatWhen:每次传入的 Observable 触发时,重复 source complete 之前的所有值,如果前一次重复还未结束,会进行新一次的重复
- retryWhen,传入的 Observable 触发时,重复 source error 之前的所有值
- startWith:指定 source 第一个发出的值
- tap:对 source 的所有值都做副作用处理,但是返回一个与 source 相同的 Observable
- timeInterval:返回当前值,和收到当前值与上一个值之间的时间间隔
- timestamp:返回值与当前值产生的事件
- withLatestFrom:source 有新值时,返回给定 Observable 的最新值
错误处理
operators
- catchError:错误处理
- finalize: 返回一个 source 的镜像 Observable,但是在 complete 或 error 时调用指定函数
- ignoreElements:忽略 source 的所有值,在 complete 或 error 的时候告知
- timeout:指定时间内,source 没有发出值则直接 error
- throwIfEmpty:如果 source 在 complete 时都未发出任何值,抛出 EmptyError
- retry:source error 时,重新 subscribe,最大次数为 count
- retryWhen,传入的 Observable 触发时,重复 source error 之前的所有值,最多执行 n 次
function
- onErrorResumeNext:source 出错或者 complete,执行传入的 Observable,source 不会触发 erro
函数总结
多个 Observable 转一个
- combineLatest
- Concat
- merge
- zip
Observable 创建
- defer
- forkJoin
- from
- fromEvent
- fromEventPattern
- generate:(init, endCondition, generateFunc) 创建一系列值,
- Identity:
- iif:(condition, trueResult, falseResult) 在 subscribe 时根据条件控制具体执行哪个 Observable
- interval:创建隔固定时间发送值的 Observable
- of
- pairs:将 object 转化为一系列 [key, value] 值的 Observable
- partition:将 source Observable 分为两个 Observable,一个全是符合条件的值,另一个是不符合的
- race:返回最快开始发送值的 Observable 的最新值
- range:(start, count, scheduler)
- scheduled:
- throwError:创建一个没有发出任何值直接 error 的 Observable
- timer:(dueTime, periodOrScheduler) 创建一个 dueTime 之后开始,每隔 period 之后发索引值的 Observable
- using:(resourceFactory, observableFactory)
其他
- isObservable
- noop
- onErrorResumeNext:source 出错或者 complete ,执行传入的 Observable,source 不会触发 error
一些实践场景
- 搜索框(防抖、取最新一次异步结果,去重)
- 动态拉数据渲染图表(n 个数据之后每次缓存 n 个数据)—— 《新手视角 - RxJS 之实时监控图表》
- 全局共享数据,使用 BehaviorSubject —— ex-child 项目中的使用
- 拖拽(mousedown、mousemoving、mouseup 等事件之间有联系的事件处理)
- 事件合并(PC 端、移动端 click 事件合并处理)
- 数据量比较大时的处理
- 《使用 React + Rxjs 实现一个虚拟滚动组件》
- 对先后/并发要求比较高的场景
个人体验
关于 why/when use 问题比较好的一篇文章 《What is RxJS? And Why You Should Know About It》
好用的地方
- 丰富的 operators API 使得少写很多代码,少创建很多临时变量(比 Spark 这种灵活方便很多)
-
RxJS allows you to solve hard problems with less code, promotes maintainability, readability, flexibility, and compostability.
- Observable / data push 模式在处理异步问题上,比 callback、Promise 都更简洁有效
- Promise 只能处理一个 value
- Promise 不能取消
-
- 可以和很多框架搭配使用,包括最新的 React Hooks
- 《像呼吸一样自然:React hooks + RxJS》
- 《更好用的 RxJS+React hooks 集成方案》
-
- 多事件源/事件处理地情景方便处理。在仅有的几次实践中比较认同的话:
- In the case of RxJS, the problem being solved is the ability to handle asynchronous calls with multiple events
- —— 《What is RxJS? And Why You Should Know About It》
比较难/烦的点
- 文档太差了,没有整体设计的阐述(一开始难理解)
- API 多但是搜索功能差,使用的时候比较难找到自己想要用的东西
- API 太多了,掌握(背诵)程度直接影响代码质量、写代码过程的便捷程度,学习曲线陡峭
- 管道流式处理的过程中,要注意自己 pipe 中的每一步想返回什么类型,有些返回 Observable,有些返回的是高阶 Observable 。大多数情况下是什么时候使用 map,什么时候使用 xxxMap。
个人练习代码
搜索框
useEffect(() => {
if (inputRef.current) {
fromEvent(inputRef.current!, "input")
.pipe(
debounceTime(300),
map((event: any) => event.target.value),
distinctUntilChanged(),
switchMap(async (value: any, _index) => {
try {
return await search(value);
} catch (e) {
return null;
}
})
)
.subscribe(console.log);
}
}, []);
拖拽事件
const [position, setPosition] = useState({ x: 0, y: 0 });
useEffect(() => {
const positionObservable = new BehaviorSubject(position);
positionObservable.subscribe(val => {
setPosition(val);
});
mouseDown
.pipe(
withLatestFrom(positionObservable),
map(([event, prev]: Array<any>) => {
return { startX: event.clientX, startY: event.clientY, prev };
}),
switchMap(({ startX, startY, prev }) => {
return mouseMove.pipe(
map((event: any) => {
return {
x: prev.x + event.clientX - startX,
y: prev.y + event.clientY - startY
};
}),
takeUntil(mouseUp)
);
})
)
.subscribe(pos => {
positionObservable.next(pos);
});
}, []);
拖拽事件 —— 抽出 Hook
const [positionState, setPosition, positionSubject] = useBehaviorSubject({
x: 0,
y: 0
});
useEffect(() => {
mouseDown
.pipe(
withLatestFrom(positionSubject),
map(([event, prev]: Array<any>) => {
return { startX: event.clientX, startY: event.clientY, prev };
}),
switchMap(({ startX, startY, prev }) => {
return mouseMove.pipe(
map((event: any) => {
return {
x: prev.x + event.clientX - startX,
y: prev.y + event.clientY - startY
};
}),
takeUntil(mouseUp)
);
})
)
.subscribe(pos => {
setPosition(pos);
});
},[]
export const useBehaviorSubject = initVal => {
const subjectRef = useRef<any>();
const [state, setState] = useState(initVal);
if (!subjectRef.current) {
subjectRef.current = new BehaviorSubject(initVal);
}
const triggerNewVal = useCallback(value => {
if (subjectRef.current) {
subjectRef.current!.next(value);
}
}, []);
useEffect(() => {
subjectRef.current.subscribe(value => {
setState(value);
});
return () => {
subscription.unsubscribe();
subjectRef.current.complete();
};
}, []);
return [state, triggerNewVal, subjectRef.current];
};