Rx-Kotlin awaitTerminalEvent永远不会完成

我想更好地了解如何使用Rx-Kotlin进行单元测试,但是我还没有能够成功地将主题设置为“完成”。 因此,我总是等待5秒的时间(onComplete应该立即),然后在assertComplete上失败。

我对awaitTerminalEvent的理解是,它应该只阻塞,直到onComplete被调用。 我也看了一下TestScheduler,但是我不认为应该在这里需要。

任何帮助或文件,可以带领我在正确的方向将不胜感激。

@Test fun testObservable() { val subject = BehaviorSubject.create<Int>() subject.onNext(0) TestSubscriber<Int>().apply { subject.subscribe({ System.out.println(it) subject.onNext(1) subject.onComplete() }) this.awaitTerminalEvent(5, TimeUnit.SECONDS) this.assertComplete() this.assertValue(1) } } 

您错误地使用了错误的工具…

  • TestSubscriber用于测试TestSubscriber ,你应该在这里使用TestObserver
  • 您应该使用TestObserver (或者TestSubscriber中的TestObserver订阅,以便监控排放,并能够等待终端事件和断言值。 在你的代码中, TestSubscriber并没有附加到任何流,所以它永远不会得到任何事件。

试图模仿你的代码,它可能是这样的:

  @Test fun testObservable() { val subject = BehaviorSubject.create<Int>() subject.onNext(0) TestObserver<Int>().apply { subject.doOnNext { System.out.println(it) subject.onNext(1) subject.onComplete() } .subscribe(this) this.awaitTerminalEvent(5, TimeUnit.SECONDS) this.assertComplete() this.assertValue(1) } } 

正如你所看到的,我正在使用TestObserver ,订阅是通过TestObserver对象完成的, onNext()onComplete()移动到了doOnNext() 。 测试将失败,因为您有两个发射值,而测试仅对单个“1”值声明。

一般来说,这是错误的,你使用主题onNext()再次发射,然后调用onComplete() ,你可以订阅之前,然后发射到外面。 像这样的东西:

 TestObserver<Int>().apply { subject.subscribe(this) subject.onNext(1) subject.onComplete() .... }