将通道桥接到一个序列
该代码基于Coroutines指南示例:扇出
val inputProducer = produce<String>(CommonPool) { (0..inputArray.size).forEach { send(inputArray[it]) } } val resultChannel = Channel<Result>(10) repeat(threadCount) { launch(CommonPool) { inputProducer.consumeEach { resultChannel.send(getResultFromData(it)) } } }
创建一个Sequence<Result>
的正确方法是什么?
您可以从ReceiveChannel
获取通道.iterator()
,然后将该通道迭代器包装到Sequence<T>
,实现其正常Iterator<T>
,以阻止在每个请求中等待结果:
fun <T> ReceiveChannel<T>.asSequence(context: CoroutineContext) = Sequence { val iterator = iterator() object : AbstractIterator<T>() { override fun computeNext() = runBlocking(context) { if (!iterator.hasNext()) done() else setNext(iterator.next()) } } } val resultSequence = resultChannel.asSequence(CommonPool)