Android RxJava / Kotlin – 将大量数据映射到网络呼叫

我试图实现一个Android应用程序,需要从后端服务获取大量的数据,并将其保存到数据库,以便以后工作。

下面的代码描述了这个过程:

itemsService .getAllItemIds() //This returns Single<List<Int>> from backend .subscribeOn(Schedulers.io()) .subscribe({ Observable.fromIterable(it) .map({ itemsService .getItemById(it) //This gets one item details from backend .subscribeOn(Schedulers.io()) .observeOn(AndroidSchedulers.mainThread()) .subscribe({ //Add item details to db }, { //Some error }) }) }, { //Some error }) 

我获得一个ID列表,然后映射每个这些ID到一个网络调用来获得完整的对象。

这适用于10个项目的测试集,但是生产集包含超过5万个ID。 它起初工作,节省的项目,但约5-10%的研磨停下来,应用程序死亡。

我假设这里的原因是Rx保持源和映射值之间的引用。

我的问题是:是否有办法将源排放“汇集”到一起,比方说,10? 或者也许还有一些我不知道的其他机制?

你没有提到到底是什么“停下来”的意思,但是有意义的是,你将在实际的情况下出现5万个项目的内存,因为你基本上会试图一次创建50,000个线程来获取每个项目的细节。

此外,不是使用运算符链接Observable ,而是在subscribe / map创建嵌套链,你可以在这里阅读为什么你不应该这样做。

关于限制工作一次10,有一个flatMap重载,最后它可能看起来像这样:

 itemsService .getAllItemIds() //This returns List<Int> from backend .flatMapIterable { t -> t } .flatMap({ itemsService .getItemById(it) //This gets one item details from backend .subscribeOn(Schedulers.io()) }, 10) //limit flat map parallelism by desired value .subscribeOn(Schedulers.io()) .observeOn(AndroidSchedulers.mainThread()) .subscribe({ //Add item details to db }, { //Some error })