Rx:即使调用了onError,如何得到最后一个元素?

我正在使用RxJava,我需要做两件事情:

  • 获取Observable发出的最后一个元素
  • 确定是否调用了onError ,而onCompleted

我看过使用lastlastOrDefault (这实际上是我需要的行为),但我一直没有能够解决onError隐藏最后一个元素。 我可以使用Observable两次,一次获得last值,一次获得完成状态,但到目前为止,我只能通过创建我自己的Observer来完成这个任务:

 public class CacheLastObserver implements Observer { private final AtomicReference lastMessageReceived = new AtomicReference(); private final AtomicReference error = new AtomicReference(); @Override public void onCompleted() { // Do nothing } @Override public void onError(Throwable e) { error.set(e); } @Override public void onNext(T message) { lastMessageReceived.set(message); } public Optional getLastMessageReceived() { return Optional.ofNullable(lastMessageReceived.get()); } public Optional getError() { return Optional.ofNullable(error.get()); } } 

我用自己的Observer没有问题,但感觉Rx应该能够更好地满足这个“获得完成之前发射的最后一个元素”的用例。 任何想法如何做到这一点?

尝试这个:

 source.materialize().buffer(2).last() 

在错误情况下,最后的排放将是两个项目的列表,即作为Notification的最后一个值以及错误通知。 没有错误,第二项将是完成通知。

还要注意的是,如果没有值发出,那么结果将是一个项目的列表,作为终端通知。

你有没有尝试onErrorResumeNext在这里你可以看到其余的或error handling操作符https://github.com/ReactiveX/RxJava/wiki/Error-Handling-Operators

我用这种方法来解决你的问题。

 public class ExampleUnitTest { @Test public void testSample() throws Exception { Observable.just(1, 2, 3, 4, 5) .map(number -> { if (number == 4) throw new NullPointerException(); else return number; }) .onErrorResumeNext(t -> Observable.empty()) .lastOrDefault(15) .subscribe(lastEmittedNumber -> System.out.println("onNext: " + lastEmittedNumber)); } } 

它会发射onNext: 3

希望它有帮助。

我解决了:

 source.materialize().withPrevious().last() 

在哪里withPrevious是(Kotlin):

 fun  Observable.withPrevious(): Observable> = this.scan(Pair(null, null)) { previous, current -> Pair(previous.second, current) } .skip(1) .map { it as Pair }