Carga concurrente de files S3 a través de Koutlin Coroutines

Necesito upload muchos files a S3, tomaría horas completar ese trabajo secuencialmente. Eso es exactamente en lo que sobresalen las nuevas corotinas de Kotlin, así que quise darles una primera oportunidad en vez de juguetear con el service de ejecución basado en Thread.

Aquí está mi código (simplificado):

fun upload(superTiles: Map<Int, Map<Int, SuperTile>>) = runBlocking { val s3 = AmazonS3ClientBuilder.standard().withRegion("eu-west-1").build() for ((x, ys) in superTiles) { val jobs = mutableListOf<Defernetworking<Any>>() for ((y, superTile) in ys) { val job = async(CommonPool) { uploadTile(s3, x, y, superTile) } jobs.add(job) } jobs.map { it.await() } } } suspend fun uploadTile(s3: AmazonS3, x: Int, y: Int, superTile: SuperTile) { val json: String = "{}" val key = "$s3Prefix/x4/$z/$x/$y.json" s3.putObject(PutObjectRequest("my_bucket", ByteArrayInputStream(json.toByteArray()), metadata)) } 

El problema: el código sigue siendo muy lento y el logging revela que las requestes se siguen ejecutando secuencialmente: un trabajo finaliza antes de que se cree el siguiente. Solo en muy pocos casos (1 de cada 10) veo trabajos ejecutándose simultáneamente.

¿Por qué el código no se ejecuta mucho más rápido / concurrentemente? ¿Qué puedo hacer al respecto?

Las corotines de Kotlin se destacan cuando trabajas con API asíncrona , mientras que la API de AmazonS3.putObject que estás utilizando es una API síncrona de locking de la vieja escuela, por lo que obtienes solo la misma cantidad de subidas simultáneas que la cantidad de subprocesss en CommonPool que estás utilizando. No tiene sentido marcar su function uploadTile con suspend modificado, ya que no usa ninguna function de suspensión en su cuerpo.

El primer paso para get más performance en su tarea de carga es comenzar a usar API asíncrona para eso. Sugeriría mirar Amazon S3 TransferManager para ese bolso. Vea si eso resuelve su problema primero.

Las corotinas de Kotlin están diseñadas para ayudarlo a combinar sus API asincrónicas en flujos de trabajo lógicos fáciles de usar. Por ejemplo, es sencillo adaptar la API asíncrona de TransferManager para su uso con corutinas escribiendo la siguiente function de extensión:

 suspend fun Upload.await(): UploadResult = suspendCancellableCoroutine { cont -> addProgressListener { if (isDone) { // we know it should not actually wait when done try { cont.resume(waitForUploadResult()) } catch (e: Throwable) { cont.resumeWithException(e) } } } cont.invokeOnCompletion { abort() } } 

Esta extensión le permite escribir un código muy fluido que funciona con TransferManager y puede reescribir su function uploadTile para trabajar con TransferManager lugar de trabajar con el locking de la interfaz de AmazonS3 :

 suspend fun uploadTile(tm: TransferManager, x: Int, y: Int, superTile: SuperTile) { val json: String = "{}" val key = "$s3Prefix/x4/$z/$x/$y.json" tm.upload(PutObjectRequest("my_bucket", ByteArrayInputStream(json.toByteArray()), metadata)) .await() } 

Observe cómo esta nueva versión de uploadTile usa una function de suspensión que se definió anteriormente.

  • Exponenciación de Kleisli en Kotlin
  • AppCompatActivity, ViewModel y data binding
  • Swift si lo dejas y más en Kotlin
  • Kotlin: ¿cómo pasar una function como parámetro a otra?
  • function kotlin devuelve nulo
  • ¿Cómo evito expresiones de objects para funciones que devuelven una interfaz SAM?
  • Dagger 2 no puede acceder a Retrofit
  • Métodos de interfaz de anulación de Android kotlin dentro del método onCreateView ()
  • Variables "coroutine local" en kotlin
  • La desconnection no funciona en la aplicación Spring Boot (no es compatible con el método POST)
  • ¿Cómo se manejan las properties anuladas en los bloques de inicio?