Tag: RX java

RxAndroid – 重试观察点击

我在Android应用程序中使用rxAndroid和rxKotlin来异步处理网络请求。 现在我想重试一个失败的网络请求,只有点击Snackbar按钮后。 我的代码现在: val citiesService = ApiFactory.citiesService citiesService.cities() .subscribeOn(Schedulers.newThread()) // fetch List<String> .flatMap { Observable.from(it) } // convert to sequence of String .flatMap { city -> citiesService.coordinates(city) // fetch DoubleArray .map { City(city, it) } // convert to City(String, DoubleArray) } .toList() .observeOn(AndroidSchedulers.mainThread()) .doOnNext { listView.setOnItemClickListener { adapterView, view, position, id -> onItemClick(it[position]) } } […]

合并Observables列表,并等到所有完成

TL; DR如何将Task.whenAll(List<Task>)转换为RxJava ? 我现有的代码使用Bolts建立一个异步任务列表,并等到所有这些任务完成之后再执行其他步骤。 本质上,它建立一个List<Task>并返回一个单独的Task ,当完成列表中的所有任务完成后,标记为已完成,按照Bolts站点上的示例 。 我期待用RxJava取代Bolts ,我假设这种方法建立一个异步任务列表(大小不是事先知道的),并把它们全部包装成一个单一的Observable是可能的,但我不知道如何。 我试着看着merge , zip , concat等等……但不能在List<Observable> ,我会建立起来,因为他们似乎都只能在两个Observables一次工作,如果我正确理解文档。 我正在努力学习RxJava并且RxJava还是很新的,所以请原谅我,如果这是一个明显的问题或者在文档中解释的话; 我试过搜索。 任何帮助将不胜感激。

如何创建rx.Single的缓存/热版本?

RxJava v1.0.13引入了一种新的Observable: rx.Single 。 它非常适合请求 – 响应模型,但缺乏像doOnNext()这样的操作符的标准副作用。 所以,要做出多种事情就更困难了。 我的想法是将doOnNext()替换为同一个Single实例的多个订阅。 但这可能导致下层工作多次完成:每次订阅一次。 示例rx.Single实现: private class WorkerSubscribe<SomeData>() : Single.OnSubscribe<SomeData> { override fun call(sub: SingleSubscriber<in SomeData>) { try { val result = fetchSomeData() sub.onSuccess(result) } catch(t: Throwable) { sub.onError(t) } } } val single = Single.create<SomeData>(WorkerSubscribe()) 用法: single.subscribe({}, {}) single.subscribe({}, {}) // Data is fetched for the second time […]

歌林 – 项目清单项目列表

我有一个对象的列表,我想要转换为以下类型的列表对象 data class Value(val x : Int, val y: Int) 到以下类型的对象: data class Collection(val xs : List<Int>, val ys: List<Int>) 我正在寻找的东西类似于RxJava中的collect运算符,或者如果可能的话,这是一个更好的方法。 任何人有想法?

Rx concatWith()只返回第一个Flowable结果

