可以重用操作符执行
鉴于下面的例子(科特林代码)
val subject = PublishSubject.create<Int>() val stream = subject.map { println("mapping") it * 2 } stream.forEach { println("A: $it") } stream.forEach { println("B: $it") } subject.onNext(1) subject.onCompleted()
输出将是
mapping A: 2 mapping B: 2
我想要实现的是源观察被映射一次,所有的订户得到的结果,但不执行映射操作,每一个…
喜欢这个
mapping A: 2 B: 2
在我的情况下,我有非常昂贵的计算正在进行的延迟和性能是至关重要的,我有一个热点观察源作为一个来源和大量的用户…
我们如何重用运算符执行? 和一般不同的映射操作?
您可以使用cache
将源observable的结果缓存到任何未来的订阅者:
val stream = subject.map { println("mapping") it * 2 }.cache()
如果你想要更缓慢地控制缓存的方式, replay
是值得研究的。
如果您不想缓存源可观察项的每个项目,但只能重新发布新项目,则可以使用autoConnect
publish
:
val stream = subject.map { println("mapping") it * 2 }.publish() .autoConnect()
其中给出了以下一系列事件:
stream.forEach { println("A: $it") } stream.forEach { println("B: $it") } subject.onNext(1) stream.forEach { println("C: $it") } subject.onNext(2) subject.onCompleted()
将打印:
mapping A: 2 B: 2 mapping A: 4 B: 4 C: 4
我找到了解决方案。 为了重新使用管道的执行,我们必须确保只有一个用户,并且这个用户将管道末端的所有排放物传播到所有用户的入口…这听起来很像主体!
如果我们只订购了100次,我们将有100根管道从可观察到的源头开始,而在这种情况下,我们有一根管道,最后分支到100根小管道。
fun <T> Observable<T>.hub(): Observable<T> { val hub = PublishSubject.create<T>() this.subscribe(hub) return hub }
现在我们可以做到这一点
val subject = PublishSubject.create<Int>() val stream = subject.map { println("mapping") it * 2 } val hub = stream.hub() hub.subscribe { println("A: $it") } hub.subscribe { println("B: $it") } subject.onNext(1) subject.onCompleted()
这将给这个
mapping A: 2 B: 2
问题解决了!