cómo limitar kotlin coroutines máxima concurrency

Tengo una secuencia (de File.walkTopDown) y necesito ejecutar una operación de larga duración en cada uno de ellos. Me gustaría usar las mejores prácticas / corutinas de Kotlin, pero o bien no obtengo paralelismo, ni demasiado paralelismo, ni acerto un error IO de "demasiados files abiertos".

File("/Users/me/Pictures/").walkTopDown() .onFail { file, ex -> println("ERROR: $file caused $ex") } .filter { ... only big images... } .map { file -> async { // I *think* I want async and not "launch"... ImageProcessor.fromFile(file) } } 

Parece que esto no funciona en paralelo, y mi CPU multinúcleo nunca supera el valor de 1 CPU. ¿Hay alguna manera con coroutines de ejecutar trabajos con "Número de operaciones paralelas" de Trabajos diferidos?

Miré Multithreading usando Kotlin Coroutines, que primero crea TODOS los trabajos y luego los une, pero eso significa completar el recorrido del tree de Secuencia / file completamente antes del paso de combinación de procesamiento pesado, y eso parece … ¡dudoso! Dividirlo en un process de recostackción y process significa que la recostackción podría ir mucho más allá del procesamiento.

 val jobs = ... the Sequence above... .toSet() println("Found ${jobs.size}") jobs.forEach { it.await() } 

    El problema con su primer fragment es que no se ejecuta en absoluto; recuerde, Sequence es flojo y debe usar una operación de terminal como toSet() o forEach() . Además, debe limitar el número de subprocesss que se pueden usar para esa tarea mediante la construcción de un newFixedThreadPoolContext context de newFixedThreadPoolContext y usarlo en modo async :

     val pictureContext = newFixedThreadPoolContext(nThreads = 10, name = "reading pictures in parallel") File("/Users/me/Pictures/").walkTopDown() .onFail { file, ex -> println("ERROR: $file caused $ex") } .filter { ... only big images... } .map { file -> async(pictureContext) { ImageProcessor.fromFile(file) } } .toList() .forEach { it.await() } 

    Editar: tienes que usar un operador de terminal ( toList ) antes de esperar los resultados

    Lo tengo trabajando con un canal. Pero tal vez estoy siendo networkingundante a tu manera?

     val pipe = ArrayChannel<Defernetworking<ImageFile>>(20) launch { while (!(pipe.isEmpty && pipe.isClosedForSend)) { imageFiles.add(pipe.receive().await()) } println("pipe closed") } File("/Users/me/").walkTopDown() .onFail { file, ex -> println("ERROR: $file caused $ex") } .forEach { pipe.send(async { ImageFile.fromFile(it) }) } pipe.close()