Tag: reactivex

通过订阅/取消订阅从侦听器发出项目的正确方法

在MVP架构之后,视图是一个Android活动,每个人都有自己的主持人。 作为数据存储库,我有一个图层,每次在后端发生变化时都会通知我新的数据。 存储库可以由不同的演示者使用,演示者必须在视图被销毁时销毁。 如何实现一个响应式流(每个存储库一个),演示者可以订阅和取消订阅以获取数据更新? 其他要求是流应该能够结合并应用它们的反应操作符。 我通过Firebase Firestore后端尝试了以下内容,但发现内存泄漏(活动和演示者不会被破坏)。 注意:当后端发生变化时,addSnapshotListener()回调会通知新数据。 正如您所看到的,observable是公开的(默认情况下是Kotlin),供演示者使用,用于订阅和取消订阅。 class DocumentRepository( path: List, private val model: Class) { private var documentReference: DocumentReference val observable: Observable private var emitter: ObservableEmitter? = null private lateinit var item: T init { documentReference = FirebaseFirestore.getInstance().collection(path[0]).document(path[1]) for (i in 2..path.lastIndex step 2) documentReference = documentReference.collection(path[i]).document(path[i + 1]) observable = Observable.create(this::listenChanges) […]

Rx:即使调用了onError,如何得到最后一个元素?

我正在使用RxJava,我需要做两件事情: 获取Observable发出的最后一个元素 确定是否调用了onError ,而onCompleted 我看过使用last和lastOrDefault (这实际上是我需要的行为),但我一直没有能够解决onError隐藏最后一个元素。 我可以使用Observable两次,一次获得last值,一次获得完成状态,但到目前为止,我只能通过创建我自己的Observer来完成这个任务: public class CacheLastObserver implements Observer { private final AtomicReference lastMessageReceived = new AtomicReference(); private final AtomicReference error = new AtomicReference(); @Override public void onCompleted() { // Do nothing } @Override public void onError(Throwable e) { error.set(e); } @Override public void onNext(T message) { lastMessageReceived.set(message); } public Optional getLastMessageReceived() […]

将无限流的无限流转化为无限流 – 反应X.

如何在反应x(理想的RxJava或RxJs例子)可以实现这一点? a |-a——————-a———–a———–a—- s1 |-xxxxxx -| (subscribe) s2 |-xxxxx-| (subscribe) s2 |-xxxxx-| (subscribe) … sn S |-xxxxxxx——-xxxxxxx————-xxxxxx- (subsribe) a是触发事件的有限流sn的无限事件流,每个事件应该是无限流S一部分,同时能够订阅每个sn流(为了进行求和操作),但同时保持流S为无穷。 编辑:更具体地说,我提供了我在Kotlin寻找的实现。 每10秒发射一个事件映射到共享有限的4个事件流。 元流是flatMap -ed到正常的无限流。 我使用doAfterNext额外订阅每个有限的流并打印结果。 /** Creates a finite stream with events * $ch-1 – $ch-4 */ fun createFinite(ch: Char): Observable<String> = Observable.interval(1, TimeUnit.SECONDS) .take(4) .map({ "$ch-$it" }).share() fun main(args: Array<String>) { var ch = 'A' […]