Odd TestObserver在订阅主题时的行为
考虑Kotlin中的以下RxJava 2片段:
// 1. Create subject val subject = PublishSubject.create<Int>() // 2. Get observable val observable = subject.subscribeOn(Schedulers.io()) // 3. Subscribe val observer = observable.test() // 4. Trigger next subject.onNext(42) // 5. Await observer.awaitCount(1) // 6. Assert value observer.assertValue(42)
根据我的理解, observer
在等待陈述5之后应该能够得到42
,并且陈述6的陈述应该成功。
然而,实际发生的情况是: 5块直到超时,因为没有收到任何值,而6上的断言失败。
另外,如果我在3上放置一个断点,并在暂停后恢复执行,那么一切都将工作。 看起来像一个线程问题。
我显然在这里错过了一些东西。 什么才是正确的方式来消费热门可观察?
通过解除subscribeOn
,你把另一个线程上的Subject
的实际订阅,然后可能需要更长一点,这样subject.onNext(42)
仍然找到一个退订的Subject
。
除了在PublishSubject
上使用subscribeOn
没有实际用途,您可以通过循环等待订阅:
while (!subject.hasObservers()) { Thread.sleep(1); }