Tag: rx java2

HttpException未被onError()捕获

我正在使用Retrofit2和RxJava2向后端服务器发送请求。 当答案是200或201时,一切正常。 当服务器的响应是409或503并引发HttpException ,它不会被Observable的onError()捕获,并且应用程序崩溃。 我正在做的请求是这样的: @POST("users/sign-up") fun register(@Body register: RegisterBody): Observable<User> 我做请求的代码片段是这样的( applySchedulers()只适用于subscribeOn()和observeOn() ): api.register(body) .compose(applySchedulers()) .subscribe( { user -> onNextRegister(user.id, email.toString(), password.toString()) }, { error -> handleRegistrationError(error) }) 引发的异常如下所示: io.reactivex.exceptions.CompositeException: 2 exceptions occurred. ComposedException 1 : retrofit2.adapter.rxjava2.HttpException: HTTP 503 Unavailable ComposedException 2 : kotlin.NotImplementedError: An operation is not implemented: not implemented 即使我为Observable实现onError() ,如何防止应用程序崩溃? 请注意, […]

RxJava2:onComplete不用flatMapIterable调用

这是简短的代码片段: val subject = BehaviorSubject.createDefault(emptyList<Int>()) subject.onNext(Arrays.asList(1, 2, 3)) subject.flatMapIterable { list: List<Int> -> list } .subscribeBy( onNext = { l("on next", it) }, onComplete = { l("on complete") } ) 为什么onComplete不会在这里打电话? 我应该怎么做这个代码? 因为在原始代码中我不能使用.toList()方法。

输入自己的值来组合最新

我有这个最好的流动性: private val subject: BehaviorSubject<Int> = BehaviorSubject.create() Flowable.combineLatest( materialA, materialB, subject.toFlowable(LATEST), Function3 { ma, mb, index -> if(index == 0) … else if(index ==1) … else … } subject.onNext(99) 基本上,我想观察这两个材料的数据库变化,并结合他们向用户显示一些数据。 有时,我需要使用最新的输出,并做一个不同的组合(计算量大)。 这就是为什么我把这个index作为一个来源。 这是一个有效的方法,还是有更好的选择? PS。 我需要能够随时要求一个不同的组合(这是我没有使用Flowable.just()),我想避免保存在obs链之外的状态。

RxJava Observable.create包装可观察订阅

我使用了Observable.create,所以当某些数据可用时,我可以通知用户。 我有点不确定订阅在我的创建方法内的observables。 这些嵌套的订阅会给我什么问题吗? 我不完全熟悉使用Observable.create创建observables,所以我想确保我没有做任何不寻常的事情或滥用它。 先谢谢你! abstract class NetworkResource<ApiType, DbType> constructor(private val schedulerProvider: SchedulerProvider) { abstract fun fetchFromApi(): Single<ApiType> abstract fun fetchFromDb(): Observable<Optional<DbType>> abstract fun saveToDb(apiType: ApiType?) abstract fun shouldFetchFromApi(cache: DbType?): Boolean fun fetch(): Observable<Optional<DbType>> { return Observable.create<Optional<DbType>> { val subscriber = it fetchFromDb() .subscribe({ subscriber.onNext(it) if(shouldFetchFromApi(it.get())) { fetchFromApi() .observeOn(schedulerProvider.io()) .map { saveToDb(it) it } .observeOn(schedulerProvider.ui()) […]

例外:尽管我在另一个线程上进行了subsribed,但不能在UI线程上调用blockingConnect

我有这样的调用: val connectionResult = googleApiClient.blockingConnect() 这是这个例外的原因: java.lang.IllegalStateException:不能在UI线程上调用blockingConnect at com.google.android.gms.common.internal.zzbp.zza(Unknown Source) at com.google.android.gms.common.api.internal.zzbd.blockingConnect(Unknown Source) at com.elstatgroup.elstat.NexoLocationManager.startLocationUpdatesGmsApi(NexoLocationManager.kt:510) at com.elstatgroup.elstat.NexoLocationManager.startLocationUpdates(NexoLocationManager.kt:498) at com.elstatgroup.elstat.NexoLocationManager.appendRSSIEvent(NexoLocationManager.kt:283) at com.elstatgroup.elstat.NexoLocationManager.appendRSSIEvent(NexoLocationManager.kt:273) at com.elstatgroup.elstat.NexoLocationTest.testAppendLocationEvents(NexoLocationTest.kt:111) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:498) at org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:50) at org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12) at org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:47) at org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17) at org.robolectric.RobolectricTestRunner$HelperTestRunner$1.evaluate(RobolectricTestRunner.java:497) at org.robolectric.internal.SandboxTestRunner$2.evaluate(SandboxTestRunner.java:228) at org.robolectric.internal.SandboxTestRunner.runChild(SandboxTestRunner.java:110) at org.robolectric.internal.SandboxTestRunner.runChild(SandboxTestRunner.java:37) at org.junit.runners.ParentRunner$3.run(ParentRunner.java:290) at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:71) at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:288) at […]

