使用RxJava加入来自Firebase实时数据库的扁平化数据

加入扁平的数据是文档中描述的常见用例。 但是文档显示了一个不是实时的简单例子,它不会对变化做出反应。 我正在寻找更强大的实施。 我认为RxJava是理想的。

考虑以下Firebase结构:

{ "messages": { "group_id_1": { "message_id_1": { "text": "Hello", "author": "uid_1" } } }, "users": { "uid_1": { "name": "David" } }, "rooms": { "room_id_1": { "name": "General", "members": { "uid_1": true } } } } 

我在这里看到两个用例:

  1. 获取具有作者姓名的组中的消息列表
    • 我想我会得到Observable<Message> ,当我订阅它时,依赖关系(这些消息的用户)也将在一些缓存中订阅。 当我显示消息时,我可以从缓存中获取作者的名字。
    • 这也是实时的 – 如果作者名称改变,可观察者发出改变的消息。
    • 当我取消订阅observable时,也会依赖取消订阅。
  2. 获取他们名字的房间成员名单
    • 我想我会得到Observable<User> ,当我订阅它,它将首先订阅房间的成员,然后个人用户。
    • 这是实时的 – 如果房间成员改变,我会得到通知。
    • 当我取消订阅observable时,也会依赖取消订阅。

你知道图书馆/解决方案可以做到这一点吗?

或者,如果我创建一个,你会使用它吗?

我最终用这两种方法解决了这个问题(在Kotlin中,Java是相似的,只不过更详细):

 fun <A, B> Observable<List<A>>.mapSubQueries(subQuery: (A) -> Observable<B>): Observable<List<Observable<B>>> { return this.flatMap { if (it.isNotEmpty()) { return@flatMap Observable.from(it).map { subQuery(it) }.toList() } else { return@flatMap Observable.just(listOf<Observable<B>>()) } } } @Suppress("UNCHECKED_CAST") fun <T> Observable<List<Observable<T>>>.joinSubQueries(): Observable<List<T>> { return this.flatMap { if (it.isNotEmpty()) { return@flatMap Observable.combineLatest(it, { val list = mutableListOf<T>() it.forEach { list.add(it as T) } list }) } else { return@flatMap Observable.just(listOf<T>()) } } } 

为了让用户看到所有消息,我可以像这样使用它:

 fun usersInMessages(roomId): Observable<List<User>> { return DatabaseRead.messages(roomId) .mapSubQueries { DatabaseRead.user(it.getAuthor()) } .joinSubQueries() } 

我决定最好把这段代码保存在我的代码库中,并在各种使用情况下稍作修改。 使它成为一个图书馆将使它不那么灵活。 主要观点总是使用Observable.combineLatest() 。 许多其他的Rx参数是无用的,因为他们需要onComplete()调用,在这里我处理无限的Observables。

我将要提出这个问题的一个变种,但似乎可能会更好地建立在这一个之上…我会描述什么是希望至少部分是上述问题的答案,但也是一个缺点我看到。

使用上面的数据模型,我们可能会有如下的东西,以创建RxJava包装firebase查询获取特定房间的成员键的列表,并获取特定成员的详细信息(注意在subscriber.getMemberInfo使用onCompleted() …更多后来!)。

 public Observable<String> getRoomMembers(String roomId) { return Observable.create(subscriber -> { databaseReference.child(roomId + "/members").addValueEventListener(new ValueEventListener() { @Override public void onDataChange(DataSnapshot dataSnapshot) { for (DataSnapshot childSnapshot: dataSnapshot.getChildren()) { String userId = childSnapshot.getKey() subscriber.onNext(userId); } subscriber.onCompleted(); } @Override public void onCancelled(DatabaseError databaseError) { } }); }); } public Observable<Member> getMemberInfo(String memberId) { return Observable.create(subscriber -> { databaseReference.child(memberId).addValueEventListener(new ValueEventListener() { @Override public void onDataChange(DataSnapshot dataSnapshot) { Member member = dataSnapshot.getValue(Member.class); subscriber.onNext(member); subscriber.onCompleted(); } @Override public void onCancelled(DatabaseError databaseError) { } }); }); } 

然后,我们可以使用像下面这样的东西获得特定房间的Member名单(已添加isActive属性给Member以显示我们如何筛选我们得到的结果)。

  getRoomMembers(roomId) .flatMap(memberId -> getMemberInfo(memberId)) .filter(Member::isActive) .toList() .subscribe(members -> { }); 

所以,上面的工作到一定程度。 问题是,我不得不调用getMemberInfo中的subscriber.onCompleted()上面的调用flatMap工作….这意味着Member数据的任何后续更改不会触发上述订阅更新。 RxJavaFirebase相对较新,所以可能会漏掉一些明显的东西。