Cómo controlar el flujo sin .flatMap, que rompe una stream reactiva que impide que operadores como distinctUntilChanged trabajen en toda la secuencia

Quiero manejar una cadena de lógica observable diferente para diferentes implementaciones de State . Esto se puede lograr fácilmente con un tipo de datos / algebraico sellado / union + .flatMap() , pero esto interrumpe la secuencia, donde los operadores como .distinctUntilChanged() solo funcionan dentro de la function .flatMap() , no en toda la transmisión en sí misma .

 sealed class State { object Loading : State() data class Loaded(val value: Int) : State() } @Test fun distinctTest() { val relay = PublishRelay.create<State>() relay.flatMap { fun handle(state: State): Observable<*> = when (state) { State.Loading -> Observable.just(state) .distinctUntilChanged() .doOnNext { println("loading") } is State.Loaded -> Observable.just(state) .distinctUntilChanged() .doOnNext { println(it.value) } } handle(it) } .subscribe() relay.accept(State.Loading) relay.accept(State.Loaded(1)) relay.accept(State.Loaded(2)) relay.accept(State.Loaded(3)) relay.accept(State.Loaded(3)) //desinetworking: loading, 1, 2, 3 //actual: loading, 1, 2, 3, 3 } 

Tenga en count que este es un ejemplo simplificado. Mientras estoy imprimiendo aquí, en realidad quiero realizar diferentes acciones (renderizar la UI de manera diferente) en function del tipo de State de implementación.

Esto se puede lograr con un sujeto / retransmisión, pero eso crearía una secuencia desconectada y mutable, que también me gustaría evitar.

¿Podría dividir el Observable en múltiples observables con cada uno de ellos obteniendo events de un solo tipo? A continuación, puede realizar algunas operaciones en estos observables, antes de fusionarlos nuevamente.

No puedo probar esto ahora, por lo que puede necesitar algunos ajustes. De todos modos, espero que entiendas la idea aquí:

 @Test fun distinctTest() { val relay = PublishRelay.create<State>() val loadingObs = relay.filter { it is State.Loading } .distinctUntilChanged() .doOnNext { println("loading") } val loadedObs = relay.filter { it is State.Loaded } .distinctUntilChanged() .doOnNext { println(it.value) } val merged = loadingObs.mergeWith(loadedObs) merged.subscribe() relay.accept(State.Loading) relay.accept(State.Loaded(1)) relay.accept(State.Loaded(2)) relay.accept(State.Loaded(3)) relay.accept(State.Loaded(3)) // Hopefully prints this: loading, 1, 2, 3 } 
  • ¿Cómo puedo indicar explícitamente la finalización de un Flowable en RxJava?
  • RxJava2 + Retrofit pantalla negra en request de datos
  • Encadenando Observables para evitar suscripciones múltiples
  • Problema de encadenamiento Completable after flatMapCompletable
  • ¿Cómo iterar sobre Single <List> y asignar a otra list?
  • Confundido sobre la asignación de la variable RxJava
  • RxJava zipCon error IDE en Kotlin con Android Studio 3.0
  • Cómo manejar el event handling errores en un solo lugar en rxjava usando wrapper
  • ¿Cómo se usa Flowable.generate de Kotlin?
  • Cómo pasar nulo a un Observable con tipo anulable en RxJava 2 y Kotlin
  • ¿Cómo crear un intervalo infinito observable que emitirá un nuevo object cada intervalo de time?