在条件符合的情况下更改可观察 – RxJava2

使用RxJava2 RxKotlinRoom ,我需要查询数据库中的一个开放的狩猎。 这意味着我搜索一个包含一个名为closed的值为false的属性。 一旦找到猎物,就需要将查询切换到特定的猎物。

对于这些查询我有2个方法:

 getOpenHunt(teamId:String): Flowable<List<Hunt>> getHunt(huntId:String): Flowable<List<Hunt>> 

他们都返回一个List因为否则查询卡住没有找到狩猎时。

我的想法是类似的

 fun queryHunt(teamId:String):Flowable<Optional<Hunt>>{ getOpenHunt(teamId) .map<Optional<Hunt>> { Optional.create(it.firstOrNull()) } .switchToFlowableIf ( it is Optional.Some, getHunt(it.element().id) } //With switchToFlowableIf's being fun <E:Any> switchToFlowableIf(condition: (E)->Boolean, newFlowable: Flowable<E>): Flowable<E> //It should unsubscribe from getOpenHunt and subscribe to newFlowable 

作为参考,这里是我的Optional

 sealed class Optional<out T> { class Some<out T>(val element: T) : Optional<T>() object None : Optional<Nothing>() fun element(): T? { return when (this) { is Optional.None -> null is Optional.Some -> element } } companion object { fun <T> create(element: T?): Optional<T> { return if (element != null) { Optional.Some(element) } else { Optional.None } } } } 

在RxJava2中是否有类似的方法? 如果不是的话,你将如何执行它?

我通过查看onErrorResumeNext来解决这个问题。 复制粘贴一些代码,并修改为我的需要。 我不认为这是完美的,但是这是他的工作。 评论,如果你发现一些可能的错误。

 public final class FlowableOnPredicateNext<T> extends AbstractFlowableWithUpstream<T, T> { private final Predicate<? super T> predicate; private final Function<? super T, ? extends Publisher<? extends T>> next; public FlowableOnPredicateNext(Flowable<T> source, Predicate<? super T> predicate, Function<? super T, ? extends Publisher<? extends T>> next) { super(source); this.predicate = predicate; this.next = next; } @Override protected void subscribeActual(Subscriber<? super T> s) { OnPredicateNextSubscriber<T> parent = new OnPredicateNextSubscriber<>(s, next, predicate); s.onSubscribe(parent.arbiter); source.subscribe(parent); } static final class OnPredicateNextSubscriber<T> implements FlowableSubscriber<T> { private final Subscriber<? super T> actual; private final Predicate<? super T> predicate; private final SubscriptionArbiter arbiter; private final Function<? super T, ? extends Publisher<? extends T>> nextSupplier; private boolean switched = false; OnPredicateNextSubscriber(Subscriber<? super T> actual, Function<? super T, ? extends Publisher<? extends T>> nextSupplier, Predicate<? super T> predicate) { this.actual = actual; this.predicate = predicate; this.nextSupplier = nextSupplier; this.arbiter = new SubscriptionArbiter(); } @Override public void onSubscribe(Subscription s) { arbiter.setSubscription(s); } @Override public void onNext(T t) { try { if (!switched && predicate.test(t)) { Publisher<? extends T> p; p = nextSupplier.apply(t); p.subscribe(this); switched = true; } else { actual.onNext(t); } } catch (Exception e) { actual.onError(e); } } @Override public void onError(Throwable t) { actual.onError(t); } @Override public void onComplete() { actual.onComplete(); } } } 

使用Kotlin我写了一个扩展函数:

 @CheckReturnValue @BackpressureSupport(BackpressureKind.FULL) @SchedulerSupport(SchedulerSupport.NONE) fun <E, T : Flowable<E>> T.onPredicateResumeNext(predicate: Predicate<E>, resumeFunction: io.reactivex.functions.Function<E, Publisher<E>>): Flowable<E> { return RxJavaPlugins.onAssembly<E>(FlowableOnPredicateNext<E>(this, predicate, resumeFunction )) } 

我现在正在使用它:

 override fun getOpenHunt(teamId: String): Flowable<Optional<Hunt>> { return created().getOpenHunt(teamId) .map { Optional.create(it.firstOrNull()) } .onPredicateResumeNext(predicate = Predicate { it.element() != null }, resumeFunction = Function { created() .getHunt(it.element()!!.id) .map<Optional<Hunt>> { Optional.create(it.firstOrNull()) } }) } 

我会这样做,通过重用你的方法:

 getOpenHunt(teamId:String): Flowable<List<Hunt>> getHunt(huntId:String): Flowable<List<Hunt>> getOpenHunt(yourId) // return your Flowable<List<Hunt>> .flatMapIterable { it } // get each result of open hunts .flatMapMaybe { getHunt(it.id) } // querie each result .toList() // back to a Maybe/Single<List<Hunt>> .toFlowable() // back to a Flowable else it wont emit more data .subscribeBy(onNext = { /** process your results **/ }, onError = { /** process if no results found **/ } ) 

这意味着你查询你的数据库,如果有条目符合你的条件。 然后得到结果,将其分成单个项目,使用结果运行多个查询。 如果找到结果,则将其转换回Single<List<Hunt>> 。 既然你想保持订阅,你需要使用.toFlowable()将它转换回到.toFlowable()

顺便说一下,变化意味着返回值的变化,但在你的情况下,它是相同的List<Hunt>

整个流程对我来说没有任何意义,因为你也可以通过getOpenHunt得到结果并遍历它。