RxJava操作符用于切换方法

即时通讯新的与Android项目上的Rxjava,在这里我的代码

class RadioListRepositoryImpl(private val apiServices: ApiServices, private val chlDao: ChannelDao) : RadioListRepository { private val results: MutableList<DataResponse> init { results = ArrayList<DataResponse>() } override fun getData(): Observable<DataResponse> { return dataFromMemory().switchIfEmpty(dataFromNetwork()) } override fun dataFromMemory(): Observable<DataResponse> { val cacheDateExp = DateTime().minusHours(6) if(chlDao.isCacheExpired(cacheDateExp).isNotEmpty()){ Logger.d("Get data from cache SQLITE") val chList: MutableList<DataResponse> = ArrayList() val cache = chlDao.loadAll() repeat(cache.size){ i -> val ch = DataResponse() ch.channelId = cache[i].channelId ch.channelTitle = cache[i].title chList.add(ch) } return Observable.from(chList) }else{ chlDao.deleteAll() return Observable.empty<DataResponse>() } } override fun dataFromNetwork(): Observable<DataResponse> { val dttime = DateTime() return apiServices.getChannelList() .concatMap { dataListResponseModel -> Observable.from(dataListResponseModel.radio) } .doOnNext { channelDataResponse -> results.add(channelDataResponse) } .doOnNext { channelDataResponse -> Logger.d("Put data to cache") val c: ChannelEntitiy = ChannelEntitiy() c.channelId = channelDataResponse.channelId c.title = channelDataResponse.channelTitle chlDao.insert(c) } } 

}

我的其他类访问方法getData(),我想如果内存中的数据是空的(sqlite的),然后从网络获取数据。

但是我想要的是,如果从内存中的数据是空的,然后从网络插入到内存的数据,然后getData()方法返回dataFromMemory()

我可以使用另一个Rx操作符来处理它以简化我的代码吗?

当你想从多个来源获取数据时, concatfirst应该适合你。

 // Our sources (left as an exercise for the reader) Observable<Data> memory = ...; Observable<Data> disk = ...; Observable<Data> network = ...; // Retrieve the first source with data Observable<Data> source = Observable .concat(memory, disk, network) .first(); 

concat()多个Observables并连接它们的序列。 first()只发出序列中的第一个项目。 因此,如果使用concat().first() ,它会检索多个源发出的第一个项目。

这种模式的关键在于, concat()仅在需要时订阅每个子Observable。 如果缓存数据,没有不必要的慢速查询,因为first()会提前停止序列。 换句话说,如果内存返回结果,那么我们不会去打扰磁盘或网络。 相反,如果内存和磁盘都没有数据,则会发出新的网络请求。

请注意, concat()源代码Observables的顺序很重要,因为它是逐个检查它们的。

那么如果你想保存每个数据源的数据,只需要用doOnNext()修改一下你的数据源。

 Observable<Data> networkWithSave = network.doOnNext(data -> { saveToDisk(data); cacheInMemory(data); }); Observable<Data> diskWithCache = disk.doOnNext(data -> { cacheInMemory(data); });