Tag: RX java

重复发送对象到Rx上的子客户端

是否可以将相同的对象反复发送给Rx中的订阅者? 例如,这个代码(在Kotlin上): val exmp = listOf("А") var observable = exmp.toObservable() observable.subscribeBy( onNext = { it + "1" println(it) }, onError = { it.printStackTrace() }, onComplete = { println("Done!") } ) 我尝试将字符串值“A”重复发送到方法onNext()并获得“A111111”。 Rx库的方法重放()我明白开始发送日期再次为新的潜艇。 在从可观察的日期开始不变的循环for ,只是方法被调用几次

致命例外:当触发器处置时,RxCachedThreadScheduler-1。 为什么?

我有以下RxJava 2代码(在Kotlin中),它有一个Observable disposable = Observable.create<String>({ subscriber -> try { Thread.sleep(2000) subscriber.onNext("Test") subscriber.onComplete() } catch (exception: Exception) { subscriber.onError(exception) } }).subscribeOn(Schedulers.io()) .observeOn(AndroidSchedulers.mainThread()) .subscribe({ result -> Log.d("Test", "Completed $result") }, { error -> Log.e("Test", "Completed ${error.message}") }) 虽然它仍然是Thread.sleep(2000) ,我执行disposable?.dispose()调用,它会出错 FATAL EXCEPTION: RxCachedThreadScheduler-1 Process: com.elyeproj.rxstate, PID: 10202 java.lang.InterruptedException at java.lang.Thread.sleep(Native Method) at java.lang.Thread.sleep(Thread.java:371) at java.lang.Thread.sleep(Thread.java:313) at presenter.MainPresenter$loadData$1.subscribe(MainPresenter.kt:41) at […]

可以重用操作符执行

鉴于下面的例子(科特林代码) val subject = PublishSubject.create<Int>() val stream = subject.map { println("mapping") it * 2 } stream.forEach { println("A: $it") } stream.forEach { println("B: $it") } subject.onNext(1) subject.onCompleted() 输出将是 mapping A: 2 mapping B: 2 我想要实现的是源观察被映射一次,所有的订户得到的结果,但不执行映射操作,每一个… 喜欢这个 mapping A: 2 B: 2 在我的情况下,我有非常昂贵的计算正在进行的延迟和性能是至关重要的,我有一个热点观察源作为一个来源和大量的用户… 我们如何重用运算符执行? 和一般不同的映射操作?

如何有条件地在RxJava流中添加一个异步操作?

这里是我想要做的简化版本(使用Kotlin和RxJava) makeServerCall() .doOnNext { doStuff(it) } //TODO: if it == 0, call asyncOperation() and wait for its callback to fire //before running the rest of the stream. Otherwise immediately run the rest //of the stream .flatMap { observable1(it) observable2(it) Observable.merge( getSpotSearchObservable(observable1), getSpotSearchObservable(observable2) } .subscribeBy(onNext = { allDone() view? }) 如何挤压调用asyncOperation()并使流的其余部分等待其回调触发,但是只有在满足某些条件时才会触发。 这看起来似乎可能是Rx的一个微不足道的操作,但是没有什么明显的解决方案。

无法使API调用urlfetch.Fetch在既不是原始请求线程也不是由ThreadManager创建的线程的线程中

我目前正在玩kotlin,Spring社交和Google应用引擎。 我试图每秒钟追回一些追随者。 这是我的代码: @Service class SubscribeService { var subscriber : Disposable? = null var twitter : Twitter = TwitterTemplate( "XXXAUTHXXX", "XXXAUTHXXX", "XXXAUTHXXX", "XXXAUTHXXX") fun subscribe() { subscriber = Observable.interval(1, TimeUnit.SECONDS).subscribe {println(twitter.userOperations().getUserProfile("azeaze").followersCount)} } fun unsubscribe() { subscriber?.dispose() } } 当我执行订阅功能时,我得到这个异常: [INFO] io.reactivex.exceptions.OnErrorNotImplementedException: Can't make API call urlfetch.Fetch in a thread that is neither the original request […]

如何在RxJava 2和Kotlin中将null传递给可空类型的Observable

我像这样初始化我的变量: val user: BehaviorSubject<User?> user = BehaviorSubject.create() 但我不能这样做。 IDE会引发错误: – user.onNext(null) 这样做,IDE说你永远不会是空的: – user.filter( u -> u!=null)

Kotlin:如何将使用Thread.sleep的测试转换为RxJava TestScheduler

