相当于RxJava 2 isUnsubscribed
我一直在通过RxJava的“ Reactive Programming ”一书中的示例进行研究,该示例针对版本1而不是2.对无限流的介绍有以下示例(并注意有更好的方法来处理并发):
Observable<BigInteger> naturalNumbers = Observable.create(subscriber -> { Runnabler = () -> { BigInteger i = ZERO; while (!subscriber.isUnsubscribed()) { subscriber.onNext(i); i = i.add(ONE); } }; new Thread(r).start(); }); ... Subscription subscription = naturalNumbers.subscribe(x -> log(x)); /* after some time... */ subscription.unsubscribe();
但是,在RxJava 2中,传递给create()
方法的lambda表达式的类型为ObservableEmitter
,并且没有isUnsubscribed()
方法。 我在2.0中看到了什么是不同的,并且执行了对存储库的搜索,但是找不到任何这样的方法。
在2.0中如何实现这个相同的功能?
编辑包括下面给出的解决方案(使用kotlin的nb):
val naturalNumbers = Observable.create<BigInteger> { emitter -> Thread({ var int: BigInteger = BigInteger.ZERO while (!emitter.isDisposed) { emitter.onNext(int) int = int.add(BigInteger.ONE) } }).start() } val first = naturalNumbers.subscribe { log("First: $it") } val second = naturalNumbers.subscribe { log("Second: $it") } Thread.sleep(5) first.dispose() Thread.sleep(5) second.dispose()
在您订阅Observable后,将返回Disposable
。 你可以把它保存到你的本地变量,并检查disposable.isDisposed()
来查看它是否仍然订阅。
- 当我退出应用程序时,为什么在Android应用程序中使用Kotlin / rxJava编写android.os.TransactionTooLargeException? (的OnExit /的onPause)?
- 在RxKotlin / RxJava中用BehaviorSubject自动创建热可观察对象
- 订阅多个视图到一个视图onclick和可见性改变RxAndroid
- 如何做一个组,并使用RxJava和Kotlin收集?
- RxKotlin(RxJava2)timeout()不会抛出TimeoutException
- Kotlin和RxJavatypes推断失败
- 暂停/恢复RX中的定时器/延时
- 使用RxJava获得与领域和改造的独特结果
- Kotlin高阶函数和单方法接口的行为?