Rx concatWith()只返回第一个Flowable结果
我已经公布了他们正在单独工作的所有方法,但是我遇到了第一个问题,在那里我concatWith()两个流动
return userFavouriteStores() .concatWith(userOtherStores()) .doOnNext(new Consumer<List<StoreModel>>() { @Override public void accept(@io.reactivex.annotations.NonNull List<StoreModel> storeModels) throws Exception { Log.i("storeModels", "" + storeModels); } }) public Flowable<List<StoreModel>> userFavouriteStores() { return userStores() .map(UserStores::favoriteStores) .flatMap(storeId -> storeDao.storesWithIds(storeId)) .map(stores -> { // TODO Konvert to Kotlin map {} List<StoreModel> result = new ArrayList<>(stores.size()); for (se.ica.handla.repositories.local.Store store : stores) { result.add(store.toStoreModel(StoreModel.Source.Favourite)); } return result; }); } public Flowable<List<StoreModel>> userOtherStores() { return userStores().map(UserStores::otherStores) .flatMap(storeId -> storeDao.storesWithIds(storeId)) .map(stores -> { List<StoreModel> result = new ArrayList<>(stores.size()); for (Store store : stores) { result.add(store.toStoreModel(StoreModel.Source.Other)); } return result; });}
更新的方法:userStores()用于收藏夹和其他商店,
private Flowable<UserStores> userStores() { return apiIcaSeResource .userStores() .toFlowable(); } @GET("user/stores") Single<UserStores> userStores();
接下来的评论,以及其他信息,你没有特别的问题concat()
,我假设它是工作,这只是不是你想要在这里实现的工具。
concat()
不会连接两个列表到一个单一的列表,但rathe会首先发出所有项目的第一个Flowable,然后由第二个Flowable发出的项目(因此你必须有onComplete
所以concat会知道什么时候Flowable
是结束,我问了什么开始时)。
为了将列表结合在一起,我建议将两个商店Obesrvable
s(收藏夹/其他)拉链,然后简单地组合列表以具有单一的组合列表输出。
除此之外,正如您所指出的那样,由于两个存储Observable
都来自userStores()
,您将两次调用网络请求,这绝对没有必要。 你可以使用publish()
来解决这个问题,它将把网络结果共享和多播给两个Observable
,从而产生单一的网络请求。
总结一下,我宁愿建议在这里使用Single,因为你没有背压Flowable
,所以不能Flowable
。 像下面的实现:
Observable<List<StoreModel>> publish = userStores() .toObservable() .publish(userStores -> Single.zip( userFavouriteStores(userStores.singleOrError()), userOtherStores(userStores.singleOrError()), (favoriteStores, otherStores) -> { favoriteStores.addAll(otherStores); return favoriteStores; } ) .toObservable() );