Cómo controlar el flujo sin .flatMap, que rompe una stream reactiva que impide que operadores como distinctUntilChanged trabajen en toda la secuencia

Quiero manejar una cadena de lógica observable diferente para diferentes implementaciones de State . Esto se puede lograr fácilmente con un tipo de datos / algebraico sellado / union + .flatMap() , pero esto interrumpe la secuencia, donde los operadores como .distinctUntilChanged() solo funcionan dentro de la function .flatMap() , no en toda la transmisión en sí misma .

 sealed class State { object Loading : State() data class Loaded(val value: Int) : State() } @Test fun distinctTest() { val relay = PublishRelay.create<State>() relay.flatMap { fun handle(state: State): Observable<*> = when (state) { State.Loading -> Observable.just(state) .distinctUntilChanged() .doOnNext { println("loading") } is State.Loaded -> Observable.just(state) .distinctUntilChanged() .doOnNext { println(it.value) } } handle(it) } .subscribe() relay.accept(State.Loading) relay.accept(State.Loaded(1)) relay.accept(State.Loaded(2)) relay.accept(State.Loaded(3)) relay.accept(State.Loaded(3)) //desinetworking: loading, 1, 2, 3 //actual: loading, 1, 2, 3, 3 } 

Tenga en count que este es un ejemplo simplificado. Mientras estoy imprimiendo aquí, en realidad quiero realizar diferentes acciones (renderizar la UI de manera diferente) en function del tipo de State de implementación.

Esto se puede lograr con un sujeto / retransmisión, pero eso crearía una secuencia desconectada y mutable, que también me gustaría evitar.

¿Podría dividir el Observable en múltiples observables con cada uno de ellos obteniendo events de un solo tipo? A continuación, puede realizar algunas operaciones en estos observables, antes de fusionarlos nuevamente.

No puedo probar esto ahora, por lo que puede necesitar algunos ajustes. De todos modos, espero que entiendas la idea aquí:

 @Test fun distinctTest() { val relay = PublishRelay.create<State>() val loadingObs = relay.filter { it is State.Loading } .distinctUntilChanged() .doOnNext { println("loading") } val loadedObs = relay.filter { it is State.Loaded } .distinctUntilChanged() .doOnNext { println(it.value) } val merged = loadingObs.mergeWith(loadedObs) merged.subscribe() relay.accept(State.Loading) relay.accept(State.Loaded(1)) relay.accept(State.Loaded(2)) relay.accept(State.Loaded(3)) relay.accept(State.Loaded(3)) // Hopefully prints this: loading, 1, 2, 3 } 
  • Observable.fromCallable () implementación con exception
  • Inferencia de tipo Observable.combineLatest en kotlin
  • ¿Cómo soluciono el error de inferencia de tipo en un Completable transformado utilizando RxLifecycle.bindToLifecycle ()?
  • ¿Cómo especificar la versión de RxJava al usar RxKotlin?
  • Asunto de RxJava: escuche el tipo diferente que el que emite
  • Problema de encadenamiento Completable after flatMapCompletable
  • Java genérico para Kotlin genérico. Retorno genérico del método
  • Cómo upload el valor de retorno de un constructor
  • kotlin consiguiendo un suscriptor para observar un observable usando RxJava2
  • ¿Por qué tengo un Log de salida no deseado al fusionar 2 observables en otro Observable, que los almacena temporalmente cada 10 segundos?
  • RxJava manejando múltiples devoluciones de llamada dentro de un observable