Kotlin: Cómo convertir la testing que usa Thread.sleep a RxJava TestScheduler

Estoy escribiendo una testing instrumental, comtesting si cuando guardo algo en un búfer Rx y después de algún intervalo (10 segundos), este tema inserta valores almacenados en mi database Room.

La testing es correcta cuando uso Thread.sleep (syncTimeInterval). Quiero escribir esta misma testing usando TestScheduler.

Aquí está con la versión Thread.sleep (que pasa la testing):

@Test fun testMultipleLogs() { val loadAllCloudCallBefore = appDatabase.logCloudCallDao().loadAll() val loadAllLogNewSessionBefore = appDatabase.logNewSessionDao().loadAll() assertEquals(0, loadAllCloudCallBefore.size) assertEquals(0, loadAllLogNewSessionBefore.size) Observable.interval(1, TimeUnit.SECONDS) .take(20) .subscribe { logManager.logNewSession() } Observable.interval(1, TimeUnit.SECONDS) .take(20) .subscribe { logManager.logCloudCall("url", "callgoup") } Observable.interval(1, TimeUnit.SECONDS) .take(20) .subscribe { logManager.logNewSession() } Observable.interval(1, TimeUnit.SECONDS) .take(20) .subscribe { logManager.logCloudCall("url", "callgoup") } Observable.interval(1, TimeUnit.SECONDS) .take(20) .subscribe { logManager.logNewSession() } Observable.interval(1, TimeUnit.SECONDS) .take(20) .subscribe { logManager.logCloudCall("url", "callgoup") } Thread.sleep(30000) val loadAllCloudCallAfter = appDatabase.logCloudCallDao().loadAll() val loadAllLogNewSessionAfter = appDatabase.logNewSessionDao().loadAll() assertEquals(60, loadAllCloudCallAfter.size) assertEquals(60, loadAllLogNewSessionAfter.size) } 

Y aquí, esta testing no pasa, el tamaño después de esperar el time adelantado por TestScheduler es 0 (no 60)

 @Test fun testMultipleLogs() { var testScheduler: TestScheduler = TestScheduler() val loadAllCloudCallBefore = appDatabase.logCloudCallDao().loadAll() val loadAllLogNewSessionBefore = appDatabase.logNewSessionDao().loadAll() assertEquals(0, loadAllCloudCallBefore.size) assertEquals(0, loadAllLogNewSessionBefore.size) Observable.interval(1, TimeUnit.SECONDS, testScheduler) .take(20) .subscribe { logManager.logNewSession() } Observable.interval(1, TimeUnit.SECONDS, testScheduler) .take(20) .subscribe { logManager.logCloudCall("url", "callgoup") } Observable.interval(1, TimeUnit.SECONDS, testScheduler) .take(20) .subscribe { logManager.logNewSession() } Observable.interval(1, TimeUnit.SECONDS, testScheduler) .take(20) .subscribe { logManager.logCloudCall("url", "callgoup") } Observable.interval(1, TimeUnit.SECONDS, testScheduler) .take(20) .subscribe { logManager.logNewSession() } Observable.interval(1, TimeUnit.SECONDS, testScheduler) .take(20) .subscribe { logManager.logCloudCall("url", "callgoup") } testScheduler.advanceTimeBy(21, TimeUnit.SECONDS) val loadAllCloudCallAfter = appDatabase.logCloudCallDao().loadAll() val loadAllLogNewSessionAfter = appDatabase.logNewSessionDao().loadAll() assertEquals(60, loadAllCloudCallAfter.size) assertEquals(60, loadAllLogNewSessionAfter.size) } 

¿Cómo puedo probar este caso correctamente? ¿Hay alguna manera?

ACTUALIZAR

Las funciones en LogManager se ven así:

  fun logCloudCall(url: String, callGroup: String) { val logCloudCall = LogCloudCall(url = url, callGroup = callGroup, date = Converter.GENERAL_DATE_FORMAT.format(Date())) Log.v("LogManager", logCloudCall.toString()) addLog(logCloudCall) } fun logNewSession() { val logNewSession = LogNewSession( date = Converter.GENERAL_DATE_FORMAT.format(Date())) Log.v("LogManager", logNewSession.toString()) addLog(logNewSession) } fun addLog(logEvent: LogEvent) { source.onNext(logEvent) } 

Y este es el mecanismo que uso en mi inicio de LogManager:

  val source = PublishSubject.create<LogEvent>().toSerialized() var logRepository: LogRepository init { logRepository = LogRepositoryImpl(context) configureSubject() } fun configureSubject() { source .buffer(10, TimeUnit.SECONDS) .subscribe { buffenetworkingData -> proceedValues(buffenetworkingData) } } 

Los siguientes pases de testing:

 @Test fun foo() { val testScheduler = TestScheduler() var count = 0 Observable.interval(1, TimeUnit.SECONDS, testScheduler) .take(20) .subscribe { count++ } testScheduler.advanceTimeBy(21, SECONDS) assert(count == 20) } 

Es decir, su código de testing parece correcto, pero el resultado no es correcto. Lo único desconocido aquí es el código logManager . ¿Hay algún subprocess en esa class? Eso puede explicar por qué el recuento todavía es 0 : es posible que tenga una condición de carrera.


Probablemente se deba a la llamada de buffer . buffer utiliza internamente el Scheduler cálculo:

 public final Observable<List<T>> buffer(long timespan, TimeUnit unit) { return buffer(timespan, unit, Schedulers.computation(), Integer.MAX_VALUE); } 

Esto probablemente dará como resultado el problema de enhebrado que estás viendo.

  • No se pudo deducir Kotlin y RxJava
  • ¿Cómo puedo indicar explícitamente la finalización de un Flowable en RxJava?
  • JsonArray a la class de datos de Kotlin con Retrofit (se esperaba BEGIN_OBJECT pero era BEGIN_ARRAY)
  • ¿Cómo escalar dinámicamente el rebote de la stream de emisión de ráfagas?
  • Completable.andThen resultados en "other is null"
  • Sitio de Android con RxJava manejar resultado de consulta vacío
  • Room - SELECT query, get o default
  • Cómo controlar el flujo sin .flatMap, que rompe una stream reactiva que impide que operadores como distinctUntilChanged trabajen en toda la secuencia
  • Convert Maybe to Single de otra fuente si Maybe completa
  • Problema de encadenamiento Completable after flatMapCompletable
  • RxJava2 cómo separar diferentes implementaciones de emisor observable