El método RXjava2 en fromCallable no se puede exceder

Soy nuevo en el uso de rxjava y estoy tratando de ejecutar una function en segundo plano usando rxjava2 pero el método no se llama código que estoy usando. Indíqueme si es la forma correcta de ejecutar una function en segundo plano:

Observable.fromCallable<OrderItem>(Callable { saveToDb(existingQty, newOty, product_id) }).doOnSubscribe { object : Observable<OrderItem>() { override fun subscribeActual(emitter: Observer<in OrderItem>?) { try { val orderItem = saveToDb(existingQty, newOty, product_id) emitter?.onNext(orderItem) emitter?.onComplete() } catch (e: Exception) { emitter?.onError(e) } } } } .subscribeOn(Schedulers.io()) .observeOn(AndroidSchedulers.mainThread()).doOnSubscribe { object : Observer<OrderItem> { override fun onComplete() { } override fun onNext(t: OrderItem) { } override fun onError(e: Throwable) { } override fun onSubscribe(d: Disposable) { } } } 

Lo estás haciendo mal. Se llama al operador doOnSubscribe() cuando observable se suscribe mediante el método subscribe() y no se ha suscrito el método observable mediante el método subscribe() .

Ha llamado saveToDb método saveToDb en callable , entonces ¿por qué lo está llamando en doOnSubscribe ? no tiene sentido.

Deberías haber escrito el siguiente código:

 Observable.fromCallable { saveToDb(existingQty, newOty, product_id) } .subscribeOn(Schedulers.io()) .observeOn(AndroidSchedulers.mainThread()) .subscribe({ orderItem -> // set values to UI }, { e -> // handle exception if any }, { // on complete }) 

para trabajar con tu lógica

DoOnSubscribe significa "hacer cuando alguien se suscribe". Pero no hay subscribe en su código. Tal vez quieras usar subsribe lugar de doOnSubscribe

  • Fusionar observables dependientes
  • Problema de encadenamiento Completable after flatMapCompletable
  • RxJava2: onComplete no llamado con flatMapIterable
  • ¿Cómo soluciono el error de inferencia de tipo en un Completable transformado utilizando RxLifecycle.bindToLifecycle ()?
  • JsonArray a la class de datos de Kotlin con Retrofit (se esperaba BEGIN_OBJECT pero era BEGIN_ARRAY)
  • RxJava2 observable no procesando en Siguiente cuando hay un cambio
  • Obtiene N últimos objects emitidos por observables en RxJava2
  • ¿Por qué mi configuration de RxJava está bloqueando mi subprocess de interfaz de usuario? Trabajar con la callback BluetoothAdapter.startLeScan
  • Excepción causada por: java.lang.ClassNotFoundException: org.reactivestreams.Publisher
  • ¿Cómo puedo fusionar una sola <Lista <Lista <T >>> en una Lista <T> con RxJava 2?
  • ¿Cómo se coordina una list de ejecuciones Completables con RxJava?