Autumn

RxJS 学习笔记

20 January 2020  —  Tech

在字节跳动新社区 EX 前端组实习期间的产物,主要是介绍一下如何入门比较好以及 RxJS 和 React Hooks 的结合使用。

建议学习顺序

  1. 官网 overview
  2. 自己写经典场景的代码
  3. 对比一下别人的实现
  4. 看看 operators
  5. 有兴趣看看源码啥的?

整体概念

基本概念

宝石图

Multicasted Observables & Reference counting

Observable 可通过 Subject 向多个 subscribers(监听器)发送消息,这成为 multicasted(多播)Observable,但是 unicast(单播的) Observable,即普通的 Observable 只有一个 Observer。 注意:这里的【一个 】Observer 意思在于,同一个 Observable 被两次 subscribe,第二次监听的是一个用同样方法构建出来的新的 Observer。比如对 fromEvent(document, ‘click’).pipe(mapTo(1),scan((acc, one) => acc + one, 0)) subscribe 两次,两次的计数是独立的。

multicast returns an Observable that looks like a normal Observable, but works like a Subject when it comes to subscribing. multicast returns a ConnectableObservable, which is simply an Observable with the connect() method.

The connect() method is important to determine exactly when the shared Observable execution will start. Because connect() does source.subscribe(subject) under the hood, connect() returns a Subscription, which you can unsubscribe from in order to cancel the shared Observable execution.

import { from, Subject } from "rxjs";
import { multicast } from "rxjs/operators";
const source = from([1, 2, 3]);
const subject = new Subject();
const multicasted = source.pipe(multicast(subject));

// These are, under the hood, `subject.subscribe({...})`:
multicasted.subscribe({
  next: v => console.log(`observerA: ${v}`)
});
multicasted.subscribe({
  next: v => console.log(`observerB: ${v}`)
});

// This is, under the hood, `source.subscribe(subject)`:
multicasted.connect();

但是在 subscribe 之后显式调用 connect 有点弱智,所以就有了 refCount,监听器数量从 0 到 1,自动 connect,变为 0 时完全 unsubscribe。

const source = interval(500);
const subject = new Subject();
const refCounted = source.pipe(multicast(subject), refCount());
let subscription1, subscription2;

// This calls `connect()`, because
// it is the first subscriber to `refCounted`
console.log("observerA subscribed");
subscription1 = refCounted.subscribe({
  next: v => console.log(`observerA: ${v}`)
});

// This is when the shared Observable execution will stop, because
// `refCounted` would have no more subscribers after this
setTimeout(() => {
  console.log("observerA unsubscribed");
  subscription1.unsubscribe();
}, 2000);

Subject

  1. BehaviorSubject 有监听器加入时,先发送当前值
  2. ReplaySubject 会记录多个值,并且在有新监听器加入时,把最近的 n 个值(可配置)一起发送过去。
  3. AsyncSubject 只在 complete 的时候,发送最近一次的值

Operators 总结

触发条件控制

用 Observable 做条件限制

用值做条件限制

多个转一个

高阶 Observable 处理

《高阶 Observable》是指返回值为 Observable 的 Observable(参考高阶函数),比如 fromEvent(‘click’).map(() => interval(1000)) 的意思就是每一次 click 都会创建一个 interval Observable,则一个点击监听的 Observable 会发送多个 interval Observable。

多个 Observable 处理

对多个 Observable 进行统一处理的函数,不是 operators。这里因为跟上下文有关联所以放一起

Observable 转换: xxxMap

这类 operators 处理的问题是将多个 Observable 合并成一个,规则如下

xxxMapTo

这类 operators 处理的问题是将多个 Observable 合并成一个,与上面不同的是,传参是个固定值,

上述多类函数的比较

// concat
const timer1 = interval(1000).pipe(take(10));
const timer2 = interval(2000).pipe(take(6));
const timer3 = interval(500).pipe(take(10));
const result = concat(timer1, timer2, timer3);

// concatAll
const clicks = fromEvent(document, 'click');
const higherOrder = clicks.pipe(
  map(ev => interval(1000).pipe(take(4))),
);
## 这边如果直接 subscribe(highOrder),得到的就是 Interval Observable
const firstOrder = higherOrder.pipe(concatAll());

// concatMap
const clicks = fromEvent(document, 'click');
const result = clicks.pipe(
  concatMap(ev => interval(1000).pipe(take(4)))
);

// concatMapTo
const clicks = fromEvent(document, 'click');
const result = clicks.pipe(
  concatMapTo(interval(1000).pipe(take(4))),
);

Observable 值处理

缓存:buffer

多播 Observable 相关

作用于 complete

作用于过程

