在Kotlin中,我如何整合Kovenant承诺和Elasticsearch异步响应?
我在我的Kotlin应用程序中使用Kovenant,我打电话给Elasticsearch,它有自己的异步API。 我宁愿使用承诺,但最好的我可以想出如下:
task { esClient.prepareSearch("index123") .setQuery(QueryBuilders.matchAllQuery()) .execute().actionGet() } then { ... } success { ... } fail { ... }
这使得Kovenant异步任务线程,然后Elasticsearch使用其池中的线程,然后actionGet()
同步阻止Elasticsearch取回结果。 在阻塞其他人的时候产生新的线程似乎很愚蠢。 有没有一种方法可以更紧密地集成线程调度?
注意: 这个问题是由作者故意编写和回答的( 自我回答问题 ),所以在SO中共享有趣问题的解决方案。
您可以使用Kovenant Deferred
类来创建一个承诺,而不像您在示例中那样通过异步task
进行调度。 该模型基本上是:
- 创建一个延迟实例
- 挂钩到异步处理程序和解决或拒绝基于异步回调延期
- 将
deferred.promise
返回给调用者
在代码中,这看起来像:
fun doSearch(): Promise<SearchResponse, Throwable> { val deferred = deferred<Response, Throwable>() esClient.prepareSearch("index") .setQuery(QueryBuilders.matchAllQuery()) .execute(object: ActionListener<T> { override fun onResponse(response: T) { deferred.resolve(response) } override fun onFailure(e: Throwable) { deferred.reject(e) }) return deferred.promise }
一个可重用的方法是首先创建一个适配器,该适配器可以适应Elasticsearch希望ActionListener
一般工作的承诺:
fun <T: Any> promiseResult(deferred: Deferred<T, Exception>): ActionListener<T> { return object: ActionListener<T> { override fun onResponse(response: T) { deferred.resolve(response) } override fun onFailure(e: Throwable) { deferred.reject(wrapThrowable(e)) } } } class WrappedThrowableException(cause: Throwable): Exception(cause.message, cause) fun wrapThrowable(rawEx: Throwable): Exception = if (rawEx is Exception) rawEx else WrappedThrowableException(rawEx)
注意: wrapThrowable()
方法在那里将Throwable
改为Exception
因为Kovenant的当前版本( 3.3.0
)有一些方法期望promise的拒绝类型从Exception
( 例如bind()
)和你如果使用unwrap()
代替嵌套的promise,可以保留Throwable
。
现在使用这个适配器函数一般地扩展Elasticsearch ActionRequestBuilder
,它几乎是你唯一会调用execute()
东西; 创建一个新的promise()
扩展函数:
fun <Request: ActionRequest<*>, Response: ActionResponse, RequestBuilder: ActionRequestBuilder<*, *, *, *>, Client: ElasticsearchClient<*>> ActionRequestBuilder<Request, Response, RequestBuilder, Client>.promise(): Promise<Response, Exception> { val deferred = deferred<Response, Exception>() this.execute(promiseResult(deferred)) return deferred.promise }
现在你可以调用promise()
而不是execute()
:
esClient.prepareSearch("index") .setQuery(QueryBuilders.matchAllQuery()) .promise()
并开始链接你的承诺
esClient.admin().indices().prepareCreate("index1").setSettings("...").promise() .bind { esClient.admin().cluster().prepareHealth() .setWaitForGreenStatus() .promise() } bind { esClient.prepareIndex("index1", "type1") .setSource(...) .promise() } bind { esClient.prepareSearch("index1") .setQuery(QueryBuilders.matchAllQuery()) .promise() } then { searchResults -> // ... use searchResults }.success { // ... }.fail { // ... } }
当你有嵌套的promise时,你应该熟悉bind()
和unwrap()
,而不需要更深的嵌套。 如果你不想包含kovenant-functional
你可以在上面的例子中使用kovenant-functional
。
由于Elasticsearch客户端中所有请求对象的一致性,您在Elasticsearch中的每个调用都将能够使用promise()
而不是execute()
。
- kotlin翻新 – 背景粘性并发标记扫描GC释放65326(1448KB)AllocSpace对象,62(1736KB)LOS对象,
- Kotlin – 运算符'=='不能应用于'可编辑'! 和'String'比较字符串