Carga concurrente de files S3 a través de Koutlin Coroutines

Necesito upload muchos files a S3, tomaría horas completar ese trabajo secuencialmente. Eso es exactamente en lo que sobresalen las nuevas corotinas de Kotlin, así que quise darles una primera oportunidad en vez de juguetear con el service de ejecución basado en Thread.

Aquí está mi código (simplificado):

fun upload(superTiles: Map<Int, Map<Int, SuperTile>>) = runBlocking { val s3 = AmazonS3ClientBuilder.standard().withRegion("eu-west-1").build() for ((x, ys) in superTiles) { val jobs = mutableListOf<Defernetworking<Any>>() for ((y, superTile) in ys) { val job = async(CommonPool) { uploadTile(s3, x, y, superTile) } jobs.add(job) } jobs.map { it.await() } } } suspend fun uploadTile(s3: AmazonS3, x: Int, y: Int, superTile: SuperTile) { val json: String = "{}" val key = "$s3Prefix/x4/$z/$x/$y.json" s3.putObject(PutObjectRequest("my_bucket", ByteArrayInputStream(json.toByteArray()), metadata)) } 

El problema: el código sigue siendo muy lento y el logging revela que las requestes se siguen ejecutando secuencialmente: un trabajo finaliza antes de que se cree el siguiente. Solo en muy pocos casos (1 de cada 10) veo trabajos ejecutándose simultáneamente.

¿Por qué el código no se ejecuta mucho más rápido / concurrentemente? ¿Qué puedo hacer al respecto?

Las corotines de Kotlin se destacan cuando trabajas con API asíncrona , mientras que la API de AmazonS3.putObject que estás utilizando es una API síncrona de locking de la vieja escuela, por lo que obtienes solo la misma cantidad de subidas simultáneas que la cantidad de subprocesss en CommonPool que estás utilizando. No tiene sentido marcar su function uploadTile con suspend modificado, ya que no usa ninguna function de suspensión en su cuerpo.

El primer paso para get más performance en su tarea de carga es comenzar a usar API asíncrona para eso. Sugeriría mirar Amazon S3 TransferManager para ese bolso. Vea si eso resuelve su problema primero.

Las corotinas de Kotlin están diseñadas para ayudarlo a combinar sus API asincrónicas en flujos de trabajo lógicos fáciles de usar. Por ejemplo, es sencillo adaptar la API asíncrona de TransferManager para su uso con corutinas escribiendo la siguiente function de extensión:

 suspend fun Upload.await(): UploadResult = suspendCancellableCoroutine { cont -> addProgressListener { if (isDone) { // we know it should not actually wait when done try { cont.resume(waitForUploadResult()) } catch (e: Throwable) { cont.resumeWithException(e) } } } cont.invokeOnCompletion { abort() } } 

Esta extensión le permite escribir un código muy fluido que funciona con TransferManager y puede reescribir su function uploadTile para trabajar con TransferManager lugar de trabajar con el locking de la interfaz de AmazonS3 :

 suspend fun uploadTile(tm: TransferManager, x: Int, y: Int, superTile: SuperTile) { val json: String = "{}" val key = "$s3Prefix/x4/$z/$x/$y.json" tm.upload(PutObjectRequest("my_bucket", ByteArrayInputStream(json.toByteArray()), metadata)) .await() } 

Observe cómo esta nueva versión de uploadTile usa una function de suspensión que se definió anteriormente.

  • Android - Error al convertir Bytecode en Dex con la versión min SDK
  • Kotlin, ¿cuándo delegar por map?
  • Kotlin con JPA: infierno de constructor pnetworkingeterminado
  • ¿Cuál es la diferencia entre plus y plusAssign en la sobrecarga del operador de kotlin?
  • Error: ejecución fallida para la tarea ': app: javaPreCompileDebug'. > java.io.IOException: no se pudo eliminar annotationProcessors.json
  • Error de la aplicación porque la instancia de la aplicación lateinit no se ha inicializado
  • Usar Kotlin en un module de biblioteca sin usarlo en el module de la aplicación
  • Al utilizar tanto publishOn como subscribeOn en un flujo, no ocurre nada
  • kotlin: Genérico a diferentes types
  • Android Spinner getDropDownView repite elementos después de la selección
  • Cuál es la diferencia entre launch / join y async / await en Kotlin coroutines