用rxJava和Retrofit重复请求登录表单

我想在rxJava上进行登录表单并进行改造。 但是,如果我在请求服务器时出错,取消订阅从rx的底部调用

我有使用rxAndroidBinding lib实现的ui方法

fun validEmail(): Observable<CharSequence> //last well formated login fun validPassword(): Observable<CharSequence> //last password of length fun clicks(): Observable<Unit> //clicks on login buttons 

我已经通过Retrofit fun authorize(email:String, pass:String): Observable<Unit>实现了注册方法fun authorize(email:String, pass:String): Observable<Unit>

如果两个输入都有效,并且点击登录按钮,我想提出请求

“`

  val validPair = rx.Observable.combineLatest(iView.validEmail(), iView.validPassword(), ::ValidLoginPair) .doOnNext { iView.setLoginButtonEnabled(true) } subscription = rx.Observable.combineLatest(validPair, iView.clicks(), { pair, unit -> pair }) .doOnNext { iView.setProgress(true) } .flatMap { model.get().authorize(it.email.toString(), it.password.toString()) .observeOn(AndroidSchedulers.mainThread()) .doOnError { it.printStackTrace(); iView.setProgress(false); iView.showError("NetWorkError", it.message ?: "Unknown error") } } .observeOn(AndroidSchedulers.mainThread()) .subscribe({ iView.setProgress(false); iView.onLogin() }) {it.printStackTrace(); iView.setProgress(false);} 

“`

如果我得到200响应,它的工作。 但是,如果调用onError,则所有ui事件订阅都将被取消订阅。 所以第二次点击什么都不会发生。 我做错了什么? 为什么改造是这样工作的?


版本compile 'io.reactivex:rxandroid:1.2.1' compile 'com.squareup.retrofit2:retrofit:2.1.0' compile 'io.reactivex:rxjava:1.1.6'

未订阅的堆栈

  java.lang.Thread.State: WAITING at com.jakewharton.rxbinding.widget.TextViewAfterTextChangeEventOnSubscribe$2.onUnsubscribe(TextViewAfterTextChangeEventOnSubscribe.java:40) at rx.android.MainThreadSubscription.unsubscribe(MainThreadSubscription.java:72) at rx.internal.util.SubscriptionList.unsubscribeFromAll(SubscriptionList.java:124) at rx.internal.util.SubscriptionList.unsubscribe(SubscriptionList.java:113) at rx.Subscriber.unsubscribe(Subscriber.java:98) at rx.internal.util.SubscriptionList.unsubscribeFromAll(SubscriptionList.java:124) at rx.internal.util.SubscriptionList.unsubscribe(SubscriptionList.java:113) at rx.Subscriber.unsubscribe(Subscriber.java:98) at rx.internal.util.SubscriptionList.unsubscribeFromAll(SubscriptionList.java:124) at rx.internal.util.SubscriptionList.unsubscribe(SubscriptionList.java:113) at rx.Subscriber.unsubscribe(Subscriber.java:98) at rx.internal.util.SubscriptionList.unsubscribeFromAll(SubscriptionList.java:124) at rx.internal.util.SubscriptionList.unsubscribe(SubscriptionList.java:113) at rx.Subscriber.unsubscribe(Subscriber.java:98) at rx.internal.operators.OnSubscribeCombineLatest$LatestCoordinator.cancel(OnSubscribeCombineLatest.java:178) at rx.internal.operators.OnSubscribeCombineLatest$LatestCoordinator.unsubscribe(OnSubscribeCombineLatest.java:165) at rx.internal.util.SubscriptionList.unsubscribeFromAll(SubscriptionList.java:124) at rx.internal.util.SubscriptionList.unsubscribe(SubscriptionList.java:113) at rx.Subscriber.unsubscribe(Subscriber.java:98) at rx.internal.operators.OnSubscribeCombineLatest$LatestCoordinator.cancel(OnSubscribeCombineLatest.java:178) at rx.internal.operators.OnSubscribeCombineLatest$LatestCoordinator.unsubscribe(OnSubscribeCombineLatest.java:165) at rx.internal.util.SubscriptionList.unsubscribeFromAll(SubscriptionList.java:124) at rx.internal.util.SubscriptionList.unsubscribe(SubscriptionList.java:113) at rx.Subscriber.unsubscribe(Subscriber.java:98) at rx.internal.util.SubscriptionList.unsubscribeFromAll(SubscriptionList.java:124) at rx.internal.util.SubscriptionList.unsubscribe(SubscriptionList.java:113) at rx.Subscriber.unsubscribe(Subscriber.java:98) at rx.internal.operators.OperatorMerge$MergeSubscriber.checkTerminate(OperatorMerge.java:814) at rx.internal.operators.OperatorMerge$MergeSubscriber.emitLoop(OperatorMerge.java:573) at rx.internal.operators.OperatorMerge$MergeSubscriber.emit(OperatorMerge.java:562) at rx.internal.operators.OperatorMerge$InnerSubscriber.onError(OperatorMerge.java:846) at rx.internal.operators.OperatorDoOnEach$1.onError(OperatorDoOnEach.java:72) at rx.internal.operators.OperatorObserveOn$ObserveOnSubscriber.checkTerminated(OperatorObserveOn.java:276) at rx.internal.operators.OperatorObserveOn$ObserveOnSubscriber.call(OperatorObserveOn.java:219) at rx.android.schedulers.LooperScheduler$ScheduledAction.run(LooperScheduler.java:107) at android.os.Handler.handleCallback(Handler.java:739) at android.os.Handler.dispatchMessage(Handler.java:95) at android.os.Looper.loop(Looper.java:135) at android.app.ActivityThread.main(ActivityThread.java:5254) at java.lang.reflect.Method.invoke(Method.java:-1) at java.lang.reflect.Method.invoke(Method.java:372) at com.android.internal.os.ZygoteInit$MethodAndArgsCaller.run(ZygoteInit.java:903) at com.android.internal.os.ZygoteInit.main(ZygoteInit.java:698) 

从ReactiveX 文档 :

一个Observable可能会产生零个或更多的OnNext通知,每个通知代表一个单独的发射项目,然后它可能会通过OnCompleted或OnError通知跟随这些发射通知,但不能同时发送。 发出OnCompleted或OnError通知后,可能不会再发出任何进一步的通知。

如果发生网络错误或API错误(200-300范围以外的代码),则更新调用onError 。 在调用onError ,Observable被终止。

对不起,在我的例子中使用Java,但一个解决方案将是:

 authorize(username, password).flatMap(() -> Observable.just(true)) .onErrorReturn(e-> Observable.just(false)) 

这将确保Observable在发生错误时不会被终止。 相反,它会发出错误的。 为了成功,它会发射真实的。 你可以用另一个包含更多数据的类(也许是错误信息)替换布尔值。

注意: onErrorReturn将消耗错误。 doOnError不消耗错误, onError仍将被调用。