Odd TestObserver在订阅主题时的行为

考虑Kotlin中的以下RxJava 2片段:

// 1. Create subject val subject = PublishSubject.create<Int>() // 2. Get observable val observable = subject.subscribeOn(Schedulers.io()) // 3. Subscribe val observer = observable.test() // 4. Trigger next subject.onNext(42) // 5. Await observer.awaitCount(1) // 6. Assert value observer.assertValue(42) 

根据我的理解, observer在等待陈述5之后应该能够得到42 ,并且陈述6的陈述应该成功。

然而,实际发生的情况是: 5块直到超时,因为没有收到任何值,而6上的断言失败。

另外,如果我在3上放置一个断点,并在暂停后恢复执行,那么一切都将工作。 看起来像一个线程问题。

我显然在这里错过了一些东西。 什么才是正确的方式来消费热门可观察?

通过解除subscribeOn ,你把另一个线程上的Subject的实际订阅,然后可能需要更长一点,这样subject.onNext(42)仍然找到一个退订的Subject

除了在PublishSubject上使用subscribeOn没有实际用途,您可以通过循环等待订阅:

 while (!subject.hasObservers()) { Thread.sleep(1); }