我怎样才能暂停一个事件流经一个可观察的?

我正在尝试创建一个可以暂停的Observable,这样一来,物品就会停止通过可观察物体,直到它变为未被暂停为止。

在这一点上,我希望它恢复处理所有未处理的项目。 我的数据源来自课堂之外,所以我现在看起来像这样:

class Agent { val publisher = PublishSubject.create<Event>() val subscription = createSubscription() fun trackEvent(e: Event) { publisher.onNext(e) } fun pause() { // ??? } fun resume() { // ??? } private fun createSubscription(): Subscription { return publisher .map { stringify(it) } .buffer(10L, TimeUnit.SECONDS, 500) // capture 500 events or 10 seconds worth, whichever comes first. .map { /* create HttpPost request */ } .flatMap { /* send request to server */ } .subscribe { println("Received response: $it") } } } 

我的目标是pause功能将停止事件甚至去服务器(但将坚持到他们直到最终的resume )。 在resume发生的时候,我们会发送事件。 (显然,如果我们在暂停状态下有太多的事件,我们会增加一些额外的背压帮助。

我已经尝试了缓冲和窗口的各种用途,以使这项工作,但它从来没有实际上暂停观察。 相反,发生以下两件事之一:

  1. 该事件被完全丢弃(在取消订阅,过滤器等的情况下)
  2. 事件流经,好像什么也没有发生。

我能做些什么来支持这个用例吗? 或者,我是否应该在以上两个结果之一会发生什么?

诀窍是使用另一个BehaviorSubject作为附加缓冲的关闭事件:

 val publisher = PublishSubject.create<Event>() fun trackEvent(e: Event) { publisher.onNext(e) isPaused.onNext(isPaused.value) } val isActive = BehaviorSubject.create(true) fun pause() { isActive.onNext(false) } fun resume() { isActive.onNext(true) } private fun createSubscription(): Subscription { return publisher .buffer(10L, TimeUnit.SECONDS, 500) // -> Observable<List<Event>> .buffer({ isActive.filter { it } }) // -> Observable<List<List<Event>>> .flatMap { Observable.from(it) } // -> Observable<List<Event>> .map { /* create HttpPost request */ } .flatMap { /* send request to server */ } .subscribe { println("Received response: $it") } } 

第一个buffer调用会将传入的事件放入具有指定大小的桶中 ,或者经过一段时间。 第二个buffer将关闭由observable发出的事件,指示Agent未暂停( isActive.filter { it } )的当前isActive在每个事件上都发出一个值,因为isActiveBehaviorSubject它会向每个新的订阅者isActive它的最后一个值。 在第一次buffer调用发出的每个桶中 ,它将立即继续或等待Agent恢复。