并行S3文件通过Kotlin协程上传

我需要上传许多文件到S3,这将需要数小时才能顺利完成这项工作。 这正是Kotlin的新协程所擅长的,所以我想给他们一个尝试,而不是再次用一些基于线程的执行服务来摆弄。

这是我的(简体)代码:

fun upload(superTiles: Map<Int, Map<Int, SuperTile>>) = runBlocking { val s3 = AmazonS3ClientBuilder.standard().withRegion("eu-west-1").build() for ((x, ys) in superTiles) { val jobs = mutableListOf<Deferred<Any>>() for ((y, superTile) in ys) { val job = async(CommonPool) { uploadTile(s3, x, y, superTile) } jobs.add(job) } jobs.map { it.await() } } } suspend fun uploadTile(s3: AmazonS3, x: Int, y: Int, superTile: SuperTile) { val json: String = "{}" val key = "$s3Prefix/x4/$z/$x/$y.json" s3.putObject(PutObjectRequest("my_bucket", ByteArrayInputStream(json.toByteArray()), metadata)) } 

问题是:代码仍然非常慢,日志显示请求仍然按顺序执行:在下一个创建之前,作业完成。 只有极少数情况下(十分之一)我看到同时进行的工作。

为什么代码运行速度不快? 我能做些什么呢?

当您使用异步 API时,Kotlin协同工作非常出色,而您正在使用的AmazonS3.putObject API是一个老派的阻塞同步API,因此只能获得与您所使用的CommonPool中的线程数量一样多的并发上传。 在suspend修改时标记uploadTile函数没有任何价值,因为它没有在其正文中使用任何suspend uploadTile函数。

在上传任务中获得更多吞吐量的第一步是开始使用异步API。 我建议看看这个钱包的Amazon S3 TransferManager 。 看看是否让你的问题首先解决。

Kotlin协同程序旨在帮助您将异步API合并到易于使用的逻辑工作流程中。 例如,通过编写以下扩展函数,可以直接使用TransferManager异步API与协程一起使用:

 suspend fun Upload.await(): UploadResult = suspendCancellableCoroutine { cont -> addProgressListener { if (isDone) { // we know it should not actually wait when done try { cont.resume(waitForUploadResult()) } catch (e: Throwable) { cont.resumeWithException(e) } } } cont.invokeOnCompletion { abort() } } 

此扩展使您能够编写非常流利的代码与TransferManager ,您可以重写您的uploadTile函数来使用TransferManager而不是使用拦截AmazonS3接口:

 suspend fun uploadTile(tm: TransferManager, x: Int, y: Int, superTile: SuperTile) { val json: String = "{}" val key = "$s3Prefix/x4/$z/$x/$y.json" tm.upload(PutObjectRequest("my_bucket", ByteArrayInputStream(json.toByteArray()), metadata)) .await() } 

请注意,这个新版本的uploadTile如何使用挂起函数await上面定义的。

Interesting Posts