RxJava推迟了可观察到的火灾

我有一个使用RxJava observables设置的事件序列。 基本上,我使用observable.delay(time, timeUnit, scheduler)函数合并了由Observable.just(Events.*)创建的不同延迟创建的不同事件。 然后将它们发布到PublishSubject (下面的代码中的events )并订阅该PublishSubject以观察下面的代码中的序列( observeEvents()函数)。 它曾经工作得很好,但最近我在我的设备上看到了一个非常奇怪的行为(OnePlus One与android 5.0.2)(并没有在模拟器上看到它)。 基本上事件会混淆起来,延迟较高的事件可能会在延迟较小的事件之前发生,延迟较小的事件可能会发生在队列的末尾,有时候所有的事件都会以正确的顺序出现。 前三个赛事经常混合在一起。 有时候根本没有观察到一些事件。 这里会发生什么?

代码在Kotlin:

 var computationScheduler = Schedulers.computation() private val events: PublishSubject = PublishSubject.create() private val userActionSubject: PublishSubject = PublishSubject.create() Observable.merge( event0(), event1(), event2(), userActionOrEvent3(), userActionOrEvent4()) .subscribe({ // Weird timings are observed here already events.onNext(it) }, { e -> events.onError(e) })) private fun userActionOrEvent4(): Observable { return Observable.amb(Observable.just(Events.Event4) .delay(12800, TimeUnit.MILLISECONDS, computationScheduler), userActionSubject.asObservable().subscribeOn(computationScheduler)) .take(1) } private fun userActionOrEvent3(): Observable { return Observable.amb(Observable.just(Events.Event3) .delay(2800, TimeUnit.MILLISECONDS, computationScheduler), userActionSubject.asObservable().subscribeOn(computationScheduler)) .take(1) } private fun event2() = Observable.just(Events.Event2) .delay(1800, TimeUnit.MILLISECONDS, computationScheduler) private fun event1() = Observable.just(Events.Event1) .delay(200, TimeUnit.MILLISECONDS, computationScheduler) private fun event0() = Observable.just(Events.Event0) .subscribeOn(computationScheduler) open fun observeEvents(): Observable = events.asObservable().observeOn(AndroidSchedulers.mainThread()) open fun onUserAction() { userActionSubject.onNext(Events.Action) } 

这是因为你正在使用merge()而不是concat

这篇文章将向你解释区别https://medium.com/fueled-android/rxify-a-simple-spell-for-complex-rxjava-operators-part-2-b82b379f5c7f#.ekppat50o

原来问题是由于计算Schedulers.computation() ,当我改变Schedulers.computation() Schedulers.newThread()事件开始在预期的时间触发。