不明白如何使通量订阅在kotlin工作

我是新来的反应式编程。 我期待看到

test provider started Beat 1000 Beat 2000 

在日志中,但是只有test provider started并且没有Beaton complete消息。 看起来我想念一些东西

 @Service class ProviderService { @PostConstruct fun start(){ val hb: Flux = Flux.interval(Duration.ofSeconds(1)).map { HeartBeat(it) } val provider = Provider("test", hb) } } //////////////////////// open class Provider(name: String, heartBests: Flux) { companion object { val log = LoggerFactory.getLogger(Provider::class.java)!! } init { log.info("$name provider started") heartBests.doOnComplete { log.info("on complete") } heartBests.doOnEach { onBeat(it.get().number) } } fun onBeat(n: Number){ log.info("Beat $n") } } ///// class HeartBeat(val number: Number) 

在你的代码lambda’doOnComplete’从来没有被调用,因为你创建了无限的流。 方法’doOnEach’作为’map’是中间操作(如流中的map ),它不会打电话。 而且还有另一个错误,反应表明“流利模式”。

试试这个简单的例子:

 import reactor.core.publisher.Flux import java.time.Duration fun main(args: Array) { val flux = Flux.interval(Duration.ofSeconds(1)).map { HeartBeat(it) } println("start") flux.take(3) .doOnEach { println("on each $it") } .map { println("before map");HeartBeat(it.value * 2) } .doOnNext { println("on next $it") } .doOnComplete { println("on complete") } .subscribe { println("subscribe $it") } Thread.sleep(5000) } data class HeartBeat(val value: Long) 

这里有三个很常见的错误:

  • doOnEach这样的操作符返回一个新增的Flux实例,所以你需要(重新)分配给一个variables或者使用一个流畅的样式
  • 没有任何反应,直到你subscribe() (或它的一个变种, blockXXXsubscribe引擎盖下,例如…)
  • 这样的管道是完全异步的,并且由于时间interval源, interval而在单独的Thread上运行。 因此,即使已订阅,控制也会立即在init返回,可能导致主线程和应用程序退出。