Llamada asincrónica para cada elemento dentro de una colección

Tengo un problema que no pude resolver hasta ahora Soy nuevo en RxKotlin, así que podría ser fácil. Por favor, eche un vistazo al código:

override fun infos(): Stream<Info> = client.infoAboutItem(identifier) .map { val itemId = it.itemId ?: "" val item = client.itemForId(itemId) ClientInfo(client, it, source, item) as Info } .let { AccessStream(it) } 

Donde la stream es nuestra colección hecha a sí misma. El map es un método que le permite iterar sobre cada elemento dentro de esa colección.

El problema aquí es que

  client.itemForId(itemId) 

es una llamada http que devuelve un single que no es ideal.

Me gustaría crear una llamada asincrónica dentro del map que devuelva Item en lugar de Single y luego pasarlo a ClientInfo. Lo que he intentado hasta ahora fue utilizar la suscripción dentro del map y usar el método blockingGet () pero esto bloquea el hilo principal incluso si observo y me suscribo en un hilo diferente

Por lo tanto, implica hacer una llamada asincrónica para cada cosa de la colección.

Gracias por la ayuda

Puede tratar de devolver Observable<Stream<Info>> y luego se vería así:

  override fun infos(): Observable<Stream<Info>> = Observable.from(client.infoAboutItem(identifier)) .flatMapSingle { val itemId = it.itemId ?: "" client.itemForId(itemId) } .map { ClientInfo(client, it, source, item) as Info } .toList() .flatMap { AccessStream(it) } 

Debería envolver esas costosas operaciones en un observable y usar un map plano para comprimir esos datos en una Información del Cliente.

He escrito una pequeña muestra para mostrarlo.

 class SimpleTest { val testScheduler = TestScheduler() @Test fun test() { infos().observeOn(Schedulers.immediate()) .subscribe { logger("Output", it.toString()) } testScheduler.advanceTimeBy(10, TimeUnit.MINUTES) } fun infos(): Single<List<ClientInfo>> { return Observable.from(infoAboutItem("some_identifier")) .doOnNext { logger("Next", it.toString()) } .flatMap { aboutItem -> Observable.fromCallable { itemForId(aboutItem.itemId) } .subscribeOn(testScheduler) .map { ClientInfo(aboutItem = aboutItem, item = it) } } .doOnNext { logger("Next", it.toString()) } .toList() .toSingle() } data class ClientInfo( val id: String = UUID.randomUUID().toString(), val aboutItem: AboutItem, val item: Item ) data class AboutItem(val itemId: String = UUID.randomUUID().toString()) data class Item(val id: String = UUID.randomUUID().toString()) fun infoAboutItem(identifier: String): List<AboutItem> { return (1..10).map { AboutItem() } } fun itemForId(itemId: String): Item { val sleepTime = Random().nextInt(1000).toLong() Thread.sleep(sleepTime) return Item() } fun logger(tag: String, message: String): Unit { val formattedDate = Date(Schedulers.immediate().now()).format() System.out.println("$tag @ $formattedDate: $message") } fun Date.format(): String { return SimpleDateFormat("HH:mm:ss.SSS", Locale.US).format(this) } } 
  • Kotlin con stack RxKotlinFX da un error de class de acceso
  • Confusión de la syntax de Kotlin lambda
  • Inyectar constructor y object complementario
  • Cómo pasar nulo a un Observable con tipo anulable en RxJava 2 y Kotlin
  • RxJava - ¿Entradas de keyboard de contrapresión?
  • ¿Cómo leer JSON desde Url usando kotlin Android?
  • Repetir acciones en estado con RxJava
  • RxKotlin collectInto () MutableList usando references de método
  • Comportamiento con Kotlin Higher-Order Functions e interfaces de método único?
  • RxJava- Gire Observable en Iterator, Stream o Sequence
  • OnComplete nunca se llamó con toSortedList () y groupBy ()