Kotlin&Vertx&Mongo:如何管理异步CRUDfunction?

朋友们! 我是Vertx和Mongo的绿手,现在我面临一个棘手的问题。 以下是代码段。

这里是Mongo客户端的包装类。

// MongoDatabase.kt import io.vertx.core.json.JsonObject import io.vertx.core.logging.LoggerFactory import io.vertx.kotlin.core.json.JsonObject import io.vertx.rxjava.core.Vertx import io.vertx.rxjava.ext.mongo.MongoClient import kotlin.reflect.KClass import kotlin.reflect.full.declaredFunctions class MongoDatabase (val tClass: KClass, vertx: Vertx, config: JsonObject, databaseName: String) : Database { private val mongo = MongoClient.createShared(vertx, config, databaseName) private val logger = LoggerFactory.getLogger(MongoDatabase::class.java) // I use a self-made annotation to record the mongo collection name private val tableName = (tClass.annotations.find { it is Entity } as Entity).tableName companion object { inline fun getInstance(vertx: Vertx, config: JsonObject, databaseName: String): Database = MongoDatabase(T::class, vertx, config, databaseName) } private fun toJsonObject(element: T): JsonObject { return tClass.declaredFunctions.find { it.name == "toJsonObject" }!!.call(element) as JsonObject } suspend override fun create(element: T): T? { var isSucceed = false mongo.rxInsert(tableName, toJsonObject(element)) .doOnError { logger.error("$tableName insert failed: ${it.message}") } .subscribe { logger.info("$tableName insert succeed: $it"); isSucceed = true } return if (isSucceed) element else null } suspend override fun delete( id: String): Boolean { var isSucceed = false mongo.rxFindOneAndDelete(tableName, JsonObject("_id" to id)) .doOnError { logger.error("$tableName delete failed: ${it.message}") } .subscribe { logger.info("$tableName delete succeed: $it"); isSucceed = true } return isSucceed } suspend override fun update(id:String, element: T): T? { var isSucceed = false mongo.rxUpdate(tableName, JsonObject("_id" to id), toJsonObject(element)) .doOnError { logger.error("$tableName insert failed: ${it.message}") } .subscribe { logger.info("$tableName insert succeed: $it"); isSucceed = true } return if (isSucceed) element else null } // No implementation yet. suspend override fun get(pair: Pair): List { return emptyList() } } 

这是Verticle代码。

 package com.ra import io.reactivex.rxkotlin.subscribeBy import io.reactivex.rxkotlin.toObservable import io.vertx.core.logging.LoggerFactory import io.vertx.kotlin.core.json.JsonObject import io.vertx.rxjava.core.AbstractVerticle import kotlinx.coroutines.experimental.async import kotlinx.coroutines.experimental.runBlocking import java.time.LocalDateTime import java.util.* class MainVerticle : AbstractVerticle() { private val logger = LoggerFactory.getLogger(MainVerticle::class.java) private lateinit var database: Database
override fun start() = runBlocking { database = MongoDatabase.getInstance(vertx, JsonObject(), "Main Pool") val datas = listOf( Article(UUID.randomUUID().toString(), "title1", listOf("tag1", "tag2"), LocalDateTime.of(2018, 1, 1, 0, 0, 0), "content"), Article(UUID.randomUUID().toString(), "title2", listOf("tag1", "tag2"), LocalDateTime.of(2018, 1, 1, 0, 0, 0), "content"), Article(UUID.randomUUID().toString(), "title3", listOf("tag1", "tag2"), LocalDateTime.of(2018, 1, 1, 0, 0, 0), "content") ) datas.toObservable() .subscribeBy( onError = { it.printStackTrace() }, onNext = { async { database.create(it) } }, onComplete = { logger.info("datas creation complete!") }) /** * Jan 10, 2018 10:35:57 AM com.ra.MongoDatabase * INFO: article delete succeed: null * Jan 10, 2018 10:35:58 AM com.ra.MongoDatabase * INFO: article insert succeed: null * Jan 10, 2018 10:35:58 AM com.ra.MongoDatabase * INFO: article insert succeed: null * Jan 10, 2018 10:35:58 AM com.ra.MongoDatabase * INFO: article insert succeed: null */ datas.forEach { database.create(it) } /** * Jan 10, 2018 10:32:56 AM com.ra.MongoDatabase * INFO: article delete succeed: null * Jan 10, 2018 10:32:56 AM com.ra.MongoDatabase * INFO: article insert succeed: null * Jan 10, 2018 10:32:56 AM com.ra.MongoDatabase * INFO: article insert succeed: null * Jan 10, 2018 10:32:56 AM com.ra.MongoDatabase * INFO: article insert succeed: null **/ database.delete(datas[1]._id) } }

正如你所看到的,

  1. 结果是完全错误的 – 它先删除然后插入。
  2. 当然,在Mongodb中的结果也是错误的 – 它根本没有删除。
  3. 更糟糕的是,记录器中的结果都是空的,但我不知道为什么…

你们可以复制代码来尝试,如果你能得到相同的结果。

所以这是我的问题:

  1. 为什么我的代码错了?
  2. 如何管理异步function? 例如,我想(添加三条)=>(Update Article where id = 1)|| (删除id = 0的文章),如何使用kotlin-coroutine或rxjava来解决?

PS: || 意味着一起