Prueba de intervalo infinito RxJava

Tengo una vista simple que muestra monedas como list. La aplicación recupera monedas del service web cada 1 segundo. Así que tengo el siguiente método en mi ViewModel:

fun onViewAppeanetworking() { currenciesViewStateUpdates.onNext(CurrenciesViewState(true, null)) Flowable.interval(1, TimeUnit.SECONDS) .flatMapSingle { _ -> currenciesService.calculateCurrencies(Currency("EUR", 10.0)) } .doOnError { error -> Log.d("LOG", error.message) } .observeOn(AndroidSchedulers.mainThread()) .subscribe { currencies -> currenciesViewStateUpdates.onNext(CurrenciesViewState(false, currencies)) } } 

Ahora necesito probar la unidad este comportamiento:

 @Test fun viewAppeanetworkingAndCurrenciesRequestSucceeded_currenciesDisplayed() { val currencies = listOf(Currency("", 0.0)) Mockito.`when`(currenciesServiceMock.calculateCurrencies(anyCurrency())).thenReturn(Single.just(currencies)) viewModel.getViewStateUpdates().subscribe(testObserver) viewModel.onViewAppeanetworking() testObserver.assertSubscribed() testObserver.assertNoErrors() testObserver.assertValueCount(2) assertFalse(testObserver.values()[1].loading) assertNotNull(testObserver.values()[1].currencies) testObserver.values()[1].currencies?.let { assertTrue(it.isNotEmpty()) } } 

Todos los RxSchedulers configurados para inmediato. Problema: con la testing aguarda por siempre cuando observable termina la suscripción, pero nunca terminará porque es un intervalo infinito. ¿Cómo puedo probarlo?

Tendrá problemas para ejecutar sus testings de la manera que ha diseñado.

  1. Convierta sus progtwigdores en parameters de class, para que puedan ser inyectados de la manera adecuada, tanto durante la testing como en la producción.
  2. Use planificadores de testing en lugar de Schedulers.immediate() porque puede terminar en un punto muerto.
  3. Siempre haga que sus operaciones temporizadas ( interval() , timeout() , etc.) operen en progtwigdores explícitos, generalmente los que ha inyectado.
  • Obteniendo una IllegalStateException al usar la API People de Google a pesar de suscribirse a otro hilo usando RxJava
  • ¿Cuándo no usar el Observable de RxJava?
  • Observable con el valor de LatestFrom
  • ¿Puedo crear un método de extensión Kotlin para agregar una suscripción rxJava a una suscripción compuesta?
  • No se llama a ninguno de los suscriptores de RxJava onNext / onError / onComplete al encadenar Observables creado desde Observable.create ()
  • ¿Expresar "súper" generics en los types funcionales de Kotlin?
  • Observable.fromCallable () implementación con exception
  • Cómo get el valor emitido desde el primer observable
  • ¿Por qué mi configuration de RxJava está bloqueando mi subprocess de interfaz de usuario? Trabajar con la callback BluetoothAdapter.startLeScan
  • Para una function de Kotlin utilizada como expresión, ¿hay una forma concisa de operar y devolver un valor?
  • Aplicando transformación a cada elemento en Single <List <T >>