Tag: rx java2

Android RxJava / Kotlin – 将大量数据映射到网络呼叫

我试图实现一个Android应用程序,需要从后端服务获取大量的数据,并将其保存到数据库,以便以后工作。 下面的代码描述了这个过程: itemsService .getAllItemIds() //This returns Single<List<Int>> from backend .subscribeOn(Schedulers.io()) .subscribe({ Observable.fromIterable(it) .map({ itemsService .getItemById(it) //This gets one item details from backend .subscribeOn(Schedulers.io()) .observeOn(AndroidSchedulers.mainThread()) .subscribe({ //Add item details to db }, { //Some error }) }) }, { //Some error }) 我获得一个ID列表,然后映射每个这些ID到一个网络调用来获得完整的对象。 这适用于10个项目的测试集,但是生产集包含超过5万个ID。 它起初工作,节省的项目,但约5-10%的研磨停下来,应用程序死亡。 我假设这里的原因是Rx保持源和映射值之间的引用。 我的问题是:是否有办法将源排放“汇集”到一起,比方说,10? 或者也许还有一些我不知道的其他机制?

RxJava 1.x .zip()不适用于RxJava 2.0

我正在尝试使用RxJava + Retrofit + Kotlin来学习链接请求。 我正在使用的教程是使用RxJava 1.x,所以当我尝试用RxJava 2.x重新实现时,我无法使用.zip()来工作。 这是一个简单的应用程序使用星球大战API,返回电影列表,并为每个电影和,从他们的角色。 fun loadMoviesFull(): Observable<Movie> { return service.listMovies() .flatMap { filmResults -> Observable.from(filmResults.results) } .flatMap { film -> Observable.zip( Observable.just(Movie(film.title, film.episodeId, ArrayList<Character>())), Observable.from(film.personUrls) .flatMap { personUrl -> service.loadPerson(Uri.parse(personUrl).lastPathSegment) } .map { person -> Character(person!!.name, person.gender) } .toList(), { movie, characters -> movie.characters.addAll(characters) movie }) } } 如果你想看看教程的整个实施,这是链接(评论是葡萄牙语): http […]

如何在RxJava 2和Kotlin中将null传递给可空类型的Observable

我像这样初始化我的变量: val user: BehaviorSubject<User?> user = BehaviorSubject.create() 但我不能这样做。 IDE会引发错误: – user.onNext(null) 这样做,IDE说你永远不会是空的: – user.filter( u -> u!=null)

Kotlin:如何将使用Thread.sleep的测试转换为RxJava TestScheduler

