Llamada asincrónica para cada elemento dentro de una colección

Tengo un problema que no pude resolver hasta ahora Soy nuevo en RxKotlin, así que podría ser fácil. Por favor, eche un vistazo al código:

override fun infos(): Stream<Info> = client.infoAboutItem(identifier) .map { val itemId = it.itemId ?: "" val item = client.itemForId(itemId) ClientInfo(client, it, source, item) as Info } .let { AccessStream(it) } 

Donde la stream es nuestra colección hecha a sí misma. El map es un método que le permite iterar sobre cada elemento dentro de esa colección.

El problema aquí es que

  client.itemForId(itemId) 

es una llamada http que devuelve un single que no es ideal.

Me gustaría crear una llamada asincrónica dentro del map que devuelva Item en lugar de Single y luego pasarlo a ClientInfo. Lo que he intentado hasta ahora fue utilizar la suscripción dentro del map y usar el método blockingGet () pero esto bloquea el hilo principal incluso si observo y me suscribo en un hilo diferente

Por lo tanto, implica hacer una llamada asincrónica para cada cosa de la colección.

Gracias por la ayuda

Puede tratar de devolver Observable<Stream<Info>> y luego se vería así:

  override fun infos(): Observable<Stream<Info>> = Observable.from(client.infoAboutItem(identifier)) .flatMapSingle { val itemId = it.itemId ?: "" client.itemForId(itemId) } .map { ClientInfo(client, it, source, item) as Info } .toList() .flatMap { AccessStream(it) } 

Debería envolver esas costosas operaciones en un observable y usar un map plano para comprimir esos datos en una Información del Cliente.

He escrito una pequeña muestra para mostrarlo.

 class SimpleTest { val testScheduler = TestScheduler() @Test fun test() { infos().observeOn(Schedulers.immediate()) .subscribe { logger("Output", it.toString()) } testScheduler.advanceTimeBy(10, TimeUnit.MINUTES) } fun infos(): Single<List<ClientInfo>> { return Observable.from(infoAboutItem("some_identifier")) .doOnNext { logger("Next", it.toString()) } .flatMap { aboutItem -> Observable.fromCallable { itemForId(aboutItem.itemId) } .subscribeOn(testScheduler) .map { ClientInfo(aboutItem = aboutItem, item = it) } } .doOnNext { logger("Next", it.toString()) } .toList() .toSingle() } data class ClientInfo( val id: String = UUID.randomUUID().toString(), val aboutItem: AboutItem, val item: Item ) data class AboutItem(val itemId: String = UUID.randomUUID().toString()) data class Item(val id: String = UUID.randomUUID().toString()) fun infoAboutItem(identifier: String): List<AboutItem> { return (1..10).map { AboutItem() } } fun itemForId(itemId: String): Item { val sleepTime = Random().nextInt(1000).toLong() Thread.sleep(sleepTime) return Item() } fun logger(tag: String, message: String): Unit { val formattedDate = Date(Schedulers.immediate().now()).format() System.out.println("$tag @ $formattedDate: $message") } fun Date.format(): String { return SimpleDateFormat("HH:mm:ss.SSS", Locale.US).format(this) } } 
  • ¿Puedo crear un método de extensión Kotlin para agregar una suscripción rxJava a una suscripción compuesta?
  • La biblioteca de Kotlin 'rxkotlin-0.21.0.jar' tiene un formatting no compatible. Actualice la biblioteca o el complemento
  • Kotlin con stack RxKotlinFX da un error de class de acceso
  • La suscripción de rx kotlin no funciona, no recibe artículos
  • ¿Cómo leer JSON desde Url usando kotlin Android?
  • Comportamiento con Kotlin Higher-Order Functions e interfaces de método único?
  • Cómo comprimir algunos observables en lenguaje Kotlin con RxAndroid
  • RxJava - ¿Entradas de keyboard de contrapresión?
  • Repetir acciones en estado con RxJava
  • ¿Cómo puedo agregar de manera condicional una operación asincrónica en medio de una transmisión de RxJava?
  • RxJava (Kotlin), Observable.amb y PublishSubject no están disparando