rx-java2 Schedulers.io () steel invoca el método de mainThread

Estoy tratando de recuperar los datos de la database usando la habitación con la forma Rx. Así es como estoy tratando de hacer eso

override fun onStart() { super.onStart() disposable.add(presenter.getAllBooks() .subscribeOn(Schedulers.io()) .observeOn(AndroidSchedulers.mainThread()) .subscribe({ println(it.size()) })) } 

este es el método getAllBooks() dentro del presentador

 fun getAllBooks() : Flowable<List<Book>> { val isMainThread = Looper.myLooper() == Looper.getMainLooper() if (!isMainThread) { updateBooks() return db.bookDao().allBooks } return Flowable.empty() } 

Aquí isMainThread variable es siempre true , también he intentado observeOn(Shcedulers.io()) , pero el mismo problema.

Este es un malentendido común con RxJava. getAllBooks ejecuta su contenido antes de que RxJava entre en escena.

 fun method() : Flowable<String> { println("method()") return Flowable.just("hello") } println("Preparing...") val f = method(); println("Subscribing...") f.subscribe({ println(it) }) 

Imprimirá

 Preparing... method() Subscribing... hello 

Tienes que ajustar tu acción en un Flowable para que suceda cuando se produzca una suscripción:

 disposable.add( Flowable.defer(() -> presenter.getAllBooks()) // <---------------------- .subscribeOn(Schedulers.io()) .observeOn(AndroidSchedulers.mainThread()) .subscribe({ println(it.size()) }) ) 
  • RxJava 2 requiere un tipo de retorno Observable diferente de RxJava 1 (Kotlin)
  • RxJava zipCon error IDE en Kotlin con Android Studio 3.0
  • Confundido sobre la asignación de la variable RxJava
  • Convert Maybe to Single de otra fuente si Maybe completa
  • Retrofit-Vertx con RxJava2 en Kotlin IllegalStateException message == null
  • EXCEPCIÓN FATAL: RxCachedThreadScheduler-1 cuando el gatillo se deshace. ¿Por qué?
  • Reescriba el código de Java en Kotlin utilizando la reference de function.
  • Convierte el código de RxJava a Kotlin correctamente
  • Observable.fromCallable () implementación con exception
  • Utilice la unidad de Kotlin (o cualquier otro object) escriba en el layout de Android
  • ¿Cómo se simula emitir 2 streams infinitas observables y tener otras observables que las fusionan y amortiguan cada 10 segundos?