如何通过键加入两个RxJava2 Obvervables?
我有两个不同类型的未分类observables。 这两种类型共享一个共同的密钥。 我想加入他们到一个新的可观察到的对应的元素发射对,我不知道该怎么做。
请注意,某些键可能会丢失。 如果不是完整的对子被丢弃,那就没问题,但是如果没有丢失的子块,它会更好。
输入1:
Entity(id = 2), Entity(id = 1), Entity(id = 4)
输入2:
Dto(id = 3), Dto(id = 2), Dto(id = 1)
预期输出(以任何顺序):
Pair(Entity(id = 1), Dto(id = 1)), Pair(Entity(id = 2), Dto(id = 2)), Pair(null, Dto(id = 3)), Pair(Entity(id = 4), null)
首先, Observable.merge
流在一起:这给你一个所有的项目流。 (在下面的代码中,我使用了一个自定义的Either
类来标记每个流。)
然后,对于流中的每个项目,尝试将其与先前观察到的其他类型的项目进行匹配,然后输出该对。 如果没有,保存它以后再匹配。
最后,一旦流完成,其余的不匹配的元素将不会与任何东西匹配,所以它们可以不成对地发射。
import io.reactivex.Observable data class Entity(val id: Int) data class Dto(val id: Int) sealed class Either<out A, out B> data class Left<A>(val value: A) : Either<A, Nothing>() data class Right<B>(val value: B) : Either<Nothing, B>() fun <A : Any, B : Any, C> joinById(a: Observable<A>, idA: (A) -> C, b: Observable<B>, idB : (B) -> C): Observable<Pair<A?, B?>> { val unmatchedA = mutableMapOf<C, A>() val unmatchedB = mutableMapOf<C, B>() val merged = Observable.mergeDelayError(a.map(::Left), b.map(::Right)).flatMap { latest -> when (latest) { is Left -> { val id = idA(latest.value) unmatchedB.remove(id)?.let { return@flatMap Observable.just(latest.value to it) } unmatchedA.put(id, latest.value) } is Right -> { val id = idB(latest.value) unmatchedA.remove(id)?.let { return@flatMap Observable.just(it to latest.value) } unmatchedB.put(id, latest.value) } } Observable.empty<Nothing>() } return Observable.concat(merged, Observable.create { emitter -> unmatchedA.values.forEach { emitter.onNext(it to null) } unmatchedB.values.forEach { emitter.onNext(null to it) } emitter.onComplete() }) } fun main(args: Array<String>) { val entities = Observable.just(Entity(2), Entity(1), Entity(4)) val dtos = Observable.just(Dto(3), Dto(2), Dto(1)) joinById(entities, Entity::id, dtos, Dto::id).blockingForEach(::println) }
(Entity(id=2), Dto(id=2)) (Entity(id=1), Dto(id=1)) (Entity(id=4), null) (null, Dto(id=3))
请注意,这可能会有一些奇怪的行为,如果ID在流中重复,并根据流的结构,这可能会最终缓冲内存中的很多元素。