如何通过键加入两个RxJava2 Obvervables?

我有两个不同类型的未分类observables。 这两种类型共享一个共同的密钥。 我想加入他们到一个新的可观察到的对应的元素发射对,我不知道该怎么做。

请注意,某些键可能会丢失。 如果不是完整的对子被丢弃,那就没问题,但是如果没有丢失的子块,它会更好。

输入1:

 Entity(id = 2), Entity(id = 1), Entity(id = 4) 

输入2:

 Dto(id = 3), Dto(id = 2), Dto(id = 1) 

预期输出(以任何顺序):

 Pair(Entity(id = 1), Dto(id = 1)), Pair(Entity(id = 2), Dto(id = 2)), Pair(null, Dto(id = 3)), Pair(Entity(id = 4), null) 

One Solution collect form web for “如何通过键加入两个RxJava2 Obvervables?”

首先, Observable.merge流在一起:这给你一个所有的项目流。 (在下面的代码中,我使用了一个自定义的Either类来标记每个流。)

然后,对于流中的每个项目,尝试将其与先前观察到的其他类型的项目进行匹配,然后输出该对。 如果没有,保存它以后再匹配。

最后,一旦流完成,其余的不匹配的元素将不会与任何东西匹配,所以它们可以不成对地发射。

 import io.reactivex.Observable data class Entity(val id: Int) data class Dto(val id: Int) sealed class Either<out A, out B> data class Left<A>(val value: A) : Either<A, Nothing>() data class Right<B>(val value: B) : Either<Nothing, B>() fun <A : Any, B : Any, C> joinById(a: Observable<A>, idA: (A) -> C, b: Observable<B>, idB : (B) -> C): Observable<Pair<A?, B?>> { val unmatchedA = mutableMapOf<C, A>() val unmatchedB = mutableMapOf<C, B>() val merged = Observable.mergeDelayError(a.map(::Left), b.map(::Right)).flatMap { latest -> when (latest) { is Left -> { val id = idA(latest.value) unmatchedB.remove(id)?.let { return@flatMap Observable.just(latest.value to it) } unmatchedA.put(id, latest.value) } is Right -> { val id = idB(latest.value) unmatchedA.remove(id)?.let { return@flatMap Observable.just(it to latest.value) } unmatchedB.put(id, latest.value) } } Observable.empty<Nothing>() } return Observable.concat(merged, Observable.create { emitter -> unmatchedA.values.forEach { emitter.onNext(it to null) } unmatchedB.values.forEach { emitter.onNext(null to it) } emitter.onComplete() }) } fun main(args: Array<String>) { val entities = Observable.just(Entity(2), Entity(1), Entity(4)) val dtos = Observable.just(Dto(3), Dto(2), Dto(1)) joinById(entities, Entity::id, dtos, Dto::id).blockingForEach(::println) } 
 (Entity(id=2), Dto(id=2)) (Entity(id=1), Dto(id=1)) (Entity(id=4), null) (null, Dto(id=3)) 

请注意,这可能会有一些奇怪的行为,如果ID在流中重复,并根据流的结构,这可能会最终缓冲内存中的很多元素。

  • 尝试使用Observable时获取Fata信号11
  • 使用函数引用重写Kotlin中的Java代码会发生SAM类型冲突
  • 经常调用回调方法来将事件转发到Observable?
  • 当有变化时,RxJava2可观察的不在onNext上进行处理
  • 如何使用RxJava2 combineLatest与Kotlin中的可观察列表
  • 即使数据没有更新,数值也会发出
  • Rx concatWith()只返回第一个Flowable结果
  • Retrofit-Vertx在Kotlin中使用RxJava2 IllegalStateException消息== null
  • 将RxJava代码正确转换为Kotlin
  • 在使用Flowable中的方法引用时,Kotlin无法推断types
  • 为什么我的RxJava设置阻止我的UI线程? 使用BluetoothAdapter.startLeScan回调
  • Kotlin language will be the best programming language for Android.