Kotlin:如何将使用Thread.sleep的测试转换为RxJava TestScheduler

我正在写一个工具测试,它检查是否当我缓存东西到一个接收缓冲区,并经过一段时间(10秒)这个主题插入缓冲值到我的房间数据库。

当我使用Thread.sleep(syncTimeInterval)时,测试是正确的。 我想用TestScheduler编写这个相同的测试。

这里是Thread.sleep版本(通过测试):

@Test fun testMultipleLogs() { val loadAllCloudCallBefore = appDatabase.logCloudCallDao().loadAll() val loadAllLogNewSessionBefore = appDatabase.logNewSessionDao().loadAll() assertEquals(0, loadAllCloudCallBefore.size) assertEquals(0, loadAllLogNewSessionBefore.size) Observable.interval(1, TimeUnit.SECONDS) .take(20) .subscribe { logManager.logNewSession() } Observable.interval(1, TimeUnit.SECONDS) .take(20) .subscribe { logManager.logCloudCall("url", "callgoup") } Observable.interval(1, TimeUnit.SECONDS) .take(20) .subscribe { logManager.logNewSession() } Observable.interval(1, TimeUnit.SECONDS) .take(20) .subscribe { logManager.logCloudCall("url", "callgoup") } Observable.interval(1, TimeUnit.SECONDS) .take(20) .subscribe { logManager.logNewSession() } Observable.interval(1, TimeUnit.SECONDS) .take(20) .subscribe { logManager.logCloudCall("url", "callgoup") } Thread.sleep(30000) val loadAllCloudCallAfter = appDatabase.logCloudCallDao().loadAll() val loadAllLogNewSessionAfter = appDatabase.logNewSessionDao().loadAll() assertEquals(60, loadAllCloudCallAfter.size) assertEquals(60, loadAllLogNewSessionAfter.size) } 

在这里,这个测试不通过,预期时间由TestScheduler提前的大小为0(不是60)

 @Test fun testMultipleLogs() { var testScheduler: TestScheduler = TestScheduler() val loadAllCloudCallBefore = appDatabase.logCloudCallDao().loadAll() val loadAllLogNewSessionBefore = appDatabase.logNewSessionDao().loadAll() assertEquals(0, loadAllCloudCallBefore.size) assertEquals(0, loadAllLogNewSessionBefore.size) Observable.interval(1, TimeUnit.SECONDS, testScheduler) .take(20) .subscribe { logManager.logNewSession() } Observable.interval(1, TimeUnit.SECONDS, testScheduler) .take(20) .subscribe { logManager.logCloudCall("url", "callgoup") } Observable.interval(1, TimeUnit.SECONDS, testScheduler) .take(20) .subscribe { logManager.logNewSession() } Observable.interval(1, TimeUnit.SECONDS, testScheduler) .take(20) .subscribe { logManager.logCloudCall("url", "callgoup") } Observable.interval(1, TimeUnit.SECONDS, testScheduler) .take(20) .subscribe { logManager.logNewSession() } Observable.interval(1, TimeUnit.SECONDS, testScheduler) .take(20) .subscribe { logManager.logCloudCall("url", "callgoup") } testScheduler.advanceTimeBy(21, TimeUnit.SECONDS) val loadAllCloudCallAfter = appDatabase.logCloudCallDao().loadAll() val loadAllLogNewSessionAfter = appDatabase.logNewSessionDao().loadAll() assertEquals(60, loadAllCloudCallAfter.size) assertEquals(60, loadAllLogNewSessionAfter.size) } 

我如何正确测试这个案例? 有没有办法?

UPDATE

LogManager中的功能如下所示:

  fun logCloudCall(url: String, callGroup: String) { val logCloudCall = LogCloudCall(url = url, callGroup = callGroup, date = Converter.GENERAL_DATE_FORMAT.format(Date())) Log.v("LogManager", logCloudCall.toString()) addLog(logCloudCall) } fun logNewSession() { val logNewSession = LogNewSession( date = Converter.GENERAL_DATE_FORMAT.format(Date())) Log.v("LogManager", logNewSession.toString()) addLog(logNewSession) } fun addLog(logEvent: LogEvent) { source.onNext(logEvent) } 

这是我在LogManager中使用的机制:

  val source = PublishSubject.create<LogEvent>().toSerialized() var logRepository: LogRepository init { logRepository = LogRepositoryImpl(context) configureSubject() } fun configureSubject() { source .buffer(10, TimeUnit.SECONDS) .subscribe { bufferedData -> proceedValues(bufferedData) } } 

以下测试通过:

 @Test fun foo() { val testScheduler = TestScheduler() var count = 0 Observable.interval(1, TimeUnit.SECONDS, testScheduler) .take(20) .subscribe { count++ } testScheduler.advanceTimeBy(21, SECONDS) assert(count == 20) } 

也就是说,你的测试代码看起来是正确的,但结果是不正确的。 这里唯一未知的是logManager代码。 那个班有线程吗? 这也许可以解释为什么伯爵仍然是0 :你可能有一个竞争条件。


这可能是由于buffer调用。 buffer内部使用一个计算Scheduler

 public final Observable<List<T>> buffer(long timespan, TimeUnit unit) { return buffer(timespan, unit, Schedulers.computation(), Integer.MAX_VALUE); } 

这可能会导致你看到的线程问题。