扇出/扇入结果通道

我正在生产项目,从多个协同例程中消费,并推回到resultChannel。 制片人正在关闭最后一个项目后的频道。

代码永远不会结束,因为resultChannel永远不会被关闭。 如何检测并正确完成迭代,所以hasNext()返回false

 val inputData = (0..99).map { "Input$it" } val threads = 10 val bundleProducer = produce<String>(CommonPool, threads) { inputData.forEach { item -> send(item) println("Producing: $item") } println("Producing finished") close() } val resultChannel = Channel<String>(threads) repeat(threads) { launch(CommonPool) { bundleProducer.consumeEach { println("CONSUMING $it") resultChannel.send("Result ($it)") } } } val iterator = object : Iterator<String> { val iterator = resultChannel.iterator() override fun hasNext() = runBlocking { iterator.hasNext() } override fun next() = runBlocking { iterator.next() } }.asSequence() println("Starting interation...") val result = iterator.toList() println("finish: ${result.size}") 

One Solution collect form web for “扇出/扇入结果通道”

您可以运行等待消费者完成的协程,然后关闭resultChannel

首先,重写启动消费者的代码以保存Job s:

 val jobs = (1..threads).map { launch(CommonPool) { bundleProducer.consumeEach { println("CONSUMING $it") resultChannel.send("Result ($it)") } } } 

然后运行另一个关闭通道的协程,一旦所有的Job完成:

 launch(CommonPool) { jobs.forEach { it.join() } resultChannel.close() } 
  • 在Kotlin协程被取消之后,抛不出异常抛出
  • 我怎样才能用Kotlin DSL为Gradle配置Kotlin协程?
  • kotlin中的“协同本地”变量
  • 在Android服务中的Kotlin协程
  • Kotlin协同吞咽exception
  • 当使用kotlin协同程序时,如何对一个调用暂停function的函数进行unit testing?
  • 取消孩子如何在Kotlin协同工作?
  • 为什么在Kotlin中使用方法引用来暂停函数是不可能的?
  • Kotlin协程和Spring Framework 5反应类型
  • Kotlin协程中的“+”?
  • Kotlin协同工具使用生产和mockito来嘲笑生产工作
  • Kotlin language will be the best programming language for Android.