如何创建rx.Single的缓存/热版本?

RxJava v1.0.13引入了一种新的Observable: rx.Single 。 它非常适合请求 – 响应模型,但缺乏像doOnNext()这样的操作符的标准副作用。 所以,要做出多种事情就更困难了。

我的想法是将doOnNext()替换为同一个Single实例的多个订阅。 但这可能导致下层工作多次完成:每次订阅一次。

示例rx.Single实现:

private class WorkerSubscribe<SomeData>() : Single.OnSubscribe<SomeData> { override fun call(sub: SingleSubscriber<in SomeData>) { try { val result = fetchSomeData() sub.onSuccess(result) } catch(t: Throwable) { sub.onError(t) } } } val single = Single.create<SomeData>(WorkerSubscribe()) 

用法:

 single.subscribe({}, {}) single.subscribe({}, {}) // Data is fetched for the second time 

是否有可能创建一个Single的实例,即使多次调用single.subscribe(),但是缓存并返回相同的结果,它将不会多次获取SoData()。

您需要RxJava主题 : BehaviorSubjectAsyncSubject

我只是需要类似的行为,找到了一些解决办法

您可以将Single转换为Observable apply cache() ,然后将其转换回Single

 yourSingle.toObservable().cacheWithInitialCapacity(1).toSingle() 

我使用cacheWithInitialCapacity(1)而不是仅cache()作为优化 – Single将永远不会发出多个项目。

提供Transformer实施也是一个好主意

 public class SingleUtils { public static <T> Single.Transformer<T, T> cached() { return single -> single.toObservable() .cacheWithInitialCapacity(1) .toSingle(); } } 

所以你可以使用缓存,只要你想调用

 yourSingle.compose(SingleUtils.cached()) 

编辑:从rxJava 1.2.2开始,已经添加了( https://github.com/ReactiveX/RxJava/releases/tag/v1.2.2

这样实现( https://github.com/ReactiveX/RxJava/pull/4757

请检查cache()运算符 。 它应该缓存来自Observable的排放并将其复制到后续的Subscriber

您可以创建一个BehaviorSubject / ReplaySubject / AsyncSubject – 然后调用toSingle它。

我做了解决方法,我不满意,但它的工作原理:

 public class Network { private Object data; private Single<Object> dataSingle; public Single<Object> getData { if (data == null) { if (dataSingle == null) { dataSingle = Single.create(...) .doOnSuccess( data -> this.data = data;) .sibscribeOn(..); } return dataSingle; } else { return Single.just(data); } } }