Kotlin过程集合并行?

我有一个对象的集合,我需要执行一些转换。 目前我正在使用:

var myObjects: List<MyObject> = getMyObjects() myObjects.forEach{ myObj -> someMethod(myObj) } 

它工作正常,但我希望通过并行运行someMethod()来加速它,而不是等待每个对象完成,然后再开始下一个对象。

Kotlin有没有办法做到这一点? 也许与doAsyncTask什么的?

我知道一年前这个问题是不可能的, 但是现在Kotlin拥有doAsyncTask这样的协同程序,我很好奇,如果任何协程都可以帮助的话

你可以使用RxJava来解决这个问题。

 List<MyObjects> items = getList() Observable.from(items).flatMap(object : Func1<MyObjects, Observable<String>>() { fun call(item: MyObjects): Observable<String> { return someMethod(item) } }).subscribeOn(Schedulers.io()).observeOn(AndroidSchedulers.mainThread()).subscribe(object : Subscriber<String>() { fun onCompleted() { } fun onError(e: Throwable) { } fun onNext(s: String) { // do on output of each string } }) 

通过在Schedulers.io()上订阅,在后台线程上Schedulers.io()一些方法。

Java Stream在Kotlin中使用简单:

 tasks.stream().parallel().forEach { computeNotSuspend(it) } 

但是,如果您使用的是Android,如果您希望应用程序与低于24的API兼容,则无法使用Java 8。

您也可以按照您的建议使用协程。 但截至目前(2017年8月)这不是真正的语言组成部分,您需要安装外部库。 有很好的例子指导 。

  runBlocking<Unit> { val deferreds = tasks.map { async(CommonPool) { compute(it) } } deferreds.forEach { it.await() } } 

请注意,协程是使用非阻塞多线程实现的,这意味着它们可以比传统的多线程更快。 我在下面的代码中对Stream并行和协程进行了基准测试,在这种情况下,我的机器上的协同进程比以前快了7倍。 然而,你必须自己做一些工作,以确保你的代码是“挂起”(非锁定),这可能是非常棘手的。 在我的例子中,我只是调用delay ,这是图书馆提供的suspend功能。 非阻塞式多线程并不总是比传统的多线程更快。 如果你有很多线程什么都不做,只能等待IO,这可能会更快,这是我的基准测试。

我的基准代码:

 import kotlinx.coroutines.experimental.CommonPool import kotlinx.coroutines.experimental.async import kotlinx.coroutines.experimental.delay import kotlinx.coroutines.experimental.launch import kotlinx.coroutines.experimental.runBlocking import java.util.* import kotlin.system.measureNanoTime import kotlin.system.measureTimeMillis class SomeTask() { val durationMS = random.nextInt(1000).toLong() companion object { val random = Random() } } suspend fun compute(task: SomeTask): Unit { delay(task.durationMS) //println("done ${task.durationMS}") return } fun computeNotSuspend(task: SomeTask): Unit { Thread.sleep(task.durationMS) //println("done ${task.durationMS}") return } fun main(args: Array<String>) { val n = 100 val tasks = List(n) { SomeTask() } val timeCoroutine = measureNanoTime { runBlocking<Unit> { val deferreds = tasks.map { async(CommonPool) { compute(it) } } deferreds.forEach { it.await() } } } println("Coroutine ${timeCoroutine / 1_000_000} ms") val timePar = measureNanoTime { tasks.stream().parallel().forEach { computeNotSuspend(it) } } println("Stream parallel ${timePar / 1_000_000} ms") } 

我的4核心计算机上的输出:

 Coroutine: 1037 ms Stream parallel: 7150 ms 

如果在两个compute函数中取消注释println ,将会看到在非阻塞协程代码中,任务按照正确的顺序进行处理,而不是使用Streams进行处理。

是的,这可以使用协程来完成。 以下函数在集合的所有元素上并行应用一个操作:

 fun <A>Collection<A>.forEachParallel(f: suspend (A) -> Unit): Unit = runBlocking { map { async(CommonPool) { f(it) } }.forEach { it.await() } } 

虽然定义本身有点神秘,但您可以轻松应用它,就像您期望的那样:

 myObjects.forEachParallel { myObj -> someMethod(myObj) } 

并行映射可以用类似的方式实现,见https://stackoverflow.com/a/45794062/1104870