Tag: RX java

使用RxJava和Handler重置Message.what值

Rxjava 1.1.8 这个想法是听消息与订阅,但message.what属性来总是0。 class RxHandler(looper: Looper) : Handler(looper) { val messagesObservable = PublishSubject<Message>() override fun handleMessage(msg: Message?) { super.handleMessage(msg) msg?.let { messagesObservable.onNext(it) } } } 然后订阅它: playbackHandler.messagesObservable .subscribe({ // it.what is always 0 }, { Timber.e(it, "Error on playback handler message handling") }) 接着: playbackHandler.sendEmptyMessage(1) 然后我得到message.what == 0预期: message.what == 1

使用RxJava获得与领域和改造的独特结果

我知道Dan Lew几乎回答了从不同来源获取数据的问题 .concat(/*…*/) .take(1) 但是,如果不是从我的本地存储的数据和改造的用户列表。 在显示结果之前,我需要对数据执行特定于数据库的操作,例如只显示不同的用户。 在这种情况下,简单地在我的网络请求和本地数据上使用concat运算符将不会返回相同的结果。 有没有用RxJava写这个习惯用法?

例外:尽管我在另一个线程上进行了subsribed,但不能在UI线程上调用blockingConnect

我有这样的调用: val connectionResult = googleApiClient.blockingConnect() 这是这个例外的原因: java.lang.IllegalStateException:不能在UI线程上调用blockingConnect at com.google.android.gms.common.internal.zzbp.zza(Unknown Source) at com.google.android.gms.common.api.internal.zzbd.blockingConnect(Unknown Source) at com.elstatgroup.elstat.NexoLocationManager.startLocationUpdatesGmsApi(NexoLocationManager.kt:510) at com.elstatgroup.elstat.NexoLocationManager.startLocationUpdates(NexoLocationManager.kt:498) at com.elstatgroup.elstat.NexoLocationManager.appendRSSIEvent(NexoLocationManager.kt:283) at com.elstatgroup.elstat.NexoLocationManager.appendRSSIEvent(NexoLocationManager.kt:273) at com.elstatgroup.elstat.NexoLocationTest.testAppendLocationEvents(NexoLocationTest.kt:111) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:498) at org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:50) at org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12) at org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:47) at org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17) at org.robolectric.RobolectricTestRunner$HelperTestRunner$1.evaluate(RobolectricTestRunner.java:497) at org.robolectric.internal.SandboxTestRunner$2.evaluate(SandboxTestRunner.java:228) at org.robolectric.internal.SandboxTestRunner.runChild(SandboxTestRunner.java:110) at org.robolectric.internal.SandboxTestRunner.runChild(SandboxTestRunner.java:37) at org.junit.runners.ParentRunner$3.run(ParentRunner.java:290) at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:71) at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:288) at […]

如何使用RxJava收集异步响应

这是我的目标:收集来自第三方库的异步任务的响应 要求:使用RxJava2来完成 我被困在思考使用哪个操作员或操作员这样做,想法表示赞赏。 我的想法是: Flowable.fromIterable(list) .anOperatorCanOnNextTheResponse() .buffer() .subscribe(newList)

与RxKotlin获取NetworkOnMainThreadException

