如何在RxJava2中默默跳过异常?

我有这样的数据流:

Observable .fromFuture( CompletableFuture.supplyAsync { // First remote call returns Future<List<Type>> listOf(1, 2, 3, 57005, 5) }, Schedulers.computation() ) .flatMap { it.toObservable() } // I turn that list into a stream of single values to process them one by one .map { CompletableFuture.supplyAsync { // This remote call may fail if it does not like the input. I want to skip that failures and continue the stream like the fail never occurred. if (it == 0xDEAD) { throw IOException("Dead value!") } it } } .flatMap { Observable.fromFuture(it) // Turn that Futures into a stream of Observables once again } .doOnNext { println(it) // Debug } .blockingSubscribe() 

我用CompletableFuture.supplyAsync替换了业务逻辑(实际上返回Future )。 而且,是的,这是Kotlin,但我想你有这个意图。

当我评论“死”价值( 57005 )的输出是:

 1 4 9 25 

但是,如果这个“死”值出现在流中,它会失败:

 1 4 9 Exception in thread "main" java.lang.RuntimeException: java.util.concurrent.ExecutionException: java.io.IOException: Dead value! at io.reactivex.internal.util.ExceptionHelper.wrapOrThrow(ExceptionHelper.java:45) at io.reactivex.internal.operators.observable.ObservableBlockingSubscribe.subscribe(ObservableBlockingSubscribe.java:86) at io.reactivex.Observable.blockingSubscribe(Observable.java:5035) at by.dev.madhead.rx.TestKt.main(test.kt:41) Caused by: java.util.concurrent.ExecutionException: java.io.IOException: Dead value! at java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:357) at java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1895) ... 

我是一个新手在RX,所以很快Google搜索解决方案: onExceptionResumeNextObservable.fromFuture(it) – > Observable.fromFuture(it).onExceptionResumeNext { Observable.empty<Int>() } 。 但现在我的应用程序永远挂起(在产生我期望的输出之后)。 看起来像流永远不会结束。

我应该“关闭” Observable莫名其妙或什么? 或者更一般地说,使用RX工作是一个好方法吗? 我应该以另一种方式重新思考吗?

吞下这样的例外:

  Observable.fromFuture(it).onErrorResumeNext(Observable.empty())