OnErrorNotImplementedException utilizando RxJava2 y Retrofit2 Mosby MVI

Recibo una exception OnErrorNotImplementedException y la aplicación falla, a pesar de manejar el error en sentido descendente (?).

Excepción

E/AndroidRuntime: FATAL EXCEPTION: RxCachedThreadScheduler-1 Process: pl.netlandgroup.smartsab, PID: 9920 io.reactivex.exceptions.OnErrorNotImplementedException: HTTP 401 Unauthorized at io.reactivex.internal.functions.Functions$OnErrorMissingConsumer.accept(Functions.java:704) at io.reactivex.internal.functions.Functions$OnErrorMissingConsumer.accept(Functions.java:701) at io.reactivex.internal.observers.LambdaObserver.onError(LambdaObserver.java:74) at io.reactivex.internal.operators.observable.ObservableDoOnEach$DoOnEachObserver.onError(ObservableDoOnEach.java:119) at io.reactivex.internal.operators.observable.ObservableDoOnEach$DoOnEachObserver.onError(ObservableDoOnEach.java:119) at io.reactivex.internal.operators.observable.ObservableSubscribeOn$SubscribeOnObserver.onError(ObservableSubscribeOn.java:63) at retrofit2.adapter.rxjava2.BodyObservable$BodyObserver.onNext(BodyObservable.java:56) at retrofit2.adapter.rxjava2.BodyObservable$BodyObserver.onNext(BodyObservable.java:37) at retrofit2.adapter.rxjava2.CallExecuteObservable.subscribeActual(CallExecuteObservable.java:43) at io.reactivex.Observable.subscribe(Observable.java:10838) at retrofit2.adapter.rxjava2.BodyObservable.subscribeActual(BodyObservable.java:34) at io.reactivex.Observable.subscribe(Observable.java:10838) at io.reactivex.internal.operators.observable.ObservableSubscribeOn$SubscribeTask.run(ObservableSubscribeOn.java:96) at io.reactivex.Scheduler$DisposeTask.run(Scheduler.java:452) at io.reactivex.internal.schedulers.ScheduledRunnable.run(ScheduledRunnable.java:61) at io.reactivex.internal.schedulers.ScheduledRunnable.call(ScheduledRunnable.java:52) at java.util.concurrent.FutureTask.run(FutureTask.java:237) at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:272) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1133) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:607) at java.lang.Thread.run(Thread.java:761) Caused by: retrofit2.adapter.rxjava2.HttpException: HTTP 401 Unauthorized at retrofit2.adapter.rxjava2.BodyObservable$BodyObserver.onNext(BodyObservable.java:54) at retrofit2.adapter.rxjava2.BodyObservable$BodyObserver.onNext(BodyObservable.java:37) at retrofit2.adapter.rxjava2.CallExecuteObservable.subscribeActual(CallExecuteObservable.java:43) at io.reactivex.Observable.subscribe(Observable.java:10838) at retrofit2.adapter.rxjava2.BodyObservable.subscribeActual(BodyObservable.java:34) at io.reactivex.Observable.subscribe(Observable.java:10838) at io.reactivex.internal.operators.observable.ObservableSubscribeOn$SubscribeTask.run(ObservableSubscribeOn.java:96) at io.reactivex.Scheduler$DisposeTask.run(Scheduler.java:452) at io.reactivex.internal.schedulers.ScheduledRunnable.run(ScheduledRunnable.java:61) at io.reactivex.internal.schedulers.ScheduledRunnable.call(ScheduledRunnable.java:52) at java.util.concurrent.FutureTask.run(FutureTask.java:237) at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:272) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1133) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:607) at java.lang.Thread.run(Thread.java:761) 

Repositorio de actualización:

 class RetrofitRepository (retrofit: Retrofit) { val apiService: ApiService = retrofit.create(ApiService::class.java) var size: Int = 0 fun getMapResponse(pageIndex: Int = 0): Observable<MapResponse> { return apiService.getMapResponse(pageIndex = pageIndex) .doOnError { Log.d("error", it.message) } .doOnNext { Log.d("currThread", Thread.currentThread().name) } } fun getItemsFormResponses(): Observable<List<Item>> { val list = mutableListOf<Observable<List<Item>>>() val resp0 = getMapResponse() resp0.subscribe { size = it.totalCount } var accum = 0 do { list.add(getMapResponse(accum).map { it.items }) accum++ } while (list.size*200 < size) return Observable.merge(list) } } 

Este resultado es observado por Interactor:

 class MapInteractor @Inject constructor(private val repository: RetrofitRepository) { fun getMapItems(): Observable<MapViewState> { return repository.getItemsFormResponses() .map { if(it.isEmpty()) { return@map MapViewState.EmptyResult() } else { val mapItems = it.map { it.toMapItem() } return@map MapViewState.MapResult(mapItems) } } .doOnNext { Log.d("currThread", Thread.currentThread().name) } .startWith(MapViewState.Loading()) .onErrorReturn { MapViewState.Error(it) } } } 

El onErrorReturn { MapViewState.Error(it) } emite correctamente (justo antes del locking de la aplicación, puedo ver lo que se representa correctamente en la pantalla). ¿Cómo puedo evitar esta exception mientras sigo manteniendo la architecture MVI?

EDITAR

La respuesta proporcionada por dimsuz fue la solución correcta, aunque para lograr fusionar y devolver un Observable con todos los elementos, tuvo que modificarse a esto:

 fun getMapItems(): Observable<List<Item>> { return getMapResponse().map { val size = it.totalCount val list = mutableListOf<Observable<List<Item>>>() var accum = 0 do { list.add(getMapResponse(accum++).map { it.items }) } while (list.size*200 < size) return@map list.zip { it.flatten() } }.mergeAll() } 

Creo que el error se arroja en getItemsFromResponse() en la línea de:

  val resp0 = getMapResponse() resp0.subscribe { size = it.totalCount } 

Aquí se suscribe, pero no maneja el caso de error. En realidad, este código es incorrecto, porque rompe la cadena Rx en dos piezas independientes, no debería hacer eso.

Lo que debes hacer es algo como esto:

 fun getItemsFormResponses(): Observable<List<Item>> { return getMapResponse().map { resp0 -> val size = resp0.totalCount val list = mutableListOf<Observable<List<Item>>>() var accum = 0 do { list.add(getMapResponse(accum).map { it.items }) accum++ } while (list.size*200 < size) return Observable.merge(list) } } 

Es decir, extraer el size extendiendo la cadena con un operador, en lugar de romperlo con subscribe() .

  • Problema de encadenamiento Completable after flatMapCompletable
  • ¿Comportamiento incorrecto de Maybe <List <T >> en Room?
  • Convierte el código de RxJava a Kotlin correctamente
  • RxJava manejando múltiples devoluciones de llamada dentro de un observable
  • Comportamiento impar de TestObserver al suscribirse a un Asunto
  • HttpException no capturado por onError ()
  • Prueba RxJava2 Flowable Query Room
  • ¿Cómo crear un intervalo infinito observable que emitirá un nuevo object cada intervalo de time?
  • Cómo manejar el event handling errores en un solo lugar en rxjava usando wrapper
  • Convirtiendo un Observable en un Flujo con contrapresión en RxJava2
  • Cómo escribir la testing adecuada para el repository de interfaz reactiva que devuelve Observable solo cuando hay algún evento, cómo simular el desencadenamiento de ese evento