RxJava如何从订阅创建Observable

我正在寻找一种方法来处理结果subscribe后创建Observable。

鉴于我从productRepo.list()这是改造返回Observable<Response<ProductResponse>>这个Observable。

 productRepo .list() .retry(3) .subscribe { response -> if (response.isSuccessful) { response.body().apply { cache.saveProducts(data) } } } 

这样做的目的是将结果保存到本地数据库cache 。 这加上另一个非常相似的调用填充本地数据库从API的远程数据。

两个调用完成后,我想从cache加载数据。

我不想以任何方式将这两种观察结合起来。 之后只想运行一些任务。

我希望这个处理作为Rx调用图中的一个单元,以便它同时执行Call1和Call2,一旦Call1和Call2完成运行Task3。 这种情况下最好的方法是什么? 我真的更喜欢每个呼叫的用户是分开的。

flatMap是最好的选择吗?

正如你所提到的,

我真的更喜欢每个呼叫的用户是分开的。

假设我们有两个可观察的事物

 val call1 = Observable.from(arrayOf(1,2,3,4,5,6,7,8)) val call2 = Observable.from(arrayOf(2,4,6,8)) 

如果我们纯粹使用Observable.zip如下,那么Call1和Call2都只能有一个用户。

 Observable.zip(call1,call2) {c1, c2 -> Pair(c1,c2) }.subscribe(task3Subscriber) 

如果我们使用三个不同的用户,Call1和Call2流将被触发两次

 call1.subscribe(call1Subscriber) call2.subscribe(call2Subscriber) Observable.zip(call1,call2) {c1, c2 -> Pair(c1,c2) }.subscribe(task3Subscriber) 

因此,我们需要使用.share().cacheWithInitialCapacity(1)来做窍门

 val call1 = Observable.from(arrayOf(1,2,3,4,5,6,7,8)) .share() .cacheWithInitialCapacity(1) val call2 = Observable.from(arrayOf(2,4,6,8)) .share() .cacheWithInitialCapacity(1) val task3Signal = Observable.zip(call1,call2){ c1, c2 -> c1 + c2 } call1.subscribe(call1Subscriber) call2.subscribe(call2Subscriber) task3Signal.subscribe(task3Subscriber) 

你也可以从一个简单的测试用例中证明/测试你的Rx图的概念。

 class SimpleJUnitTest { @Test fun test(){ val call1 = Observable.from(arrayOf(1,2,3,4,5,6,7,8)) .doOnNext { println("call1 doOnNext $it") } .share() .cacheWithInitialCapacity(1) val call2 = Observable.from(arrayOf(2,4,6,8)) .doOnNext { println("call2 doOnNext $it") } .share() .cacheWithInitialCapacity(1) val task3Signal = Observable.zip(call1,call2){ c1, c2 -> println("task3Signal c1:$c1, c2: $c2") c1 + c2 } val testSubscriber1 = TestSubscriber<Int>() val testSubscriber2 = TestSubscriber<Int>() val testSubscriber3 = TestSubscriber<Int>() call1.subscribe(testSubscriber1) call2.subscribe(testSubscriber2) task3Signal.subscribe(testSubscriber3) testSubscriber1.assertReceivedOnNext(listOf(1,2,3,4,5,6,7,8)) testSubscriber2.assertReceivedOnNext(listOf(2,4,6,8)) testSubscriber3.assertReceivedOnNext(listOf(3,6,9,12)) testSubscriber1.assertValueCount(8) testSubscriber2.assertValueCount(4) testSubscriber3.assertValueCount(4) } } 

输出:

 call1 doOnNext 1 call1 doOnNext 2 call1 doOnNext 3 call1 doOnNext 4 call1 doOnNext 5 call1 doOnNext 6 call1 doOnNext 7 call1 doOnNext 8 call2 doOnNext 2 call2 doOnNext 4 call2 doOnNext 6 call2 doOnNext 8 task3Signal c1:1, c2: 2 task3Signal c1:2, c2: 4 task3Signal c1:3, c2: 6 task3Signal c1:4, c2: 8 
 .doOnNext() 

是你的答案,因为会返回你的最终答复或每个响应如果是多个。 试试。

看看Zip 。 做类似Observable.zip(firstObservable,secondObservable,….. {Task 3}