我已经公布了他们正在单独工作的所有方法,但是我遇到了第一个问题,在那里我concatWith()两个流动 return userFavouriteStores() .concatWith(userOtherStores()) .doOnNext(new Consumer<List<StoreModel>>() { @Override public void accept(@io.reactivex.annotations.NonNull List<StoreModel> storeModels) throws Exception { Log.i("storeModels", "" + storeModels); } }) public Flowable<List<StoreModel>> userFavouriteStores() { return userStores() .map(UserStores::favoriteStores) .flatMap(storeId -> storeDao.storesWithIds(storeId)) .map(stores -> { // TODO Konvert to Kotlin map {} List<StoreModel> result = new ArrayList<>(stores.size()); for (se.ica.handla.repositories.local.Store store : stores) { result.add(store.toStoreModel(StoreModel.Source.Favourite)); } […]

RxJava如何将一个列表的项目分组到Map <Key,List <Value >>

我有一个这样的结构类“内阁” class Cabinet { var title: String var floor: Int var cabinetNumber: Int } 而另一个班级“标签” class Tabs { var floor: Int var cabinetList: List<Cabinet> } 所以在RxJava的帮助下,我试图创建Observable<List<Tabs>> 。 首先我得到一个内阁列表,所以我必须得到所有的“橱柜”“标签”结构。 fun getCabinets(): Observable<List<Tabs>> { return getCabs() //This metod returns List<Cabinet> .observeOn(Schedulers.io)) .filter { it.title != null } … ??? //There I don't get what to do next […]

RxJava – 随时接受更多观测数据的合并Observable?

我遇到需要一个Observable实现,它持有一个或多个Observable并将它们合并。 但是这里有一个启发:我想随时添加更多的可观察对象,我想这也可以支持将它们移除。 为了真正有效,所有订阅者都必须接收来自新订阅者的通知,这些订阅者是在订阅后添加的。 除非所有合并的Observable都很冷,并调用onComplete() ,那么我想可以让订阅取消订阅,即使添加了更多的Observable。 这是更多的合并多个无限的热Observables,并能够在任何时候添加更多。 MergableObservable<MyEvent> allSources = new MergableObservable<>(); //later in application Observable<MyEvent> eventSource1 = … allSources.add(eventSource1); //and later again Observable<MyEvent> eventSource2 = … allSources.add(eventSource2 ); //and so on Observable<MyEvent> eventSource3 = … allSources.add(eventSource3); 我知道有合并操作符,但我需要一个可变的结构。 我是否缺少已经存在的东西? 除非这种情况绝对适合,否则我宁愿不使用主题。

RxJava:在另一个异步生产中取消订阅异步观察

什么是最好的(干净)的方式来取消订阅一个异步Observable这是在另一个异步之一的flatMap产生? 说,我有一个演示文稿,显示用户的聊天列表。 每个聊天项目应该呈现自己的最新消息。 这里是一个原始的代码来说明: fun AsyncInsideAsync() { /** * A chat that 'transmits' latest message */ class Chat(val name: Int) { val lastMessage: PublishSubject<String> = PublishSubject.create() fun postMessage(message: Int) { lastMessage.onNext("Chat: $name, Message: $message") } } // List of chats for user. val chats: PublishSubject<Chat> = PublishSubject.create() /////////////////////////////////// // ORIGINAL SEQUENCE // /////////////////////////////////// val sequence […]

使用Retrofit 2.0和RxJava获取响应状态代码

我正在尝试升级到Retrofit 2.0,并在我的Android项目中添加RxJava。 我正在做一个API调用,并希望检索错误代码的情况下,从服务器的错误响应。 Observable<MyResponseObject> apiCall(@Body body); 在RxJava调用中: myRetrofitObject.apiCall(body).subscribe(new Subscriber<MyResponseObject>() { @Override public void onCompleted() { } @Override public void onError(Throwable e) { } @Override public void onNext(MyResponseObject myResponseObject) { //On response from server } }); 在1.9版本中,RetrofitError仍然存在,我们可以通过以下方式获得状态: error.getResponse().getStatus() 如何使用RxJava进行Retrofit 2.0操作?

合并来自不同Observable的数据,并根据数据可用性选择不同的获取策略

我有一种情况,通过获取一个Item对象,三件事情之一应该发生: 如果该项目不存在于缓存(又名为空)中,则从api1和api2加载数据,合并数据并返回一个Observable 。 如果该项目存在于缓存中,但缺少api2 ,则从api2加载数据,并将其与高速缓存中已有的数据合并 如果整个数据在缓存中可用,只需返回它。 到目前为止,这是我设法提出的最好的: val cacheObservable = cache.fetchItem(itemId); val api1Observable = api1.fetchItem(itemId); val api2Observable = api2.fetchItem(itemId); val resultObservable = cacheObservable!!.flatMap { item: Item? -> if (item == null) { /* Get the item data, and the full text separately, then combine them */ Observable.zip(api1Observable, api2Observable, { itemData, itemText -> itemData.apply { text […]