RxJava BehaviorSubject不发射最后一项?
我有一个简单的RxJava,使用ReplaySubject,我可以得到结果,所有3个数字打印。
val observable : Observable<Int> = Observable.just(1, 2, 3) val subject = ReplaySubject.create<Int>() observable.subscribe(subject) subject.subscribe{ result -> System.out.println("Start $result in Subscription Result") }
当我改变行为,我期望第三个数字,即3打印,因为我一直认为行为是重播最后发射的项目。
val observable : Observable<Int> = Observable.just(1, 2, 3) val subject = BehaviorSubject.create<Int>() observable.subscribe(subject) subject.subscribe{ result -> System.out.println("Start $result in Subscription Result") }
但它不打印任何东西。 为什么?
我错过了这里重要的事情吗 如果是的话,让我知道如何得到假设发出的最后一个项目(即3)打印。
它不打印任何东西,因为subsription已经终止。 如果订阅仍然有效,则会打印3,例如:
val o1: Observable<Int> = Observable.just(1, 2, 3) val o2: Observable<Int> = Observable.just(4).delay(100,TimeUnit.MILLISECONDS) val observable: Observable<Int> = Observable.concat(o1, o2) val subject = BehaviorSubject.create<Int>() observable.subscribe(subject) subject.subscribe{ result -> System.out.println("Start $result in Subscription Result") } Thread.sleep(1000)
3和4(延迟后)将被打印,其中3作为最近的事件发射之前订阅和4发射后的订阅。
另外,正如在注释部分用@akarnokd所解释的,ReplaySubject.createWithSize ReplaySubject.createWithSize(1)
可以用来始终重放最后一个项目,即使在可观察完成之后,如果需要单个项目而不管流完成状态如何,那么observable.takeLast(1).subscribe(subject)
可以用来保证:
val observable : Observable<Int> = Observable.just(1, 2, 3) val subject = ReplaySubject.createWithSize<Int>(1) observable.takeLast(1).subscribe(subject) //can be moved after subject.subscribe as well subject.subscribe{ result -> System.out.println("Start $result in Subscription Result") }