Rx concatWith()只返回第一个Flowable结果

我已经公布了他们正在单独工作的所有方法,但是我遇到了第一个问题,在那里我concatWith()两个流动

return userFavouriteStores() .concatWith(userOtherStores()) .doOnNext(new Consumer<List<StoreModel>>() { @Override public void accept(@io.reactivex.annotations.NonNull List<StoreModel> storeModels) throws Exception { Log.i("storeModels", "" + storeModels); } }) public Flowable<List<StoreModel>> userFavouriteStores() { return userStores() .map(UserStores::favoriteStores) .flatMap(storeId -> storeDao.storesWithIds(storeId)) .map(stores -> { // TODO Konvert to Kotlin map {} List<StoreModel> result = new ArrayList<>(stores.size()); for (se.ica.handla.repositories.local.Store store : stores) { result.add(store.toStoreModel(StoreModel.Source.Favourite)); } return result; }); } public Flowable<List<StoreModel>> userOtherStores() { return userStores().map(UserStores::otherStores) .flatMap(storeId -> storeDao.storesWithIds(storeId)) .map(stores -> { List<StoreModel> result = new ArrayList<>(stores.size()); for (Store store : stores) { result.add(store.toStoreModel(StoreModel.Source.Other)); } return result; });} 

更新的方法:userStores()用于收藏夹和其他商店,

  private Flowable<UserStores> userStores() { return apiIcaSeResource .userStores() .toFlowable(); } @GET("user/stores") Single<UserStores> userStores(); 

接下来的评论,以及其他信息,你没有特别的问题concat() ,我假设它是工作,这只是不是你想要在这里实现的工具。

concat()不会连接两个列表到一个单一的列表,但rathe会首先发出所有项目的第一个Flowable,然后由第二个Flowable发出的项目(因此你必须有onComplete所以concat会知道什么时候Flowable是结束,我问了什么开始时)。

为了将列表结合在一起,我建议将两个商店Obesrvable s(收藏夹/其他)拉链,然后简单地组合列表以具有单一的组合列表输出。
除此之外,正如您所指出的那样,由于两个存储Observable都来自userStores() ,您将两次调用网络请求,这绝对没有必要。 你可以使用publish()来解决这个问题,它将把网络结果共享和多播给两个Observable ,从而产生单一的网络请求。

总结一下,我宁愿建议在这里使用Single,因为你没有背压Flowable ,所以不能Flowable 。 像下面的实现:

 Observable<List<StoreModel>> publish = userStores() .toObservable() .publish(userStores -> Single.zip( userFavouriteStores(userStores.singleOrError()), userOtherStores(userStores.singleOrError()), (favoriteStores, otherStores) -> { favoriteStores.addAll(otherStores); return favoriteStores; } ) .toObservable() );