与RxJava的PublishSubject竞争条件

将PublishSubject与blockingGet()一起使用时,似乎存在订阅者未收到事件的争用情况。 我在Kotlin中附加了一个基本的JUnit测试,它有两种方法。

rxTestBroken()用PublishSubject显示中断的行为。
rxTestOk()表明,一切正常行为主体,因为后者重播最后一个事件的情况下,用户没有订阅的时间。

这种竞争条件从哪里来,并使用BehaviorSubject正确的修复?

import io.reactivex.Single import io.reactivex.subjects.BehaviorSubject import io.reactivex.subjects.PublishSubject import io.reactivex.subjects.Subject import org.assertj.core.api.Assertions.assertThat import org.junit.Test import java.util.concurrent.TimeUnit class StateMachine(val stateSubject: Subject) { companion object { val STATE_IDLE = 1 val STATE_READY = 2 } val output = 10L var currentState = STATE_IDLE fun scheduleNextState(nextState: Int) { Thread(Runnable { currentState = nextState stateSubject.onNext(currentState) }).start() } fun start() = scheduleNextState(STATE_READY) fun stop() = scheduleNextState(STATE_IDLE) } class RxTest { fun stateOutput(stateSubject: Subject): Single { val stateMachine = StateMachine(stateSubject) val waitForIdle = stateSubject .startWith(stateMachine.currentState) .doOnNext { if (it != StateMachine.STATE_IDLE) { stateMachine.stop() } } .filter { it == StateMachine.STATE_IDLE } .firstOrError() val scanFile = stateSubject .doOnSubscribe { stateMachine.start() } .filter { when (it) { StateMachine.STATE_READY -> true StateMachine.STATE_IDLE -> false else -> throw RuntimeException("Wrong state $it") } } .firstOrError() .map { stateMachine.output.toInt() } .doFinally { stateMachine.stop() } return waitForIdle.flatMap { scanFile }.timeout(1, TimeUnit.SECONDS).onErrorReturnItem(-1) } @Test fun rxTestBroken() { for (i in 1..10000) { assertThat(stateOutput(PublishSubject.create()).blockingGet()) .withFailMessage("worked $i times") .isEqualTo(10) } } @Test fun rxTestOk() { for (i in 1..10000) { assertThat(stateOutput(BehaviorSubject.createDefault(StateMachine.STATE_IDLE)).blockingGet()) .withFailMessage("worked $i times") .isEqualTo(10) } } }