Kotlin系列的并行操作?
在斯卡拉,人们可以很容易地做一个平行的地图,forEach等,与:
collection.par.map(..)
Kotlin有没有相同的东西?
Kotlin标准库不支持并行操作。 但是,由于Kotlin使用标准的Java集合类,所以可以使用Java 8流API对Kotlin集合执行并行操作。
Kotlin的stdlib还没有官方的支持,但是你可以定义一个扩展函数来模仿par.map
:
fun <T, R> Iterable<T>.pmap( numThreads: Int = Runtime.getRuntime().availableProcessors() - 2, exec: ExecutorService = Executors.newFixedThreadPool(numThreads), transform: (T) -> R): List<R> { // default size is just an inlined version of kotlin.collections.collectionSizeOrDefault val defaultSize = if (this is Collection<*>) this.size else 10 val destination = Collections.synchronizedList(ArrayList<R>(defaultSize)) for (item in this) { exec.submit { destination.add(transform(item)) } } exec.shutdown() exec.awaitTermination(1, TimeUnit.DAYS) return ArrayList<R>(destination) }
( github源码 )
这是一个简单的使用示例
val result = listOf("foo", "bar").pmap { it+"!" }.filter { it.contains("bar") }
如果需要,它允许通过提供线程数量甚至特定的java.util.concurrent.Executor
来调整线程。 例如
listOf("foo", "bar").pmap(4, transform = { it + "!" })
请注意,这种方法只允许并行map
操作,不会影响任何下游位。 例如,第一个例子中的filter
将运行单线程。 但是,在许多情况下,只是数据转换(即map
)需要并行化。 此外,将上述方法扩展到Kotlin收集API的其他元素将是直接的。
从Kotlin 1.1开始,并行操作在协程上也可以很好地表达。 这是列表中的pmap
:
fun <A, B>List<A>.pmap(f: suspend (A) -> B): List<B> = runBlocking { map { async(CommonPool) { f(it) } }.map { it.await() } }
请注意,协程仍然是一个实验性功能。
目前没有。 如果你看看官方Kotlin与Scala的比较,你会看到
以后可能会添加到Kotlin的东西:
- 并行收集