Spring WebFlux:Reactive MongoDB

我是Spring Reactor的新手,所以我想重构这个简单的spring数据(在kotlin)方法:

fun save(user: User): Mono<User> { if (findByEmail(user.email).block() != null) { throw UserAlreadyExistsException() } user.password = passwordEncoder.encode(user.password) return userRepository.save(user) } 

谢谢

像这样的东西应该工作:

  open fun save(req: ServerRequest): Mono<ServerResponse> { logger.info { "${req.method()} ${req.path()}" } return req.bodyToMono<User>().flatMap { // You might need to "work out" this if since I don't know what you are doing if (null != findByEmail(it.email).block()) { throw UserAlreadyExistsException() } it.password = passwordEncoder.encode(it.password) repository.save(it).flatMap { logger.debug { "Entity saved successfully! Result: $it" } ServerResponse.created(URI.create("${req.path()}/${it.id}")).build() } } } 

注意我正在使用MicroUtils / kotlin-logging 。 删除日志语句,如果你不知道或只是不想要他们。

基本上,您需要先“消费”(又名订阅 ),以便访问ServerRequest中的内容。

或者,也可以不用抛出异常,而是使用实际的流来处理该场景。 就像是:

 open fun ... return ServerResponse.ok() // Keep doing stuff here...if something is wrong .switchIfEmpty(ServerResponse.notFound().build()) } 

您可以将示例调整为您的User类型,以防您真的想传递它,而不是ServerRequest

(原谅我,如果Kotlin的语法是错误的,如果我做的事情在Java风格:o)

 fun save(user: User): Mono<User> { //we'll prepare several helpful Monos and finally combine them. //as long as we don't subscribe to them, nothing happens. //first we want to short-circuit if the user is found (by email). //the mono below will onError in that case, or be empty Mono<User> failExistingUser = findByEmail(user.email) .map(u -> { throw new UserAlreadyExistsException(); }); //later we'll need to encode the password. This is likely to be //a blocking call that takes some time, so we isolate that call //in a Mono that executes on the Elastic Scheduler. Note this //does not execute immediately, since it's not subscribed to yet... Mono<String> encodedPassword = Mono .fromCallable(() -> passwordEncoder.encode(user.password)) .subscribeOn(Schedulers.elastic()); //lastly the save part. We want to combine the original User with //the result of the encoded password. Mono<User> saveUser = user.toMono() //this is a Kotlin extension .and(encodedPassword, (u, p) -> { u.password = p; return u; }) //Once this is done and the user has been updated, save it .flatMap(updatedUser -> userRepository.save(updatedUser)); //saveUser above is now a Mono that represents the completion of //password encoding, user update and DB save. //what we return is a combination of our first and last Monos. //when something subscribes to this combination: // - if the user is found, the combination errors // - otherwise, it subscribes to saveUser, which triggers the rest of the process return failExistingUser.switchIfEmpty(saveUser); } 

没有中间变量的缩写版本也没有评论:

 fun save(user: User): Mono<User> { return findByEmail(u.email) .map(u -> { throw new UserAlreadyExistsException(); }) .switchIfEmpty(user.toMono()) .and(Mono.fromCallable(() -> passwordEncoder.encode(user.password)) .subscribeOn(Schedulers.elastic()), (u, p) -> { u.password = p; return u; }) .flatMap(updatedUser -> userRepository.save(updatedUser)); } 

你可以在Mono中使用hasElement()函数。 看看这个扩展功能,以单声道:

 inline fun <T> Mono<T>.errorIfEmpty(crossinline onError: () -> Throwable): Mono<T> { return this.hasElement() .flatMap { if (it) this else Mono.error(onError()) } } inline fun <T> Mono<T>.errorIfNotEmpty(crossinline onError: (T) -> Throwable): Mono<T> { return this.hasElement() .flatMap { if (it) Mono.error(onError.invoke(this.block()!!)) else this } } 

switchIfEmpty的问题在于,它总是评估在参数中传递的表达式 – 编写这样的代码将始终产生Foo对象:

 mono.switchIfEmpty(Foo()) 

你可以编写你自己的扩展名来传递参数中的懒表达式:

 inline fun <T> Mono<T>.switchIfEmpty(crossinline default: () -> Mono<T>): Mono<T> { return this.hasElement() .flatMap { if (it) this else default() } } 

这里有两个扩展功能 – 您可以使用它们来检查密码是否正确:

 inline fun <T> Mono<T>.errorIf(crossinline predicate: (T) -> Boolean, crossinline throwable: (T) -> Throwable): Mono<T> { return this.flatMap { if (predicate(it)) Mono.error(throwable(it)) else Mono.just(it) } } inline fun <T> Mono<T>.errorIfNot(crossinline predicate: (T) -> Boolean, crossinline throwable: (T) -> Throwable): Mono<T> { return this.errorIf(predicate = { !predicate(it) }, throwable = throwable) }