Convirtiendo un Observable en un Flujo con contrapresión en RxJava2

Estoy observando las líneas producidas por un NetworkResource , envolviéndolo en un Observable.create . Aquí está el código, falta try / catch y cancelación por simplicidad:

 fun linesOf(resource: NetworkResource): Observable<String> = Observable.create { emitter -> while (!emitter.isDisposed) { val line = resource.readLine() Log.i(TAG, "Emitting: $line") emitter.onNext(line) } } 

El problema es que más adelante quiero convertirlo en un Flowable usando observable.toFlowable(LATEST) para agregar una contrapresión en caso de que mi consumidor no pueda mantener el ritmo, pero dependiendo de cómo lo haga, el consumidor deja de recibir los artículos después del artículo 128.

A) de esta manera todo funciona:

 val resource = ... linesOf(resource) .subscribeOn(Schedulers.io()) .observeOn(AndroidSchedulers.mainThread()) .toFlowable(BackpressureStrategy.LATEST) .subscribe { Log.i(TAG, "Consuming: $it") } 

B) aquí el consumidor se queda atascado después de 128 artículos (pero la emisión continúa):

 val resource = ... linesOf(resource) .toFlowable(BackpressureStrategy.LATEST) .subscribeOn(Schedulers.io()) .observeOn(AndroidSchedulers.mainThread()) .subscribe { Log.i(TAG, "Consuming: $it") } // <-- stops after 128 

En la opción A) todo funciona sin problemas, y puedo ver el Emitting: ... log al lado del Consuming: ... log.

En la opción B) puedo ver el Emitting: ... post de logging que emite felizmente nuevas líneas, pero dejo de ver el post Consuming: ... registrar después del elemento 128, aunque la emisión continúa.

Pregunta : ¿Alguien puede ayudarme a entender por qué sucede esto?

En primer lugar, está utilizando el tipo incorrecto y el operador incorrecto. El uso de Flowable elimina la necesidad de conversión. Al usar Flowable.generate obtiene una contrapresión:

 Flowable.generate(emitter -> { String line = resource.readLine(); if (line == null) { emitter.onComplete(); } else { emitter.onNext(line); } }); 

En segundo lugar, la razón por la que se cuelga su versión se debe a un mismo punto muerto de agrupación causado por subscribeOn . Las requestes de flujo descendente están progtwigdas detrás de su ciclo de emisión ansioso y no pueden tener efecto, deteniendo la emisión en los 128 elementos pnetworkingeterminados. Use Flowable.subscribeOn(scheduler, false) para evitar este caso.

  • EXCEPCIÓN FATAL: RxCachedThreadScheduler-1 cuando el gatillo se deshace. ¿Por qué?
  • El suscriptor RxJava2 PublishSubject no puede recibir elementos cuando se le llama desde varios hilos usando SingleScheduler
  • Problema de encadenamiento Completable after flatMapCompletable
  • ¿Cómo especificar la versión de RxJava al usar RxKotlin?
  • Aplicando transformación a cada elemento en Single <List <T >>
  • HttpException no capturado por onError ()
  • Espere hasta que dos observables emitan verdadero
  • Ingrese los valores propios para combinarlos
  • No se puede 'observar en' hilo principal con RxKotlin
  • ¿Por qué tengo un Log de salida no deseado al fusionar 2 observables en otro Observable, que los almacena temporalmente cada 10 segundos?
  • Proguard: ¿Qué regla puedo agregar para evitar no puedo encontrar la class referenceda?