RxJava Observable.create包装可观察订阅

我使用了Observable.create,所以当某些数据可用时,我可以通知用户。 我有点不确定订阅在我的创建方法内的observables。 这些嵌套的订阅会给我什么问题吗? 我不完全熟悉使用Observable.create创建observables,所以我想确保我没有做任何不寻常的事情或滥用它。 先谢谢你!

abstract class NetworkResource<ApiType, DbType> constructor(private val schedulerProvider: SchedulerProvider) { abstract fun fetchFromApi(): Single<ApiType> abstract fun fetchFromDb(): Observable<Optional<DbType>> abstract fun saveToDb(apiType: ApiType?) abstract fun shouldFetchFromApi(cache: DbType?): Boolean fun fetch(): Observable<Optional<DbType>> { return Observable.create<Optional<DbType>> { val subscriber = it fetchFromDb() .subscribe({ subscriber.onNext(it) if(shouldFetchFromApi(it.get())) { fetchFromApi() .observeOn(schedulerProvider.io()) .map { saveToDb(it) it } .observeOn(schedulerProvider.ui()) .flatMapObservable { fetchFromDb() } .subscribe({ subscriber.onNext(it) subscriber.onComplete() }) } else { subscriber.onComplete() } }) } } } 

是的,这会导致一个问题。

首先,这样嵌套Observable并不是惯用的,反应方法的优点之一就是组成Observables ,从而具有单一的清洁流。 用这种方式,你打破了链条,直接的结果是交织在一起的代码更难以阅读,更多的代码来连接通知事件,基本上就像使用Observable封装异步回调方法。
在这里,因为你已经有了被动组件,你可以简单地将它们组合起来,而不是用回调方法来处理它们。

其次,由于断链,最重要和最直接的一个 – 取消外部Observable不会自动影响内部Observable 。 尝试添加subscribeOn()以及在背压很重要的情况下也是如此。

一个组合的选择可能是这样的:

 fun fetch2(): Observable<Optional<DbType>> { return fetchFromDb() .flatMap { if (shouldFetchFromApi(it.get())) { fetchFromApi() .observeOn(schedulerProvider.io()) .doOnSuccess { saveToDb(it) } .observeOn(schedulerProvider.ui()) .flatMapObservable { fetchFromDb() } } else { Observable.empty() } } } 

如果由于某种原因,你想在任何情况fetchFromDb()一个fetchFromDb()结果被分开发射,你也可以使用带有选择器的publish()来完成:

  fun fetch2(): Observable<Optional<DbType>> { return fetchFromDb() .publish { Observable.merge(it, it.flatMap { if (shouldFetchFromApi(it.get())) { fetchFromApi() .observeOn(schedulerProvider.io()) .doOnSuccess { saveToDb(it) } .observeOn(schedulerProvider.ui()) .flatMapObservable { fetchFromDb() } } else { Observable.empty() } }) } }