相当于RxJava 2 isUnsubscribed

我一直在通过RxJava的“ Reactive Programming ”一书中的示例进行研究,该示例针对版本1而不是2.对无限流的介绍有以下示例(并注意有更好的方法来处理并发):

Observable<BigInteger> naturalNumbers = Observable.create(subscriber -> { Runnabler = () -> { BigInteger i = ZERO; while (!subscriber.isUnsubscribed()) { subscriber.onNext(i); i = i.add(ONE); } }; new Thread(r).start(); }); ... Subscription subscription = naturalNumbers.subscribe(x -> log(x)); /* after some time... */ subscription.unsubscribe(); 

但是,在RxJava 2中,传递给create()方法的lambda表达式的类型为ObservableEmitter ,并且没有isUnsubscribed()方法。 我在2.0中看到了什么是不同的,并且执行了对存储库的搜索,但是找不到任何这样的方法。

在2.0中如何实现这个相同的功能?

编辑包括下面给出的解决方案(使用kotlin的nb):

 val naturalNumbers = Observable.create<BigInteger> { emitter -> Thread({ var int: BigInteger = BigInteger.ZERO while (!emitter.isDisposed) { emitter.onNext(int) int = int.add(BigInteger.ONE) } }).start() } val first = naturalNumbers.subscribe { log("First: $it") } val second = naturalNumbers.subscribe { log("Second: $it") } Thread.sleep(5) first.dispose() Thread.sleep(5) second.dispose() 

在您订阅Observable后,将返回Disposable 。 你可以把它保存到你的本地变量,并检查disposable.isDisposed()来查看它是否仍然订阅。