RxJava在Flowable和Observable之间使用Window和Groupby的不同输出

我使用的是RxJava2,代码如下: val whitespaceRegex = Regex("\\s+") val queryRegex = Regex("query=([^&]+)", RegexOption.IGNORE_CASE) val dateTimeFormatter = DateTimeFormatter.ISO_OFFSET_DATE_TIME @JvmStatic fun main(args: Array<String>) { val cnt = AtomicLong() val templateStr = "|date| /ignored/ query=|query|" val random = ThreadLocalRandom.current() var curDate = ZonedDateTime.of(LocalDate.of(2016, Month.JANUARY, 1), LocalTime.MIDNIGHT, ZoneId.of("UTC")) val generator = Flowable.generate<String> { emitter -> // normally these are read from a […]

如何使用RxJava收集异步响应

这是我的目标:收集来自第三方库的异步任务的响应 要求:使用RxJava2来完成 我被困在思考使用哪个操作员或操作员这样做,想法表示赞赏。 我的想法是: Flowable.fromIterable(list) .anOperatorCanOnNextTheResponse() .buffer() .subscribe(newList)

RxJava2发布

有什么区别 ObservableTransformer { Observable.merge( it.ofType(x).compose(transformerherex), it.ofType(y).compose(transformerherey) ) } 和 ObservableTransformer { it.publish{ shared -> Observable.merge( shared.ofType(x).compose(transformerherex), shared.ofType(y).compose(transformerherey) ) } } 当我使用这两个运行我的代码时,我得到了相同的结果。 这里发布什么?

RxJava2 PublishSubject订阅服务器在使用SingleScheduler从多个线程调用时无法接收项目

我有以下的单元测试,我试图从不同的线程发送10个String ,并测试我从一个线程接收这些String 。 我的问题是,这个测试襟翼。 有时会成功,但有时我只会收到8或9项目,之后测试会挂起,直到闩锁超时。 我SingleScheduler以错误的方式使用SingleScheduler ? 我错过了别的吗? val consumerCallerThreadNames = mutableSetOf<String>() val messageCount = AtomicInteger(0) val latch = CountDownLatch(MESSAGE_COUNT) @Test fun someTest() { val msg = "foo" val subject = PublishSubject.create<String>() subject .observeOn(SingleScheduler()) .subscribe({ message -> consumerCallerThreadNames.add(Thread.currentThread().name) messageCount.incrementAndGet() latch.countDown() }, Throwable::printStackTrace) 1.rangeTo(MESSAGE_COUNT).forEach { Thread({ try { subject.onNext(msg) } catch (t: Throwable) { t.printStackTrace() } […]

Observable.combineLatest更新到RxJava 2.xx后导致错误 – 不能推断类型

最近几天,我正试图将我的项目从RxJava 1.xx迁移到RxJava 2.xx我有这个简单的方法。 如果我使用rxjava 1.xx(rx.Observable)中的Observable,一切都可以。 然而,当我用“新”观察者(io.reactivex.Observable)替换它时,我得到一个错误:“类型推断失败,请明确指定” fun <T1, T2, T3, R> combineLatestValue3Nullable(observable1: Observable<T1?>, observable2: Observable<T2?>, observable3: Observable<T3?>, merge: (T1, T2, T3?) -> R): Observable<R?> { return Observable.combineLatest(observable1, observable2, observable3) { value1, value2, value3 -> var result: R? = null if (value1 != null && value2 != null) { result = merge(value1, value2, value3) } result […]