Tag: 反应流

Reactor Flux <MyObject> to Mono <List <MyObject >>

如何将Flux<MyObject>直接转换为Mono<List<MyObject>> ? 我期待从RxJava等同于Single<List<MyObject>> single = observable.toList() 。 通过阻塞运算符,我可以这样做: val just: Mono<List<MyObject>> = Mono.just(flux.toIterable().toList()) 但是在不合缝的声明的时候被执行。

将publishOn与自定义Publisher配合使用时的ReactiveStreams NPE

当我使用Reactive Streams( https://github.com/reactor/reactor-core )与publishOn函数结合使用自定义Publisher时,我总是得到一个NPE。 我的代码有什么问题? 我是否以错误的方式使用Publisher ? Flux.from(MyPublisher()) .publishOn(Schedulers.single()) .subscribe { println("<– $it received") } class MyPublisher : Publisher<Int> { override fun subscribe(sub: Subscriber<in Int>) { while (true) { Thread.sleep(300) sub.onNext(1) } } } 例外是: Exception in thread "main" java.lang.NullPointerException at reactor.core.publisher.FluxPublishOn$PublishOnSubscriber.onNext(FluxPublishOn.java:212) at org.guenhter.kotlin.hello.MyPublisher.subscribe(HelloWorld.kt:18) at reactor.core.publisher.FluxSource.subscribe(FluxSource.java:52) at reactor.core.publisher.FluxPublishOn.subscribe(FluxPublishOn.java:96) at reactor.core.publisher.Flux.subscribe(Flux.java:6447) at reactor.core.publisher.Flux.subscribeWith(Flux.java:6614) at reactor.core.publisher.Flux.subscribe(Flux.java:6440) at […]

Spring WebFlux:Reactive MongoDB

我是Spring Reactor的新手,所以我想重构这个简单的spring数据(在kotlin)方法: fun save(user: User): Mono<User> { if (findByEmail(user.email).block() != null) { throw UserAlreadyExistsException() } user.password = passwordEncoder.encode(user.password) return userRepository.save(user) } 谢谢