Tag: RX java

合并多个单打组成一个Observable

我正在编写一个Android应用程序,需要按以下顺序执行2个查询: 向一个返回Single<List<String>> urls的库发出一个请求(让我们称之为RequestA)。 根据我从RequestA收到的内容,我必须使用每个这些URL向另一个库发出请求(RequestB)。 每个RequestB现在都返回一个Single。 现在我已经把所有的RequestB中的所有Single结合起来形成了一个observable。 就像Observable.mergedelayerror(List<Single>) 。 我不能这样做,因为mergedelayerror期望ObservableSource iterable 。 我知道我可以通过实现回调和使用一些丑陋的逻辑来实现这一目标但是我真的只需要使用由RX提供的运算符

RxJava2如何分离不同的可观察发射器的实现

这是我的情况。 我想从Android的本地LocationManager或Google服务中公开2个不同的Observable <'Location'>实现。 我想检查是否使用本地方法或gms。 所以最后我想把Observable公开给我的客户 – 他不需要知道从什么方法我收集的位置。 请注意,我使用这个库: https : //github.com/mcharmas/Android-ReactiveLocation 从谷歌服务暴露Observable。 它已经揭露了我正在寻找的Observable。 但是另一个地点经理呢。 它使用回调。 这是我的实现: var locationEmitter : Observable<Location> = Observable.empty() init { configureEmitter() } @SuppressLint("MissingPermission") private fun configureEmitter(){ if (!isUsingLocationNativeApi) locationEmitter = reactiveLocationProvider.getUpdatedLocation(reactiveLocationRequest) else{ configureNativeLocationEmitter() } } @SuppressLint("MissingPermission") private fun configureNativeLocationEmitter() { val mLocationCallbackNativeApi: LocationListener = object : LocationListener { override fun onLocationChanged(location: […]

用RxJava重复对状态的操作

这是在Kotlin,但我认为任何编写Java的人都能理解。 我正在尝试用Rx制作一个秒表,而且我在实际停止和启动时遇到了一些麻烦。 最大的问题是,我不知道如何保持现在的时间,而修改它作为不同的行动(开始和停止)进来。这是我现在所拥有的。 fullTime.switchMap { startTime -> controlCommands.switchMap { command -> when (command) { ControlState.PLAY -> Observable.interval(1L, TimeUnit.SECONDS).map { ControlState.PLAY } ControlState.PAUSE -> Observable.just(ControlState.PAUSE) else -> Observable.just(ControlState.STOP) } } } fullTime和controlCommands是Observable ,它们分别发出有关当前开始时间的事件,从中倒数,然后说下一步该做什么。 我想链接controlCommands并能够保持状态在startTime开始,当PLAY事件出现时将倒计时, PAUSE出现时PAUSE ,并在出现STOP时在startTime复位。 scan几乎工作,但我不知道如何停止计时器命中0和PLAY仍然每秒钟发送,因为它会发送重复的信息。 也不允许状态和观察值之间的分离。 所以我scan积累的值必须是Observable内部的值(如果有意义的话)。 任何想法我应该做什么?

如何从OnClick事件Android创建一个Observable?

我是反应式编程的新手。 所以当我从事件创建一个流,如onClick,ontouch … 任何人都可以帮我解决这个问题。 谢谢。

RxJava-将Observable转变为Iterator,Stream或Sequence

