Invoque RxJava2 cancelable / desechable del hilo correcto

Estoy implementando un observable que emmite líneas de un Resource .

El problema es que a este recurso realmente no le gusta que lo cierren desde un hilo diferente sobre el que fue creado (mata a un cachorro y lanza una exception cuando esto sucede).

Cuando dispongo de la suscripción, se invoca el recurso Cancellable / Disposable desde el hilo main , mientras que el observable se suscribió en Schedulers.io() .

Aquí está el código de Kotlin:

 fun lines(): Observable<String> = Observable.create { emitter -> val resource = NetworkResource() emitter.setCancellable { resource.close() // <-- main thread :( } try { while (!emitter.isDisposed) emitter.onNext(resource.readLine()) // <-- blocked here! } catch (ioe: IOException) { emitter.tryOnError(ioe) // <-- this also triggers the cancellable } } val disposable = lines() .subscribeOn(Schedulers.io()) .observeOn(AndroidSchedulers.mainThread()) .subscribe { Log.i(TAG, "Line: $it" } disposable.dispose() // <-- main thread :) 

Pregunta : ¿Es posible invocar el Cancellable desde el hilo correcto *, teniendo en count que el hilo de suscripción está bloqueado en resource.readLine() ?

* Corregir el hilo que significa el de subscribeOn(Schedures.io()) .

EDITAR : me temo que esta pregunta no tiene una respuesta correcta, a less que resource.close() se haga seguro para subprocesss o se implemente algún tipo de sondeo en resource.dataReady para que el hilo no se bloquee.

Schedulers.io() administra un grupo de subprocesss, por lo que puede o no utilizar el mismo subprocess para disponer de su recurso. Deberá usar un planificador personalizado y el operador unsubscribeOn() para asegurarse de que su Observable esté suscrito y anulado en el mismo hilo. Algo como:

 Scheduler customScheduler = Schedulers.from(Executors.newSingleThreadExecutor()); val disposable = lines() .unsubscribeOn(customScheduler) .subscribeOn(customScheduler) .observeOn(AndroidSchedulers.mainThread()) .subscribe { Log.i(TAG, "Line: $it" } 

Si no te importa retrasar la llamada a NetworkResource#close un poco, ¿por qué no solo

  fun lines(): Observable<String> = Observable.create { emitter -> val resource = NetworkResource() try { while (!emitter.isDisposed) { emitter.onNext(resource.readLine()) } resource.close() } catch (ioe: IOException) { emitter.tryOnError(ioe) } } 

Pero todavía hay un problema con esto: en el caso de una IOException nadie llamará a NetworkResource#close (también en tu ejemplo de la pregunta, creo).

Intenta arreglar esto:

  fun lines(): Observable<String> = Observable.create { emitter -> val resource = NetworkResource() try { while (!emitter.isDisposed) { emitter.onNext(resource.readLine()) } } catch (ioe: IOException) { emitter.tryOnError(ioe) } finally { resource.close() // try-catch here, too? } } 

o usando la function "Kotlin-Try-With-Resources"

  fun lines(): Observable<String> = Observable.create { emitter -> NetworkResource().use { resource -> try { while (!emitter.isDisposed) { emitter.onNext(resource.readLine()) } } catch (ioe: IOException) { emitter.tryOnError(ioe) } } } 

Espero que esto ayude. Te deseo un buen fin de semana.

¿Qué tal tomar un path alternativo?

a) haciendo que NetworkResource sea NetworkResource subprocesss (si tiene el control del código fuente)

o

b) envolviendo NetworkResource con un "agente" ? Con "agente" me refiero a un proxy que utiliza internamente un hilo dedicado que hace toda la interacción con NetworkResource (build, leer, cerrar, …)

  • RxJava 2 requiere un tipo de retorno Observable diferente de RxJava 1 (Kotlin)
  • Observable.combineLa causa de error más reciente después de actualizar a RxJava 2.xx - no se puede inferir el tipo
  • Sitio de Android con RxJava manejar resultado de consulta vacío
  • Inferencia de tipo Observable.combineLatest en kotlin
  • El método RXjava2 en fromCallable no se puede exceder
  • RxJava y Retrofit usando Kotlin
  • Cómo escribir la testing adecuada para el repository de interfaz reactiva que devuelve Observable solo cuando hay algún evento, cómo simular el desencadenamiento de ese evento
  • ¿Cómo puedo indicar explícitamente la finalización de un Flowable en RxJava?
  • kotlin consiguiendo un suscriptor para observar un observable usando RxJava2
  • Cómo contar el time de ejecución de lo observable
  • RxJava2 Publicado