Tag: RX java

订阅多个视图到一个视图onclick和可见性改变RxAndroid

我新使用RxJava,并且仍然熟悉很多概念。 试图在一个项目中使用它,我想要做的是订阅一个视图的点击事件来改变另一个视图的可见性。 这是我的审判工作。 RxView.clicks(info_overlay).map { _ -> View.GONE }.subscribe { AppCache().hasSeenInfoScreen = true info_overlay_child_take_a_helfie.visibility = it info_overlay_child_subscription.visibility = it info_overlay_child_description.visibility = it info_overlay_child_header.visibility = it } 不过,我也想订阅info_overlay可见性。 这样info_overlay的可见性info_overlay影响其他视图。 我如何实现这一点和点击观察在一起。

在哪里绘制反应式编程线

在我的项目中,我一直在使用RxJava大约一年。 随着时间的推移,我开始非常喜欢它 – 现在我想也许太多了… 我写的大多数方法现在都有某种forms的Rx,这太棒了! (直到不是)。 我现在注意到有些方法需要大量的工作来组合不同的可观察生产方法。 我感觉到虽然我明白我写的东西,但下一位程序员很难理解我的代码。 在底线之前,让我直接从我在Kotlin的代码给出一个例子(不要太深入): private fun getCachedEntities( getManyFunc: () -> Observable<Timestamped<List>>, getFromNetwork: () -> Observable<ListResult>, getFunc: (String) -> Observable<Timestamped>, insertFunc: (T) -> Unit, updateFunc: (T) -> Unit, deleteFunc: (String) -> Unit) = concat( getManyFunc().filter { isNew(it.timestampMillis) } .map { ListResult(it.value, “”) }, getFromNetwork().doOnNext { syncWithStorage(it.entities, getFunc, insertFunc, updateFunc, deleteFunc) }).first() […]

合并多个单打组成一个Observable

我正在编写一个Android应用程序,需要按以下顺序执行2个查询: 向一个返回Single<List> urls的库发出一个请求(让我们称之为RequestA)。 根据我从RequestA收到的内容,我必须使用每个这些URL向另一个库发出请求(RequestB)。 每个RequestB现在都返回一个Single。 现在我已经把所有的RequestB中的所有Single结合起来形成了一个observable。 就像Observable.mergedelayerror(List) 。 我不能这样做,因为mergedelayerror期望ObservableSource iterable 。 我知道我可以通过实现回调和使用一些丑陋的逻辑来实现这一目标但是我真的只需要使用由RX提供的运算符

RxAndroid / RxLifeCycle – 处理处理onError而不是onComplete

我正在尝试将RxLifeCycle实现到与RxJava联网中。 我一直在使用Consumer一个子Consumer ,但是对于RxLifeCycle ,你需要处理onError 。 所以我已经转移到Observer 。 这个问题是,当调用被处置时,它调用onComplete而不是onError ,我更喜欢。 buildle.gradle: // RxJava compile ‘io.reactivex.rxjava2:rxandroid:2.0.1’ compile ‘io.reactivex.rxjava2:rxjava:2.0.3’ compile ‘com.trello.rxlifecycle2:rxlifecycle-kotlin:2.2.1’ compile ‘com.trello.rxlifecycle2:rxlifecycle-android-lifecycle-kotlin:2.2.1’ 我以前的NetworkConsumer是这样构建的,我会处理accept所有结果。 NetworkConsumer: abstract class NetworkConsumer : Consumer<NetworkResponse> { @Throws(Exception::class) override fun accept(response: NetworkResponse) { … } // to override open fun onSuccess(response: T) {} open fun onComplete() {} } 我的网络电话都使用Single结构化。 fun getFavorites(): Single<NetworkResponse<Array>> 我正在使用它。 service.getFavorites(…) […]

使用flatMap和filter过滤observables是否正确?

