rxjava2 – ejemplo simple de ejecutar tareas en un grupo de subprocesss, suscribirse en un solo hilo

Estoy experimentando con la siguiente tarea para entender RxJava:

  • Dada una list de URL
  • Realice una request HTTP para cada URL en un grupo de subprocesss
  • Para cada resultado, inserte algunos datos en una database SQLite (no multiprocess aquí)
  • Bloquee el método hasta que finalice

Así que lo probé en Kotlin:

val ex = Executors.newFixedThreadPool(10) Observable.fromIterable((1..100).toList()) .observeOn(Schedulers.from(ex)) .map { Thread.currentThread().name } .subscribe { println(it + " " + Thread.currentThread().name } 

Esperaba que se imprimiera

 pool-1-thread-1 main pool-1-thread-2 main pool-1-thread-3 main pool-1-thread-4 main .... 

Sin embargo, imprime:

 pool-1-thread-1 pool-1-thread-1 pool-1-thread-1 pool-1-thread-1 pool-1-thread-1 pool-1-thread-1 

¿Alguien puede corregir mis malentendidos sobre cómo funciona esto? ¿Por qué no utiliza todos los subprocesss del grupo de subprocesss? ¿Cómo puedo hacer que mi suscriptor se ejecute en el hilo o bloque principal hasta que se complete?

Rx no se entiende como un service de ejecución en paralelo, use la API de las aplicaciones de Java para eso. Los events Rx son síncronos y fluirán a través de la secuencia posteriormente. Al build la secuencia, observe en On solicitará una secuencia una vez y procesará las emisiones una por una en ese hilo.

También esperaba que la subscribe se ejecutara en el hilo principal. observeOn cambia los subprocesss y todos los events descendentes suceden en ese subprocess. Si desea cambiar al hilo principal, deberá insert otro observeOn justo antes de subscribe .

Para hacer que el código dentro de tu bloque de map funcione en paralelo, debes envolverlo en un observable con tu propio progtwigdor:

 val ex = Executors.newFixedThreadPool(10) val scheduler = Schedulers.from(ex) Observable.fromIterable((1..100).toList()) .flatMap { Observable .fromCallable { Thread.currentThread().name } .subscribeOn(scheduler) } .subscribe { println(it + " " + Thread.currentThread().name) } 

En este caso, verá el resultado:

 pool-1-thread-1 pool-1-thread-1 pool-1-thread-2 pool-1-thread-1 pool-1-thread-3 pool-1-thread-1 pool-1-thread-4 pool-1-thread-1 ... 

Puede consultar el artículo RxJava – Lograr la paralelización que brinda explicaciones de este comportamiento.

Además, RxJava 2.0.5 introdujo la API ParallelFlowable

  • ¿Cómo especificar la versión de RxJava al usar RxKotlin?
  • Referencia de constructor de Kotlin con generics
  • Reescriba el código de Java en Kotlin utilizando la reference de function.
  • Cómo señalar un observable para producir más datos
  • ¿Cómo escalar dinámicamente el rebote de la stream de emisión de ráfagas?
  • ¿Por qué mi configuration de RxJava está bloqueando mi subprocess de interfaz de usuario? Trabajar con la callback BluetoothAdapter.startLeScan
  • Comportamiento impar de TestObserver al suscribirse a un Asunto
  • Convierte el código de RxJava a Kotlin correctamente
  • Cómo recordar el estado con los operadores de rebashs en RxJava2
  • Completable.andThen resultados en "other is null"
  • kotlin consiguiendo un suscriptor para observar un observable usando RxJava2