Kotlin:阻塞具有非阻塞I / O的协程

我正在尝试使用Kotlin协同处理非阻塞I / O。 情景如下:

  1. 从线程1上运行的异步回调接收数据。
  2. 等待线程2中的这些数据,然后将其消耗。

我现在的代码看起来像这样(为简洁起见而简化):

private var latch = CountDownLatch(1) private var data: Any? = null // Async callback from non-blocking I/O fun onReceive(data: Any) { currentData = data latch.countDown() } // Wait and consume data fun getData(): Any? { latch.await() latch = CountDownLatch(1) return currentData } fun processData() { launch(CommonPool) { while (true) { val data = getData() // Consume data } } } 

据我所知,Kotlin协程应该能够帮助我摆脱CountDownLatch。 看完这个(真棒)指南后 ,我所能想到的是这样的:

 // Wait and consume data fun getData() = async(CommonPool) { latch.await() latch = CountDownLatch(1) currentData } fun processData() { launch(CommonPool) { while (true) { runBlocking { val data = getData().await() // Consume data } } } } 

我也尝试过与管道 ,有类似的结果。 我显然不理解如何使用这些功能。

你没有说如果在onReceive()接收到的数据可以并行处理。 这是主要的问题。 如果是的话,你可以在onReceive() 。 如果这是不允许的,让每个调用onReceive()CommonPool上启动一个任务,没有任何协程。 如果它们应该按顺序处理,那么最简单的方法就是在内部启动一个带有循环的线程:

 fun onReceive(data: Any) { queue.put(data); } .... // loop in a thread while(true) { data = queue.take(); processData(data); } 

同样,协程是不需要的。

一般来说,协程是语法糖来表示异步程序,就好像它是同步的。 我不认为你的程序是使用协程的情况。