Tag: RX java

如何处理Retrofit Rx onError中不同难度的错误

我想知道你的方式来处理不同types的错误(如httpexception,没有互联网连接exception等)在改进Rx onError,而不使用instanceof像这里提出的: 如何在RxJava或在这里处理 Retrofit 2中的网络错误 : Handle改造2 RX中的错误 在kotlin中,我将简单地为每种可投掷项目做一些扩展函数来做我想做的任何事情。 但是我不得不在项目中使用Java。 任何不错的建议? 是这样的构建某种error handling程序的方法: public interface ErrorHandler { void handleError(Exception e); void handleError(HttpException e); void handleError(NullPointerException npe); } 好? 我知道这不是因为每次我需要处理另一个特定的错误,我不得不改变接口,所以这是违反开放原则。 但我找不出任何解决办法。 欢呼Wojtek

Rx:即使调用了onError,如何得到最后一个元素?

我正在使用RxJava,我需要做两件事情: 获取Observable发出的最后一个元素 确定是否调用了onError ,而onCompleted 我看过使用last和lastOrDefault (这实际上是我需要的行为),但我一直没有能够解决onError隐藏最后一个元素。 我可以使用Observable两次,一次获得last值,一次获得完成状态,但到目前为止,我只能通过创建我自己的Observer来完成这个任务: public class CacheLastObserver implements Observer { private final AtomicReference lastMessageReceived = new AtomicReference(); private final AtomicReference error = new AtomicReference(); @Override public void onCompleted() { // Do nothing } @Override public void onError(Throwable e) { error.set(e); } @Override public void onNext(T message) { lastMessageReceived.set(message); } public Optional getLastMessageReceived() […]

Kotlingenerics方法和inheritance

在我的代码中,我想在抽象类中创建一个方法,该方法返回一些Observable。 然后这个抽象类的实现将返回某些(指定)types的Observable。 不幸的是,Android Studio会在实现方法()中返回一个错误“types不匹配”: 预计:可观察 find:Observable 我的MockDrawerList.getList()返回Observable 请关注execute()和buildUseCaseObservable 抽象类 public abstract class UseCase(threadExecutor: ThreadExecutor, postExecutionThread: PostExecutionThread) { private val threadExecutor: ThreadExecutor private val postExecutionThread: PostExecutionThread private var subscription: Subscription = Subscriptions.empty() init { this.postExecutionThread = postExecutionThread this.threadExecutor = threadExecutor } protected abstract fun buildUseCaseObservable(): Observable public fun execute(useCaseSubsriber: Subscriber) { subscription = buildUseCaseObservable() .subscribeOn(Schedulers.from(threadExecutor)) .observeOn(postExecutionThread.getScheduler()) […]

Kotlintypes不匹配编译错误:需要成功,findMyError

我遇到了以下代码不能在kotlin中编译的问题。 // StateModel.kt sealed class StateModel class Loading : StateModel() data class Success(val data: T) : StateModel() data class MyError(val message: String) : StateModel() // StateModelTransformer.kt class StateModelTransformer : FlowableTransformer { override fun apply(upstream: Flowable): Publisher { return upstream .map { data -> Success(data) } .onErrorReturn { error -> MyError(error.message) // compile error, Type mismatch, […]

为什么我的RxJava设置阻止我的UI线程? 使用BluetoothAdapter.startLeScan回调

我正在努力寻找阻止我的UI线程的具体行动,我已经尝试了几个调度运算符,但我不知道如何使其工作。 我有一个用户界面的按钮,这onClicked是开始蓝牙扫描和更新textView字符串像一个日志(它显示了在这一刻发生的事情)。 所以这是我的MainActivity: lateinit var disposable: Disposable val textDataService = TextDataService() override fun onCreate(savedInstanceState: Bundle?) { super.onCreate(savedInstanceState) setContentView(R.layout.activity_scan_test) buttonScanTestStart.setOnClickListener { if (isBluetoothEnabled()) { textViewLog.text = “” buttonScanTestStop.visibility = View.VISIBLE buttonExportScanTestRaportSummary.visibility = View.GONE buttonExportScanTestRaportFull.visibility = View.GONE buttonScanTestStart.visibility = View.GONE disposable= Scanner() .discoverSingleDevice(this, ” “, textViewLog) .doOnError { setText(“General error: ${it.message ?: it::class.java}”, textViewLog) setLogText(“General error: ${it.message […]

IncompatibleClassChangeError:类’java.lang.VirtualMachineError’没有实现接口’java.lang.CharSequence’

我的代码有奇怪的问题,我无法解释发生的任何事情。 它是在棉花糖崩溃,但没有牛轧糖,这使得这更离奇。 我可以指向一个错误发生的地方,但是错误本身并不清楚。 我不知道什么是崩溃,以及如何防止它。 以下是详细信息: 当我尝试打电话给服务器时发生崩溃。 为此,我使用RxJava旁边的RxJava ,我在Kotlin中做这个。 这是我的代码: fun login(username: String, password: String, callback: Login.OnLoginListener) { RestClient.getInstance().service .loginUser( // ( if (success.status) callback.onLoginSuccess() else callback.onLoginError() ) }, { error -> GenericErrorHandler.handleError(error, callback.retrieveContext()) } ) } 这里是logcat输出: java.lang.IncompatibleClassChangeError: Class ‘java.lang.VirtualMachineError’ does not implement interface ‘java.lang.CharSequence’ in call to ‘java.lang.String java.lang.CharSequence.toString()’ (declaration of ‘java.lang.Throwable’ appears in […]

如何在RxJava2中用重试运算符记住状态

我有一个网络客户端,可以从中断恢复,但需要最后一条消息时,这样做是在重试。 Kotlin示例: fun requestOrResume(last: Message? = null): Flowable = Flowable.create({ emitter -> val connection = if (last != null) client.start() else client.resumeFrom(last.id) while (!emitter.isDisposed) { val msg = connection.nextMessage() emitter.onNext(msg) } }, BackpressureStrategy.MISSING) requestOrResume() .retryWhen { it.flatMap { Flowable.timer(5, SECONDS) } } // how to pass the resume data when there is a retry? 问题 […]

为什么只有最新的订阅者使用Kotlin和RxMVP才能在Android上的RxJava上进行下一个事件

鉴于以下情况: 以CheckBox和多个EditText为特色的View 该View使用杰克·沃顿的RxBinding来访问Observables 对象 fun observeUsername(): InitialValueObservable = RxTextView.textChanges(et_username) 对于EditText (有三个,用户名,密码和电子邮件)和 fun observeSignUpCheckBox(): InitialValueObservable = RxCompoundButton.checkedChanges(cb_sign_up) 为CheckBox Presenter每个EditText都有一个方法,就像 fun observeUsernameText(): Disposable { return view.observeUsernameText() .skipInitialValue() .map { username -> StringUtils.isValidUsername(username.toString()) } .subscribe({ view.setValidUsername(it) }) } 和CheckBox一个方法: fun observeSignUpCheckBox(): Disposable { return view.observeSignUpCheckBox() .subscribe({ checked -> Timber.d(“### view trigger”) }) } 所有这些方法都在Presenter的onCreate中调用,一切按预期工作。 现在的问题: 我在Presenter中添加了一个用于validation用户输入的新函数: fun observeInputFields(): […]

在java类中调用扩展函数作为任何RX操作符

我创建了一个扩展函数, fun Observable.subscribeWithErrorHandling(onNext: (T) -> Unit ,onError: ((throwable: Throwable) -> Unit)? = null): Subscription { //doing stuff } 在kotlin类中,我将能够以这种方式使用它 observable.subscribeWithErrorHandling(…) 现在,我想在我的java类中使用这个函数。 我已经看到,你可以静态调用它: MyExtensionFile.subscribeWithErrorHandling 但在我的情况下,你需要别的东西,因为它是一个RX流的中间部分。 这是我坚持的部分。 这甚至可能吗? 或没有办法做这样的事情,从Java代码?

尽管使用RxJava在另一个线程上订阅,但在使用Google的People API时仍然收到IllegalStateExceptionexception

注意:我使用Kotlin和RxKotlin来实现有用的扩展function。 我正在尝试使用Google的People API获取一些用户数据,并且将所有的AsyncTasks和东西都移植到了RxJava中。 一切工作除了这部分: private fun getGooglePerson(service: PeopleService?, account: GoogleSignInAccount) = Single.just(service?.people() ?.get(“people/me”) ?.setPersonFields(“emailAddresses,birthdays,genders,phoneNumbers”) ?.execute() ?: Person()) // If null, return an empty person (shouldn’t ever be null, though) .subscribeOn(Schedulers.io()) .observeOn(AndroidSchedulers.mainThread()) .subscribeBy( onError = {/*Do something*/}, onSuccess = {/*Do something*/} ) 具体来说,我得到一个IllegalStateException的消息, Calling this from your main thread can lead to deadlock 。 我感到困惑,因为我的印象是,调用subscribeOn应该将其上下的所有东西(除非一个observeOn遵循它)转移到另一个线程(在这种情况下, Schedulers.io() […]