介绍
RxJS 中的 subject 是一种特殊的混合体,可以同时充当可观察者和观察者。 这样,数据可以被推送到主题中,而主题的订阅者将依次接收推送的数据。
主题对于多播或当数据源不容易转换为可观察对象时很有用。 很容易过度使用主题,并且通常,如 这个出色的帖子 中所示,当可以以其他方式创建可观察源时,可以避免主题。
在这篇文章中,您将了解主题、行为主题和回放主题。
先决条件
如果您想继续阅读本文,您将需要:
- 熟悉 RxJS observables 和 observers。
本教程已使用 rxjs v7.3.0 进行了验证。
使用主题
创建主题从 RxJS 的 Subject 的新实例开始:
const mySubject = new Rx.Subject();
可以创建多个订阅,并且在内部主题将保留订阅列表:
const subscription1 = mySubject.subscribe(x => console.log(`${x} ${x}`));
const subscription2 = mySubject.subscribe(x => console.log(x.toUpperCase()));
可以使用 next 方法将数据推送到主题中:
mySubject.next('Hello!');
运行此脚本将产生以下输出:
OutputHello! Hello! HELLO!
对于 subscription1,此代码将获取输入并显示两次。 对于 subscription2,此代码将获取输入并应用 toUpperCase()。
当数据被推送到一个主题中时,它会遍历其内部订阅列表,并将 next 数据放入每个订阅列表中。
将数据推送到订阅
这是一个演示数据如何推送到订阅的示例:
const mySubject = new Rx.Subject();
mySubject.next(1);
const subscription1 = mySubject.subscribe(x => {
console.log('From subscription 1:', x);
});
mySubject.next(2);
const subscription2 = mySubject.subscribe(x => {
console.log('From subscription 2:', x);
});
mySubject.next(3);
subscription1.unsubscribe();
mySubject.next(4);
在此示例中,以下是将在控制台中打印的结果:
OutputFrom subscription 1: 2 From subscription 1: 3 From subscription 2: 3 From subscription 2: 4
请注意,迟到的订阅如何丢失一些已推送到主题中的数据。 稍后我们将看到如何使用 行为主题 或 重放主题 来解决这个问题。
向所有订阅多播数据
主体的真正力量通过 multicasting 发挥作用,其中主体作为观察者传递给可观察对象,这意味着,当可观察对象发出时,数据被多播到所有主体的订阅:
这是一个示例,其中 trickleWords 可观察对象每 750 毫秒发出一个单词。
const mySubject = new Rx.Subject();
const words = ['Hot Dog', 'Pizza', 'Hamburger'];
const trickleWords = Rx.Observable.zip(
Rx.Observable.from(words),
Rx.Observable.interval(750),
word => word
);
const subscription1 = mySubject.subscribe(x => {
console.log(x.toUpperCase());
});
const subscription2 = mySubject.subscribe(x => {
console.log(
x
.toLowerCase()
.split('')
.reverse()
.join('')
);
});
trickleWords.subscribe(mySubject);
在发出所有值后,这将产生以下输出:
OutputHOT DOG god toh PIZZA azzip HAMBURGER regrubmah
对于 subscription1,words 的数组已修改为 toUpperCase()。 对于 subscription2,words 的数组已用 toLowerCase() 和“reverse()”进行了修改。
使用 asObservable
asObservable 运算符可用于将主体转换为可观察对象。 当您想公开来自主题的数据时,这可能很有用,但同时要防止数据无意中被推入主题:
const mySubject = new Rx.Subject();
const myObservable = mySubject.asObservable();
mySubject.next('Hello');
myObservable.next('World!');
这将产生以下输出:
OutputTypeError: myObservable.next is not a function
myObservable 不具备 next、error 或 complete。
处理错误
当主题完成或出错时,所有内部订阅也会完成或出错:
const mySubject = new Rx.Subject();
const subscription1 = mySubject.subscribe(null, error =>
console.log('From subscription 1:', error.message)
);
const subscription2 = mySubject.subscribe(null, error =>
console.log('From subscription 2:', error.message)
);
mySubject.error(new Error('Error!'));
这将产生以下输出:
OutputFrom subscription 1: Error! From subscription 2: Error!
已生成错误消息。
使用重播主题
如前所述,迟到的主题订阅将错过之前发出的数据。 重播主题可以通过保留将发送给新订阅的先前值的缓冲区来帮助解决这个问题。
这是重播主题的使用示例,其中保留了 2 个先前值的缓冲区并在新订阅上发出:
const mySubject = new Rx.ReplaySubject(2);
mySubject.next(1);
mySubject.next(2);
mySubject.next(3);
mySubject.next(4);
mySubject.subscribe(x => {
console.log('From subscription 1:', x);
});
mySubject.next(5);
mySubject.subscribe(x => {
console.log('From subscription 2:', x);
});
这将产生以下输出:
OutputFrom subscription 1: 3 From subscription 1: 4 From subscription 1: 5 From subscription 2: 4 From subscription 2: 5
已存储 2 个值的缓冲区。
使用行为主体
Behavior subject 类似于重播主题,但如果之前没有发出任何值,则只会重新发出最后发出的值或默认值:
const mySubject = new Rx.BehaviorSubject('Hello!');
mySubject.subscribe(x => {
console.log('From subscription 1:', x);
});
mySubject.next(5);
mySubject.subscribe(x => {
console.log('From subscription 2:', x);
});
这将产生以下输出:
OutputFrom subscription 1: Hello! From subscription 1: 5 From subscription 2: 5
发出了默认的 Hello! 值。
结论
在这篇文章中,您了解了主题、行为主题和回放主题。
继续学习 RxJS 缓冲运算符简介 、RxJS:From 运算符 和 如何使用 takeUntil RxJS 运算符以声明方式管理订阅 。