Tag: 响应式编程

JavaRX:如何立即返回缓存值并行执行网络请求

在等待接收网络请求时,如何立即产生先前的缓存值? val suggestionsCache = mutableMapOf<String, List<Suggestion>>() override fun requestSuggestions(queryString: String, queryLocation: Location): Observable<List<Suggestion>> { return Observable.merge( getCached(queryString), placesService .requestSuggestions(queryString, queryLocation) .doOnNext { cache(it, queryString) }) } private fun getCached(queryString: String): Observable<List<Suggestion>?>? { return if (suggestionsCache.containsKey(queryString)) Observable.just(suggestionsCache[queryString]) else Observable.never<List<Suggestion>>() } private fun cache(suggestions: List<Suggestion>, queryString: String) { suggestionsCache[queryString] = suggestions } 这是关于requestSuggestions方法。 如果可能的话,它应该从缓存中立即得到结果,并且仍然对placeService运行一个请求,这样用户将立即看到建议,但是在请求终止之后也会得到一个更新的列表。 上面的代码在订阅时不会产生任何元素。 我错过了什么?

合并Observables列表,并等到所有完成

TL; DR如何将Task.whenAll(List<Task>)转换为RxJava ? 我现有的代码使用Bolts建立一个异步任务列表,并等到所有这些任务完成之后再执行其他步骤。 本质上,它建立一个List<Task>并返回一个单独的Task ,当完成列表中的所有任务完成后,标记为已完成,按照Bolts站点上的示例 。 我期待用RxJava取代Bolts ,我假设这种方法建立一个异步任务列表(大小不是事先知道的),并把它们全部包装成一个单一的Observable是可能的,但我不知道如何。 我试着看着merge , zip , concat等等……但不能在List<Observable> ,我会建立起来,因为他们似乎都只能在两个Observables一次工作,如果我正确理解文档。 我正在努力学习RxJava并且RxJava还是很新的,所以请原谅我,如果这是一个明显的问题或者在文档中解释的话; 我试过搜索。 任何帮助将不胜感激。

如何在RxJava2中默默跳过异常?

我有这样的数据流: Observable .fromFuture( CompletableFuture.supplyAsync { // First remote call returns Future<List<Type>> listOf(1, 2, 3, 57005, 5) }, Schedulers.computation() ) .flatMap { it.toObservable() } // I turn that list into a stream of single values to process them one by one .map { CompletableFuture.supplyAsync { // This remote call may fail if it does not like […]

重复发送对象到Rx上的子客户端

是否可以将相同的对象反复发送给Rx中的订阅者? 例如,这个代码(在Kotlin上): val exmp = listOf("А") var observable = exmp.toObservable() observable.subscribeBy( onNext = { it + "1" println(it) }, onError = { it.printStackTrace() }, onComplete = { println("Done!") } ) 我尝试将字符串值“A”重复发送到方法onNext()并获得“A111111”。 Rx库的方法重放()我明白开始发送日期再次为新的潜艇。 在从可观察的日期开始不变的循环for ,只是方法被调用几次