Tag: RX java

从普通的Java事件创建Observable

从古典Java事件模式创建Rx-Java Observable的最佳方式是什么? 那就是给定的 class FooEvent { … } interface FooListener { void fooHappened(FooEvent arg); } class Bar { public void addFooListener(FooListener l); public void removeFooListener(FooListener l); } 我想实现 Observable<FooEvent> fooEvents(Bar bar); 我想到的实现是: Observable<FooEvent> fooEvents(Bar bar) { return Observable.create(new OnSubscribeFunc<FooEvent>() { public Subscription onSubscribe(Observer<? super FooEvent> obs) { FooListener l = new FooListener() { public void […]

如何获得可观察到的最新价值

假设,我有一个计时器,它在一秒钟间隔后发出一个项目。 我想订阅它并执行它10秒钟。 10秒后,我取消订阅,然后我想能够从代码的其他部分访问其最后emmited值。 这里是示例代码: @Test fun testMeasuretime(){ val emitter = Observable.interval(1, TimeUnit.SECONDS) .doOnNext{t: Long? -> Log.v("emitter", t.toString())} val disposable = emitter.subscribe() Thread.sleep(10000) disposable.dispose() Thread.sleep(5000) //get the last emited value Thread.sleep(5000) } 有没有办法从代码的其他部分获取最后的emited值? 我想用这个解决方案来衡量一些任务的时间执行。

使用Retrofit2 + RxJava + Jackson重试202状态码

我有一个API,根据不同的场景返回200,202,4xx。 当我得到一个202,我应该做相同的API,直到我得到一个200或4xx。 我尝试使用doOnErrorNext,onError,onNext。 我无法解决这个问题 Observable<MyPOJO> makeAPI(); Observable<MyPOJO> makeAPIImpl(){ makeAPI().doOnErrorNext(/*how to I access the error code here*/); makeAPI().doOnNext(/*given that 202 is a success code, I expected it to come here, but it goes to Error because of JSON Mapping*/); } doOnErrorNext – >我能够再次进行API调用,但它会发生所有我不想要的错误情况 我已经检查了多个关于这个问题的答案,但是没有人专门解决这个问题,并且无法在我的用例中加入其他答案。

如何在Android中获取ObservableField的值

您好我有我的java代码中的这个ObservableField 。 我想通过调用get方法来获得它的价值。 val email = ObservableField<String>() 这可以使用下面的方法来完成。 我很困惑,不知道我应该在这里做一个getter来获得它的价值吗? 或者有不同的标准方法来获取ObservableField的值我也在我的应用程序中使用RxJava。 fun login(view: View) { val emailVal = email.get() }

Android RxJava – GroupBy和合并回来?

我如何按照以下标准排列对象的列表: 1)日期> =今天的所有对象 2)日期<今天的所有对象 例: 今天= 7/2月 List = [{5/2月,8/2月,6/2月,4/2月,9/2月]] 预期成果= [{8/2月9日/ 2月6日/ 2月5日/ 2月4日/ 2月]} 我吃了什么: dummyProvider .getAllObjects() .map(mapper::mapToDateObject) //Convert to new object -> DateObject .groupBy(dateObject -> dateObject.getDateTime().isAfterNow()) //What now? 在这个时候,我确实按照日期(在'now'之后,'now'之前)将它们Single<List<DateObject>> ,现在我需要将它合并回来并返回一个Single<List<DateObject>> 。

RxKotlin(RxJava2)timeout()不会抛出TimeoutException

我试图得到一个使用两个不同超时值的示例工作。 第一个排放量的初始值较大,随后的所有排放量则为较短的值。 这个例子是从RxJava v1x的Java转换为Kotlin,虽然我试图这是v2x(不知道这是否有任何区别)。 问题是第一个事件的超时不会引发TimeoutException 。 设置的值低于500毫秒,我期待一个堆栈跟踪打印,但我得到的输出,如果没有超时已经发生(后续排放超时设置为40毫秒导致堆栈跟踪如预期)。 以下示例阻止成功初始超时有什么问题? fun nextSolarEclipse(after: LocalDate): Observable<LocalDate> { return Observable .just( LocalDate.of(2016, Month.MARCH, 9), LocalDate.of(2016, Month.SEPTEMBER, 1), LocalDate.of(2017, Month.FEBRUARY, 26), LocalDate.of(2017, Month.AUGUST, 21), LocalDate.of(2018, Month.FEBRUARY, 15), LocalDate.of(2018, Month.JULY, 13), LocalDate.of(2018, Month.AUGUST, 11), LocalDate.of(2019, Month.JANUARY, 6), LocalDate.of(2019, Month.JULY, 2), LocalDate.of(2019, Month.DECEMBER, 26) ) .skipWhile { date -> !date.isAfter(after) } .zipWith( Observable.interval(500, […]

将AutoCompleteTextView中的Observable更改为EditText

我正在尝试使用Kotlin和RxJava编辑一些预先存在的代码,以便为用户提供更好的UI布局。 初始ui使用AutoCompleteTextEdit将用户的Google Places结果显示为用户类型。 我试图改变它,所以用户键入,结果显示在输入字段下的ListView而不是附加到输入字段。 我的问题是,我只能得到数据显示在列表视图中,如果我离开当前的代码,以便它也显示在AutoCompleteTextView由于代码是为observable和自定义RxFilterableArrayAdapter编写的方式。 有了这个代码,搜索工作,但显示在下拉菜单,以及列表视图。 (见下文) private fun setupView(view:View) { val tripPlanningResult = view.findViewById(R.id.trip_planning_api_results) as ListView val starting_location = view.findViewById(R.id.trip_planning_starting_location) as AutoCompleteTextView val destination_address = view.findViewById(R.id.trip_planning_destination_address) as AutoCompleteTextView val plan_route_button = view.findViewById(R.id.trip_planning_plan_route_button) val tripPlanningService = TripPlanningService.sharedTripPlanningService(context) val cityBounds0 = CityCache.getInstance(context, "cities", true) .getCityRegion(cityId!!) //Creates an Observable to watch for text input from the AutoCompleteTextView […]

将无限流的无限流转化为无限流 – 反应X.

如何在反应x(理想的RxJava或RxJs例子)可以实现这一点? a |-a——————-a———–a———–a—- s1 |-xxxxxx -| (subscribe) s2 |-xxxxx-| (subscribe) s2 |-xxxxx-| (subscribe) … sn S |-xxxxxxx——-xxxxxxx————-xxxxxx- (subsribe) a是触发事件的有限流sn的无限事件流,每个事件应该是无限流S一部分,同时能够订阅每个sn流(为了进行求和操作),但同时保持流S为无穷。 编辑:更具体地说,我提供了我在Kotlin寻找的实现。 每10秒发射一个事件映射到共享有限的4个事件流。 元流是flatMap -ed到正常的无限流。 我使用doAfterNext额外订阅每个有限的流并打印结果。 /** Creates a finite stream with events * $ch-1 – $ch-4 */ fun createFinite(ch: Char): Observable<String> = Observable.interval(1, TimeUnit.SECONDS) .take(4) .map({ "$ch-$it" }).share() fun main(args: Array<String>) { var ch = 'A' […]

RxJava Completabe然后测试

我有以下RxJava2 Kotlin代码: val tester = Completable.complete() .andThen(SingleSource<Int> { Single.just(42) }) .test() tester.assertComplete() tester.assertValue(42) 这模拟了一个Completable可观察(想象一个简单的更新操作到一个API),然后单个可观察(图像获取操作的API)。 我想连接两个observables的方式,当Completable完成时,Single运行,最后我在我的观察者(Int 42)上得到onSuccess事件。 但是这个测试代码不起作用。 断言失败,出现以下错误: java.lang.AssertionError: Not completed (latch = 1, values = 0, errors = 0, completions = 0)) 我无法理解我在做什么错误,我期望Completable在订阅时发出onComplete,然后Single订阅,而我的观察者( tester )获得值为42的onSuccess事件,但似乎订阅保持“暂停“不发射任何东西。 这个想法与本博客文章中的想法类似: https : //android.jlelse.eu/making-your-rxjava-intentions-clearer-with-single-and-completable-f064d98d53a8 apiClient.updateMyData(myUpdatedData) // a Completable .andThen(performOtherOperation()) // a Single<OtherResult> .subscribe(otherResult -> { // handle otherResult }, […]

Kotlin高阶函数和单方法接口的行为?

我之前在使用RxJava和Kotlin时遇到了一些问题。 我做了一些有趣的发现,我仍然感到困惑。 RxJava中有简单的Func1接口 public interface Func1<T, R> extends Function { R call(T t); } 我试图添加一个扩展方法到一个Observable ,也是一个RxJava类。 这将收集排放到谷歌Guava ImmutableListMulitmap使用Func1来映射每个项目的关键。 fun <K,T> Observable<T>.toImmutableListMultimap(keyMapper: Func1<T, K>): Observable<ImmutableListMultimap<K,T>> { return this.collect({ ImmutableListMultimap.builder<K,T>()},{ b, t -> b.put(keyMapper.call(t), t)}).map { it.build() } } 当我试图调用这个扩展方法时,我无法编译它,它根本不理解lambda表达式。 ScheduledItem.all.flatMap { it.rebuildSoftTransactions } .toImmutableListMultimap { it.id /*compile error */ } .cache() 然而,当我修改扩展方法使用函数类型时,发生了最奇怪的事情。 fun <K,T> Observable<T>.toImmutableListMultimap(keyMapper: (T) […]