Tag: 项目反应堆

使用Flux而不是for循环,有什么好处?

我试图让我的脑袋围绕反应式编程,所以我想问问在这里使用Flux是否有任何好处: override fun notifyObserversOnMessage(message: Message) { Flux.fromStream(observers.stream()) .map { observer -> Mono.just(observer.reactOnMessage(message)) } .subscribe() } 代替: override fun notifyObserversOnMessage(message: Message) { for (observer in observers) { observer.reactOnMessage(message) } } 这是否取决于每个观察员正在做的工作,如果这是IO还是不?

Reactor Flux <MyObject> to Mono <List <MyObject >>

如何将Flux<MyObject>直接转换为Mono<List<MyObject>> ? 我期待从RxJava等同于Single<List<MyObject>> single = observable.toList() 。 通过阻塞运算符,我可以这样做: val just: Mono<List<MyObject>> = Mono.just(flux.toIterable().toList()) 但是在不合缝的声明的时候被执行。

如何在弹簧反应堆中将两台出版商结合在一起

我实现了我的一个虚拟反应仓库。 我正在与更新方法斗争: @Override public Mono<User> updateUser(int id, Mono<User> updateMono) { return //todo with getUser } @Override public Mono<User> getUser(int id) { return Mono.justOrEmpty(this.users.get(id)); } 从一方面我有传入发布者Mono<User> updateMono ,另一方面,我有Mono.justOrEmpty(this.users.get(id))期间另一个出版商。 如何将它们结合在一起,进行更新,并只返回一个发布者? 我唯一想到的是: @Override public Mono<User> updateUser(int id, Mono<User> updateMono) { return getUser(id).doOnNext(user -> { updateMono.subscribe(update -> { users.put(id, new User(id, update.getName(), update.getAge())); System.out.format("Updated user with id %d to […]

如何用Spring WebFlux返回404

我有一个这样的控制器(在Kotlin中): @RestController @RequestMapping("/") class CustomerController (private val service: CustomerService) { @GetMapping("/{id}") fun findById(@PathVariable id: String, @RequestHeader(value = IF_NONE_MATCH) versionHeader: String?): Mono<HttpEntity<KundeResource>> = return service.findById(id) .switchIfEmpty(Mono.error(NotFoundException())) .map { // ETag stuff … ok().eTag("…").body(…) } } 我想知道是否有一个更好的方法比抛出一个异常是用@ResponseStatus(code = NOT_FOUND)注解@ResponseStatus(code = NOT_FOUND)

在junit测试中,反应器switchifempty的行为不像预期的那样

我正在为下面提供的方法编写测试。 ` class ScrapedRecipeCache @Autowired constructor(private val cache: RecipeScrapingCacheService, private val recipeService: RecipeService) : ScrapedRecipeProvider { override fun provide(request: ScrapingRequest): Flux<ScrapedRecipe> = cache.retrieve(request.link) .doOnNext { println(it) } .flatMap { (link, _, recipeHash, error) -> recipeService.findByHash(recipeHash) .map { ScrapedRecipe(it, link, error)} .switchIfEmpty(cache.remove(request.link).then(Mono.empty())) } .flux() } `测试看起来如下: private val recipeFetched = Recipe("Tortellini", RecipeDifficulty.EASY, 15.0) val cacheContents = […]

如何在Spring WebFlux中记录请求和响应主体

我想用Kotlin在Spring WebFlux的REST API中集中记录请求和响应。 到目前为止我已经尝试过这种方法 @Bean fun apiRouter() = router { (accept(MediaType.APPLICATION_JSON) and "/api").nest { "/user".nest { GET("/", userHandler::listUsers) POST("/{userId}", userHandler::updateUser) } } }.filter { request, next -> logger.info { "Processing request $request with body ${request.bodyToMono<String>()}" } next.handle(request).doOnSuccess { logger.info { "Handling with response $it" } } } 这里的请求方法和路径日志成功,但身体是Mono ,所以我应该如何登录? 应该是相反的方式,我必须订阅请求正文Mono ,并在回调登录? 另一个问题是ServerResponse接口在这里不能访问响应主体。 我怎样才能在这里? 我试过的另一种方法是使用WebFilter @Bean […]

用Spring 5 WebFlux框架解码ByteArray

我试图用kotlin来使用新的Spring WebFlux框架。 而我无法找到这个代码(myService)的错误: fun foo(): Flux<ByteArray> { val client = WebClient.create("http://byte-array-service") return client .get() .uri("/info") .accept(MediaType.APPLICATION_OCTET_STREAM) .exchange() .flatMapMany { r -> r.bodyToFlux(ByteArray::class.java) } } 这个方法返回Flux 7893字节,我知道并不是byte-array-service发送的所有字节。 如果我使用旧式的休息模板一切正常 fun foo(): Flux<ByteArray> { val rt = RestTemplate() rt.messageConverters.add( ByteArrayHttpMessageConverter()) val headers = HttpHeaders() headers.accept = listOf(MediaType.APPLICATION_OCTET_STREAM) val entity = HttpEntity<String>(headers) val r = rt.exchange("http://byte-array-service/info", HttpMethod.GET,entity, ByteArray::class.java) return […]

WebFlux功能:如何检测一个空的通量并返回404?

我有以下简化的处理函数(Spring WebFlux和使用Kotlin的功能性API)。 但是,我需要一个提示如何检测一个空的Flux,然后使用noContent()为404,当Flux为空时。 fun findByLastname(request: ServerRequest): Mono<ServerResponse> { val lastnameOpt = request.queryParam("lastname") val customerFlux = if (lastnameOpt.isPresent) { service.findByLastname(lastnameOpt.get()) } else { service.findAll() } // How can I detect an empty Flux and then invoke noContent() ? return ok().body(customerFlux, Customer::class.java) }

Spring WebFlux:Reactive MongoDB

我是Spring Reactor的新手,所以我想重构这个简单的spring数据(在kotlin)方法: fun save(user: User): Mono<User> { if (findByEmail(user.email).block() != null) { throw UserAlreadyExistsException() } user.password = passwordEncoder.encode(user.password) return userRepository.save(user) } 谢谢