Tag: rx java2

如何通过键加入两个RxJava2 Obvervables?

我有两个不同类型的未分类observables。 这两种类型共享一个共同的密钥。 我想加入他们到一个新的可观察到的对应的元素发射对,我不知道该怎么做。 请注意,某些键可能会丢失。 如果不是完整的对子被丢弃,那就没问题,但是如果没有丢失的子块,它会更好。 输入1: Entity(id = 2), Entity(id = 1), Entity(id = 4) 输入2: Dto(id = 3), Dto(id = 2), Dto(id = 1) 预期输出(以任何顺序): Pair(Entity(id = 1), Dto(id = 1)), Pair(Entity(id = 2), Dto(id = 2)), Pair(null, Dto(id = 3)), Pair(Entity(id = 4), null)

RxJava在Android中进行故障测试Presenter

嗨,我想测试一个Android主持人使用mockito,我有很多麻烦。 这是我想测试的代码: public class SignUpPresenter extends BasePresenter { private static final String TAG = SignUpPresenter.class.getSimpleName(); private final AuthRepository mAuthRepository; private final SignUpView mView; public SignUpPresenter(@NonNull SignUpView view, @NonNull AuthRepository authRepository) { this.mAuthRepository = authRepository; this.mView = view; } public void signUp(@NonNull String username, @NonNull String password, @NonNull String email) { mCompositeDisposable.add(mAuthRepository.signUp(username, password, email) .subscribeOn(Schedulers.io()) .observeOn(AndroidSchedulers.mainThread()) […]

Kotlin使用RxJava2来观察观察者

Android Studio 3.0 Beta2 我创建了两个方法,一个创建观察者,另一个创建订阅者。 不过,我有一个问题试图让订阅者订阅观察。 在Java中这将工作,我正在努力让它在Kotlin工作。 在我的onCreate(..)方法,我试图设置这个。 这是正确的方法来做到这一点? class MainActivity : AppCompatActivity() { override fun onCreate(savedInstanceState: Bundle?) { super.onCreate(savedInstanceState) setContentView(R.layout.activity_main) /* CANNOT SET SUBSCRIBER TO SUBCRIBE TO THE OBSERVABLE */ createStringObservable().subscribe(createStringSubscriber()) } fun createStringObservable(): Observable<String> { val myObservable: Observable<String> = Observable.create { subscriber -> subscriber.onNext("Hello, World!") subscriber.onComplete() } return myObservable } fun createStringSubscriber(): Subscriber<String> […]

Rx concatWith()只返回第一个Flowable结果

我已经公布了他们正在单独工作的所有方法,但是我遇到了第一个问题,在那里我concatWith()两个流动 return userFavouriteStores() .concatWith(userOtherStores()) .doOnNext(new Consumer<List<StoreModel>>() { @Override public void accept(@io.reactivex.annotations.NonNull List<StoreModel> storeModels) throws Exception { Log.i("storeModels", "" + storeModels); } }) public Flowable<List<StoreModel>> userFavouriteStores() { return userStores() .map(UserStores::favoriteStores) .flatMap(storeId -> storeDao.storesWithIds(storeId)) .map(stores -> { // TODO Konvert to Kotlin map {} List<StoreModel> result = new ArrayList<>(stores.size()); for (se.ica.handla.repositories.local.Store store : stores) { result.add(store.toStoreModel(StoreModel.Source.Favourite)); } […]

如何获得可观察到的最新价值

假设,我有一个计时器,它在一秒钟间隔后发出一个项目。 我想订阅它并执行它10秒钟。 10秒后,我取消订阅,然后我想能够从代码的其他部分访问其最后emmited值。 这里是示例代码: @Test fun testMeasuretime(){ val emitter = Observable.interval(1, TimeUnit.SECONDS) .doOnNext{t: Long? -> Log.v("emitter", t.toString())} val disposable = emitter.subscribe() Thread.sleep(10000) disposable.dispose() Thread.sleep(5000) //get the last emited value Thread.sleep(5000) } 有没有办法从代码的其他部分获取最后的emited值? 我想用这个解决方案来衡量一些任务的时间执行。

将RxJava 1.0代码更新到RxJava 2.0

我正在更新一些旧的Android应用程序代码,并不知道如何将在RxJava / RxAndroid 1.0工作的代码转换为RxJava / RxAndroid 2.0。 我试图转换的代码负责获取信号强度指标,并每秒用它的值更新图表。 我设法在RxJava 2.0中破解的功能根本不起作用,几秒钟后,SIGABORT崩溃。 我无法弄清楚我错过了什么? 我的RxJava 1代码: rx.Observer myObserver = new rx.Observer<Float>() { @Override public void onCompleted() { } @Override public void onError(Throwable e) { } @Override public void onNext(Float aFloat) { addEntry(aFloat); } }; subscription = rx.Observable.interval(1, TimeUnit.SECONDS) .flatMap(new Func1<Long, rx.Observable<Float>>() { @Override public rx.Observable<Float> call(Long aLong) { […]

如何将Observable对象映射为Flowable <?>?

private Observable< SimpleResource > resource; return resource.map(new Function<SimpleResource, Flowable<Data>>() { @Override public Flowable< Data > apply(SimpleResource resource) throws Exception { return resource.data().toFlowable(); } }); Single<Data> data(); 我需要Flowable,但是我的结果是Observable>

在条件符合的情况下更改可观察 – RxJava2

使用RxJava2 RxKotlin和Room ,我需要查询数据库中的一个开放的狩猎。 这意味着我搜索一个包含一个名为closed的值为false的属性。 一旦找到猎物,就需要将查询切换到特定的猎物。 对于这些查询我有2个方法: getOpenHunt(teamId:String): Flowable<List<Hunt>> getHunt(huntId:String): Flowable<List<Hunt>> 他们都返回一个List因为否则查询卡住没有找到狩猎时。 我的想法是类似的 fun queryHunt(teamId:String):Flowable<Optional<Hunt>>{ getOpenHunt(teamId) .map<Optional<Hunt>> { Optional.create(it.firstOrNull()) } .switchToFlowableIf ( it is Optional.Some, getHunt(it.element().id) } //With switchToFlowableIf's being fun <E:Any> switchToFlowableIf(condition: (E)->Boolean, newFlowable: Flowable<E>): Flowable<E> //It should unsubscribe from getOpenHunt and subscribe to newFlowable 作为参考,这里是我的Optional类 sealed class Optional<out T> { class Some<out T>(val element: […]

Android RxJava – GroupBy和合并回来?

我如何按照以下标准排列对象的列表: 1)日期> =今天的所有对象 2)日期<今天的所有对象 例: 今天= 7/2月 List = [{5/2月,8/2月,6/2月,4/2月,9/2月]] 预期成果= [{8/2月9日/ 2月6日/ 2月5日/ 2月4日/ 2月]} 我吃了什么: dummyProvider .getAllObjects() .map(mapper::mapToDateObject) //Convert to new object -> DateObject .groupBy(dateObject -> dateObject.getDateTime().isAfterNow()) //What now? 在这个时候,我确实按照日期(在'now'之后,'now'之前)将它们Single<List<DateObject>> ,现在我需要将它合并回来并返回一个Single<List<DateObject>> 。

RxKotlin(RxJava2)timeout()不会抛出TimeoutException

我试图得到一个使用两个不同超时值的示例工作。 第一个排放量的初始值较大,随后的所有排放量则为较短的值。 这个例子是从RxJava v1x的Java转换为Kotlin,虽然我试图这是v2x(不知道这是否有任何区别)。 问题是第一个事件的超时不会引发TimeoutException 。 设置的值低于500毫秒,我期待一个堆栈跟踪打印,但我得到的输出,如果没有超时已经发生(后续排放超时设置为40毫秒导致堆栈跟踪如预期)。 以下示例阻止成功初始超时有什么问题? fun nextSolarEclipse(after: LocalDate): Observable<LocalDate> { return Observable .just( LocalDate.of(2016, Month.MARCH, 9), LocalDate.of(2016, Month.SEPTEMBER, 1), LocalDate.of(2017, Month.FEBRUARY, 26), LocalDate.of(2017, Month.AUGUST, 21), LocalDate.of(2018, Month.FEBRUARY, 15), LocalDate.of(2018, Month.JULY, 13), LocalDate.of(2018, Month.AUGUST, 11), LocalDate.of(2019, Month.JANUARY, 6), LocalDate.of(2019, Month.JULY, 2), LocalDate.of(2019, Month.DECEMBER, 26) ) .skipWhile { date -> !date.isAfter(after) } .zipWith( Observable.interval(500, […]