El método RXjava2 en fromCallable no se puede exceder

Soy nuevo en el uso de rxjava y estoy tratando de ejecutar una function en segundo plano usando rxjava2 pero el método no se llama código que estoy usando. Indíqueme si es la forma correcta de ejecutar una function en segundo plano:

Observable.fromCallable<OrderItem>(Callable { saveToDb(existingQty, newOty, product_id) }).doOnSubscribe { object : Observable<OrderItem>() { override fun subscribeActual(emitter: Observer<in OrderItem>?) { try { val orderItem = saveToDb(existingQty, newOty, product_id) emitter?.onNext(orderItem) emitter?.onComplete() } catch (e: Exception) { emitter?.onError(e) } } } } .subscribeOn(Schedulers.io()) .observeOn(AndroidSchedulers.mainThread()).doOnSubscribe { object : Observer<OrderItem> { override fun onComplete() { } override fun onNext(t: OrderItem) { } override fun onError(e: Throwable) { } override fun onSubscribe(d: Disposable) { } } } 

Lo estás haciendo mal. Se llama al operador doOnSubscribe() cuando observable se suscribe mediante el método subscribe() y no se ha suscrito el método observable mediante el método subscribe() .

Ha llamado saveToDb método saveToDb en callable , entonces ¿por qué lo está llamando en doOnSubscribe ? no tiene sentido.

Deberías haber escrito el siguiente código:

 Observable.fromCallable { saveToDb(existingQty, newOty, product_id) } .subscribeOn(Schedulers.io()) .observeOn(AndroidSchedulers.mainThread()) .subscribe({ orderItem -> // set values to UI }, { e -> // handle exception if any }, { // on complete }) 

para trabajar con tu lógica

DoOnSubscribe significa "hacer cuando alguien se suscribe". Pero no hay subscribe en su código. Tal vez quieras usar subsribe lugar de doOnSubscribe

  • RxJava 1.x .zip () no funciona en RxJava 2.0
  • Confundido sobre la asignación de la variable RxJava
  • Observables opcionales en combinar
  • Tipo de devolución diferente en RxJava 2 (actualización desde RxJava1)
  • Cómo hacer un event handling errores en rxjava2 en android
  • RxJava Observable.create envolver suscripciones observables
  • Enfrentando problemas con la implementación de Rx Java con la architecture de flujo en Android kotlin
  • ¿Cómo puedo fusionar una sola <Lista <Lista <T >>> en una Lista <T> con RxJava 2?
  • Convert Maybe to Single de otra fuente si Maybe completa
  • RxJava zipCon error IDE en Kotlin con Android Studio 3.0
  • EXCEPCIÓN FATAL: RxCachedThreadScheduler-1 cuando el gatillo se deshace. ¿Por qué?