可以用最好的值来观察

我实现了一个名为“FilterByLatestFrom”的伪操作符作为kotlin的扩展函数。

我用这个运算符写下了下面的代码:

fun testFilterByLatestFromOperator(){ val observableA : Observable = Observable.fromArray(1,2,3,4,5,6,7,8,9,10) val observableC : PublishSubject = PublishSubject.create() val observableB : Observable = Observable.just(2).mergeWith(observableC) observableB.subscribe { println("observableB onNext: $it") } observableA .subscribe({ println("Original : $it")}) observableA.filterByLatestFrom(observableB, BiFunction { aVal, bVal -> aVal%bVal==0 }) .subscribe({ println("Result A : $it") }) observableC.onNext(3) observableA.filterByLatestFrom(observableB, BiFunction { aVal, bVal -> aVal%bVal==0 }) .subscribe({ println("Result AC : $it") }) } 

输出是:

 observableB onNext: 2 Original : 1 Original : 2 Original : 3 Original : 4 Original : 5 Original : 6 Original : 7 Original : 8 Original : 9 Original : 10 Result A : 2 Result A : 4 Result A : 6 Result A : 8 Result A : 10 observableB onNext: 3 Result AC : 2 Result AC : 4 Result AC : 6 Result AC : 8 Result AC : 10 

我想筛选器运算符将过滤ObsA根据可观察B的最后一个值。它适用于第一个块,但是当我添加On-next与新值它不会更改结果(使用相同的最后一个值从原始可观察)。

这是FilterByLatestFrom impl(它也是从Java使用的设计(与撰写):

 class FilterByLatestFrom(private val observable: Observable, private val biFunction: BiFunction){ fun filter() : ObservableTransformer = ObservableTransformer { it .withLatestFrom( observable, BiFunction<U,T,Pair> { u, t -> Pair(u,biFunction.apply(u,t)) }) .filter { it.second } .map { it.first } } } fun  Observable.filterByLatestFrom(observable: Observable, biFunction: BiFunction) : Observable = this.compose(FilterByLatestFrom(observable,biFunction).filter()) 

我错过了什么?

编辑:我想我发现这个问题:发布主题应该是BehaviorSubject而不是。 并且合并函数应该是concon to obsC会在obsB之后发射。

你的伪运算符filterByLatestFrom就好了,问题在于测试, PublishSubject只会发射后续的项目,所以当你在最后一个订阅(’结果AC’), observableB将只发出2,因为observableC已经发射3,将不重播它observableB (使用merge )。

只要将observableC.onNext(3)移到最后一个订阅(最后一行)后面,你就会看到预期的行为。

编辑:也像你一样更改为PublishSubject解决了同样的问题(主题将重播新订阅的最后一个值)