将无限流的无限流转化为无限流 – 反应X.

如何在反应x(理想的RxJava或RxJs例子)可以实现这一点?

a |-a-------------------a-----------a-----------a---- s1 |-xxxxxx -| (subscribe) s2 |-xxxxx-| (subscribe) s2 |-xxxxx-| (subscribe) ... sn S |-xxxxxxx-------xxxxxxx-------------xxxxxx- (subsribe) 

a是触发事件的有限流sn的无限事件流,每个事件应该是无限流S一部分,同时能够订阅每个sn流(为了进行求和操作),但同时保持流S为无穷。

编辑:更具体地说,我提供了我在Kotlin寻找的实现。 每10秒发射一个事件映射到共享有限的4个事件流。 元流是flatMap -ed到正常的无限流。 我使用doAfterNext额外订阅每个有限的流并打印结果。

 /** Creates a finite stream with events * $ch-1 - $ch-4 */ fun createFinite(ch: Char): Observable<String> = Observable.interval(1, TimeUnit.SECONDS) .take(4) .map({ "$ch-$it" }).share() fun main(args: Array<String>) { var ch = 'A' Observable.interval(10, TimeUnit.SECONDS).startWith(0) .map { createFinite(ch++) } .doAfterNext { it .count() .subscribe({ c -> println("I am done. Total event count is $c") }) } .flatMap { it } .subscribe { println("Just received [$it] from the infinite stream ") } // Let main thread wait forever CountDownLatch(1).await() } 

但是我不确定这是否是“纯粹的RX”方式。

你不清楚你想如何计算。 如果你正在做一个总数,那么就没有必要做内部订阅:

 AtomicLong counter = new AtomicLong() Observable.interval(10, TimeUnit.SECONDS).startWith(0) .map { createFinite(ch++) } .flatMap { it } .doOnNext( counter.incrementAndget() ) .subscribe { println("Just received [$it] from the infinite stream ") } 

另一方面,如果需要为每个中间可观察项提供一个计数,则可以在flatMap()内移动计数并打印出计数并在完成时重置计数:

 AtomicLong counter = new AtomicLong() Observable.interval(10, TimeUnit.SECONDS).startWith(0) .map { createFinite(ch++) } .flatMap { it .doOnNext( counter.incrementAndget() .doOnCompleted( { long ctr = counter.getAndSet(0) println("I am done. Total event count is $ctr") } ) .subscribe { println("Just received [$it] from the infinite stream ") } 

这不是很有用,但是这种报告往往会打破正常的流程。

Interesting Posts