RxJava:在另一个异步生产中取消订阅异步观察

什么是最好的(干净)的方式来取消订阅一个异步Observable这是在另一个异步之一的flatMap产生? 说,我有一个演示文稿,显示用户的聊天列表。 每个聊天项目应该呈现自己的最新消息。 这里是一个原始的代码来说明:

 fun AsyncInsideAsync() { /** * A chat that 'transmits' latest message */ class Chat(val name: Int) { val lastMessage: PublishSubject<String> = PublishSubject.create() fun postMessage(message: Int) { lastMessage.onNext("Chat: $name, Message: $message") } } // List of chats for user. val chats: PublishSubject<Chat> = PublishSubject.create() /////////////////////////////////// // ORIGINAL SEQUENCE // /////////////////////////////////// val sequence = chats.flatMap { it.lastMessage } val subscriber = TestSubscriber<String>() sequence.subscribe(subscriber) // User has single chat in a chat-list val chat1 = Chat(1) chats.onNext(chat1) // Someone posts a message in Chat 1 chat1.postMessage(1) subscriber.assertValues( "Chat: 1, Message: 1" ) // Someone posts another message chat1.postMessage(2) subscriber.assertValues( "Chat: 1, Message: 1", "Chat: 1, Message: 2" ) // Chat 1 disappears FROM USER CHAT LIST, Chat 2 is created val chat2 = Chat(2) chats.onNext(chat2) // Someone posts a message to Chat 2 chat2.postMessage(1) subscriber.assertValues( "Chat: 1, Message: 1", "Chat: 1, Message: 2", "Chat: 2, Message: 1" ) // Someone out there posts a message to Chat 1 that is not visible to user anymore chat1.postMessage(3) // The answer looks like this // "Chat: 1, Message: 1", // "Chat: 1, Message: 2", // "Chat: 2, Message: 1", // "Chat: 1, Message: 3" // Chat 1 is still subscribed and test fails subscriber.assertValues( "Chat: 1, Message: 1", "Chat: 1, Message: 2", "Chat: 2, Message: 1", ) } 

我想到的是使用一个主题(或共享的观察者)来制止一连串的内部订阅。 但看起来很奇怪:

  /////////////////////////////////// // MODIFIED SEQUENCE // /////////////////////////////////// val unsubscribe: PublishSubject<Boolean> = PublishSubject.create() val sequence = chats .doOnNext({ unsubscribe.onNext(true) }) .doAfterTerminate({ unsubscribe.onNext(true) }) .flatMap { it.lastMessage.takeUntil(unsubscribe) } 

这种方法的工作,但看起来可怕。 非常感谢!

假设评论

 // Chat 1 disappears FROM USER CHAT LIST, Chat 2 is created 

表示在Chat中发送新Chat时,您希望当前的Chat (本例中为chat1 )停止发送,您可以使用switchMap操作符完成此操作。

 val sequence = chats.switchMap { it.lastMessage } 

从ReactiveX文档:

RxJava也实现了switchMap操作符。 它的行为与flatMap非常相似,不同的是每当源Observable发出一个新的项目时,它将取消订阅并停止对先前发出的项目生成的Observable进行镜像,只开始镜像当前的项目。