RxJava Observable.create envolver suscripciones observables

Utilicé Observable.create para poder notificar al suscriptor cuando ciertos datos estaban disponibles. Estoy un poco inseguro de suscribir observables dentro de mi método de creación. ¿Estas suscripciones anidadas me darán algún tipo de problema? No estoy completamente familiarizado con la creación de observables usando Observable.create, así que quería asegurarme de no hacer nada fuera de lo normal o hacer un uso indebido. ¡Gracias de antemano!

abstract class NetworkResource<ApiType, DbType> constructor(private val schedulerProvider: SchedulerProvider) { abstract fun fetchFromApi(): Single<ApiType> abstract fun fetchFromDb(): Observable<Optional<DbType>> abstract fun saveToDb(apiType: ApiType?) abstract fun shouldFetchFromApi(cache: DbType?): Boolean fun fetch(): Observable<Optional<DbType>> { return Observable.create<Optional<DbType>> { val subscriber = it fetchFromDb() .subscribe({ subscriber.onNext(it) if(shouldFetchFromApi(it.get())) { fetchFromApi() .observeOn(schedulerProvider.io()) .map { saveToDb(it) it } .observeOn(schedulerProvider.ui()) .flatMapObservable { fetchFromDb() } .subscribe({ subscriber.onNext(it) subscriber.onComplete() }) } else { subscriber.onComplete() } }) } } } 

Sí, causará problemas.

En primer lugar, no es idiomático anidar Observable esta manera, uno de los puntos fuertes del enfoque reactivo, es componer Observables y, por lo tanto, tener un solo flujo limpio. de esta manera, estás rompiendo la cadena, y el resultado inmediato es un código entrelazado que es más difícil de leer, y más código para conectar los events de notificación, básicamente es como envolver los methods de callback asíncrona con Observable .
aquí ya que tiene componentes reactivos, puede simplemente componerlos en lugar de tratarlos con un enfoque de callback.

En segundo lugar, como resultado de romper la cadena, la más corta e inmediata, cancelar la suscripción del Observable externo no afectará automáticamente al Observable interno. Lo mismo ocurre con el bash de agregar subscribeOn() y con un escenario diferente donde la contrapresión es importante, también se aplica.

una alternativa de composition podría ser algo como esto:

 fun fetch2(): Observable<Optional<DbType>> { return fetchFromDb() .flatMap { if (shouldFetchFromApi(it.get())) { fetchFromApi() .observeOn(schedulerProvider.io()) .doOnSuccess { saveToDb(it) } .observeOn(schedulerProvider.ui()) .flatMapObservable { fetchFromDb() } } else { Observable.empty() } } } 

si por algún motivo desea que, en cualquier caso, el primer resultado de fetchFromDb() se emita por separado, también puede hacerlo utilizando publish() con el selector:

  fun fetch2(): Observable<Optional<DbType>> { return fetchFromDb() .publish { Observable.merge(it, it.flatMap { if (shouldFetchFromApi(it.get())) { fetchFromApi() .observeOn(schedulerProvider.io()) .doOnSuccess { saveToDb(it) } .observeOn(schedulerProvider.ui()) .flatMapObservable { fetchFromDb() } } else { Observable.empty() } }) } } 
  • Enfrentando problemas con la implementación de Rx Java con la architecture de flujo en Android kotlin
  • RxAndroid - Manejar errores con el operador Zip
  • Excepción causada por: java.lang.ClassNotFoundException: org.reactivestreams.Publisher
  • La function de extensión no crea un nuevo object Observable
  • Convierte el código de RxJava a Kotlin correctamente
  • Excepción: blockingConnect no se debe invocar en el subprocess UI a pesar de que subsubscribí en otro subprocess
  • Observable.fromCallable () implementación con exception
  • Descargar un file desde un control remoto y savelo en un dispositivo Android
  • ¿Cómo se simula emitir 2 streams infinitas observables y tener otras observables que las fusionan y amortiguan cada 10 segundos?
  • ¿Comportamiento incorrecto de Maybe <List <T >> en Room?
  • RxJava 2 requiere un tipo de retorno Observable diferente de RxJava 1 (Kotlin)