Tag: RX java

RuntimeException处理最佳实践

RuntimeExceptions应该表示编程错误,并且当我的observables里面的东西抛出RuntimeException时,我希望我的应用程序崩溃。 做这个的最好方式是什么? 现在我正在考虑这个解决方案(这是Kotlin,但我希望这是可以理解的) fun Observable.subscribeCrashOnRuntimeException(onNext: (T) -> Unit, onError: (Throwable) -> Unit) { this.subscribe({ onNext(it) }, { e -> if (e is RuntimeException) { throw e } else { onError(e) } }) } fun usageExample() { val observable = Observable.just(1) observable.subscribeCrashOnRuntimeExceptions( { next -> Log.d(“TAG”, “next: $next”) }, { e -> Log.d(“TAG”, “error: $e”) } […]

在RxKotlin / RxJava中用BehaviorSubject自动创建热可观察对象

目前我正在使用RxKotlin在Kotlin上建立一个项目。 我与Rx的背景主要是基于RxJS。 我经常用于在Typescript中创建hot observables模式的模式看起来是这样的: private dataStore: IFoo; private dataStoreSubject: BehaviorSubject = new BehaviorSubject(this.dataStore); public dataStoreObservable: Observable = Observable.from(this.dataStoreSubject); public getNetworkData(): Observable { return this.http.get() .map((response: IResponse) => { this.dataStore = response; this.dataStoreSubject.next(this.dataStore); return this.dataStore; }); } 这将允许我暴露一个Observable ,而不暴露Subject和后续的subject.next(); 方法。 我的问题是:在RxKotlin或RxJava中建立类似逻辑最常用的方法是什么?

可以用最好的值来观察

我实现了一个名为“FilterByLatestFrom”的伪操作符作为kotlin的扩展函数。 我用这个运算符写下了下面的代码: fun testFilterByLatestFromOperator(){ val observableA : Observable = Observable.fromArray(1,2,3,4,5,6,7,8,9,10) val observableC : PublishSubject = PublishSubject.create() val observableB : Observable = Observable.just(2).mergeWith(observableC) observableB.subscribe { println(“observableB onNext: $it”) } observableA .subscribe({ println(“Original : $it”)}) observableA.filterByLatestFrom(observableB, BiFunction { aVal, bVal -> aVal%bVal==0 }) .subscribe({ println(“Result A : $it”) }) observableC.onNext(3) observableA.filterByLatestFrom(observableB, BiFunction { aVal, bVal -> aVal%bVal==0 […]

等待API调用以完成RxJava和Retrofit

我正在使用RxAndroid和RxJava和Kotlin向API发出各种请求,并以异步方式将其收回: getClient().sendSomeData(data) .subscribeOn(Schedulers.io()) .observeOn(AndroidSchedulers.mainThread()) .subscribe( { Log.i(“upload”, “data ok”) }, { t -> Log.e(“upload”, “data error: ” + t.stackTrace) } ); getClient().sendSomeOtherData(otherData) .subscribeOn(Schedulers.io()) .observeOn(AndroidSchedulers.mainThread()) .subscribe( { Log.i(“upload”, “otherData ok”) }, { t -> Log.e(“upload”, “data error: ” + t.stackTrace) } ); 等等。 现在我想要做的是,等待所有api请求完成并显示进度。 我如何等待所有可观察的事情完成并获得进展回调?

使用RxJava和Scheduler来更新Realm

我试图“rxify”我的数据库层是基于Realm .. 我有这个方法: public fun insertOrUpdate(user: user): User { return Observable.create { try { realm.beginTransaction() val result = realm.copyToRealmOrUpdate(user) realm.commitTransaction() it.onNext(result) it.onCompleted() } catch(e: Exception) { it.onError(IllegalDataAccess()) } } } 不过,由于我使用了不同的调度器,因此在不同的线程上使用领域对象的错误。 有什么要克服这个? (我能够做到这一点在对象检索,但不是在对象插入) (我正在使用kotlin btw。) 使用示例: userRepo.insertOrUpdateUser(user) .subscribeOn(Schedulers.io()) .observeOn(AndroidSchedulers.mainThread()) .subscribe(…)

Kotlin – 如何创建RxJava flatmap()的别名函数?

我试图创建一个别名函数Flowable.flatmap()如下,但编译错误。 fun Flowable.then(mapper: Function<T, Publisher>): Flowable { return flatMap(mapper) } 错误是: 在kotlin中定义的接口Function有一个types参数 有什么想法? 谢谢!

RxKotlin(RxJava2)timeout()不会抛出TimeoutException

我试图得到一个使用两个不同超时值的示例工作。 第一个排放量的初始值较大,随后的所有排放量则为较短的值。 这个例子是从RxJava v1x的Java转换为Kotlin,虽然我试图这是v2x(不知道这是否有任何区别)。 问题是第一个事件的超时不会引发TimeoutException 。 设置的值低于500毫秒,我期待一个堆栈跟踪打印,但我得到的输出,如果没有发生超时(后续排放超时设置为40毫秒导致堆栈跟踪如预期)。 以下示例阻止成功初始超时有什么问题? fun nextSolarEclipse(after: LocalDate): Observable { return Observable .just( LocalDate.of(2016, Month.MARCH, 9), LocalDate.of(2016, Month.SEPTEMBER, 1), LocalDate.of(2017, Month.FEBRUARY, 26), LocalDate.of(2017, Month.AUGUST, 21), LocalDate.of(2018, Month.FEBRUARY, 15), LocalDate.of(2018, Month.JULY, 13), LocalDate.of(2018, Month.AUGUST, 11), LocalDate.of(2019, Month.JANUARY, 6), LocalDate.of(2019, Month.JULY, 2), LocalDate.of(2019, Month.DECEMBER, 26) ) .skipWhile { date -> !date.isAfter(after) } .zipWith( Observable.interval(500, […]

如何从静态函数返回Template 时使用null来获取正确的types

我正在尝试一些Rx和Kotlin。 假设我有一个返回Observable的函数: public fun get():Observable { return if (someCondition()) { Observable.just(someLocalVarOfTypeT) } else { Observable.just(null) } } 编译器无法推断Observable.just(null)因为它在此expression式kotlin.Nothing替换为T : Type mismatch: required rx.Observable found rx.Observable 试图施放: Observable.just(null as T) 导致: The cast will never succeed Observable.just(null)的签名是(为了方便起见): public final static Observable just(final T value) { return … } 将这样一个Observable转换成正确types的正确方法是什么? 中间variables: val a: Observable = Observable.just(null) 作品,但看起来很奇怪…

如何做一个组,并使用RxJava和Kotlin收集?

我有Observable和Rate是一个简单的对象: Rate(val value:String){} Rates(val rates: List) 我想将Observable改为Observable<HashMap 。 Rates(arrayOf(Rate(“1”),Rate(“2”), Rate(“3”),Rate(“3”), Rate(“2”),Rate(“2”)))我期望的结果: (1 -> 1) (2 -> 3) (3 -> 2) (4 -> 0) (5 -> 0) 我开始创建这样的东西: service.getRates() .flatMap {it-> Observable.from(it.rates) } .filter { !it.value.isNullOrEmpty() } .groupBy {it -> it.value} .collect({ HashMap()}, { b, t -> b.put(t.key, t.count???)} 但我卡在这里,我不知道所有的价值? 我不知道如何添加空值(0),如果没有5个4.有没有办法使用rx做到这一点?

我无法从rxkotlin的groupby得到地图

你能帮我这个rxkotlin groupby不工作吗? 在rxjava而不是kotlin工作。 高度赞赏你的帮助