Tag: RX java

暂停/恢复RX中的定时器/延时

我正在尝试使用rx-Java来暂停/恢复延迟的操作,而且令人惊讶的是,我找不到任何有关如何操作的细节。 显然,我知道如何通过创建一个特定的Timer线程,并跟踪时间,但我正在寻找一个更优雅和被动的方式。 我有三个不同的observable, playDetected ,一个用于pauseDetected ,一个用于stopDetected 。 我想在PLAY的某个延迟之后发出一些东西,但是当我的暂停observable发出时暂停,并且当我再次获得PLAY时恢复 我到目前为止:(它是用kotlin编写的,但是Java ,伪代码或者任何语言都可以用于答案) val playSubscription = playDetected .delay(DELAY, SECONDS, schedulers.computation) .subscribe { emitFinalEvent(it) } stopDetected.subscribe { playSubscription.unsubscribe() } 我的延迟工作,当我检测到一个STOP ,它成功地消除了延迟,使下一个PLAY可以再次启动它。 但是,如何暂停和恢复时,暂停pauseDetected发出的东西?

从正确的线程调用RxJava2可取消/一次性

我正在实现一个可观察的Resource从一个Resource发出线。 问题是,这个资源真的不喜欢被关闭从一个不同的线程,它创建(它杀死一只小狗,抛出异常,当这种情况发生)。 当我处理订阅时,从main线程调用资源Cancellable / Disposable ,而在Schedulers.io()上订阅observable。 这是Kotlin代码: fun lines(): Observable<String> = Observable.create { emitter -> val resource = NetworkResource() emitter.setCancellable { resource.close() // <– main thread 🙁 } try { while (!emitter.isDisposed) emitter.onNext(resource.readLine()) // <– blocked here! } catch (ioe: IOException) { emitter.tryOnError(ioe) // <– this also triggers the cancellable } } val disposable = […]

RxJava Observable to Completable,如何避免toBlocking()

我目前正在使用Kotlin Android上的RxJava,但我有一个问题,我不能解决不使用toBlocking()。 我有雇员服务的方法返回一个Observable>: fun all(): Observable<List<Employee>> 这是完全正确的,因为每当员工发生变化时,Observable就会发布新的员工列表。 但我想从员工生成一个PDF文件,显然不需要每次员工变动时都运行。 另外,我想从我的PDF生成器方法返回一个Completable对象。 我想为我的PDF添加一个标题,然后遍历员工并计算每位员工的工资,这也会返回一个Observable,这是我现在正在使用toBlocking的地方。 我目前的做法是这样的: private fun generatePdf(outputStream: OutputStream): Completable { return employeeService.all().map { employees -> try { addHeaderToPDF() for (i in employees) { val calculated = employeeService.calculateWage(i.id).toBlocking().first() // Print calculated to PDF…. } addFooterToPDF() return @map Completable.complete() } catch (e: Exception) { return @map Completable.error(e) } }.first().toCompletable() 有什么办法可以使这个代码更清洁使用RxJava? […]

在java类中调用扩展函数作为任何RX操作符

我创建了一个扩展函数, fun <T> Observable<T>.subscribeWithErrorHandling(onNext: (T) -> Unit ,onError: ((throwable: Throwable) -> Unit)? = null): Subscription { //doing stuff } 在kotlin类中,我将能够以这种方式使用它 observable.subscribeWithErrorHandling(…) 现在,我想在我的java类中使用这个函数。 我已经看到,你可以静态调用它: MyExtensionFile.subscribeWithErrorHandling 但在我的情况下,你需要别的东西,因为它是一个RX流的中间部分。 这是我坚持的部分。 这甚至可能吗? 或没有办法做这样的事情,从Java代码?

如何模拟返回Observable的被动库

所以我有知识库,它提供了客户端的Observable。 有没有办法我可以嘲笑这个存储库,所以我不需要从我的模拟器发送位置或使用真实的设备获得一些位置? 接口如下所示: interface RxLocationRepository { @SuppressLint("MissingPermission") fun onLocationUpdate(): Observable<Location> fun stopLocationUpdates() } 在我的客户端,我使用这样的: class LocationManager( val rxLocationRepository: RxLocationRepository){ private fun appendGeoEvent(location: Location) { val locationGeoEvent = LocationGeoEvent( accuracy = location.accuracy.toDouble(), latitude = location.latitude, longitude = location.longitude, timestampGeoEvent = location.time ) processGeoEvent(locationGeoEvent) } compositeDisposable.add(rxLocationRepository.onLocationUpdate() .subscribe(Consumer { location -> appendGeoEvent(location) })) …. 所以我发送这个获得的位置我的appendGeoEvent方法。 我可以使用例如Mockito,但我不知道如何嘲笑这个存储库,所以我可以使用假的位置。 另外,我想用Kotlin。

如何计算observable的执行时间

我正在寻找解决方案来衡量一些任务的执行情况 我想在某个地方开始计时 我想执行一些任务 在代码的不同部分,我希望能够停止计时器,我得到执行最终执行时间的结果 我猜这可能是真的使用EventBus来发布开始计数和停止事件。 所以我可以有一个功能: doSomething(){ //start counting } 在ohter函数中我想访问计数器并调用stop: otherFunctionWhichCanBeInvokedLater(){ //stop counter and get the duration time } 有没有很好的方法来实现这个在Rx? 我想为此使用Kotlin和RxJava2。

以递归方式将Rx选项组合成Obserbles

比方说,我有一个名为s_0的Single ,它可以从T类型发出一个元素t_0 ,也可以失败(在某些语言中,这可能是Single<T> )。 那是: s_0: — t_0 // Success OR s_0: — X // Failure T类型的实例有一个next()方法,它返回T类型的一个可选的Single ( Kotlin中的Single<T>? )。 这种行为导致一个能够发射一个T实例链的Single实例链,其中每个单独的s_i可以发出一个能够返回下一个单独的s_i+1的元素t_i+1 ,这将发出一个元素t_i+1等等,直到最后一个元素t_n-1没有返回单个或任何单数失败: s_0: — t_0 ↓ s_1: — t_1 ↓ s_2: — t_2 … ↓ s_n-1: — t_n-1 ↓ null OR s_0: — t_0 ↓ s_1: — t_1 ↓ s_2: — t_2 … ↓ s_i: […]

经常调用回调方法来将事件转发到Observable?

所以我有一个自定义滑块视图的监听器。 当用户滑动滑块时,视图调用onSliderChanged(int percent) 。 我试图做一个网络调用滑块更改时,但我不想进行百万次网络调用,因为该方法是经常调用时滑动。 我怎样才能把这个监听器回调成一个Observable? 我知道一旦它可以观察,我可以使用去抖动,只有在一定的时间间隔后才能更新。 我试着做Observable.create()但我在回调方法,我不知道如何工作。 我使用Kotlin和RxJava2 btw。 override fun onSliderChanged(percent: Int) { // Either here or in the presenter I want to make this // callback reactive so I can debounce the callback presenter.onSliderChanged(percent) }

Odd TestObserver在订阅主题时的行为

考虑Kotlin中的以下RxJava 2片段: // 1. Create subject val subject = PublishSubject.create<Int>() // 2. Get observable val observable = subject.subscribeOn(Schedulers.io()) // 3. Subscribe val observer = observable.test() // 4. Trigger next subject.onNext(42) // 5. Await observer.awaitCount(1) // 6. Assert value observer.assertValue(42) 根据我的理解, observer在等待陈述5之后应该能够得到42 ,并且陈述6的陈述应该成功。 然而,实际发生的情况是: 5块直到超时,因为没有收到任何值,而6上的断言失败。 另外,如果我在3上放置一个断点,并在暂停后恢复执行,那么一切都将工作。 看起来像一个线程问题。 我显然在这里错过了一些东西。 什么才是正确的方式来消费热门可观察?

RxJava:如何返回null的正确类型

我正在开发一个使用Firebase,Kotlin和RxJava的应用程序。 基本上,我需要做的是使用Firebase中的Auth注册用户,如果用户选择了一张照片,上传照片,然后将用户从Firebase中保存到数据库中。 到现在为止,我有这个 RxFirebaseAuth.createUserWithEmailAndPassword(auth, email, password) .map { authResult -> user.uid = authResult.user.uid authResult.user.uid } .flatMap<UploadTask.TaskSnapshot>( { uid -> if (imageUri != null) RxFirebaseStorage.putFile(mFirebaseStorage .getReference(STORAGE_IMAGE_REFERENCE) .child(uid), imageUri) else Maybe.empty<UploadTask.TaskSnapshot>() } ) .map { taskSnapshot -> user.photoUrl = taskSnapshot.downloadUrl!!.toString() } .map { RxFirebaseDatabase .setValue(mFirebaseDatabase.getReference("user") .child(user.uid), user).subscribe() } .doOnComplete { appLocalDataStore.saveUser(user) } .toObservable() 当用户选择一张照片时,它正常工作,但是当它没有被选中时,其他地图被忽略,因为我返回了Maybe.empty()。 我应该如何实现这个使用或不使用用户照片? 谢谢。