Tag: rx java2

使用函数引用重写Kotlin中的Java代码会发生SAMtypes冲突

我有一个示例Java代码使用方法引用,我想重写到Kotlin。 Java版本使用方法参考,解决方案简短明了。 但另一方面,我不能在Kotlin中使用方法引用。 我设法编写的唯一版本是下面介绍的一个。 看起来像Function3 { s: String, b: Boolean, i: Int -> combine(s, b, i) }可以用更简洁的方式编写(如果可能的话,方法引用将是完美的)。 我是Kotlin新手,所以我会感激任何线索。 Java的 import io.reactivex.Observable; public class TestJava { Observable strings() { return Observable.just(“test”); } Observable booleans() { return Observable.just(true); } Observable integers() { return Observable.just(1); } void test() { Observable.combineLatest(strings(), booleans(), integers(), this::combine); } double combine(String s, boolean […]

rxjava2:使用连接的Completable并在IO线程中观察它们

起初,我知道网络操作不应该从主线程调用。 这就是为什么我在Schedulers.io()上观察Completable的原因! 我试图连接两个完整的。 这两个完整的使用网络,这就是为什么我订阅Schedulers.io()。 如果我使用concatWith(或者和然后)代码失败与NetworkOnMainThreadException。 这里是kotlin代码: val singleSubject = SingleSubject.create(); completalbe1.concatWith(completable2) .subscribeOn(Schedulers.io()) .observeOn(Schedulers.io()) .subscribe({ singleSubject.onSuccess(“ok”) }, { error -> Log.e(tag, error.message, error)//here i got exception singleSubject.onError(error) }) return singleSubject 如果我重写代码没有完整的链接 – 一切都好。 这是工作代码: val singleSubject = SingleSubject.create(); completable1 .subscribeOn(Schedulers.io()) .observeOn(Schedulers.io()) .subscribe({ completable2 .subscribeOn(Schedulers.io()) .observeOn(Schedulers.io()) .subscribe({ singleSubject.onSuccess(“ok”) }, { error -> Log.e(tag, error.message, error) singleSubject.onError(error) }) […]

exception:java.lang.ClassNotFoundException:org.reactivestreams.Publisher

在IntelliJ IDEA 2017.1.2(Build#IC-171.4249.39)中使用RxJava和Kotlin开发的基于JavaFX Gradle的应用程序中,出现exception: 线程“JavaFX Application Thread”中的exceptionjava.lang.NoClassDefFoundError:org / reactivestreams / Publisher 每次这样的代码 return Completable.complete() 被执行。 在类似问题的建议之后为什么我得到NoClassDefFoundError:org / reactivestreams / Publisher ,我试图添加包括反应流到我的build.gradle脚本的dependencies块 dependencies { compile ‘org.jetbrains.kotlin:kotlin-stdlib:1.1.2’ compile ‘org.reactivestreams:reactive-streams:1.0.0’ compile ‘io.reactivex.rxjava2:rxkotlin:2.0.0’ } 但问题依然存在。 依赖关系树看起来像这样: compile – Dependencies for source set ‘main’ (deprecated, use ‘implementation ‘ instead). +— org.jetbrains.kotlin:kotlin-stdlib:1.1.2 | \— org.jetbrains:annotations:13.0 +— org.reactivestreams:reactive-streams:1.0.0 \— io.reactivex.rxjava2:rxkotlin:2.0.0 +— io.reactivex.rxjava2:rxjava:2.0.7 […]

错误在http请求返回错误时处理UI线程

我正在使用Fuel和Rxjava进行网络通话。 我已经将我的基本URL设置为localhost,而不是在服务任何东西。 我想能够处理网络错误,所以我可以在用户界面上显示某种错误信息。 这是我的GET请求的一个例子 fun getRandom(take: Int, responseHandler: (result: WikiResult) -> Unit?) { Urls.getRandomURl(take) .httpGet() .timeout(timeout) .timeoutRead(readTimeout) .rx_object(WikipediaDataDeserializer()) .subscribeOn(Schedulers.newThread()) .observeOn(AndroidSchedulers.mainThread()) .subscribe({ result -> val statusCode = result.component2()?.response?.statusCode when(statusCode) { -1 -> e(statusCode.toString(), result.component2()?.cause.toString()) else -> { val (data, _) = result responseHandler.invoke(data as WikiResult) } } }, { error -> e(error.cause.toString()) }) } 在我的片段上,我在异步任务中调用上面的函数 private […]

如何在RxJava2中错误地终止开关图的序列

我正试图在RxJava2和Kotlin中实现一个刷新令牌流,并且在处理错误时遇到了问题。 有几个请求需要按顺序完成,但是如果有一些错误,顺序会有所不同。 基本上,如果我尝试使用我的刷新令牌并接收400 – Bad Request响应,因为令牌无效,我需要终止流并且不执行下一个switchMap (理想情况下,我想返回最终的Observable )。 但我不知道如何做到这一点。 如果我使用onErrorReturn ,我只是将返回的结果传递给下一个切换映射。 而doOnError只是在请求失败的时候执行这个步骤,但是整个序列还在继续。 fun refreshToken(): Observable { // try to use the refresh token to obtain an access token return authRepository.refreshToken(token) .switchMap { response -> // process response here userRepository.getUser() // fetch user details }.doOnError { // TODO – return final result, do not jump to […]

如何在RxJava 2和Android中延迟onError()?

我试图从我的应用程序中的Web服务器加载一些数据。 而且由于操作的异步性,无法预先知道需要多长时间才能完成。 为了提醒用户操作“正在进行”,我正在使用一个加载指示器。 这是一个使用kotlin和RxJava 2(我希望很清楚): fun loadData(){ showLoader() // show loading indicator Single.fromCallable { // http request logic goes here }.delay(1000, TimeUnit.MILLISECONDS) .subscribeOn(Schedulers.io()) .observeOn(AndroidSchedulers.mainThread()) .subscribeWith(object : DisposableSingleObserver() { override fun onSuccess(data: String) { // do something hideLoader() // on success, hide indicator } override fun onError(e: Throwable) { displayErrorMessage() hideLoader() // on error hide indicator […]

如何在RxJava2中用重试运算符记住状态

我有一个网络客户端,可以从中断恢复,但需要最后一条消息时,这样做是在重试。 Kotlin示例: fun requestOrResume(last: Message? = null): Flowable = Flowable.create({ emitter -> val connection = if (last != null) client.start() else client.resumeFrom(last.id) while (!emitter.isDisposed) { val msg = connection.nextMessage() emitter.onNext(msg) } }, BackpressureStrategy.MISSING) requestOrResume() .retryWhen { it.flatMap { Flowable.timer(5, SECONDS) } } // how to pass the resume data when there is a retry? 问题 […]

在android布局中使用kotlin的Unit(或任何其他对象)types

研究Kotlin的Unittypes,这是一个基本上代表Java的void的object 。 在某些情况下,我想在onClick数据绑定中使用PublishSubject来基本上只是点击点击。 Android的XML看起来像这样: viewModel.navigateSubject.onNext(Unit)}” /> 在layout我介绍了这种types: […] 然而,以这种方式对象types的调用当然是不可能的,有没有人find一种方法来使用android xml Unittypes?

与RxJava的PublishSubject竞争条件

将PublishSubject与blockingGet()一起使用时,似乎存在订阅者未收到事件的争用情况。 我在Kotlin中附加了一个基本的JUnit测试,它有两种方法。 rxTestBroken()用PublishSubject显示中断的行为。 rxTestOk()表明,一切正常行为主体,因为后者重播最后一个事件的情况下,用户没有订阅的时间。 这种竞争条件从哪里来,并使用BehaviorSubject正确的修复? import io.reactivex.Single import io.reactivex.subjects.BehaviorSubject import io.reactivex.subjects.PublishSubject import io.reactivex.subjects.Subject import org.assertj.core.api.Assertions.assertThat import org.junit.Test import java.util.concurrent.TimeUnit class StateMachine(val stateSubject: Subject) { companion object { val STATE_IDLE = 1 val STATE_READY = 2 } val output = 10L var currentState = STATE_IDLE fun scheduleNextState(nextState: Int) { Thread(Runnable { currentState = nextState stateSubject.onNext(currentState) }).start() […]

在RxKotlin / RxJava中用BehaviorSubject自动创建热可观察对象

目前我正在使用RxKotlin在Kotlin上建立一个项目。 我与Rx的背景主要是基于RxJS。 我经常用于在Typescript中创建hot observables模式的模式看起来是这样的: private dataStore: IFoo; private dataStoreSubject: BehaviorSubject = new BehaviorSubject(this.dataStore); public dataStoreObservable: Observable = Observable.from(this.dataStoreSubject); public getNetworkData(): Observable { return this.http.get() .map((response: IResponse) => { this.dataStore = response; this.dataStoreSubject.next(this.dataStore); return this.dataStore; }); } 这将允许我暴露一个Observable ,而不暴露Subject和后续的subject.next(); 方法。 我的问题是:在RxKotlin或RxJava中建立类似逻辑最常用的方法是什么?