Tag: rx java2

Mockito从来没有()不工作,然后rxjava2

我正在测试用Kotlin编写的一段相当简单的代码: o1.updateUser(params) .andThen(o2.reload()) 当updateUser失败时,我预计重装将不会被执行。 但是当使用Mockito 2和jUnit 5进行测试时,就会调用这个模拟。 嘲笑是这样做的: given(o1.updateUser(user)).willReturn(Completable.error(IllegalArgumentException()) given(o2.reload()).willReturn(Completable.complete()) 现在的问题是,代码的行为如预期的,这意味着在这种情况下,o2是从来没有实际调用,但单元测试Mockito声称有一个o1的调用。 测试断言如下所示: verify(o2, never()).reload() 我在这里做错了吗? 有没有解决的办法? 我是不是很了解操作员?

如何模拟返回Observable的被动库

所以我有知识库,它提供了客户端的Observable。 有没有办法我可以嘲笑这个存储库,所以我不需要从我的模拟器发送位置或使用真实的设备获得一些位置? 接口如下所示: interface RxLocationRepository { @SuppressLint("MissingPermission") fun onLocationUpdate(): Observable<Location> fun stopLocationUpdates() } 在我的客户端,我使用这样的: class LocationManager( val rxLocationRepository: RxLocationRepository){ private fun appendGeoEvent(location: Location) { val locationGeoEvent = LocationGeoEvent( accuracy = location.accuracy.toDouble(), latitude = location.latitude, longitude = location.longitude, timestampGeoEvent = location.time ) processGeoEvent(locationGeoEvent) } compositeDisposable.add(rxLocationRepository.onLocationUpdate() .subscribe(Consumer { location -> appendGeoEvent(location) })) …. 所以我发送这个获得的位置我的appendGeoEvent方法。 我可以使用例如Mockito,但我不知道如何嘲笑这个存储库,所以我可以使用假的位置。 另外,我想用Kotlin。

如何计算observable的执行时间

我正在寻找解决方案来衡量一些任务的执行情况 我想在某个地方开始计时 我想执行一些任务 在代码的不同部分,我希望能够停止计时器,我得到执行最终执行时间的结果 我猜这可能是真的使用EventBus来发布开始计数和停止事件。 所以我可以有一个功能: doSomething(){ //start counting } 在ohter函数中我想访问计数器并调用stop: otherFunctionWhichCanBeInvokedLater(){ //stop counter and get the duration time } 有没有很好的方法来实现这个在Rx? 我想为此使用Kotlin和RxJava2。

rxjava2 – 在线程池上执行任务的简单示例,在单个线程上订阅

我正在试验下面的任务,让我的头绕过RxJava: 给定一个URL列表 对线程池上的每个URL执行HTTP请求 为每个结果插入一些数据到SQLite数据库(这里没有多线程) 阻止方法直到完成 所以我在Kotlin试了一下: val ex = Executors.newFixedThreadPool(10) Observable.fromIterable((1..100).toList()) .observeOn(Schedulers.from(ex)) .map { Thread.currentThread().name } .subscribe { println(it + " " + Thread.currentThread().name } 我期望它打印 pool-1-thread-1 main pool-1-thread-2 main pool-1-thread-3 main pool-1-thread-4 main …. 但是它打印: pool-1-thread-1 pool-1-thread-1 pool-1-thread-1 pool-1-thread-1 pool-1-thread-1 pool-1-thread-1 任何人都可以纠正我对这是如何工作的误解? 为什么不使用线程池的所有线程? 我如何让我的订阅者在主线程上运行或阻塞直到完成?

经常调用回调方法来将事件转发到Observable?

所以我有一个自定义滑块视图的监听器。 当用户滑动滑块时,视图调用onSliderChanged(int percent) 。 我试图做一个网络调用滑块更改时,但我不想进行百万次网络调用,因为该方法是经常调用时滑动。 我怎样才能把这个监听器回调成一个Observable? 我知道一旦它可以观察,我可以使用去抖动,只有在一定的时间间隔后才能更新。 我试着做Observable.create()但我在回调方法,我不知道如何工作。 我使用Kotlin和RxJava2 btw。 override fun onSliderChanged(percent: Int) { // Either here or in the presenter I want to make this // callback reactive so I can debounce the callback presenter.onSliderChanged(percent) }

Odd TestObserver在订阅主题时的行为

考虑Kotlin中的以下RxJava 2片段: // 1. Create subject val subject = PublishSubject.create<Int>() // 2. Get observable val observable = subject.subscribeOn(Schedulers.io()) // 3. Subscribe val observer = observable.test() // 4. Trigger next subject.onNext(42) // 5. Await observer.awaitCount(1) // 6. Assert value observer.assertValue(42) 根据我的理解, observer在等待陈述5之后应该能够得到42 ,并且陈述6的陈述应该成功。 然而,实际发生的情况是: 5块直到超时,因为没有收到任何值,而6上的断言失败。 另外,如果我在3上放置一个断点,并在暂停后恢复执行,那么一切都将工作。 看起来像一个线程问题。 我显然在这里错过了一些东西。 什么才是正确的方式来消费热门可观察?

尝试使用Observable时获取Fata信号11

我正在尝试第一次尝试Kotlin,RxJava 2和新的android架构组件。 我试图从单独的线程上的房间执行删除方法,我得到ERROR: Fatal signal 11 (SIGSEGV), code 1, fault addr 0x0 in tid 12673 (RxCachedThreadS)执行该方法时, ERROR: Fatal signal 11 (SIGSEGV), code 1, fault addr 0x0 in tid 12673 (RxCachedThreadS) ,并强制关闭。 我正在尝试这两个选项: 1。 Single.fromCallable { deviceViewModel.delete(device.phone) } .subscribeOn(Schedulers.io()) .observeOn(AndroidSchedulers.mainThread()) .subscribe() 2。 Observable.just(Unit) .subscribeOn(Schedulers.io()) .map { deviceViewModel.delete(device.phone) } .subscribe() 这是我的DeviceViewModel.kt(使用AndroidViewModel) class DeviceViewModel constructor(application: Application) : AndroidViewModel(application) […]

即使数据没有更新,数值也会发出

我尝试哟使用Kotlin和RxJava 2的房间。 我有一个字符串主键的实体,我希望得到通知,当值更新,但我收到来自流动的新值,即使没有更新值,当我有任何价值。 实体 : @Entity class MarketSummaryModel(@PrimaryKey var marketName: String)… 道: @Dao interface MarketSummaryDao { @Insert(onConflict = OnConflictStrategy.REPLACE) fun insert(marketSummaryModel: MarketSummaryModel) @Query("select * from MarketSummaryModel where marketName = :marketName LIMIT 1") fun getByName(marketName: String): Flowable<MarketSummaryModel> }

重复发送对象到Rx上的子客户端

是否可以将相同的对象反复发送给Rx中的订阅者? 例如,这个代码(在Kotlin上): val exmp = listOf("А") var observable = exmp.toObservable() observable.subscribeBy( onNext = { it + "1" println(it) }, onError = { it.printStackTrace() }, onComplete = { println("Done!") } ) 我尝试将字符串值“A”重复发送到方法onNext()并获得“A111111”。 Rx库的方法重放()我明白开始发送日期再次为新的潜艇。 在从可观察的日期开始不变的循环for ,只是方法被调用几次

致命例外:当触发器处置时,RxCachedThreadScheduler-1。 为什么?

我有以下RxJava 2代码(在Kotlin中),它有一个Observable disposable = Observable.create<String>({ subscriber -> try { Thread.sleep(2000) subscriber.onNext("Test") subscriber.onComplete() } catch (exception: Exception) { subscriber.onError(exception) } }).subscribeOn(Schedulers.io()) .observeOn(AndroidSchedulers.mainThread()) .subscribe({ result -> Log.d("Test", "Completed $result") }, { error -> Log.e("Test", "Completed ${error.message}") }) 虽然它仍然是Thread.sleep(2000) ,我执行disposable?.dispose()调用,它会出错 FATAL EXCEPTION: RxCachedThreadScheduler-1 Process: com.elyeproj.rxstate, PID: 10202 java.lang.InterruptedException at java.lang.Thread.sleep(Native Method) at java.lang.Thread.sleep(Thread.java:371) at java.lang.Thread.sleep(Thread.java:313) at presenter.MainPresenter$loadData$1.subscribe(MainPresenter.kt:41) at […]