RxAndroid – Manejar errores con el operador Zip

Estoy tratando de encontrar una forma de ejecutar requestes en paralelo y manejarlas cuando finalice cada observación. A pesar de que todo está funcionando cuando todos los observables dan una respuesta, no veo una forma de manejar cada uno de los errores cuando todo está terminado.

Esta es una muestra del operador zip, que básicamente ejecuta 2 requestes en paralelo:

Observable.zip( getObservable1() .onErrorResumeNext { errorThrowable: Throwable -> Observable.error(ErrorEntity(Type.ONE, errorThrowable)) }.subscribeOn(Schedulers.io()), getObservable2() .onErrorResumeNext { errorThrowable: Throwable -> Observable.error(ErrorEntity(Type.TWO, errorThrowable)) }.subscribeOn(Schedulers.io()), BiFunction { value1: String, value2: String -> return@BiFunction value1 + value2 }) //execute requests should be on io() thread .subscribeOn(Schedulers.io()) //there are other tasks inside subscriber that need io() thread .observeOn(AndroidSchedulers.mainThread()) .subscribe( { result -> Snackbar.make(view, "Replace with your own action " + result, Snackbar.LENGTH_LONG) .setAction("Action", null).show() }, { error -> Log.d("TAG", "Error is : " + (error as ErrorEntity).error.message) } ) private fun getObservable1(): Observable<String> { return Observable.defer { throw Throwable("Error 1") } } private fun getObservable2(): Observable<String> { return Observable.defer { throw Throwable("Error 2") } } 

El problema con este enfoque es que no existe un mecanismo para unir cada error como lo hace BiFunction para el caso de éxito. Por lo tanto, el operador zip solo activará el primer error e ignorará los demás.

Salida:

 D/TAG: Error is : Error 1 

¿Hay alguna forma de recuperar todos los errores solo después de que se haya completado o haya cometido un error cada uno de los zip interiores observables?

Mi objective principal es ver qué requestes dieron un error y ejecutarlas solo después de que aparece un cuadro de dialog que le pregunta al usuario si desea volver a intentar las requestes fallidas.

Puede modelar sus observables usando classs de datos. P.ej

 sealed class Response { data class Success(val data: String) : Response() data class Error(val t: Throwable) : Response() } 

entonces puedes asignar tus observables a Response de esta manera:

 val first: Observable<Response> = observable1 .map<Response> { Response.Success(it) } .onErrorReturn { Response.Error(it) } val second: Observable<Response> = observable2 .map<Response> { Response.Success(it) } .onErrorReturn { Response.Error(it) } 

y puedes combinarlos:

 Observable.zip( first, second, BiFunction { t1: Response, t2: Response -> Pair(t1, t2) } ).subscribe({println(it)}) 

esto imprime:

(Error (t = java.lang.Exception: Error 1), Error (t = java.lang.Exception: Error 2))

También eche un vistazo a este artículo .

  • Cómo upload el valor de retorno de un constructor
  • ¿Cómo puedo fusionar una sola <Lista <Lista <T >>> en una Lista <T> con RxJava 2?
  • manejar onKeyDown usando RxAndroid
  • Observables opcionales en combinar
  • Cómo crear el controller de respuesta genérica para el error y validar la respuesta utilizando retrofit, rxjava y dagger
  • ¿Cómo se simula emitir 2 streams infinitas observables y tener otras observables que las fusionan y amortiguan cada 10 segundos?
  • Error "No se puede combinar Dex" al usar Room + Kotlin
  • RxJava Debounce onNext ()
  • ¿Por qué mi configuration de RxJava está bloqueando mi subprocess de interfaz de usuario? Trabajar con la callback BluetoothAdapter.startLeScan
  • rx-java2 Schedulers.io () steel invoca el método de mainThread
  • Android Architecture Components Room ViewModel CompleteableFormAction