我试图使用RxKotlin做网络请求,但不断得到一个NetworkOnMainThreadException我在主线程订阅,所以我不知道为什么它没有把它从UI线程。 这里是我订阅Observable的地方 weatherInteractor.getWeather(lat, lng) .subscribeOn(Schedulers.io()) .observeOn(AndroidSchedulers.mainThread()) .subscribe( {response -> try { val jsonData = response.body().string() val currentWeather = getCurrentWeatherData(jsonData) view!!.displayCurrentWeather(currentWeather) } catch (e: JSONException) { Log.d("Present JSON Exception", e.message) } catch (e: IOException) { Log.d("Present IO Exception", e.message) } }, { error -> error.printStackTrace() } ) } 这里是我创建我的Observable的地方 fun getWeather(lat: Double, lng: Double): Observable<Response> { […]

CountdownTimer完成时如何通知Observable

我有一个自定义的Android TextView,通过CountDownTimer显示游戏中剩余的时间 class CountdownTextView(context: Context, attrs: AttributeSet) : TextView(context, attrs) { private lateinit var countDownTimer: CountDownTimer private lateinit var onFinishObservable: Observable<Unit> fun setTime(initTime: Int) { this.text = "$initTime:00" countDownTimer = object : CountDownTimer((initTime *1000).toLong(), 1000) { override fun onTick(millisUntilFinished: Long) { val minutes = millisUntilFinished / 60000 val seconds = (millisUntilFinished % 60000) / 1000 […]

RxJava如何从订阅创建Observable

我正在寻找一种方法来处理结果subscribe后创建Observable。 鉴于我从productRepo.list()这是改造返回Observable<Response<ProductResponse>>这个Observable。 productRepo .list() .retry(3) .subscribe { response -> if (response.isSuccessful) { response.body().apply { cache.saveProducts(data) } } } 这样做的目的是将结果保存到本地数据库cache 。 这加上另一个非常相似的调用填充本地数据库从API的远程数据。 两个调用完成后,我想从cache加载数据。 我不想以任何方式将这两种观察结合起来。 之后只想运行一些任务。 我希望这个处理作为Rx调用图中的一个单元,以便它同时执行Call1和Call2,一旦Call1和Call2完成运行Task3。 这种情况下最好的方法是什么? 我真的更喜欢每个呼叫的用户是分开的。 flatMap是最好的选择吗?

Observable.combineLatest更新到RxJava 2.xx后导致错误 – 不能推断类型

最近几天,我正试图将我的项目从RxJava 1.xx迁移到RxJava 2.xx我有这个简单的方法。 如果我使用rxjava 1.xx(rx.Observable)中的Observable,一切都可以。 然而,当我用“新”观察者(io.reactivex.Observable)替换它时,我得到一个错误:“类型推断失败,请明确指定” fun <T1, T2, T3, R> combineLatestValue3Nullable(observable1: Observable<T1?>, observable2: Observable<T2?>, observable3: Observable<T3?>, merge: (T1, T2, T3?) -> R): Observable<R?> { return Observable.combineLatest(observable1, observable2, observable3) { value1, value2, value3 -> var result: R? = null if (value1 != null && value2 != null) { result = merge(value1, value2, value3) } result […]

异步调用集合中的每个项目

我有一个问题,迄今为止我无法解决,我是RxKotlin的新手,所以可能很简单。 请看代码: override fun infos(): Stream<Info> = client.infoAboutItem(identifier) .map { val itemId = it.itemId ?: "" val item = client.itemForId(itemId) ClientInfo(client, it, source, item) as Info } .let { AccessStream(it) } 流哪里是我们自制的收藏。 地图是一种方法,允许您迭代该集合中的每个项目。 这里的问题是 client.itemForId(itemId) 是一个http调用,返回一个不理想的Single。 我想创建一个异步调用内部映射,将返回项目而不是单一,然后将其传递给ClientInfo。 到目前为止我尝试过的事情是使用在地图内部的订阅和使用blockingGet()方法,但是这阻止了主线程,即使我观察和订阅不同的线程 所以它涉及到对集合中的每一件事进行异步调用。 感谢帮助

境界把交易变成可观察的

有时我想在使用Retrofit的服务调用之后执行写入操作。 由于我的默认服务器调用在io线程上执行,我只想使用Realm写入操作的相同线程。 我正在使用它: serviceInteractor.uploadMeasure(measure) .filter { uploaded: Boolean -> uploaded } // filter only the ones successfully uploaded .flatMap { realmInteractor.markMeasureSent(measure.timeStamp) } .subscribe( { success -> /* do nothing */ }, { inDebug { it.printStackTrace() } } ) 我的境界可观察如下: fun markMeasureSent(measureId: Long?): Observable<Unit> { if (measureId == null) Observable.just(Unit) return Observable.fromCallable { Realm.getDefaultInstance().use { it.executeTransaction […]