如何限制kotlin协同程序的最大并发性
我有一个序列(从File.walkTopDown),我需要在他们每个人上运行一个长时间运行的操作。 我想使用Kotlin的最佳实践/协同程序,但是我要么没有并行性,要么太多的并行性,并打“太多打开的文件”IO错误。
File("/Users/me/Pictures/").walkTopDown() .onFail { file, ex -> println("ERROR: $file caused $ex") } .filter { ... only big images... } .map { file -> async { // I *think* I want async and not "launch"... ImageProcessor.fromFile(file) } }
这似乎并不是并行运行,而我的多核CPU从来没有超过1 CPU的价值。 有没有一种方法与协程运行“NumberOfCores并行操作”价值的延期工作?
我使用Kotlin Coroutines来查看multithreading ,首先创建所有作业然后加入它们,但这意味着完成序列/文件树遍历繁重的处理加入步骤之前,似乎… iffy! 将其拆分成一个收集和一个流程步骤意味着收集可以在处理之前运行。
val jobs = ... the Sequence above... .toSet() println("Found ${jobs.size}") jobs.forEach { it.await() }
你的第一个问题是它根本不能运行 – 记住, Sequence
是懒惰的,你必须使用终端操作,如toSet()
或forEach()
。 此外,您需要通过构建一个newFixedThreadPoolContext
上下文来限制可用于该任务的线程数,并在async
使用它:
val pictureContext = newFixedThreadPoolContext(nThreads = 10, name = "reading pictures in parallel") File("/Users/me/Pictures/").walkTopDown() .onFail { file, ex -> println("ERROR: $file caused $ex") } .filter { ... only big images... } .map { file -> async(pictureContext) { ImageProcessor.fromFile(file) } } .toList() .forEach { it.await() }
编辑:您必须使用终端操作员( toList
),等待结果
我得到了一个频道的工作。 但是,也许我对你的方式是多余的?
val pipe = ArrayChannel>(20) launch { while (!(pipe.isEmpty && pipe.isClosedForSend)) { imageFiles.add(pipe.receive().await()) } println("pipe closed") } File("/Users/me/").walkTopDown() .onFail { file, ex -> println("ERROR: $file caused $ex") } .forEach { pipe.send(async { ImageFile.fromFile(it) }) } pipe.close()