在Rx中设置调度程序的顺序

我对Rx很新,只是想知道subscribeOn的顺序如何影响Observable

 //This will not print anything Observable.just("whatever") .flatMap { s -> Observable.just(s.length) } .subscribeOn(Schedulers.newThread()) .subscribe(::println) //This prints the length Observable.just("whatever") .subscribeOn(Schedulers.newThread()) .flatMap { s -> Observable.just(s.length) } .subscribe(::println) 

发生了什么事情,为什么?

RxJava调度程序使用守护程序线程,如果Java的“主”线程退出,它可能会停止或根本不能运行。 当我运行这两个设置,有时第二个不打印任何东西,有时它, 你看到的东西是概率的。

这实际上取决于newThread反应并执行上游代码的速度。 对于第二种情况,由于在主线程上发生了更长的订阅集合,在订阅之后执行返回的时间, newThread可能有足够的时间来触发打印。

如果你把Thread.sleep(1000)你会看到两个打印的价值。

其实他们都打印结果。 唯一的区别是时机。 这是我认为是对所观察行为的解释 – 在第一种情况下, flatMap直接订阅main线程和println在新线程上的结果。 产生一个新的线程和println都是密集的操作需要时间来执行和main线程退出之前。

在第二种情况下, flatMap订阅一个新线程, println订阅main线程上的flatMap 。 因此,由于依赖后产生一个新的线程, main线程保持更多的占用,我们看到main退出前的结果。

您可以尝试在最后一行中使用下面的代码执行相同的操作,以查看不同之处。

 Observable.timer(3, TimeUnit.SECONDS).toBlocking().subscribe();