RxJava Live实时反应队列(带开关)
在kotlin上开发一个android应用程序。
我需要建立一个系统,以便能够从一个实时队列中进行工作(并观察流中的工作结果)。
但是我也必须能够根据像networkIsAvailable (Observable<Boolean>)
这样的一对外部因素(也是以流的形式出现)切换“队列处理”。
我不能使用Observable.fromIterable()
因为这会立即创建迭代,这个队列将会调整,并且项目可能会被删除。
我需要一些可以完成项目的循环,检查以确保我们应该继续前进,然后弹出队列中的第一项并执行此操作。
我不确定如何在订阅中做这样的循环?
队列也可以是空的,当切换开启时,事情应该重新开始。
也许我应该推出这个决定(关于是否处理队列中的下一个项目)到Subject<Boolean>?
然后订阅那个重新启动过程的主题?
例子:
打开—-处理队列顶部,处理队列顶部(先前被轮询掉)—关闭 – 不再处理
再次开启 – 队列进程顶部,队列空 – 停止
将项目添加到队列 – 进程 – 停止队列空
打开处理 – 将项目添加到队列 – 直到它重新打开时才进行处理
打开 – 处理顶部项目
你可以使用RxJava2Extensions中的 valve()
运算符。
public PublishProcessor<Boolean> flowControl = PublishProcessor.create(); public void start() { Flowable./*...*/ .compose(FlowableTransformers.valve(flowControl)) .subscribe(/*...*/);
在这种情况下, flowControl.onNext(true)
将启动流,并且flowControl.onNext(false)
将停止并排队。
(任何其他Rx2类型必须转换为Flowable来使用它。)