Rx:即使调用了onError,如何得到最后一个元素?
我正在使用RxJava,我需要做两件事情:
- 获取
Observable
发出的最后一个元素 - 确定是否调用了
onError
,而onCompleted
我看过使用last
和lastOrDefault
(这实际上是我需要的行为),但我一直没有能够解决onError
隐藏最后一个元素。 我可以使用Observable两次,一次获得last
值,一次获得完成状态,但到目前为止,我只能通过创建我自己的Observer
来完成这个任务:
public class CacheLastObserver implements Observer { private final AtomicReference lastMessageReceived = new AtomicReference(); private final AtomicReference error = new AtomicReference(); @Override public void onCompleted() { // Do nothing } @Override public void onError(Throwable e) { error.set(e); } @Override public void onNext(T message) { lastMessageReceived.set(message); } public Optional getLastMessageReceived() { return Optional.ofNullable(lastMessageReceived.get()); } public Optional getError() { return Optional.ofNullable(error.get()); } }
我用自己的Observer
没有问题,但感觉Rx应该能够更好地满足这个“获得完成之前发射的最后一个元素”的用例。 任何想法如何做到这一点?
尝试这个:
source.materialize().buffer(2).last()
在错误情况下,最后的排放将是两个项目的列表,即作为Notification
的最后一个值以及错误通知。 没有错误,第二项将是完成通知。
还要注意的是,如果没有值发出,那么结果将是一个项目的列表,作为终端通知。
你有没有尝试onErrorResumeNext在这里你可以看到其余的或error handling操作符https://github.com/ReactiveX/RxJava/wiki/Error-Handling-Operators
我用这种方法来解决你的问题。
public class ExampleUnitTest { @Test public void testSample() throws Exception { Observable.just(1, 2, 3, 4, 5) .map(number -> { if (number == 4) throw new NullPointerException(); else return number; }) .onErrorResumeNext(t -> Observable.empty()) .lastOrDefault(15) .subscribe(lastEmittedNumber -> System.out.println("onNext: " + lastEmittedNumber)); } }
它会发射onNext: 3
希望它有帮助。
我解决了:
source.materialize().withPrevious().last()
在哪里withPrevious
是(Kotlin):
fun Observable .withPrevious(): Observable> = this.scan(Pair(null, null)) { previous, current -> Pair(previous.second, current) } .skip(1) .map { it as Pair }