RxJava Live实时反应队列(带开关)

在kotlin上开发一个android应用程序。

我需要建立一个系统,以便能够从一个实时队列中进行工作(并观察流中的工作结果)。

但是我也必须能够根据像networkIsAvailable (Observable<Boolean>)这样的一对外部因素(也是以流的形式出现)切换“队列处理”。

我不能使用Observable.fromIterable()因为这会立即创建迭代,这个队列将会调整,并且项目可能会被删除。

我需要一些可以完成项目的循环,检查以确保我们应该继续前进,然后弹出队列中的第一项并执行此操作。

我不确定如何在订阅中做这样的循环?

队列也可以是空的,当切换开启时,事情应该重新开始。

也许我应该推出这个决定(关于是否处理队列中的下一个项目)到Subject<Boolean>? 然后订阅那个重新启动过程的主题?

例子:

打开—-处理队列顶部,处理队列顶部(先前被轮询掉)—关闭 – 不再处理

再次开启 – 队列进程顶部,队列空 – 停止

将项目添加到队列 – 进程 – 停止队列空

打开处理 – 将项目添加到队列 – 直到它重新打开时才进行处理

打开 – 处理顶部项目

你可以使用RxJava2Extensions中的 valve()运算符。

 public PublishProcessor<Boolean> flowControl = PublishProcessor.create(); public void start() { Flowable./*...*/ .compose(FlowableTransformers.valve(flowControl)) .subscribe(/*...*/); 

在这种情况下, flowControl.onNext(true)将启动流,并且flowControl.onNext(false)将停止并排队。

(任何其他Rx2类型必须转换为Flowable来使用它。)