将publishOn与自定义Publisher配合使用时的ReactiveStreams NPE

当我使用Reactive Streams( https://github.com/reactor/reactor-core )与publishOn函数结合使用自定义Publisher时,我总是得到一个NPE。 我的代码有什么问题? 我是否以错误的方式使用Publisher

 Flux.from(MyPublisher()) .publishOn(Schedulers.single()) .subscribe { println("<-- $it received") } class MyPublisher : Publisher<Int> { override fun subscribe(sub: Subscriber<in Int>) { while (true) { Thread.sleep(300) sub.onNext(1) } } } 

例外是:

 Exception in thread "main" java.lang.NullPointerException at reactor.core.publisher.FluxPublishOn$PublishOnSubscriber.onNext(FluxPublishOn.java:212) at org.guenhter.kotlin.hello.MyPublisher.subscribe(HelloWorld.kt:18) at reactor.core.publisher.FluxSource.subscribe(FluxSource.java:52) at reactor.core.publisher.FluxPublishOn.subscribe(FluxPublishOn.java:96) at reactor.core.publisher.Flux.subscribe(Flux.java:6447) at reactor.core.publisher.Flux.subscribeWith(Flux.java:6614) at reactor.core.publisher.Flux.subscribe(Flux.java:6440) at reactor.core.publisher.Flux.subscribe(Flux.java:6404) at reactor.core.publisher.Flux.subscribe(Flux.java:6347) at org.guenhter.kotlin.hello.HelloWorldKt.main(HelloWorld.kt:11) 

Publisher由“反应流”标准定义,并有许多要求。 其中一个要求是Subscriber.onSubscribe必须在任何其他方法之前被调用才能遵循该协议。

既然你没有这样做,这意味着有些东西可能没有被正确初始化,造成NPE在反应堆类内部。

然而,即使你解决了这个问题,标准被设计成被动的 ,这意味着当用户要求时它只发出数据。 既然你会发送数据,无论这可能会导致一个例外。 使用Flux.create创建一个可以正确处理请求的发射器,而不是创建自己的发布器实现。