使用一个人为的例子来说明我的问题,我有一个复合对象types的Observable: Observable public class CategoryPayload { public List categories; // other meta data and getters } public class Category { public Integer id; // other meta data and getters } 我需要根据id过滤掉某些类别,所以我最终做了如下的事情: Observable categoryObservable = service.getCategoryPayload(); // use flatMap to transform the Observable into multiple mSubscription.add( categoryObservable.flatMap(new Func1<CategoryPayload, Observable>(){ public Observable call(CategoryPayload categoryPayload){ return Observable.from(categoryPayload.categories); } […]

运行多个测试(Kotlin)时,只有第一个测试通过TestScheduler,

我试图在Kotlin中使用RxJava在MVP体系结构中测试我的演示者。 我已经创建了一个测试规则,用TestScheduler替换通常的调度程序来测试异步请求: class TestSchedulerRule : TestRule { val testScheduler = TestScheduler() override fun apply(base: Statement, d: Description): Statement { return object : Statement() { @Throws(Throwable::class) override fun evaluate() { RxJavaPlugins.setInitIoSchedulerHandler { testScheduler } RxJavaPlugins.setInitComputationSchedulerHandler { testScheduler } RxJavaPlugins.setInitNewThreadSchedulerHandler { testScheduler } RxJavaPlugins.setInitSingleSchedulerHandler { testScheduler } RxAndroidPlugins.setInitMainThreadSchedulerHandler { testScheduler } try { base.evaluate() } finally { […]

以递归方式将Rx选项组合成Obserbles

比方说,我有一个名为s_0的Single ,它可以从Ttypes发出一个元素t_0 ,也可以失败(在某些语言中,这可能是Single )。 那是: s_0: — t_0 // Success OR s_0: — X // Failure Ttypes的实例有一个next()方法,它返回Ttypes的一个可选的Single ( Kotlin中的Single? )。 这种行为导致一个能够发射一个T实例链的Single实例链,其中每个单独的s_i可以发出一个能够返回下一个单独的s_i+1的元素t_i+1 ,这将发出一个元素t_i+1等等,直到最后一个元素t_n-1没有返回单个或任何单数失败: s_0: — t_0 ↓ s_1: — t_1 ↓ s_2: — t_2 … ↓ s_n-1: — t_n-1 ↓ null OR s_0: — t_0 ↓ s_1: — t_1 ↓ s_2: — t_2 … ↓ s_i: […]

在展平之前在RxJava中设置一个variables

Observable 从调用getSomething()返回。 Foo有一个列表,基本上是我想要的数据。 我需要过滤清单,所以我把它弄平了。 不过,我需要从Foo的另一个数据在subscribe.onNext()中设置一个variables。 我想要设置的variables是kotlin类的一个默认setter的成员。 我确实试图从地图返回@地图,但这似乎阻止了排放,使得filter无法使用。 我确实在其他线程中查看了Pair和Observable嵌套,但还没有发现它有帮助。 我也看了flatMapIterable的第二个重载的版本与一个Func2,但没有得到它吐出限制。 在这种情况下,我如何正确应用上面的任何一个,或者是否有另一个更简单的解决方案? service.getSomething() .subscribeOn(Schedulers.io()) —> here I need to set limit = Foo.data.limit .flatMapIterable { t -> t.data.results } .filter({ s -> filterLogic(s) }) .toList() .observeOn(AndroidSchedulers.mainThread()) .subscribe( { c -> //this is the list, not Foo

通过订阅/取消订阅从侦听器发出项目的正确方法

在MVP架构之后,视图是一个Android活动,每个人都有自己的主持人。 作为数据存储库,我有一个图层,每次在后端发生变化时都会通知我新的数据。 存储库可以由不同的演示者使用,演示者必须在视图被销毁时销毁。 如何实现一个响应式流(每个存储库一个),演示者可以订阅和取消订阅以获取数据更新? 其他要求是流应该能够结合并应用它们的反应操作符。 我通过Firebase Firestore后端尝试了以下内容,但发现内存泄漏(活动和演示者不会被破坏)。 注意:当后端发生变化时,addSnapshotListener()回调会通知新数据。 正如您所看到的,observable是公开的(默认情况下是Kotlin),供演示者使用,用于订阅和取消订阅。 class DocumentRepository( path: List, private val model: Class) { private var documentReference: DocumentReference val observable: Observable private var emitter: ObservableEmitter? = null private lateinit var item: T init { documentReference = FirebaseFirestore.getInstance().collection(path[0]).document(path[1]) for (i in 2..path.lastIndex step 2) documentReference = documentReference.collection(path[i]).document(path[i + 1]) observable = Observable.create(this::listenChanges) […]

RxJava推迟了可观察到的火灾

我有一个使用RxJava observables设置的事件序列。 基本上,我使用observable.delay(time, timeUnit, scheduler)函数合并了由Observable.just(Events.*)创建的不同延迟创建的不同事件。 然后将它们发布到PublishSubject (下面的代码中的events )并订阅该PublishSubject以观察下面的代码中的序列( observeEvents()函数)。 它曾经工作得很好,但最近我在我的设备上看到了一个非常奇怪的行为(OnePlus One与android 5.0.2)(并没有在模拟器上看到它)。 基本上事件会混淆起来,延迟较高的事件可能会在延迟较小的事件之前发生,延迟较小的事件可能会发生在队列的末尾,有时候所有的事件都会以正确的顺序出现。 前三个赛事经常混合在一起。 有时候根本没有观察到一些事件。 这里会发生什么? 代码在Kotlin: var computationScheduler = Schedulers.computation() private val events: PublishSubject = PublishSubject.create() private val userActionSubject: PublishSubject = PublishSubject.create() Observable.merge( event0(), event1(), event2(), userActionOrEvent3(), userActionOrEvent4()) .subscribe({ // Weird timings are observed here already events.onNext(it) }, { e -> events.onError(e) })) private […]