RxJava – 随时接受更多观测数据的合并Observable?

我遇到需要一个Observable实现,它持有一个或多个Observable并将它们合并。 但是这里有一个启发:我想随时添加更多的可观察对象,我想这也可以支持将它们移除。

为了真正有效,所有订阅者都必须接收来自新订阅者的通知,这些订阅者是在订阅后添加的。 除非所有合并的Observable都很冷,并调用onComplete() ,那么我想可以让订阅取消订阅,即使添加了更多的Observable。 这是更多的合并多个无限的热Observables,并能够在任何时候添加更多。

 MergableObservable<MyEvent> allSources = new MergableObservable<>(); //later in application Observable<MyEvent> eventSource1 = ... allSources.add(eventSource1); //and later again Observable<MyEvent> eventSource2 = ... allSources.add(eventSource2 ); //and so on Observable<MyEvent> eventSource3 = ... allSources.add(eventSource3); 

我知道有合并操作符,但我需要一个可变的结构。 我是否缺少已经存在的东西? 除非这种情况绝对适合,否则我宁愿不使用主题。

你不能避免主题,因为你想手动推新的Observable源,而不是“自然地”生成它们。

 Subject<Observable<T>, Observable<T>> o = PublishSubject.<Observable<T>>().toSerialized(); ConcurrentHashSet<Observable<T>> live = ... o.flatMap(v -> v.takeWhile(x -> live.containsKey(v))).subscribe(...); Observable<T> inner = ... live.add(inner); o.onNext(inner); //... live.remove(inner); 

也许主题是处理这个问题的最合理的方法。 如果有人知道更好的办法,我愿意接受其他答案。 这种杀死我没有一个内置的方式来做到这一点。 我想这将是一个共同的需要。

Kotlin实现

 class MergingObservable<T> { private val subject: SerializedSubject<T, T> = PublishSubject<T>().toSerialized() fun toObservable(): Observable<T> = subject operator fun plusAssign(observable: Observable<T>) { add(observable) } fun add(observable: Observable<T>): Subscription = observable.subscribe(subject) }