rxjava2 – ejemplo simple de ejecutar tareas en un grupo de subprocesss, suscribirse en un solo hilo

Estoy experimentando con la siguiente tarea para entender RxJava:

  • Dada una list de URL
  • Realice una request HTTP para cada URL en un grupo de subprocesss
  • Para cada resultado, inserte algunos datos en una database SQLite (no multiprocess aquí)
  • Bloquee el método hasta que finalice

Así que lo probé en Kotlin:

val ex = Executors.newFixedThreadPool(10) Observable.fromIterable((1..100).toList()) .observeOn(Schedulers.from(ex)) .map { Thread.currentThread().name } .subscribe { println(it + " " + Thread.currentThread().name } 

Esperaba que se imprimiera

 pool-1-thread-1 main pool-1-thread-2 main pool-1-thread-3 main pool-1-thread-4 main .... 

Sin embargo, imprime:

 pool-1-thread-1 pool-1-thread-1 pool-1-thread-1 pool-1-thread-1 pool-1-thread-1 pool-1-thread-1 

¿Alguien puede corregir mis malentendidos sobre cómo funciona esto? ¿Por qué no utiliza todos los subprocesss del grupo de subprocesss? ¿Cómo puedo hacer que mi suscriptor se ejecute en el hilo o bloque principal hasta que se complete?

Rx no se entiende como un service de ejecución en paralelo, use la API de las aplicaciones de Java para eso. Los events Rx son síncronos y fluirán a través de la secuencia posteriormente. Al build la secuencia, observe en On solicitará una secuencia una vez y procesará las emisiones una por una en ese hilo.

También esperaba que la subscribe se ejecutara en el hilo principal. observeOn cambia los subprocesss y todos los events descendentes suceden en ese subprocess. Si desea cambiar al hilo principal, deberá insert otro observeOn justo antes de subscribe .

Para hacer que el código dentro de tu bloque de map funcione en paralelo, debes envolverlo en un observable con tu propio progtwigdor:

 val ex = Executors.newFixedThreadPool(10) val scheduler = Schedulers.from(ex) Observable.fromIterable((1..100).toList()) .flatMap { Observable .fromCallable { Thread.currentThread().name } .subscribeOn(scheduler) } .subscribe { println(it + " " + Thread.currentThread().name) } 

En este caso, verá el resultado:

 pool-1-thread-1 pool-1-thread-1 pool-1-thread-2 pool-1-thread-1 pool-1-thread-3 pool-1-thread-1 pool-1-thread-4 pool-1-thread-1 ... 

Puede consultar el artículo RxJava – Lograr la paralelización que brinda explicaciones de este comportamiento.

Además, RxJava 2.0.5 introdujo la API ParallelFlowable

  • ¿Por qué tengo un Log de salida no deseado al fusionar 2 observables en otro Observable, que los almacena temporalmente cada 10 segundos?
  • Java genérico para Kotlin genérico. Retorno genérico del método
  • Utilice la unidad de Kotlin (o cualquier otro object) escriba en el layout de Android
  • ¿Cómo puedo fusionar una sola <Lista <Lista <T >>> en una Lista <T> con RxJava 2?
  • Prueba RxJava2 Flowable Query Room
  • Retrofit-Vertx con RxJava2 en Kotlin IllegalStateException message == null
  • Asunto de RxJava: escuche el tipo diferente que el que emite
  • ¿Cómo puedo unir dos RxJava2 Obvervables por key?
  • Cambie Flowable <List <Obj1 >> a Flowable <List <Obj2 >> en la habitación
  • ¿Cómo crear un intervalo infinito observable que emitirá un nuevo object cada intervalo de time?
  • Room - SELECT query, get o default