如何有条件地在RxJava流中添加一个异步操作?

这里是我想要做的简化版本(使用Kotlin和RxJava)

makeServerCall() .doOnNext { doStuff(it) } //TODO: if it == 0, call asyncOperation() and wait for its callback to fire //before running the rest of the stream. Otherwise immediately run the rest //of the stream .flatMap { observable1(it) observable2(it) Observable.merge( getSpotSearchObservable(observable1), getSpotSearchObservable(observable2) } .subscribeBy(onNext = { allDone() view? }) 

如何挤压调用asyncOperation()并使流的其余部分等待其回调触发,但是只有在满足某些条件时才会触发。 这看起来似乎可能是Rx的一个微不足道的操作,但是没有什么明显的解决方案。

FlatMap吧!

 .flatMap { if (it == 0) { return@flatMap asyncOperation() .ignoreElements() .andThen(Observable.just(0)) } return@flatMap Observable.just(it) } .flatMap { observable1(it) observable2(it) Observable.merge( getSpotSearchObservable(observable1), getSpotSearchObservable(observable2) ) }