Kotlin&Vertx&Mongo:如何管理异步CRUDfunction?
朋友们! 我是Vertx和Mongo的绿手,现在我面临一个棘手的问题。 以下是代码段。
这里是Mongo客户端的包装类。
// MongoDatabase.kt import io.vertx.core.json.JsonObject import io.vertx.core.logging.LoggerFactory import io.vertx.kotlin.core.json.JsonObject import io.vertx.rxjava.core.Vertx import io.vertx.rxjava.ext.mongo.MongoClient import kotlin.reflect.KClass import kotlin.reflect.full.declaredFunctions class MongoDatabase (val tClass: KClass, vertx: Vertx, config: JsonObject, databaseName: String) : Database { private val mongo = MongoClient.createShared(vertx, config, databaseName) private val logger = LoggerFactory.getLogger(MongoDatabase::class.java) // I use a self-made annotation to record the mongo collection name private val tableName = (tClass.annotations.find { it is Entity } as Entity).tableName companion object { inline fun getInstance(vertx: Vertx, config: JsonObject, databaseName: String): Database = MongoDatabase(T::class, vertx, config, databaseName) } private fun toJsonObject(element: T): JsonObject { return tClass.declaredFunctions.find { it.name == "toJsonObject" }!!.call(element) as JsonObject } suspend override fun create(element: T): T? { var isSucceed = false mongo.rxInsert(tableName, toJsonObject(element)) .doOnError { logger.error("$tableName insert failed: ${it.message}") } .subscribe { logger.info("$tableName insert succeed: $it"); isSucceed = true } return if (isSucceed) element else null } suspend override fun delete( id: String): Boolean { var isSucceed = false mongo.rxFindOneAndDelete(tableName, JsonObject("_id" to id)) .doOnError { logger.error("$tableName delete failed: ${it.message}") } .subscribe { logger.info("$tableName delete succeed: $it"); isSucceed = true } return isSucceed } suspend override fun update(id:String, element: T): T? { var isSucceed = false mongo.rxUpdate(tableName, JsonObject("_id" to id), toJsonObject(element)) .doOnError { logger.error("$tableName insert failed: ${it.message}") } .subscribe { logger.info("$tableName insert succeed: $it"); isSucceed = true } return if (isSucceed) element else null } // No implementation yet. suspend override fun get(pair: Pair): List { return emptyList() } }
这是Verticle代码。
package com.ra import io.reactivex.rxkotlin.subscribeBy import io.reactivex.rxkotlin.toObservable import io.vertx.core.logging.LoggerFactory import io.vertx.kotlin.core.json.JsonObject import io.vertx.rxjava.core.AbstractVerticle import kotlinx.coroutines.experimental.async import kotlinx.coroutines.experimental.runBlocking import java.time.LocalDateTime import java.util.* class MainVerticle : AbstractVerticle() { private val logger = LoggerFactory.getLogger(MainVerticle::class.java) private lateinit var database: Database override fun start() = runBlocking { database = MongoDatabase.getInstance(vertx, JsonObject(), "Main Pool") val datas = listOf( Article(UUID.randomUUID().toString(), "title1", listOf("tag1", "tag2"), LocalDateTime.of(2018, 1, 1, 0, 0, 0), "content"), Article(UUID.randomUUID().toString(), "title2", listOf("tag1", "tag2"), LocalDateTime.of(2018, 1, 1, 0, 0, 0), "content"), Article(UUID.randomUUID().toString(), "title3", listOf("tag1", "tag2"), LocalDateTime.of(2018, 1, 1, 0, 0, 0), "content") ) datas.toObservable() .subscribeBy( onError = { it.printStackTrace() }, onNext = { async { database.create(it) } }, onComplete = { logger.info("datas creation complete!") }) /** * Jan 10, 2018 10:35:57 AM com.ra.MongoDatabase * INFO: article delete succeed: null * Jan 10, 2018 10:35:58 AM com.ra.MongoDatabase * INFO: article insert succeed: null * Jan 10, 2018 10:35:58 AM com.ra.MongoDatabase * INFO: article insert succeed: null * Jan 10, 2018 10:35:58 AM com.ra.MongoDatabase * INFO: article insert succeed: null */ datas.forEach { database.create(it) } /** * Jan 10, 2018 10:32:56 AM com.ra.MongoDatabase * INFO: article delete succeed: null * Jan 10, 2018 10:32:56 AM com.ra.MongoDatabase * INFO: article insert succeed: null * Jan 10, 2018 10:32:56 AM com.ra.MongoDatabase * INFO: article insert succeed: null * Jan 10, 2018 10:32:56 AM com.ra.MongoDatabase * INFO: article insert succeed: null **/ database.delete(datas[1]._id) } }
正如你所看到的,
- 结果是完全错误的 – 它先删除然后插入。
- 当然,在Mongodb中的结果也是错误的 – 它根本没有删除。
- 更糟糕的是,记录器中的结果都是空的,但我不知道为什么…
你们可以复制代码来尝试,如果你能得到相同的结果。
所以这是我的问题:
- 为什么我的代码错了?
- 如何管理异步function? 例如,我想(添加三条)=>(Update Article where id = 1)|| (删除id = 0的文章),如何使用kotlin-coroutine或rxjava来解决?
PS: ||
意味着一起