Spring WebFlux:Reactive MongoDB

我是Spring Reactor的新手,所以我想重构这个简单的spring数据(在kotlin)方法:

fun save(user: User): Mono { if (findByEmail(user.email).block() != null) { throw UserAlreadyExistsException() } user.password = passwordEncoder.encode(user.password) return userRepository.save(user) } 

谢谢

像这样的东西应该工作:

  open fun save(req: ServerRequest): Mono { logger.info { "${req.method()} ${req.path()}" } return req.bodyToMono().flatMap { // You might need to "work out" this if since I don't know what you are doing if (null != findByEmail(it.email).block()) { throw UserAlreadyExistsException() } it.password = passwordEncoder.encode(it.password) repository.save(it).flatMap { logger.debug { "Entity saved successfully! Result: $it" } ServerResponse.created(URI.create("${req.path()}/${it.id}")).build() } } } 

注意我正在使用MicroUtils / kotlin-logging 。 删除日志语句,如果你不知道或只是不想要他们。

基本上,您需要先“消费”(又名订阅 ),以便访问ServerRequest中的内容。

或者,也可以不用抛出exception,而是使用实际的流来处理该场景。 就像是:

 open fun ... return ServerResponse.ok() // Keep doing stuff here...if something is wrong .switchIfEmpty(ServerResponse.notFound().build()) } 

您可以将示例调整为您的Usertypes,以防您真的想传递它,而不是ServerRequest

(原谅我,如果Kotlin的语法是错误的,如果我做的事情在Java风格:o)

 fun save(user: User): Mono { //we'll prepare several helpful Monos and finally combine them. //as long as we don't subscribe to them, nothing happens. //first we want to short-circuit if the user is found (by email). //the mono below will onError in that case, or be empty Mono failExistingUser = findByEmail(user.email) .map(u -> { throw new UserAlreadyExistsException(); }); //later we'll need to encode the password. This is likely to be //a blocking call that takes some time, so we isolate that call //in a Mono that executes on the Elastic Scheduler. Note this //does not execute immediately, since it's not subscribed to yet... Mono encodedPassword = Mono .fromCallable(() -> passwordEncoder.encode(user.password)) .subscribeOn(Schedulers.elastic()); //lastly the save part. We want to combine the original User with //the result of the encoded password. Mono saveUser = user.toMono() //this is a Kotlin extension .and(encodedPassword, (u, p) -> { u.password = p; return u; }) //Once this is done and the user has been updated, save it .flatMap(updatedUser -> userRepository.save(updatedUser)); //saveUser above is now a Mono that represents the completion of //password encoding, user update and DB save. //what we return is a combination of our first and last Monos. //when something subscribes to this combination: // - if the user is found, the combination errors // - otherwise, it subscribes to saveUser, which triggers the rest of the process return failExistingUser.switchIfEmpty(saveUser); } 

没有中间variables的缩写版本也没有评论:

 fun save(user: User): Mono { return findByEmail(u.email) .map(u -> { throw new UserAlreadyExistsException(); }) .switchIfEmpty(user.toMono()) .and(Mono.fromCallable(() -> passwordEncoder.encode(user.password)) .subscribeOn(Schedulers.elastic()), (u, p) -> { u.password = p; return u; }) .flatMap(updatedUser -> userRepository.save(updatedUser)); } 

你可以在Mono中使用hasElement()函数。 看看这个扩展function,以单声道:

 inline fun  Mono.errorIfEmpty(crossinline onError: () -> Throwable): Mono { return this.hasElement() .flatMap { if (it) this else Mono.error(onError()) } } inline fun  Mono.errorIfNotEmpty(crossinline onError: (T) -> Throwable): Mono { return this.hasElement() .flatMap { if (it) Mono.error(onError.invoke(this.block()!!)) else this } } 

switchIfEmpty的问题在于,它总是评估在参数中传递的expression式 – 编写这样的代码将始终产生Foo对象:

 mono.switchIfEmpty(Foo()) 

你可以编写你自己的扩展名来传递参数中的懒expression式:

 inline fun  Mono.switchIfEmpty(crossinline default: () -> Mono): Mono { return this.hasElement() .flatMap { if (it) this else default() } } 

这里有两个扩展function – 您可以使用它们来检查密码是否正确:

 inline fun  Mono.errorIf(crossinline predicate: (T) -> Boolean, crossinline throwable: (T) -> Throwable): Mono { return this.flatMap { if (predicate(it)) Mono.error(throwable(it)) else Mono.just(it) } } inline fun  Mono.errorIfNot(crossinline predicate: (T) -> Boolean, crossinline throwable: (T) -> Throwable): Mono { return this.errorIf(predicate = { !predicate(it) }, throwable = throwable) }