RxAndroid – Manejar errores con el operador Zip

Estoy tratando de encontrar una forma de ejecutar requestes en paralelo y manejarlas cuando finalice cada observación. A pesar de que todo está funcionando cuando todos los observables dan una respuesta, no veo una forma de manejar cada uno de los errores cuando todo está terminado.

Esta es una muestra del operador zip, que básicamente ejecuta 2 requestes en paralelo:

Observable.zip( getObservable1() .onErrorResumeNext { errorThrowable: Throwable -> Observable.error(ErrorEntity(Type.ONE, errorThrowable)) }.subscribeOn(Schedulers.io()), getObservable2() .onErrorResumeNext { errorThrowable: Throwable -> Observable.error(ErrorEntity(Type.TWO, errorThrowable)) }.subscribeOn(Schedulers.io()), BiFunction { value1: String, value2: String -> return@BiFunction value1 + value2 }) //execute requests should be on io() thread .subscribeOn(Schedulers.io()) //there are other tasks inside subscriber that need io() thread .observeOn(AndroidSchedulers.mainThread()) .subscribe( { result -> Snackbar.make(view, "Replace with your own action " + result, Snackbar.LENGTH_LONG) .setAction("Action", null).show() }, { error -> Log.d("TAG", "Error is : " + (error as ErrorEntity).error.message) } ) private fun getObservable1(): Observable<String> { return Observable.defer { throw Throwable("Error 1") } } private fun getObservable2(): Observable<String> { return Observable.defer { throw Throwable("Error 2") } } 

El problema con este enfoque es que no existe un mecanismo para unir cada error como lo hace BiFunction para el caso de éxito. Por lo tanto, el operador zip solo activará el primer error e ignorará los demás.

Salida:

 D/TAG: Error is : Error 1 

¿Hay alguna forma de recuperar todos los errores solo después de que se haya completado o haya cometido un error cada uno de los zip interiores observables?

Mi objective principal es ver qué requestes dieron un error y ejecutarlas solo después de que aparece un cuadro de dialog que le pregunta al usuario si desea volver a intentar las requestes fallidas.

Puede modelar sus observables usando classs de datos. P.ej

 sealed class Response { data class Success(val data: String) : Response() data class Error(val t: Throwable) : Response() } 

entonces puedes asignar tus observables a Response de esta manera:

 val first: Observable<Response> = observable1 .map<Response> { Response.Success(it) } .onErrorReturn { Response.Error(it) } val second: Observable<Response> = observable2 .map<Response> { Response.Success(it) } .onErrorReturn { Response.Error(it) } 

y puedes combinarlos:

 Observable.zip( first, second, BiFunction { t1: Response, t2: Response -> Pair(t1, t2) } ).subscribe({println(it)}) 

esto imprime:

(Error (t = java.lang.Exception: Error 1), Error (t = java.lang.Exception: Error 2))

También eche un vistazo a este artículo .

  • consultas de múltiples dominios de Android con RXJava
  • Prueba RxJava2 Flowable Query Room
  • Cómo contar el time de ejecución de lo observable
  • RxJava 2 requiere un tipo de retorno Observable diferente de RxJava 1 (Kotlin)
  • Problema de encadenamiento Completable after flatMapCompletable
  • Obtiene N últimos objects emitidos por observables en RxJava2
  • Kotlin no puede inferir el tipo cuando utiliza la reference de método en Flowable
  • No se pudo deducir Kotlin y RxJava
  • ¿Cómo puedo unir dos RxJava2 Obvervables por key?
  • EXCEPCIÓN FATAL: RxCachedThreadScheduler-1 cuando el gatillo se deshace. ¿Por qué?
  • Escuchar posts y escribir commands en un flujo observable