Kotlin:如何异步等待一个相同的方法列表?

我有几百个Java类实例,都需要在10分钟内完成.calculate()方法或死亡。 他们会抓住CPU和内存,所以我想只允许5(线程?)一次。 我相信我很接近,但来自Java背景我还不够熟悉kotlin协同程序(vs java ExecutorServices)来进行编译。

// ...my logic to create a stream of identical class type instances // that all have a vanilla blocking .calculate():Double method... // which I now want to (maybe?) map to Jobs listOf(MyClass(1), MyClass(2), MyClass(1000)) .map { launch(CommonPool) { val errorRate: Double? = it?.calculate() println("${it?.javaClass?.simpleName} $errorRate") // desired output errorRate } } .collect(Collectors.toList<Job>()) jobs.forEach { println(it.join()) } 

然后我觉得我需要用非阻塞计算包装计算? 还是阻塞,但超时有限? 应该“runBlocking”在那里? 在上面的代码中更好的lambda?

 fun MyClass.calculateTimeLimited(): Double = runBlocking { withTimeout(TIMEOUT) { this.calculate() // <-- doesn't compile! "this" is "CoroutineScope" 

我不知道你是否知道,但有这个伟大的文件: 例如协程 。 我链接了关于取消和超时的特定部分。 以下是我的执行你的问题:

 import kotlinx.coroutines.experimental.CommonPool import kotlinx.coroutines.experimental.newSingleThreadContext import java.util.* import kotlin.system.measureNanoTime internal val random = Random() fun createRandArray(n: Long): Array<Int> { return createRandTArray(n).toTypedArray() } fun createRandTArray(n: Long): IntArray { return random.ints(n, 0, n.toInt()).toArray() } var size: Long = 1_000_000 var nRepeats: Int = 11 var nDrops: Int = 1 fun <T> benchmark(name: String, buildSortable: (Long) -> T, sort: (T) -> Any) { val arrays = List(nRepeats) { buildSortable(size) } val timesMS = arrays.map { measureNanoTime { sort(it) } / 1_000_000 }.drop(nDrops) // for JVM optimization warmup // println(timesMS) println("[$name] Average sort time for array of size $size: ${timesMS.average() } ms") } fun main(args: Array<String>) { size = 1_000_000 nRepeats = 6 benchmark("Array<Int>", buildSortable = { size -> createRandTArray(size).toTypedArray() }, sort = ::mergeSort) benchmark("ListLike Array<Int>", buildSortable = { size -> SortingArray(createRandTArray(size).toTypedArray()) }, sort = { array -> mergeSortLL(array) }) // oddly ::mergeSortLL is refused benchmark("ListLike IntArray", buildSortable = { size -> createRandTArray(size).asComparableList() }, sort = { array -> mergeSortLL(array) }) // oddly ::mergeSortLL is refused benchmark("IntArray", buildSortable = { size -> createRandTArray(size) }, sort = ::mergeSortIA) benchmark("IntArray multi-thread (CommonPool) with many array copies", buildSortable = { size -> createRandTArray(size) }, sort = { mergeSortCorou(it) }) val origContext = corouContext corouContext = newSingleThreadContext("single thread") benchmark("IntArray multi-thread (one thread!) with many array copies", buildSortable = { size -> createRandTArray(size) }, sort = { mergeSortCorou(it) }) corouContext = origContext benchmark("Java int[]", buildSortable = { size -> createRandTArray(size) }, sort = { MergeSorter.sort(it) }) } 

我得到了输出:

  Hit the timeout (CancellationException). Out of 100 computations, 45 completed. 

你可以玩timeOut值(目前为500毫秒)。 每个作业在0到1000毫秒之间都有不同的随机执行时间,所以在超时之前,大约有一半的时间会被执行。

但是,对于您的具体问题,您可能需要更多的时间来实现这一点。 你的计算必须是可以取消的 。 请在上面链接的文件中仔细阅读取消部分。 基本上你的计算要么必须调用kotlinx.coroutines一个suspend函数(这是我从调用delay所做的),或者使用yieldisActive


编辑 :相关评论取消任何工作(不可撤销/不可中止):

不,这里没有魔法。 使计算真正可以取消是非常困难的,不管你使用什么框架。 臭名昭着的Java有Thread.stop() ,这似乎做你想要的,但已被弃用 。

我尝试使用协程来解决在超时之后停止提交新作业的更简单的问题,但是在超时之前启动的作业可以远远超出超时而不被取消/中断。 我花了一些时间,并找不到一个简单的协程的解决方案。 这可以使用标准的Java并发结构来完成。

我的(抱歉不能在我的机器上测试):

 val results = streamOfInstances.asSeqence().map { async(CommonPool) { val errorRate: Double? = it?.calculate() println("${it?.javaClass?.simpleName} $errorRate") errorRate } } runBlocking { results.forEach { println(it.await()) } } 

与您的代码的主要区别:

  • 自从我收集结果以来,我使用async而不是launch
  • 阻塞操作( join()await() )在runBlocking{}内部
  • 我使用Kotlin的map ,而不是JDK8 API