Llamada asincrónica para cada elemento dentro de una colección

Tengo un problema que no pude resolver hasta ahora Soy nuevo en RxKotlin, así que podría ser fácil. Por favor, eche un vistazo al código:

override fun infos(): Stream<Info> = client.infoAboutItem(identifier) .map { val itemId = it.itemId ?: "" val item = client.itemForId(itemId) ClientInfo(client, it, source, item) as Info } .let { AccessStream(it) } 

Donde la stream es nuestra colección hecha a sí misma. El map es un método que le permite iterar sobre cada elemento dentro de esa colección.

El problema aquí es que

  client.itemForId(itemId) 

es una llamada http que devuelve un single que no es ideal.

Me gustaría crear una llamada asincrónica dentro del map que devuelva Item en lugar de Single y luego pasarlo a ClientInfo. Lo que he intentado hasta ahora fue utilizar la suscripción dentro del map y usar el método blockingGet () pero esto bloquea el hilo principal incluso si observo y me suscribo en un hilo diferente

Por lo tanto, implica hacer una llamada asincrónica para cada cosa de la colección.

Gracias por la ayuda

Puede tratar de devolver Observable<Stream<Info>> y luego se vería así:

  override fun infos(): Observable<Stream<Info>> = Observable.from(client.infoAboutItem(identifier)) .flatMapSingle { val itemId = it.itemId ?: "" client.itemForId(itemId) } .map { ClientInfo(client, it, source, item) as Info } .toList() .flatMap { AccessStream(it) } 

Debería envolver esas costosas operaciones en un observable y usar un map plano para comprimir esos datos en una Información del Cliente.

He escrito una pequeña muestra para mostrarlo.

 class SimpleTest { val testScheduler = TestScheduler() @Test fun test() { infos().observeOn(Schedulers.immediate()) .subscribe { logger("Output", it.toString()) } testScheduler.advanceTimeBy(10, TimeUnit.MINUTES) } fun infos(): Single<List<ClientInfo>> { return Observable.from(infoAboutItem("some_identifier")) .doOnNext { logger("Next", it.toString()) } .flatMap { aboutItem -> Observable.fromCallable { itemForId(aboutItem.itemId) } .subscribeOn(testScheduler) .map { ClientInfo(aboutItem = aboutItem, item = it) } } .doOnNext { logger("Next", it.toString()) } .toList() .toSingle() } data class ClientInfo( val id: String = UUID.randomUUID().toString(), val aboutItem: AboutItem, val item: Item ) data class AboutItem(val itemId: String = UUID.randomUUID().toString()) data class Item(val id: String = UUID.randomUUID().toString()) fun infoAboutItem(identifier: String): List<AboutItem> { return (1..10).map { AboutItem() } } fun itemForId(itemId: String): Item { val sleepTime = Random().nextInt(1000).toLong() Thread.sleep(sleepTime) return Item() } fun logger(tag: String, message: String): Unit { val formattedDate = Date(Schedulers.immediate().now()).format() System.out.println("$tag @ $formattedDate: $message") } fun Date.format(): String { return SimpleDateFormat("HH:mm:ss.SSS", Locale.US).format(this) } } 
  • Cómo comprimir algunos observables en lenguaje Kotlin con RxAndroid
  • RxJava2 Tal vez devuelva Observable vacío si no hay elemento
  • No se puede 'observar en' hilo principal con RxKotlin
  • Para una function de Kotlin utilizada como expresión, ¿hay una forma concisa de operar y devolver un valor?
  • Inyectar constructor y object complementario
  • ¿Cómo leer JSON desde Url usando kotlin Android?
  • RxAndroid, cómo detectar si observable ha finalizado la emisión
  • RxKotlin - Single.just () no se emite al suscribirse TestSubscriber
  • ¿Puedo crear un método de extensión Kotlin para agregar una suscripción rxJava a una suscripción compuesta?
  • RxKotlin collectInto () MutableList usando references de método
  • Confusión de la syntax de Kotlin lambda