Cómo controlar el flujo sin .flatMap, que rompe una stream reactiva que impide que operadores como distinctUntilChanged trabajen en toda la secuencia

Quiero manejar una cadena de lógica observable diferente para diferentes implementaciones de State . Esto se puede lograr fácilmente con un tipo de datos / algebraico sellado / union + .flatMap() , pero esto interrumpe la secuencia, donde los operadores como .distinctUntilChanged() solo funcionan dentro de la function .flatMap() , no en toda la transmisión en sí misma .

 sealed class State { object Loading : State() data class Loaded(val value: Int) : State() } @Test fun distinctTest() { val relay = PublishRelay.create<State>() relay.flatMap { fun handle(state: State): Observable<*> = when (state) { State.Loading -> Observable.just(state) .distinctUntilChanged() .doOnNext { println("loading") } is State.Loaded -> Observable.just(state) .distinctUntilChanged() .doOnNext { println(it.value) } } handle(it) } .subscribe() relay.accept(State.Loading) relay.accept(State.Loaded(1)) relay.accept(State.Loaded(2)) relay.accept(State.Loaded(3)) relay.accept(State.Loaded(3)) //desinetworking: loading, 1, 2, 3 //actual: loading, 1, 2, 3, 3 } 

Tenga en count que este es un ejemplo simplificado. Mientras estoy imprimiendo aquí, en realidad quiero realizar diferentes acciones (renderizar la UI de manera diferente) en function del tipo de State de implementación.

Esto se puede lograr con un sujeto / retransmisión, pero eso crearía una secuencia desconectada y mutable, que también me gustaría evitar.

¿Podría dividir el Observable en múltiples observables con cada uno de ellos obteniendo events de un solo tipo? A continuación, puede realizar algunas operaciones en estos observables, antes de fusionarlos nuevamente.

No puedo probar esto ahora, por lo que puede necesitar algunos ajustes. De todos modos, espero que entiendas la idea aquí:

 @Test fun distinctTest() { val relay = PublishRelay.create<State>() val loadingObs = relay.filter { it is State.Loading } .distinctUntilChanged() .doOnNext { println("loading") } val loadedObs = relay.filter { it is State.Loaded } .distinctUntilChanged() .doOnNext { println(it.value) } val merged = loadingObs.mergeWith(loadedObs) merged.subscribe() relay.accept(State.Loading) relay.accept(State.Loaded(1)) relay.accept(State.Loaded(2)) relay.accept(State.Loaded(3)) relay.accept(State.Loaded(3)) // Hopefully prints this: loading, 1, 2, 3 } 
  • RxJava salida diferente entre Flowable y Observable con window y Groupby
  • ¿Cómo se usa Flowable.generate de Kotlin?
  • No se puede llamar al operador desde () en Observable en Android Kotlin
  • El suscriptor RxJava2 PublishSubject no puede recibir elementos cuando se le llama desde varios hilos usando SingleScheduler
  • Cómo contar el time de ejecución de lo observable
  • ¿Cómo continuar el procesamiento después de que ocurra un error en RxJava 2?
  • kotlin consiguiendo un suscriptor para observar un observable usando RxJava2
  • RxJava Debounce onNext ()
  • HttpException no capturado por onError ()
  • Pruebas unitarias Rxjava observables que tienen un retraso
  • Utilice la unidad de Kotlin (o cualquier otro object) escriba en el layout de Android