Prueba de intervalo infinito RxJava

Tengo una vista simple que muestra monedas como list. La aplicación recupera monedas del service web cada 1 segundo. Así que tengo el siguiente método en mi ViewModel:

fun onViewAppeanetworking() { currenciesViewStateUpdates.onNext(CurrenciesViewState(true, null)) Flowable.interval(1, TimeUnit.SECONDS) .flatMapSingle { _ -> currenciesService.calculateCurrencies(Currency("EUR", 10.0)) } .doOnError { error -> Log.d("LOG", error.message) } .observeOn(AndroidSchedulers.mainThread()) .subscribe { currencies -> currenciesViewStateUpdates.onNext(CurrenciesViewState(false, currencies)) } } 

Ahora necesito probar la unidad este comportamiento:

 @Test fun viewAppeanetworkingAndCurrenciesRequestSucceeded_currenciesDisplayed() { val currencies = listOf(Currency("", 0.0)) Mockito.`when`(currenciesServiceMock.calculateCurrencies(anyCurrency())).thenReturn(Single.just(currencies)) viewModel.getViewStateUpdates().subscribe(testObserver) viewModel.onViewAppeanetworking() testObserver.assertSubscribed() testObserver.assertNoErrors() testObserver.assertValueCount(2) assertFalse(testObserver.values()[1].loading) assertNotNull(testObserver.values()[1].currencies) testObserver.values()[1].currencies?.let { assertTrue(it.isNotEmpty()) } } 

Todos los RxSchedulers configurados para inmediato. Problema: con la testing aguarda por siempre cuando observable termina la suscripción, pero nunca terminará porque es un intervalo infinito. ¿Cómo puedo probarlo?

Tendrá problemas para ejecutar sus testings de la manera que ha diseñado.

  1. Convierta sus progtwigdores en parameters de class, para que puedan ser inyectados de la manera adecuada, tanto durante la testing como en la producción.
  2. Use planificadores de testing en lugar de Schedulers.immediate() porque puede terminar en un punto muerto.
  3. Siempre haga que sus operaciones temporizadas ( interval() , timeout() , etc.) operen en progtwigdores explícitos, generalmente los que ha inyectado.
  • RXJava ... emiten un observable cada segundo
  • RxKotlin - matriz de observadores dynamics
  • Combina Rx Singles en Observables recursivamente
  • Implementando la búsqueda que empuja los resultados a la list tan pronto como estén disponibles usando rxJava
  • Llamar a un RxJava Single In Kotlin Lambda
  • Operador RxJava para el método de conmutación
  • Función rxjava :: zip devuelve un resultado vacío
  • Cómo probar el observador?
  • Datos de caching de request HTTP en RxJava
  • Cómo llenar una vista de list usando Kotlin, retroadaptación y RXjava
  • Cómo escribir la testing adecuada para el repository de interfaz reactiva que devuelve Observable solo cuando hay algún evento, cómo simular el desencadenamiento de ese evento