错误处理

operators

function

函数总结

多个 Observable 转一个

Observable 创建

其他

一些实践场景

  1. 搜索框(防抖、取最新一次异步结果,去重)
  2. 动态拉数据渲染图表(n 个数据之后每次缓存 n 个数据)—— 《新手视角 - RxJS 之实时监控图表》
  3. 全局共享数据,使用 BehaviorSubject —— ex-child 项目中的使用
  4. 拖拽(mousedown、mousemoving、mouseup 等事件之间有联系的事件处理)
  5. 事件合并(PC 端、移动端 click 事件合并处理)
  6. 数据量比较大时的处理
  7. 《使用 React + Rxjs 实现一个虚拟滚动组件》
  8. 对先后/并发要求比较高的场景

个人体验

关于 why/when use 问题比较好的一篇文章 《What is RxJS? And Why You Should Know About It》

好用的地方

  1. 丰富的 operators API 使得少写很多代码,少创建很多临时变量(比 Spark 这种灵活方便很多)
  1. Observable / data push 模式在处理异步问题上,比 callback、Promise 都更简洁有效
  2. Promise 只能处理一个 value
  3. Promise 不能取消

-

  1. 可以和很多框架搭配使用,包括最新的 React Hooks
  2. 《像呼吸一样自然:React hooks + RxJS》
  3. 《更好用的 RxJS+React hooks 集成方案》

-

  1. 多事件源/事件处理地情景方便处理。在仅有的几次实践中比较认同的话:

比较难/烦的点

  1. 文档太差了,没有整体设计的阐述(一开始难理解)
  2. API 多但是搜索功能差,使用的时候比较难找到自己想要用的东西
  3. API 太多了,掌握(背诵)程度直接影响代码质量、写代码过程的便捷程度,学习曲线陡峭
  4. 管道流式处理的过程中,要注意自己 pipe 中的每一步想返回什么类型,有些返回 Observable,有些返回的是高阶 Observable 。大多数情况下是什么时候使用 map,什么时候使用 xxxMap。

个人练习代码

搜索框

useEffect(() => {
    if (inputRef.current) {
      fromEvent(inputRef.current!, "input")
        .pipe(
          debounceTime(300),
          map((event: any) => event.target.value),
          distinctUntilChanged(),
          switchMap(async (value: any, _index) => {
            try {
              return await search(value);
            } catch (e) {
              return null;
            }
          })
        )
        .subscribe(console.log);
    }
}, []);

拖拽事件

const [position, setPosition] = useState({ x: 0, y: 0 });
useEffect(() => {
  const positionObservable = new BehaviorSubject(position);
  positionObservable.subscribe(val => {
    setPosition(val);
  });

  mouseDown
    .pipe(
      withLatestFrom(positionObservable),
      map(([event, prev]: Array<any>) => {
        return { startX: event.clientX, startY: event.clientY, prev };
      }),
      switchMap(({ startX, startY, prev }) => {
        return mouseMove.pipe(
          map((event: any) => {
            return {
              x: prev.x + event.clientX - startX,
              y: prev.y + event.clientY - startY
            };
          }),
          takeUntil(mouseUp)
        );
      })
    )
    .subscribe(pos => {
      positionObservable.next(pos);
    });
}, []);

拖拽事件 —— 抽出 Hook

const [positionState, setPosition, positionSubject] = useBehaviorSubject({
    x: 0,
    y: 0
});

useEffect(() => {
    mouseDown
      .pipe(
        withLatestFrom(positionSubject),
        map(([event, prev]: Array<any>) => {
          return { startX: event.clientX, startY: event.clientY, prev };
        }),
        switchMap(({ startX, startY, prev }) => {
          return mouseMove.pipe(
            map((event: any) => {
              return {
                x: prev.x + event.clientX - startX,
                y: prev.y + event.clientY - startY
              };
            }),
            takeUntil(mouseUp)
          );
        })
      )
      .subscribe(pos => {
        setPosition(pos);
      });
},[]

export const useBehaviorSubject = initVal => {
  const subjectRef = useRef<any>();
  const [state, setState] = useState(initVal);

  if (!subjectRef.current) {
    subjectRef.current = new BehaviorSubject(initVal);
  }

  const triggerNewVal = useCallback(value => {
    if (subjectRef.current) {
      subjectRef.current!.next(value);
    }
  }, []);

  useEffect(() => {
    subjectRef.current.subscribe(value => {
      setState(value);
    });
    return () => {
      subscription.unsubscribe();
      subjectRef.current.complete();
    };
  }, []);

  return [state, triggerNewVal, subjectRef.current];
};
winter is coming. © CHRISTINE Z