Fan-out / fan-in – canal de resultado de cierre

Estoy produciendo elementos, consumiendo desde múltiples rutinas conjuntas y presionando back to resultChannel. El productor está cerrando su canal después del último elemento.

El código nunca termina ya que resultChannel nunca se cierra. ¿Cómo detectar y finalizar correctamente la iteración, de modo que hasNext() devuelve false ?

 val inputData = (0..99).map { "Input$it" } val threads = 10 val bundleProducer = produce<String>(CommonPool, threads) { inputData.forEach { item -> send(item) println("Producing: $item") } println("Producing finished") close() } val resultChannel = Channel<String>(threads) repeat(threads) { launch(CommonPool) { bundleProducer.consumeEach { println("CONSUMING $it") resultChannel.send("Result ($it)") } } } val iterator = object : Iterator<String> { val iterator = resultChannel.iterator() override fun hasNext() = runBlocking { iterator.hasNext() } override fun next() = runBlocking { iterator.next() } }.asSequence() println("Starting interation...") val result = iterator.toList() println("finish: ${result.size}") 

Puede ejecutar una corutina que espera a que los consumidores finalicen y luego cierra el resultChannel .

Primero, reescriba el código que inicia a los consumidores para save los Job s:

 val jobs = (1..threads).map { launch(CommonPool) { bundleProducer.consumeEach { println("CONSUMING $it") resultChannel.send("Result ($it)") } } } 

Y luego ejecuta otra coroutine que cierra el canal una vez que todos los Job están terminados:

 launch(CommonPool) { jobs.forEach { it.join() } resultChannel.close() } 
  • No se puede ejecutar kotlin coroutine (no existe tal exception de método
  • Spring 5 and Kotlin 1.1 Coroutines: Type rx.Scheduler no presente
  • Corutinas de testing unitaria en el hilo de UI
  • Kotlin Coroutines con time de espera
  • reference no resuelta: lanzamiento
  • Oyente dentro del productor
  • ¿Cuál es la diferencia entre CoroutineContext y Job en kotlinx.coroutines?
  • cómo limitar kotlin coroutines máxima concurrency
  • Variables "coroutine local" en kotlin
  • No se puede detectar el lanzamiento de exception después de que se cancela Kotlin coroutine
  • Kotlin coroutine no espera a ser hecho