我正在写一个工具测试,它检查是否当我缓存东西到一个接收缓冲区,并经过一段时间(10秒)这个主题插入缓冲值到我的房间数据库。 当我使用Thread.sleep(syncTimeInterval)时,测试是正确的。 我想用TestScheduler编写这个相同的测试。 这里是Thread.sleep版本(通过测试): @Test fun testMultipleLogs() { val loadAllCloudCallBefore = appDatabase.logCloudCallDao().loadAll() val loadAllLogNewSessionBefore = appDatabase.logNewSessionDao().loadAll() assertEquals(0, loadAllCloudCallBefore.size) assertEquals(0, loadAllLogNewSessionBefore.size) Observable.interval(1, TimeUnit.SECONDS) .take(20) .subscribe { logManager.logNewSession() } Observable.interval(1, TimeUnit.SECONDS) .take(20) .subscribe { logManager.logCloudCall("url", "callgoup") } Observable.interval(1, TimeUnit.SECONDS) .take(20) .subscribe { logManager.logNewSession() } Observable.interval(1, TimeUnit.SECONDS) .take(20) .subscribe { logManager.logCloudCall("url", "callgoup") } Observable.interval(1, TimeUnit.SECONDS) .take(20) .subscribe { […]

Rxjava2流动不开火onComplete

这是代码 private fun setUpDistrictSpinner() { commonRepo.getAllDistricts() .observeOn(AndroidSchedulers.mainThread()) .flatMap { list -> Flowable.fromIterable(list) } .map { district -> districtNameList.add(district.district_name) district }.subscribe(object:Subscriber<District>{ override fun onComplete() { labSelectionInterface.loadDistricts(districtNameList) Timber.d("district List loaded total " + districtList.size) } override fun onError(t: Throwable?) { t!!.printStackTrace() } override fun onNext(t: District) { districtList.add(t) } override fun onSubscribe(s: Subscription) { s.request(Long.MAX_VALUE) } }) […]

Kotlin和RxJava2 zip运算符 – 以下函数都不能用提供的参数调用

我在使用Android Studio 3.0 IDE编写Kotlin中的.zip Observable时遇到了问题。 这是我的代码: internal var updateStringEventsSubject = PublishSubject.create<String>() internal var updateIntEventsSubject = PublishSubject.create<Int>() internal var triggerSave = PublishSubject.create<Boolean>() internal var triggerStopAndSave = PublishSubject.create<Boolean>() internal var normalSaveTrigger = triggerSave.debounce(30, TimeUnit.SECONDS) internal var trigger = Observable.merge(normalSaveTrigger, triggerStopAndSave) private fun saveEvents( strings: List<String>, integers: List<Int>, someBoolean: Boolean): Boolean { return true //doesn't matter for now […]

在Kotlin Lambda中调用一个RxJava单身人士

我正在尝试修改新的房间库与RxJava配对。 我已经找到了一种方法来使用Single来插入后台线程中的项目,就像这样,在一个activity中: Single.fromCallable { AppDatabase.getInMemoryDatabase(this).taskDao().insertAll(task) } .subscribeOn(Schedulers.newThread()) .subscribe() 现在,我有一个RecyclerView任务,有一个复选框,您可以用来标记项目是否完成。 我想要做的是每次检查/取消选中时更新项目。 我将粘贴整个ViewHolder来完成,但是请特别注意bindTask()的lambda: inner class TaskViewHolder(view: View?) : RecyclerView.ViewHolder(view) { val descriptionTextView = view?.findViewById(R.id.task_description) as? TextView val completedCheckBox = view?.findViewById(R.id.task_completed) as? CheckBox fun bindTask(task: Task) { descriptionTextView?.text = task.description completedCheckBox?.isChecked = task.completed completedCheckBox?.setOnCheckedChangeListener { _, isChecked -> tasks[adapterPosition].completed = isChecked Single.fromCallable { itemView.context.taskDao().update(tasks[adapterPosition]) } .subscribeOn(Schedulers.newThread()) .subscribe() } […]

在kotlin android中使用flux结构实现Rx Java的问题

我正在kotlin中实现flux架构,并试图使用RX java中我做了一个调度器,用于调度事件和听事件 Dispatcher.kt class Dispatcher @Inject constructor() { private val mBus:Subject<Any> init{ mBus = PublishSubject.create() } fun <A : IEvent> dispatchUserEvent(@NonNull e:A) { mBus.onNext(e) } fun <E : IEvent> observeUserEvents(clazz:Class<E>):Observable<E> { return mBus.ofType(clazz) } } IEvent是一个通用接口的接口,我正在创建一个扩展IEvent的用户事件 IPnrUserEvent.kt interface IPnrUserEvent :IEvent { fun getPnrNo():String } PnrUserEvent.kt class PnrUserEvent(var pnr:String) : IPnrUserEvent { override fun getPnrNo(): String […]

RxJava 2需要与RxJava 1(Kotlin)不同的Observable返回类型

我正在更新我的项目从RxJava 1到RxJava 2.我有这个扩展方法(我使用Kotlin): fun <T : DatabaseModel> Observable<DataSnapshot?>.toObjectObservable(type: Class<T>): Observable<T> { return this.map { if (it == null) { return@map null } val data = it.getValue(type) data?.setId(it.key) data } } 当我使用RxJava 1时,一切都很好。 但是,当我用RxJava 2(io.reactivex.Observable)中的新Observable替换了observable时,发生了错误。 现在IDE说它需要下面的返回类型: ((Datasnashot) -> T)! 而不是提供 (Datasnapshot) -> T? 你有什么想法可能会导致这种不同的行为? 我应该如何改变代码来修复它?

使用Kotlin的RxJava – 如何同步2个异步方法,从Java重构

我有2个集合,哪个缓冲区位置更新事件: private List<LocationGeoEvent> mUpdateGeoEvents = new ArrayList<>(); private List<LocationRSSIEvent> mUpdateRSSIEvents = new ArrayList<>(); 在我的代码中也有: private final ScheduledExecutorService mSaveDataExecutor = Executors.newSingleThreadScheduledExecutor(); private boolean mSaveDataScheduled; private final Object mEventsMonitor = new Object(); private ScheduledFuture<?> mScheduledStopLocationUpdatesFuture; private final ScheduledExecutorService mStopLocationUpdatesExecutor = Executors.newSingleThreadScheduledExecutor(); 我添加这样的事件: public void appendGeoEvent(LocationGeoEvent event) { synchronized (mEventsMonitor) { mUpdateGeoEvents.add(event); scheduleSaveEvents(); } } RSSI事件也是如此 现在,scheduleSaveEvents方法如下所示: private […]