可以重用操作符执行

鉴于下面的例子(科特林代码)

val subject = PublishSubject.create<Int>() val stream = subject.map { println("mapping") it * 2 } stream.forEach { println("A: $it") } stream.forEach { println("B: $it") } subject.onNext(1) subject.onCompleted() 

输出将是

 mapping A: 2 mapping B: 2 

我想要实现的是源观察被映射一次,所有的订户得到的结果,但不执行映射操作,每一个…

喜欢这个

 mapping A: 2 B: 2 

在我的情况下,我有非常昂贵的计算正在进行的延迟和性能是至关重要的,我有一个热点观察源作为一个来源和大量的用户…

我们如何重用运算符执行? 和一般不同的映射操作?

您可以使用cache将源observable的结果缓存到任何未来的订阅者:

 val stream = subject.map { println("mapping") it * 2 }.cache() 

如果你想要更缓慢地控制缓存的方式, replay是值得研究的。

如果您不想缓存源可观察项的每个项目,但只能重新发布新项目,则可以使用autoConnect publish

 val stream = subject.map { println("mapping") it * 2 }.publish() .autoConnect() 

其中给出了以下一系列事件:

 stream.forEach { println("A: $it") } stream.forEach { println("B: $it") } subject.onNext(1) stream.forEach { println("C: $it") } subject.onNext(2) subject.onCompleted() 

将打印:

 mapping A: 2 B: 2 mapping A: 4 B: 4 C: 4 

我找到了解决方案。 为了重新使用管道的执行,我们必须确保只有一个用户,并且这个用户将管道末端的所有排放物传播到所有用户的入口…这听起来很像主体!

如果我们只订购了100次,我们将有100根管道从可观察到的源头开始,而在这种情况下,我们有一根管道,最后分支到100根小管道。

 fun <T> Observable<T>.hub(): Observable<T> { val hub = PublishSubject.create<T>() this.subscribe(hub) return hub } 

现在我们可以做到这一点

 val subject = PublishSubject.create<Int>() val stream = subject.map { println("mapping") it * 2 } val hub = stream.hub() hub.subscribe { println("A: $it") } hub.subscribe { println("B: $it") } subject.onNext(1) subject.onCompleted() 

这将给这个

 mapping A: 2 B: 2 

问题解决了!