Pruebas unitarias Rxjava observables que tienen un retraso

Quiero poder probar un Observable que tiene una emisión retrasada, pero sin esperar el time de demora. ¿Hay alguna forma de hacer esto?

Actualmente estoy usando CountDownHatch para retrasar la afirmación, y eso funciona bien, pero aumenta el time de ejecución de la testing.

Ejemplo:

 val myObservable: PublishSubject<Boolean> = PublishSubject.create<Boolean>() fun myObservable(): Observable<Boolean> = myObservable.delay(3, TimeUnit.SECONDS) @Test fun testMyObservable() { val testObservable = myObservable().test() myObservable.onNext(true) // val lock = CountDownLatch(1) // lock.await(3100, TimeUnit.MILLISECONDS) testObservable.assertValue(true) } 

TestScheduler es perfecto para esto. Tiene un método práctico advanceTimeBy(long, TimeUnit) que le permite controlar el time. Y Observable.delay tiene un método de sobrecarga que toma un Scheduler .

Así que solo use la Scheduler.computation() pnetworkingeterminada Scheduler.computation() en la function myObservable , y use TestScheduler para probar la unidad.

Pude encontrar una solución completa gracias a la respuesta de @ aaron-he.

Utiliza una combinación de TestScheduler para adelantar el time y también RxJavaPlugins para anular el planificador pnetworkingeterminado con testScheduler. Esto permitió probar la function myObservable() sin modificaciones o sin necesidad de pasar un Scheduler .

 @Before fun setUp() = RxJavaPlugins.reset() @After fun tearDown() = RxJavaPlugins.reset() @Test fun testMyObservable() { val testScheduler = TestScheduler() RxJavaPlugins.setComputationSchedulerHandler { testScheduler } val testObservable = myObservable().test() myObservable.onNext(true) testScheduler.advanceTimeBy(2, TimeUnit.SECONDS) testObservable.assertEmpty() testScheduler.advanceTimeBy(2, TimeUnit.SECONDS) testObservable.assertValue(true) } 
  • Inferencia de tipo Observable.combineLatest en kotlin
  • ¿Cómo escalar dinámicamente el rebote de la stream de emisión de ráfagas?
  • Cambio observable a condición cumplida - RxJava2
  • No se puede llamar al operador desde () en Observable en Android Kotlin
  • Buscando con RxJava no funciona
  • Sitio de Android con RxJava manejar resultado de consulta vacío
  • ¿Cómo crear un intervalo infinito observable que emitirá un nuevo object cada intervalo de time?
  • OnErrorNotImplementedException utilizando RxJava2 y Retrofit2 Mosby MVI
  • No se puede comprimir Rxjava Observables
  • RxJava2 Publicado
  • ¿Cómo puedo indicar explícitamente la finalización de un Flowable en RxJava?