Tag: rx java2

Proguard:我可以添加什么规则来避免找不到引用类?

我有这样的功能: private fun enableSearch() { parentActivity?.let { parentActivity -> parentActivity.searchParamsObs //This is for the first time, to avoid having an empty trolley .flatMap { searchParams -> Observable.combineLatest(searchService.search(searchParams).toObservable(), orderItemsService.getCachedTrolleyOrRequest(), BiFunction<SearchResults, TrolleyItemsResponse, SearchListResultsAndTrolley> { searchResults, trolleyItems -> SearchListResultsAndTrolley(searchResults, trolleyItems) }) } .subscribeIO() .observeMain() .subscribe( { updateScreenWithSearch(it) }, { e -> onSearchError(e) }) .addTo(disposables) parentActivity.focusSearchView() } } 每次尝试使用proguard进行构建时, […]

使用RxJava2创建一个带有生成函数的流程图

我需要创建一个自定义Flowable背压执行。 我试图实现某种分页。 这意味着当下游请求5个项目时,我将“询问数据源”中的项目0-5。然后,当下游需要另外5个项目时,我将得到项目5-10并且返回。 到目前为止我发现的最好的事情是使用Flowable.generate方法,但我真的不明白为什么没有办法(据我所知)如何获得下游请求的项目数量。 我可以使用生成器的state属性来保存最后一个项目的索引,所以我只需要新请求的项目的数量。 我在BiFunction中apply的发件人实例是从AtomicLong扩展的GeneratorSubscription 。 所以将emmiter投射到AtomicLong可以得到我要求的号码。 但是我知道这不能成为“推荐”的方式。 另一方面,当你使用Flowable.create你会得到FlowableEmitter,它有long requested()方法。 使用generate更适合我的用例,但现在我也好奇什么是使用Flowable.generate的“正确”方法。 也许我正在想办法让所有的东西都指向正确的方向。 :) 谢谢。 这就是实际代码的样子(在Kotlin中): Flowable.generate(Callable { 0 }, BiFunction { start /*state*/, emitter -> val requested = (emitter as AtomicLong).get().toInt() //this is bull*hit val end = start + requested //get items [start to end] -> items emmiter.onNext(items) end /*return the new state*/ })

我怎样才能明确表示RxJava中Flowable的完成?

我正在尝试创建一个包装Iterable 。 我周期性地将元素推到我的Iterable但是看起来完成事件是隐含的。 我不知道如何表示处理完成。 例如在我的代码中: // note that this code is written in Kotlin val iterable = LinkedBlockingQueue<Int>() iterable.addAll(listOf(1, 2, 3)) val flowable = Flowable.fromIterable(iterable) .subscribeOn(Schedulers.computation()) .observeOn(Schedulers.computation()) flowable.subscribe(::println, {it.printStackTrace()}, {println("completed")}) iterable.add(4) Thread.sleep(1000) iterable.add(5) Thread.sleep(1000) 这打印: 1 2 3 4完成 我检查了Flowable接口的来源,但是似乎我不能明确指出Flowable是完整的。 我怎么能这样做? 在我的程序中,我发布了一些在它们之间有一些延迟的事件,我想明确何时complete事件流程。 澄清 :我有一个漫长的运行过程,发出事件。 我收集他们在一个队列中,我公开一个方法返回一个Flowable包裹我的队列。 问题是创建Flowable时队列中可能已经有元素了。 我只会处理事件一次,并且知道事件流何时停止,所以我知道何时需要完成Flowable。

RxJava2如何分离不同的可观察发射器的实现

这是我的情况。 我想从Android的本地LocationManager或Google服务中公开2个不同的Observable <'Location'>实现。 我想检查是否使用本地方法或gms。 所以最后我想把Observable公开给我的客户 – 他不需要知道从什么方法我收集的位置。 请注意,我使用这个库: https : //github.com/mcharmas/Android-ReactiveLocation 从谷歌服务暴露Observable。 它已经揭露了我正在寻找的Observable。 但是另一个地点经理呢。 它使用回调。 这是我的实现: var locationEmitter : Observable<Location> = Observable.empty() init { configureEmitter() } @SuppressLint("MissingPermission") private fun configureEmitter(){ if (!isUsingLocationNativeApi) locationEmitter = reactiveLocationProvider.getUpdatedLocation(reactiveLocationRequest) else{ configureNativeLocationEmitter() } } @SuppressLint("MissingPermission") private fun configureNativeLocationEmitter() { val mLocationCallbackNativeApi: LocationListener = object : LocationListener { override fun onLocationChanged(location: […]

RxJava和Kotlin的花括号和正常括号有什么区别?

我不明白使用RxJava时,大括号和Kotlin中的正常括号之间的真正区别。 例如,我有以下代码,它按预期工作: someMethodThatReturnsCompletable() .andThen(anotherMethodThatReturnsACompletable()) .subscribe(…) 但以下不起作用: someMethodThatReturnsCompletable() .andThen { anotherMethodThatReturnsACompletable() } .subscribe(…) 用大括号注意链中的andThen()部分的区别。 我不明白这两者之间的区别是什么。 我已经看了一些文章,但不幸的是,我仍然很难理解这种微妙的差异。

RxJava Debounce onNext()

我试图为我的SwitchCompat反弹点击/ swipe没有成功。 下面的代码看起来不错,虽然onNext没有经过debounce()ie:当用户垃圾开关,onNext被称为每次点击,不会因为debounce而被省略。 它应该工作,这与RxJava的错误? timer = Observable.create { subscriber: Subscriber<in Void>? -> super.setOnCheckedChangeListener { buttonView, isChecked -> subscriber?.onNext(null) } } timer.debounce(3,TimeUnit.SECONDS)

如何在RxJava2中默默跳过异常?

我有这样的数据流: Observable .fromFuture( CompletableFuture.supplyAsync { // First remote call returns Future<List<Type>> listOf(1, 2, 3, 57005, 5) }, Schedulers.computation() ) .flatMap { it.toObservable() } // I turn that list into a stream of single values to process them one by one .map { CompletableFuture.supplyAsync { // This remote call may fail if it does not like […]

相当于RxJava 2 isUnsubscribed

我一直在通过RxJava的“ Reactive Programming ”一书中的示例进行研究,该示例针对版本1而不是2.对无限流的介绍有以下示例(并注意有更好的方法来处理并发): Observable<BigInteger> naturalNumbers = Observable.create(subscriber -> { Runnabler = () -> { BigInteger i = ZERO; while (!subscriber.isUnsubscribed()) { subscriber.onNext(i); i = i.add(ONE); } }; new Thread(r).start(); }); … Subscription subscription = naturalNumbers.subscribe(x -> log(x)); /* after some time… */ subscription.unsubscribe(); 但是,在RxJava 2中,传递给create()方法的lambda表达式的类型为ObservableEmitter ,并且没有isUnsubscribed()方法。 我在2.0中看到了什么是不同的,并且执行了对存储库的搜索,但是找不到任何这样的方法。 在2.0中如何实现这个相同的功能? 编辑包括下面给出的解决方案(使用kotlin的nb): val naturalNumbers = Observable.create<BigInteger> { […]

从正确的线程调用RxJava2可取消/一次性

我正在实现一个可观察的Resource从一个Resource发出线。 问题是,这个资源真的不喜欢被关闭从一个不同的线程,它创建(它杀死一只小狗,抛出异常,当这种情况发生)。 当我处理订阅时,从main线程调用资源Cancellable / Disposable ,而在Schedulers.io()上订阅observable。 这是Kotlin代码: fun lines(): Observable<String> = Observable.create { emitter -> val resource = NetworkResource() emitter.setCancellable { resource.close() // <– main thread 🙁 } try { while (!emitter.isDisposed) emitter.onNext(resource.readLine()) // <– blocked here! } catch (ioe: IOException) { emitter.tryOnError(ioe) // <– this also triggers the cancellable } } val disposable = […]

不能用RxKotlin“观察”主线程

我试图通过使用主线程观察可观察的: // Kotlin Code Observable .observeOn(AndroidSchedulers.mainThread()) 但我得到以下错误: Type Mismatch: Required: rx.Scheduler! Found: io.reactivex.Scheduler! 我订阅的Observable来自一个用Java编写的库,因此使用RxJava。 我是愚蠢的,错过了什么? 我puzzeled:$ 提前致谢 :)