Llamada asincrónica para cada elemento dentro de una colección

Tengo un problema que no pude resolver hasta ahora Soy nuevo en RxKotlin, así que podría ser fácil. Por favor, eche un vistazo al código:

override fun infos(): Stream<Info> = client.infoAboutItem(identifier) .map { val itemId = it.itemId ?: "" val item = client.itemForId(itemId) ClientInfo(client, it, source, item) as Info } .let { AccessStream(it) } 

Donde la stream es nuestra colección hecha a sí misma. El map es un método que le permite iterar sobre cada elemento dentro de esa colección.

El problema aquí es que

  client.itemForId(itemId) 

es una llamada http que devuelve un single que no es ideal.

Me gustaría crear una llamada asincrónica dentro del map que devuelva Item en lugar de Single y luego pasarlo a ClientInfo. Lo que he intentado hasta ahora fue utilizar la suscripción dentro del map y usar el método blockingGet () pero esto bloquea el hilo principal incluso si observo y me suscribo en un hilo diferente

Por lo tanto, implica hacer una llamada asincrónica para cada cosa de la colección.

Gracias por la ayuda

Puede tratar de devolver Observable<Stream<Info>> y luego se vería así:

  override fun infos(): Observable<Stream<Info>> = Observable.from(client.infoAboutItem(identifier)) .flatMapSingle { val itemId = it.itemId ?: "" client.itemForId(itemId) } .map { ClientInfo(client, it, source, item) as Info } .toList() .flatMap { AccessStream(it) } 

Debería envolver esas costosas operaciones en un observable y usar un map plano para comprimir esos datos en una Información del Cliente.

He escrito una pequeña muestra para mostrarlo.

 class SimpleTest { val testScheduler = TestScheduler() @Test fun test() { infos().observeOn(Schedulers.immediate()) .subscribe { logger("Output", it.toString()) } testScheduler.advanceTimeBy(10, TimeUnit.MINUTES) } fun infos(): Single<List<ClientInfo>> { return Observable.from(infoAboutItem("some_identifier")) .doOnNext { logger("Next", it.toString()) } .flatMap { aboutItem -> Observable.fromCallable { itemForId(aboutItem.itemId) } .subscribeOn(testScheduler) .map { ClientInfo(aboutItem = aboutItem, item = it) } } .doOnNext { logger("Next", it.toString()) } .toList() .toSingle() } data class ClientInfo( val id: String = UUID.randomUUID().toString(), val aboutItem: AboutItem, val item: Item ) data class AboutItem(val itemId: String = UUID.randomUUID().toString()) data class Item(val id: String = UUID.randomUUID().toString()) fun infoAboutItem(identifier: String): List<AboutItem> { return (1..10).map { AboutItem() } } fun itemForId(itemId: String): Item { val sleepTime = Random().nextInt(1000).toLong() Thread.sleep(sleepTime) return Item() } fun logger(tag: String, message: String): Unit { val formattedDate = Date(Schedulers.immediate().now()).format() System.out.println("$tag @ $formattedDate: $message") } fun Date.format(): String { return SimpleDateFormat("HH:mm:ss.SSS", Locale.US).format(this) } } 
  • ¿Cómo especificar la versión de RxJava al usar RxKotlin?
  • RxJava2 Tal vez devuelva Observable vacío si no hay elemento
  • RxJava- Gire Observable en Iterator, Stream o Sequence
  • Kotlin con stack RxKotlinFX da un error de class de acceso
  • RxKotlin flattenAsObservable (): no coincide con el método de reference
  • Obligatorio <Objeto> y encontrado <Objeto>?
  • ¿Cómo escalar dinámicamente el rebote de la stream de emisión de ráfagas?
  • Múltiples requestes de modificación2 usando Flowable en Kotlin
  • RxKotlin - Single.just () no se emite al suscribirse TestSubscriber
  • Cómo comprimir algunos observables en lenguaje Kotlin con RxAndroid
  • No se puede 'observar en' hilo principal con RxKotlin