在Kotlin中,我如何整合Kovenant承诺和Elasticsearch异步响应?

我在我的Kotlin应用程序中使用Kovenant,我打电话给Elasticsearch,它有自己的异步API。 我宁愿使用承诺,但最好的我可以想出如下:

task { esClient.prepareSearch("index123") .setQuery(QueryBuilders.matchAllQuery()) .execute().actionGet() } then { ... } success { ... } fail { ... } 

这使得Kovenant异步任务线程,然后Elasticsearch使用其池中的线程,然后actionGet()同步阻止Elasticsearch取回结果。 在阻塞其他人的时候产生新的线程似乎很愚蠢。 有没有一种方法可以更紧密地集成线程调度?

注意: 这个问题是由作者故意编写和回答的( 自我回答问题 ),所以在SO中共享有趣问题的解决方案。

您可以使用Kovenant Deferred类来创建一个承诺,而不像您在示例中那样通过异步task进行调度。 该模型基本上是:

  1. 创建一个延迟实例
  2. 挂钩到异步处理程序和解决或拒绝基于异步回调延期
  3. deferred.promise返回给调用者

在代码中,这看起来像:

 fun doSearch(): Promise<SearchResponse, Throwable> { val deferred = deferred<Response, Throwable>() esClient.prepareSearch("index") .setQuery(QueryBuilders.matchAllQuery()) .execute(object: ActionListener<T> { override fun onResponse(response: T) { deferred.resolve(response) } override fun onFailure(e: Throwable) { deferred.reject(e) }) return deferred.promise } 

一个可重用的方法是首先创建一个适配器,该适配器可以适应Elasticsearch希望ActionListener一般工作的承诺:

 fun <T: Any> promiseResult(deferred: Deferred<T, Exception>): ActionListener<T> { return object: ActionListener<T> { override fun onResponse(response: T) { deferred.resolve(response) } override fun onFailure(e: Throwable) { deferred.reject(wrapThrowable(e)) } } } class WrappedThrowableException(cause: Throwable): Exception(cause.message, cause) fun wrapThrowable(rawEx: Throwable): Exception = if (rawEx is Exception) rawEx else WrappedThrowableException(rawEx) 

注意: wrapThrowable()方法在那里将Throwable改为Exception因为Kovenant的当前版本( 3.3.0 )有一些方法期望promise的拒绝类型从Exception例如bind() )和你如果使用unwrap()代替嵌套的promise,可以保留Throwable

现在使用这个适配器函数一般地扩展Elasticsearch ActionRequestBuilder ,它几乎是你唯一会调用execute()东西; 创建一个新的promise()扩展函数:

 fun <Request: ActionRequest<*>, Response: ActionResponse, RequestBuilder: ActionRequestBuilder<*, *, *, *>, Client: ElasticsearchClient<*>> ActionRequestBuilder<Request, Response, RequestBuilder, Client>.promise(): Promise<Response, Exception> { val deferred = deferred<Response, Exception>() this.execute(promiseResult(deferred)) return deferred.promise } 

现在你可以调用promise()而不是execute()

 esClient.prepareSearch("index") .setQuery(QueryBuilders.matchAllQuery()) .promise() 

并开始链接你的承诺

 esClient.admin().indices().prepareCreate("index1").setSettings("...").promise() .bind { esClient.admin().cluster().prepareHealth() .setWaitForGreenStatus() .promise() } bind { esClient.prepareIndex("index1", "type1") .setSource(...) .promise() } bind { esClient.prepareSearch("index1") .setQuery(QueryBuilders.matchAllQuery()) .promise() } then { searchResults -> // ... use searchResults }.success { // ... }.fail { // ... } } 

当你有嵌套的promise时,你应该熟悉bind()unwrap() ,而不需要更深的嵌套。 如果你不想包含kovenant-functional你可以在上面的例子中使用kovenant-functional

由于Elasticsearch客户端中所有请求对象的一致性,您在Elasticsearch中的每个调用都将能够使用promise()而不是execute()