Pruebas unitarias Rxjava observables que tienen un retraso

Quiero poder probar un Observable que tiene una emisión retrasada, pero sin esperar el time de demora. ¿Hay alguna forma de hacer esto?

Actualmente estoy usando CountDownHatch para retrasar la afirmación, y eso funciona bien, pero aumenta el time de ejecución de la testing.

Ejemplo:

 val myObservable: PublishSubject<Boolean> = PublishSubject.create<Boolean>() fun myObservable(): Observable<Boolean> = myObservable.delay(3, TimeUnit.SECONDS) @Test fun testMyObservable() { val testObservable = myObservable().test() myObservable.onNext(true) // val lock = CountDownLatch(1) // lock.await(3100, TimeUnit.MILLISECONDS) testObservable.assertValue(true) } 

TestScheduler es perfecto para esto. Tiene un método práctico advanceTimeBy(long, TimeUnit) que le permite controlar el time. Y Observable.delay tiene un método de sobrecarga que toma un Scheduler .

Así que solo use la Scheduler.computation() pnetworkingeterminada Scheduler.computation() en la function myObservable , y use TestScheduler para probar la unidad.

Pude encontrar una solución completa gracias a la respuesta de @ aaron-he.

Utiliza una combinación de TestScheduler para adelantar el time y también RxJavaPlugins para anular el planificador pnetworkingeterminado con testScheduler. Esto permitió probar la function myObservable() sin modificaciones o sin necesidad de pasar un Scheduler .

 @Before fun setUp() = RxJavaPlugins.reset() @After fun tearDown() = RxJavaPlugins.reset() @Test fun testMyObservable() { val testScheduler = TestScheduler() RxJavaPlugins.setComputationSchedulerHandler { testScheduler } val testObservable = myObservable().test() myObservable.onNext(true) testScheduler.advanceTimeBy(2, TimeUnit.SECONDS) testObservable.assertEmpty() testScheduler.advanceTimeBy(2, TimeUnit.SECONDS) testObservable.assertValue(true) } 
  • No se puede llamar al operador desde () en Observable en Android Kotlin
  • Asunto de RxJava: escuche el tipo diferente que el que emite
  • Llamar a un RxJava Single In Kotlin Lambda
  • ¿Cómo especificar la versión de RxJava al usar RxKotlin?
  • Obtiene N últimos objects emitidos por observables en RxJava2
  • RxJava 2 requiere un tipo de retorno Observable diferente de RxJava 1 (Kotlin)
  • Cómo hacer un event handling errores en rxjava2 en android
  • Cómo recordar el estado con los operadores de rebashs en RxJava2
  • Excepción: blockingConnect no se debe invocar en el subprocess UI a pesar de que subsubscribí en otro subprocess
  • Aplicando transformación a cada elemento en Single <List <T >>
  • Retrofit 2, Rx 2 y llamadas asincrónicas