我怎样才能明确表示RxJava中Flowable的完成?
我正在尝试创建一个包装Iterable
。 我周期性地将元素推到我的Iterable
但是看起来完成事件是隐含的。 我不知道如何表示处理完成。 例如在我的代码中:
// note that this code is written in Kotlin val iterable = LinkedBlockingQueue<Int>() iterable.addAll(listOf(1, 2, 3)) val flowable = Flowable.fromIterable(iterable) .subscribeOn(Schedulers.computation()) .observeOn(Schedulers.computation()) flowable.subscribe(::println, {it.printStackTrace()}, {println("completed")}) iterable.add(4) Thread.sleep(1000) iterable.add(5) Thread.sleep(1000)
这打印:
1 2 3 4完成
我检查了Flowable
接口的来源,但是似乎我不能明确指出Flowable
是完整的。 我怎么能这样做? 在我的程序中,我发布了一些在它们之间有一些延迟的事件,我想明确何时complete
事件流程。
澄清 :我有一个漫长的运行过程,发出事件。 我收集他们在一个队列中,我公开一个方法返回一个Flowable包裹我的队列。 问题是创建Flowable时队列中可能已经有元素了。 我只会处理事件一次,并且知道事件流何时停止,所以我知道何时需要完成Flowable。
使用.fromIterable
是为您的用例创建.fromIterable
的错误方法。
我实际上并没有清楚这是什么用例,但你可能想要使用PublishSubject
Flowable.create()
或PublishSubject
val flowable = Flowable.create<Int>( { it.onNext(1) it.onNext(2) it.onComplete() }, BackpressureStrategy.MISSING) val publishSubject = PublishSubject.create<Int>() val flowableFromSubject = publishSubject.toFlowable(BackpressureStrategy.MISSING) //This data will be dropepd unless something is subscribed to the flowable. publishSubject.onNext(1) publishSubject.onNext(2) publishSubject.onComplete()
当然,如何处理背压将取决于数据来源的性质。
就像akarnokd所建议的那样, ReplayProcessor正是你想要的。 用processor.onNext(item)
替换iterable.add(item)
,并在完成时调用processor.onComplete()
。