Rx-Kotlin awaitTerminalEvent nunca se sube a Completo

Estoy tratando de entender mejor cómo hacer testings unitarias con Rx-Kotlin, pero no he podido establecer el tema con éxito como "completado". Como resultado, siempre estoy esperando el time de espera de 5 segundos (el onComplete debe ser inmediato) y luego fallo en assertComplete.

Mi comprensión de awaitTerminalEvent es que solo debe bloquear hasta que se llame a onComplete. También he examinado TestScheduler, pero no creo que deba ser requerido aquí.

Cualquier ayuda o documentation que pueda llevarme en la dirección correcta sería muy apreciada.

@Test fun testObservable() { val subject = BehaviorSubject.create<Int>() subject.onNext(0) TestSubscriber<Int>().apply { subject.subscribe({ System.out.println(it) subject.onNext(1) subject.onComplete() }) this.awaitTerminalEvent(5, TimeUnit.SECONDS) this.assertComplete() this.assertValue(1) } } 

Estás usando la herramienta incorrecta de la manera incorrecta …

  • TestSubscriber es para probar Flowable , debe usar aquí TestObserver .
  • debe suscribirse con TestObserver (o TestSubscriber in TestSubscriber ), para poder monitorear las emisiones y poder esperar el evento terminal y afirmar los valores. En su código, TestSubscriber no está conectado a ninguna secuencia, por lo que nunca recibirá ningún evento.

tratando de imitar tu código, podría ser algo como esto:

  @Test fun testObservable() { val subject = BehaviorSubject.create<Int>() subject.onNext(0) TestObserver<Int>().apply { subject.doOnNext { System.out.println(it) subject.onNext(1) subject.onComplete() } .subscribe(this) this.awaitTerminalEvent(5, TimeUnit.SECONDS) this.assertComplete() this.assertValue(1) } } 

como puede ver, estoy usando TestObserver la suscripción se hace con el object TestObserver , y el sujeto onNext() , onComplete() movió a doOnNext() . la testing fallará ya que tiene dos valores emitidos, mientras que la testing solo afirma para el valor '1' individual.

En general, es un poco incorrecto, que usando subject para emitir de nuevo en onNext() y luego call onComplete() , puede suscribirse antes y luego emitir afuera. algo como esto:

 TestObserver<Int>().apply { subject.subscribe(this) subject.onNext(1) subject.onComplete() .... }