OnErrorNotImplementedException utilizando RxJava2 y Retrofit2 Mosby MVI
Recibo una exception OnErrorNotImplementedException y la aplicación falla, a pesar de manejar el error en sentido descendente (?).
Excepción
- Cambie Flowable <List <Obj1 >> a Flowable <List <Obj2 >> en la habitación
- Utilice la unidad de Kotlin (o cualquier otro object) escriba en el layout de Android
- RxJava zipCon error IDE en Kotlin con Android Studio 3.0
- Cómo hacer que la function regrese Observable
- Observable.fromCallable () implementación con exception
E/AndroidRuntime: FATAL EXCEPTION: RxCachedThreadScheduler-1 Process: pl.netlandgroup.smartsab, PID: 9920 io.reactivex.exceptions.OnErrorNotImplementedException: HTTP 401 Unauthorized at io.reactivex.internal.functions.Functions$OnErrorMissingConsumer.accept(Functions.java:704) at io.reactivex.internal.functions.Functions$OnErrorMissingConsumer.accept(Functions.java:701) at io.reactivex.internal.observers.LambdaObserver.onError(LambdaObserver.java:74) at io.reactivex.internal.operators.observable.ObservableDoOnEach$DoOnEachObserver.onError(ObservableDoOnEach.java:119) at io.reactivex.internal.operators.observable.ObservableDoOnEach$DoOnEachObserver.onError(ObservableDoOnEach.java:119) at io.reactivex.internal.operators.observable.ObservableSubscribeOn$SubscribeOnObserver.onError(ObservableSubscribeOn.java:63) at retrofit2.adapter.rxjava2.BodyObservable$BodyObserver.onNext(BodyObservable.java:56) at retrofit2.adapter.rxjava2.BodyObservable$BodyObserver.onNext(BodyObservable.java:37) at retrofit2.adapter.rxjava2.CallExecuteObservable.subscribeActual(CallExecuteObservable.java:43) at io.reactivex.Observable.subscribe(Observable.java:10838) at retrofit2.adapter.rxjava2.BodyObservable.subscribeActual(BodyObservable.java:34) at io.reactivex.Observable.subscribe(Observable.java:10838) at io.reactivex.internal.operators.observable.ObservableSubscribeOn$SubscribeTask.run(ObservableSubscribeOn.java:96) at io.reactivex.Scheduler$DisposeTask.run(Scheduler.java:452) at io.reactivex.internal.schedulers.ScheduledRunnable.run(ScheduledRunnable.java:61) at io.reactivex.internal.schedulers.ScheduledRunnable.call(ScheduledRunnable.java:52) at java.util.concurrent.FutureTask.run(FutureTask.java:237) at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:272) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1133) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:607) at java.lang.Thread.run(Thread.java:761) Caused by: retrofit2.adapter.rxjava2.HttpException: HTTP 401 Unauthorized at retrofit2.adapter.rxjava2.BodyObservable$BodyObserver.onNext(BodyObservable.java:54) at retrofit2.adapter.rxjava2.BodyObservable$BodyObserver.onNext(BodyObservable.java:37) at retrofit2.adapter.rxjava2.CallExecuteObservable.subscribeActual(CallExecuteObservable.java:43) at io.reactivex.Observable.subscribe(Observable.java:10838) at retrofit2.adapter.rxjava2.BodyObservable.subscribeActual(BodyObservable.java:34) at io.reactivex.Observable.subscribe(Observable.java:10838) at io.reactivex.internal.operators.observable.ObservableSubscribeOn$SubscribeTask.run(ObservableSubscribeOn.java:96) at io.reactivex.Scheduler$DisposeTask.run(Scheduler.java:452) at io.reactivex.internal.schedulers.ScheduledRunnable.run(ScheduledRunnable.java:61) at io.reactivex.internal.schedulers.ScheduledRunnable.call(ScheduledRunnable.java:52) at java.util.concurrent.FutureTask.run(FutureTask.java:237) at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:272) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1133) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:607) at java.lang.Thread.run(Thread.java:761)
Repositorio de actualización:
class RetrofitRepository (retrofit: Retrofit) { val apiService: ApiService = retrofit.create(ApiService::class.java) var size: Int = 0 fun getMapResponse(pageIndex: Int = 0): Observable<MapResponse> { return apiService.getMapResponse(pageIndex = pageIndex) .doOnError { Log.d("error", it.message) } .doOnNext { Log.d("currThread", Thread.currentThread().name) } } fun getItemsFormResponses(): Observable<List<Item>> { val list = mutableListOf<Observable<List<Item>>>() val resp0 = getMapResponse() resp0.subscribe { size = it.totalCount } var accum = 0 do { list.add(getMapResponse(accum).map { it.items }) accum++ } while (list.size*200 < size) return Observable.merge(list) } }
Este resultado es observado por Interactor:
class MapInteractor @Inject constructor(private val repository: RetrofitRepository) { fun getMapItems(): Observable<MapViewState> { return repository.getItemsFormResponses() .map { if(it.isEmpty()) { return@map MapViewState.EmptyResult() } else { val mapItems = it.map { it.toMapItem() } return@map MapViewState.MapResult(mapItems) } } .doOnNext { Log.d("currThread", Thread.currentThread().name) } .startWith(MapViewState.Loading()) .onErrorReturn { MapViewState.Error(it) } } }
El onErrorReturn { MapViewState.Error(it) }
emite correctamente (justo antes del locking de la aplicación, puedo ver lo que se representa correctamente en la pantalla). ¿Cómo puedo evitar esta exception mientras sigo manteniendo la architecture MVI?
EDITAR
La respuesta proporcionada por dimsuz fue la solución correcta, aunque para lograr fusionar y devolver un Observable con todos los elementos, tuvo que modificarse a esto:
fun getMapItems(): Observable<List<Item>> { return getMapResponse().map { val size = it.totalCount val list = mutableListOf<Observable<List<Item>>>() var accum = 0 do { list.add(getMapResponse(accum++).map { it.items }) } while (list.size*200 < size) return@map list.zip { it.flatten() } }.mergeAll() }
- Kotlin no puede inferir el tipo cuando utiliza la reference de método en Flowable
- RxJava Debounce onNext ()
- Sitio de Android con RxJava manejar resultado de consulta vacío
- ¿Cómo escalar dinámicamente el rebote de la stream de emisión de ráfagas?
- ¿Cómo puedo fusionar una sola <Lista <Lista <T >>> en una Lista <T> con RxJava 2?
- Llamar a un RxJava Single In Kotlin Lambda
- llamar parte de la secuencia una vez con múltiples suscriptores?
- ¿Hay alguna manera de cambiar mi método a la stream Observable que será una cadena de modificadores?
Creo que el error se arroja en getItemsFromResponse()
en la línea de:
val resp0 = getMapResponse() resp0.subscribe { size = it.totalCount }
Aquí se suscribe, pero no maneja el caso de error. En realidad, este código es incorrecto, porque rompe la cadena Rx en dos piezas independientes, no debería hacer eso.
Lo que debes hacer es algo como esto:
fun getItemsFormResponses(): Observable<List<Item>> { return getMapResponse().map { resp0 -> val size = resp0.totalCount val list = mutableListOf<Observable<List<Item>>>() var accum = 0 do { list.add(getMapResponse(accum).map { it.items }) accum++ } while (list.size*200 < size) return Observable.merge(list) } }
Es decir, extraer el size
extendiendo la cadena con un operador, en lugar de romperlo con subscribe()
.
- Kotlin || El elemento Recycleview no está inflando: código inalcanzable en cada método reemplazado de RecycleView.Adapter
- Convierta un ArrayMap en ArrayList – Kotlin