我知道这打破了很多Rx规则,但我真的很喜欢RxJava-JDBC ,所以我的队友也是这样。 关系数据库对于我们的工作非常重要,Rx也是如此。 但是有些情况下,我们不希望作为Observable<ResultSet>发出,而只是有一个基于拉的Java 8 Stream<ResultSet>或Kotlin Sequence<ResultSet> 。 但是我们非常习惯于只返回一个Observable<ResultSet>的RxJava-JDBC库。 因此,我想知道是否有一种方法可以使用扩展函数将Observable<ResultSet>转换为Sequence<ResultSet> ,而不执行任何中间集合或toBlocking()调用。 下面是我迄今所有的,但我的头正在旋转,试图连接推拉系统,我不能缓冲,因为每个onNext()调用ResultSet是有状态的。 这是不可能的任务吗? import rx.Observable import rx.Subscriber import java.sql.ResultSet fun Observable<ResultSet>.asSequence() = object: Iterator<ResultSet>, Subscriber<ResultSet>() { private var isComplete = false override fun onCompleted() { isComplete = true } override fun onError(e: Throwable?) { throw UnsupportedOperationException() } override fun onNext(rs: ResultSet?) { throw UnsupportedOperationException() } […]

RxJava Debounce onNext()

我试图为我的SwitchCompat反弹点击/ swipe没有成功。 下面的代码看起来不错,虽然onNext没有经过debounce()ie:当用户垃圾开关,onNext被称为每次点击,不会因为debounce而被省略。 它应该工作,这与RxJava的错误? timer = Observable.create { subscriber: Subscriber<in Void>? -> super.setOnCheckedChangeListener { buttonView, isChecked -> subscriber?.onNext(null) } } timer.debounce(3,TimeUnit.SECONDS)

RxJava:是否可以避免toBlocking()。single()?

我有时使用RxJava来为一些复杂的过滤器,映射等写一个更实用的风格(我知道这不是它所做的,我会很乐意使用Kotlin或Java 8,但是我可以“吨(我在Android上,所以坚持到6)。 但是,当你试图从Observable中“抽取”实际的对象时,你总是需要写.toList().toBlocking().singe() 例如(不那么复杂,但你明白了): final List<CashRegister> cashRegisters = cashRegisterData.getCashRegisters(); return Observable.from(cashRegisters) .map(CashRegister::getName) .toList() .toBlocking() .single(); 正如你所看到的,将Observable“转换”成一个列表需要比我想要执行的实际映射更多的行。 有没有toList().toBlocking().single()链做到这一点?

如何在RxJava2中默默跳过异常?

我有这样的数据流: Observable .fromFuture( CompletableFuture.supplyAsync { // First remote call returns Future<List<Type>> listOf(1, 2, 3, 57005, 5) }, Schedulers.computation() ) .flatMap { it.toObservable() } // I turn that list into a stream of single values to process them one by one .map { CompletableFuture.supplyAsync { // This remote call may fail if it does not like […]

相当于RxJava 2 isUnsubscribed

我一直在通过RxJava的“ Reactive Programming ”一书中的示例进行研究,该示例针对版本1而不是2.对无限流的介绍有以下示例(并注意有更好的方法来处理并发): Observable<BigInteger> naturalNumbers = Observable.create(subscriber -> { Runnabler = () -> { BigInteger i = ZERO; while (!subscriber.isUnsubscribed()) { subscriber.onNext(i); i = i.add(ONE); } }; new Thread(r).start(); }); … Subscription subscription = naturalNumbers.subscribe(x -> log(x)); /* after some time… */ subscription.unsubscribe(); 但是,在RxJava 2中,传递给create()方法的lambda表达式的类型为ObservableEmitter ,并且没有isUnsubscribed()方法。 我在2.0中看到了什么是不同的,并且执行了对存储库的搜索,但是找不到任何这样的方法。 在2.0中如何实现这个相同的功能? 编辑包括下面给出的解决方案(使用kotlin的nb): val naturalNumbers = Observable.create<BigInteger> { […]

如何使用ContextWrapper类与subscribeWith

您好我已经使ContextWrapper在单个地方进行翻新错误处理,并希望在subscribeWith使用它。 abstract class CallbackWrapper<T : BaseResponse> : DisposableObserver<T>() { protected abstract fun onSuccess(t: T) override fun onNext(t: T) { //You can return StatusCodes of different cases from your API and handle it here. I usually include these cases on BaseResponse and inherit it from every Response onSuccess(t) } override fun onError(e: Throwable) { when (e) […]