RxJava2 PublishSubject订阅服务器在使用SingleScheduler从多个线程调用时无法接收项目

我有以下的单元测试,我试图从不同的线程发送10个String ,并测试我从一个线程接收这些String 。 我的问题是,这个测试襟翼。 有时会成功,但有时我只会收到89项目,之后测试会挂起,直到闩锁超时。 我SingleScheduler以错误的方式使用SingleScheduler ? 我错过了别的吗?

 val consumerCallerThreadNames = mutableSetOf<String>() val messageCount = AtomicInteger(0) val latch = CountDownLatch(MESSAGE_COUNT) @Test fun someTest() { val msg = "foo" val subject = PublishSubject.create<String>() subject .observeOn(SingleScheduler()) .subscribe({ message -> consumerCallerThreadNames.add(Thread.currentThread().name) messageCount.incrementAndGet() latch.countDown() }, Throwable::printStackTrace) 1.rangeTo(MESSAGE_COUNT).forEach { Thread({ try { subject.onNext(msg) } catch (t: Throwable) { t.printStackTrace() } }).start() } latch.await(10, SECONDS) assertThat(consumerCallerThreadNames).hasSize(1) assertThat(messageCount.get()).isEqualTo(MESSAGE_COUNT) } companion object { val MESSAGE_COUNT = 10 } 

如果我重写这个使用单线程ExecutorService测试不再掀翻,所以问题是与Rx或我对Rx的知识缺乏。

RxJava有一个要求,打电话on*不会在同一时间发生。 这意味着你的代码不是线程安全的。

由于只有主题本身以并发方式使用,因此应该使用Subject<T>.toSerialized()方法序列化(本质上是Java的“同步”)主题本身。

val subject = PublishSubject.create<String>()变成val subject = PublishSubject.create<String>().toSerialized()