Kotlin系列的并行操作?

在斯卡拉,人们可以很容易地做一个平行的地图,forEach等,与:

collection.par.map(..) 

Kotlin有没有相同的东西?

Kotlin标准库不支持并行操作。 但是,由于Kotlin使用标准的Java集合类,所以可以使用Java 8流API对Kotlin集合执行并行操作。

Kotlin的stdlib还没有官方的支持,但是你可以定义一个扩展函数来模仿par.map

 fun <T, R> Iterable<T>.pmap( numThreads: Int = Runtime.getRuntime().availableProcessors() - 2, exec: ExecutorService = Executors.newFixedThreadPool(numThreads), transform: (T) -> R): List<R> { // default size is just an inlined version of kotlin.collections.collectionSizeOrDefault val defaultSize = if (this is Collection<*>) this.size else 10 val destination = Collections.synchronizedList(ArrayList<R>(defaultSize)) for (item in this) { exec.submit { destination.add(transform(item)) } } exec.shutdown() exec.awaitTermination(1, TimeUnit.DAYS) return ArrayList<R>(destination) } 

( github源码 )

这是一个简单的使用示例

 val result = listOf("foo", "bar").pmap { it+"!" }.filter { it.contains("bar") } 

如果需要,它允许通过提供线程数量甚至特定的java.util.concurrent.Executor来调整线程。 例如

 listOf("foo", "bar").pmap(4, transform = { it + "!" }) 

请注意,这种方法只允许并行map操作,不会影响任何下游位。 例如,第一个例子中的filter将运行单线程。 但是,在许多情况下,只是数据转换(即map )需要并行化。 此外,将上述方法扩展到Kotlin收集API的其他元素将是直接的。

从Kotlin 1.1开始,并行操作在协程上也可以很好地表达。 这是列表中的pmap

 fun <A, B>List<A>.pmap(f: suspend (A) -> B): List<B> = runBlocking { map { async(CommonPool) { f(it) } }.map { it.await() } } 

请注意,协程仍然是一个实验性功能。

目前没有。 如果你看看官方Kotlin与Scala的比较,你会看到

以后可能会添加到Kotlin的东西:

  • 并行收集