Tag: 响应编程

如何创建rx.Single的缓存/热版本?

RxJava v1.0.13引入了一种新的Observable: rx.Single 。 它非常适合请求 – 响应模型,但缺乏像doOnNext()这样的操作符的标准副作用。 所以,要做出多种事情就更困难了。 我的想法是将doOnNext()替换为同一个Single实例的多个订阅。 但这可能导致下层工作多次完成:每次订阅一次。 示例rx.Single实现: private class WorkerSubscribe<SomeData>() : Single.OnSubscribe<SomeData> { override fun call(sub: SingleSubscriber<in SomeData>) { try { val result = fetchSomeData() sub.onSuccess(result) } catch(t: Throwable) { sub.onError(t) } } } val single = Single.create<SomeData>(WorkerSubscribe()) 用法: single.subscribe({}, {}) single.subscribe({}, {}) // Data is fetched for the second time […]

暂停/恢复RX中的定时器/延时

我正在尝试使用rx-Java来暂停/恢复延迟的操作,而且令人惊讶的是,我找不到任何有关如何操作的细节。 显然,我知道如何通过创建一个特定的Timer线程,并跟踪时间,但我正在寻找一个更优雅和被动的方式。 我有三个不同的observable, playDetected ,一个用于pauseDetected ,一个用于stopDetected 。 我想在PLAY的某个延迟之后发出一些东西,但是当我的暂停observable发出时暂停,并且当我再次获得PLAY时恢复 我到目前为止:(它是用kotlin编写的,但是Java ,伪代码或者任何语言都可以用于答案) val playSubscription = playDetected .delay(DELAY, SECONDS, schedulers.computation) .subscribe { emitFinalEvent(it) } stopDetected.subscribe { playSubscription.unsubscribe() } 我的延迟工作,当我检测到一个STOP ,它成功地消除了延迟,使下一个PLAY可以再次启动它。 但是,如何暂停和恢复时,暂停pauseDetected发出的东西?