在Rx中设置调度程序的顺序
我对Rx很新,只是想知道subscribeOn
的顺序如何影响Observable
//This will not print anything Observable.just("whatever") .flatMap { s -> Observable.just(s.length) } .subscribeOn(Schedulers.newThread()) .subscribe(::println) //This prints the length Observable.just("whatever") .subscribeOn(Schedulers.newThread()) .flatMap { s -> Observable.just(s.length) } .subscribe(::println)
发生了什么事情,为什么?
RxJava调度程序使用守护程序线程,如果Java的“主”线程退出,它可能会停止或根本不能运行。 当我运行这两个设置,有时第二个不打印任何东西,有时它, 你看到的东西是概率的。
这实际上取决于newThread
反应并执行上游代码的速度。 对于第二种情况,由于在主线程上发生了更长的订阅集合,在订阅之后执行返回的时间, newThread
可能有足够的时间来触发打印。
如果你把Thread.sleep(1000)
你会看到两个打印的价值。
其实他们都打印结果。 唯一的区别是时机。 这是我认为是对所观察行为的解释 – 在第一种情况下, flatMap
直接订阅main
线程和println
在新线程上的结果。 产生一个新的线程和println
都是密集的操作需要时间来执行和main
线程退出之前。
在第二种情况下, flatMap
订阅一个新线程, println
订阅main
线程上的flatMap
。 因此,由于依赖后产生一个新的线程, main
线程保持更多的占用,我们看到main
退出前的结果。
您可以尝试在最后一行中使用下面的代码执行相同的操作,以查看不同之处。
Observable.timer(3, TimeUnit.SECONDS).toBlocking().subscribe();