我正在写一个工具测试,它检查是否当我缓存东西到一个接收缓冲区,并经过一段时间(10秒)这个主题插入缓冲值到我的房间数据库。 当我使用Thread.sleep(syncTimeInterval)时,测试是正确的。 我想用TestScheduler编写这个相同的测试。 这里是Thread.sleep版本(通过测试): @Test fun testMultipleLogs() { val loadAllCloudCallBefore = appDatabase.logCloudCallDao().loadAll() val loadAllLogNewSessionBefore = appDatabase.logNewSessionDao().loadAll() assertEquals(0, loadAllCloudCallBefore.size) assertEquals(0, loadAllLogNewSessionBefore.size) Observable.interval(1, TimeUnit.SECONDS) .take(20) .subscribe { logManager.logNewSession() } Observable.interval(1, TimeUnit.SECONDS) .take(20) .subscribe { logManager.logCloudCall("url", "callgoup") } Observable.interval(1, TimeUnit.SECONDS) .take(20) .subscribe { logManager.logNewSession() } Observable.interval(1, TimeUnit.SECONDS) .take(20) .subscribe { logManager.logCloudCall("url", "callgoup") } Observable.interval(1, TimeUnit.SECONDS) .take(20) .subscribe { […]

当我退出应用程序时,为什么在Android应用程序中使用Kotlin / rxJava编写android.os.TransactionTooLargeException? (的OnExit /的onPause)?

我正在Kotlin / rxJava上开发一个应用程序,现在我一直在为一个错误而苦苦挣扎。 每次我退出应用程序时,都会弹出一个窗口,提示:“不幸的是,AppName已经停止。” 我在控制台中得到这个错误: java.lang.RuntimeException:android.os.TransactionTooLargeException:数据包大小896824字节 **enter code here** 当我使用API​​ 23或更低的电话错误不会发生,但与24和25我得到错误和应用程序“崩溃”后,我退出了该应用程序。 而且,对于我作为开发者,当然对于用户来说,这也变得非常烦人。 每当我尝试以任何方式退出应用程序时都会发生错误。 如果我点击一个链接,把我带到浏览器,从我的应用程序添加/发送联系人到手机上的联系人列表或通过Gmail,Google +,蓝牙等共享错误发生。 所以会发生的是,在所有这些问题上,我必须退出应用程序(按住home按钮)或从应用程序(联系人,Gmail等)和应用程序崩溃打开另一个应用程序。 我还可以提到,当我从我的应用程序添加一个人到电话联系人列表中的所有信息被发送,我可以添加此人。 有没有人有任何线索,为什么这个快乐? 如果这是必要的,请随时问我更多的细节,我已经有一段时间了这个bug,这是非常恼人的。 整个错误: 04-12 08:46:18.128 23287-23287/com.****.***** E/AndroidRuntime: FATAL EXCEPTION: main Process: com.******.*******, PID: 23287 java.lang.RuntimeException: android.os.TransactionTooLargeException: data parcel size 896824 bytes at android.app.ActivityThread$StopInfo.run(ActivityThread.java:3781) at android.os.Handler.handleCallback(Handler.java:751) at android.os.Handler.dispatchMessage(Handler.java:95) at android.os.Looper.loop(Looper.java:154) at android.app.ActivityThread.main(ActivityThread.java:6119) at java.lang.reflect.Method.invoke(Native Method) at com.android.internal.os.ZygoteInit$MethodAndArgsCaller.run(ZygoteInit.java:886) at com.android.internal.os.ZygoteInit.main(ZygoteInit.java:776) […]

Mockito通缉,但没有调用

我是新手写测试和使用Mockito。 我已经阅读了Stackoverflow中的类似主题,并进行了建议的更改,确保所考虑的类/接口/方法处于打开状态。 我试图遵循这一点 嘲笑构造函数注入依赖 这是我到目前为止的测试 class RegistrationPresenterTest { @Test fun testRegisterSuccess() { val mockService = mock<IHerokuInteractor>() val mockLocal = mock<ILocalStorageInteractor>() val mockView = mock<RegisterView>() val mockRegistrationResponse = HerokuRegisterResponse("hash") val mockPair = ImeiPair("imei","hash") val presenter = RegisterPresenterImpl(mockLocal,mockService) whenever(mockService.register(any())).thenReturn(Observable.just(mockRegistrationResponse)) whenever(mockLocal.clearPreferences()).thenReturn(Observable.just(true)) whenever(mockLocal.putImeiPair(any())).thenReturn(Observable.just(true)) //whenever(presenter.writeImeiPairLocally(any())) How do I specify parameters since it uses a parameter from the register method? presenter.bindView(mockView) […]

如何用RxAndroid压缩Kotlin语言中的几个观察值

大家! 我有一些问题。 我是RxJava / RxKotlin / RxAndroid的初学者,并不了解某些功能。 例如: import rus.pifpaf.client.data.catalog.models.Category import rus.pifpaf.client.data.main.MainRepository import rus.pifpaf.client.data.main.models.FrontDataModel import rus.pifpaf.client.data.product.models.Product import rx.Observable import rx.Single import rx.lang.kotlin.observable import java.util.* class MainInteractor { private var repository: MainRepository = MainRepository() fun getFrontData() { val cats = getCategories() val day = getDayProduct() val top = getTopProducts() return Observable.zip(cats, day, top, MainInteractor::convert) } private […]