监制器内的生产者

我正在尝试为听众创建producer 。 我的代码看起来像这样

 suspend fun foo() = produce{ someEvent.addListener { this.send(it) } } 

但是我得到错误Suspension functions can be called only within coroutineSuspension functions can be called only within coroutine ,这是有道理的。 我的问题是。 有没有办法使用协程来实现这个模式?

有几种方法来实现它,取决于你想要实现的:

如果你只想收到最近的事件,那么你应该使用一个混合的频道,并offer一个成功的方法:

 fun foo() = produce<T>(capacity = Channel.CONFLATED) { someEvent.addListener { offer(it) } } 

如果收到所有事件至关重要,那么您的选择取决于事件生产者的行为。 在这里思考的关键问题是如果您的活动制作者开始生产大量“不间断”活动,会发生什么情况。 大多数“同步”事件制作者,根据经验,不支持明确的背压信号,但是仍然支持隐式背压信号 – 如果他们的听众很慢或阻塞线程,他们将放慢速度。 所以,通常,以下解决方案对于同步事件生成器来说是完美的:

 fun foo() = produce<T>() { someEvent.addListener { runBlocking { send(it) } } } 

您也可以指定一些正capacity = xxx作为参数,以便在produce一批事件的情况下produce构建器作为性能优化,而您不想阻止生产者,而是让消费者处理它自己的步伐。

在极少数情况下,当你的制作者不理解一个隐含的阻塞反压信号(当它是某种多线程装置剧烈地产生没有内部同步的事件时),那么你可以使用带有无限容量的通道,但要注意如果生产者超过了消费者,你就有可能冒出内存的风险:

 fun foo() = produce<T>(capacity = Channel.UNLIMITED) { someEvent.addListener { offer(it) } } 

如果您的生产商支持明确的背压信号 (如功能反应流),那么您应该使用一个特殊的适配器将其背压信号正确传输到协同程序或从协程中传输。 kotlinx.coroutines库有许多带有各种反应性库的开箱集成模块。 看到这里 。

注意:不应该suspend修饰符来标记你的foo函数。 foo调用并不会暂停调用者。 它立即(同步)启动一个生产者协同程序。

要了解更多关于协程和不同类型的渠道,我强烈建议学习kotlinx.coroutines指南 。