RxJava如何从订阅创建Observable
我正在寻找一种方法来处理结果subscribe
后创建Observable。
鉴于我从productRepo.list()
这是改造返回Observable<Response<ProductResponse>>
这个Observable。
productRepo .list() .retry(3) .subscribe { response -> if (response.isSuccessful) { response.body().apply { cache.saveProducts(data) } } }
这样做的目的是将结果保存到本地数据库cache
。 这加上另一个非常相似的调用填充本地数据库从API的远程数据。
两个调用完成后,我想从cache
加载数据。
我不想以任何方式将这两种观察结合起来。 之后只想运行一些任务。
我希望这个处理作为Rx调用图中的一个单元,以便它同时执行Call1和Call2,一旦Call1和Call2完成运行Task3。 这种情况下最好的方法是什么? 我真的更喜欢每个呼叫的用户是分开的。
flatMap
是最好的选择吗?
正如你所提到的,
我真的更喜欢每个呼叫的用户是分开的。
假设我们有两个可观察的事物
val call1 = Observable.from(arrayOf(1,2,3,4,5,6,7,8)) val call2 = Observable.from(arrayOf(2,4,6,8))
如果我们纯粹使用Observable.zip
如下,那么Call1和Call2都只能有一个用户。
Observable.zip(call1,call2) {c1, c2 -> Pair(c1,c2) }.subscribe(task3Subscriber)
如果我们使用三个不同的用户,Call1和Call2流将被触发两次 。
call1.subscribe(call1Subscriber) call2.subscribe(call2Subscriber) Observable.zip(call1,call2) {c1, c2 -> Pair(c1,c2) }.subscribe(task3Subscriber)
因此,我们需要使用.share().cacheWithInitialCapacity(1)
来做窍门
val call1 = Observable.from(arrayOf(1,2,3,4,5,6,7,8)) .share() .cacheWithInitialCapacity(1) val call2 = Observable.from(arrayOf(2,4,6,8)) .share() .cacheWithInitialCapacity(1) val task3Signal = Observable.zip(call1,call2){ c1, c2 -> c1 + c2 } call1.subscribe(call1Subscriber) call2.subscribe(call2Subscriber) task3Signal.subscribe(task3Subscriber)
你也可以从一个简单的测试用例中证明/测试你的Rx图的概念。
class SimpleJUnitTest { @Test fun test(){ val call1 = Observable.from(arrayOf(1,2,3,4,5,6,7,8)) .doOnNext { println("call1 doOnNext $it") } .share() .cacheWithInitialCapacity(1) val call2 = Observable.from(arrayOf(2,4,6,8)) .doOnNext { println("call2 doOnNext $it") } .share() .cacheWithInitialCapacity(1) val task3Signal = Observable.zip(call1,call2){ c1, c2 -> println("task3Signal c1:$c1, c2: $c2") c1 + c2 } val testSubscriber1 = TestSubscriber<Int>() val testSubscriber2 = TestSubscriber<Int>() val testSubscriber3 = TestSubscriber<Int>() call1.subscribe(testSubscriber1) call2.subscribe(testSubscriber2) task3Signal.subscribe(testSubscriber3) testSubscriber1.assertReceivedOnNext(listOf(1,2,3,4,5,6,7,8)) testSubscriber2.assertReceivedOnNext(listOf(2,4,6,8)) testSubscriber3.assertReceivedOnNext(listOf(3,6,9,12)) testSubscriber1.assertValueCount(8) testSubscriber2.assertValueCount(4) testSubscriber3.assertValueCount(4) } }
输出:
call1 doOnNext 1 call1 doOnNext 2 call1 doOnNext 3 call1 doOnNext 4 call1 doOnNext 5 call1 doOnNext 6 call1 doOnNext 7 call1 doOnNext 8 call2 doOnNext 2 call2 doOnNext 4 call2 doOnNext 6 call2 doOnNext 8 task3Signal c1:1, c2: 2 task3Signal c1:2, c2: 4 task3Signal c1:3, c2: 6 task3Signal c1:4, c2: 8
.doOnNext()
是你的答案,因为会返回你的最终答复或每个响应如果是多个。 试试。
看看Zip 。 做类似Observable.zip(firstObservable,secondObservable,….. {Task 3}