如何创建rx.Single的缓存/热版本?
RxJava v1.0.13引入了一种新的Observable: rx.Single 。 它非常适合请求 – 响应模型,但缺乏像doOnNext()这样的操作符的标准副作用。 所以,要做出多种事情就更困难了。
我的想法是将doOnNext()替换为同一个Single实例的多个订阅。 但这可能导致下层工作多次完成:每次订阅一次。
示例rx.Single实现:
private class WorkerSubscribe<SomeData>() : Single.OnSubscribe<SomeData> { override fun call(sub: SingleSubscriber<in SomeData>) { try { val result = fetchSomeData() sub.onSuccess(result) } catch(t: Throwable) { sub.onError(t) } } } val single = Single.create<SomeData>(WorkerSubscribe())
用法:
single.subscribe({}, {}) single.subscribe({}, {}) // Data is fetched for the second time
是否有可能创建一个Single的实例,即使多次调用single.subscribe(),但是缓存并返回相同的结果,它将不会多次获取SoData()。
您需要RxJava主题 : BehaviorSubject
或AsyncSubject
我只是需要类似的行为,找到了一些解决办法
您可以将Single
转换为Observable
apply cache()
,然后将其转换回Single
。
yourSingle.toObservable().cacheWithInitialCapacity(1).toSingle()
我使用cacheWithInitialCapacity(1)
而不是仅cache()
作为优化 – Single
将永远不会发出多个项目。
提供Transformer
实施也是一个好主意
public class SingleUtils { public static <T> Single.Transformer<T, T> cached() { return single -> single.toObservable() .cacheWithInitialCapacity(1) .toSingle(); } }
所以你可以使用缓存,只要你想调用
yourSingle.compose(SingleUtils.cached())
编辑:从rxJava 1.2.2开始,已经添加了( https://github.com/ReactiveX/RxJava/releases/tag/v1.2.2 )
请检查cache()
运算符 。 它应该缓存来自Observable
的排放并将其复制到后续的Subscriber
。
您可以创建一个BehaviorSubject / ReplaySubject / AsyncSubject – 然后调用toSingle它。
我做了解决方法,我不满意,但它的工作原理:
public class Network { private Object data; private Single<Object> dataSingle; public Single<Object> getData { if (data == null) { if (dataSingle == null) { dataSingle = Single.create(...) .doOnSuccess( data -> this.data = data;) .sibscribeOn(..); } return dataSingle; } else { return Single.just(data); } } }