暂停/恢复RX中的定时器/延时

我正在尝试使用rx-Java来暂停/恢复延迟的操作,而且令人惊讶的是,我找不到任何有关如何操作的细节。

显然,我知道如何通过创建一个特定的Timer线程,并跟踪时间,但我正在寻找一个更优雅和被动的方式。

我有三个不同的observable, playDetected ,一个用于pauseDetected ,一个用于stopDetected 。 我想在PLAY的某个延迟之后发出一些东西,但是当我的暂停observable发出时暂停,并且当我再次获得PLAY时恢复

我到目前为止:(它是用kotlin编写的,但是Java ,伪代码或者任何语言都可以用于答案)

 val playSubscription = playDetected .delay(DELAY, SECONDS, schedulers.computation) .subscribe { emitFinalEvent(it) } stopDetected.subscribe { playSubscription.unsubscribe() } 

我的延迟工作,当我检测到一个STOP ,它成功地消除了延迟,使下一个PLAY可以再次启动它。 但是,如何暂停和恢复时,暂停pauseDetected发出的东西?

以下是我如何做到的:

 playDetected .doOnNext { if (trackIsDifferent(it)) resetTimer() trackPlaying.set(it.track) } .switchMap { state -> interval(1, SECONDS, schedulers.computation) .doOnNext { currentTimer.incrementAndGet() } .takeUntil(merge(pauseDetected, stopDetected.doOnNext { resetTimer() })) .filter { currentTimer.get() == DELAY } .map { state } }.subscribe { emitFinalEvent(it)) } 

有:

 private val trackPlaying = AtomicReference<Track>() private val currentTimer = AtomicLong() private fun resetTimer() { currentTimer.set(0) } private fun trackIsDifferent(payload: StateWithTrack) = payload.track != trackPlaying.get() 

前一段时间,我也在寻找RX“定时器”解决方案,但没有达到我的预期。 所以你可以找到我自己的解决方案:

 AtomicLong elapsedTime = new AtomicLong(); AtomicBoolean resumed = new AtomicBoolean(); AtomicBoolean stopped = new AtomicBoolean(); public Flowable<Long> startTimer() { //Create and starts timper resumed.set(true); stopped.set(false); return Flowable.interval(1, TimeUnit.SECONDS) .takeWhile(tick -> !stopped.get()) .filter(tick -> resumed.get()) .map(tick -> elapsedTime.addAndGet(1000)); } public void pauseTimer() { resumed.set(false); } public void resumeTimer() { resumed.set(true); } public void stopTimer() { stopped.set(true); } public void addToTimer(int seconds) { elapsedTime.addAndGet(seconds * 1000); } 

在延迟的情况下,只是有延迟的过载interval功能。