将通道桥接到一个序列

该代码基于Coroutines指南示例:扇出

val inputProducer = produce<String>(CommonPool) { (0..inputArray.size).forEach { send(inputArray[it]) } } val resultChannel = Channel<Result>(10) repeat(threadCount) { launch(CommonPool) { inputProducer.consumeEach { resultChannel.send(getResultFromData(it)) } } } 

创建一个Sequence<Result>的正确方法是什么?

您可以从ReceiveChannel获取通道.iterator() ,然后将该通道迭代器包装到Sequence<T> ,实现其正常Iterator<T> ,以阻止在每个请求中等待结果:

 fun <T> ReceiveChannel<T>.asSequence(context: CoroutineContext) = Sequence { val iterator = iterator() object : AbstractIterator<T>() { override fun computeNext() = runBlocking(context) { if (!iterator.hasNext()) done() else setNext(iterator.next()) } } } val resultSequence = resultChannel.asSequence(CommonPool)