Tag: rxjs

在RxKotlin / RxJava中用BehaviorSubject自动创建热可观察对象

目前我正在使用RxKotlin在Kotlin上建立一个项目。 我与Rx的背景主要是基于RxJS。 我经常用于在Typescript中创建hot observables模式的模式看起来是这样的: private dataStore: IFoo; private dataStoreSubject: BehaviorSubject = new BehaviorSubject(this.dataStore); public dataStoreObservable: Observable = Observable.from(this.dataStoreSubject); public getNetworkData(): Observable { return this.http.get() .map((response: IResponse) => { this.dataStore = response; this.dataStoreSubject.next(this.dataStore); return this.dataStore; }); } 这将允许我暴露一个Observable ,而不暴露Subject和后续的subject.next(); 方法。 我的问题是:在RxKotlin或RxJava中建立类似逻辑最常用的方法是什么?

将无限流的无限流转化为无限流 – 反应X.

如何在反应x(理想的RxJava或RxJs例子)可以实现这一点? a |-a——————-a———–a———–a—- s1 |-xxxxxx -| (subscribe) s2 |-xxxxx-| (subscribe) s2 |-xxxxx-| (subscribe) … sn S |-xxxxxxx——-xxxxxxx————-xxxxxx- (subsribe) a是触发事件的有限流sn的无限事件流,每个事件应该是无限流S一部分,同时能够订阅每个sn流(为了进行求和操作),但同时保持流S为无穷。 编辑:更具体地说,我提供了我在Kotlin寻找的实现。 每10秒发射一个事件映射到共享有限的4个事件流。 元流是flatMap -ed到正常的无限流。 我使用doAfterNext额外订阅每个有限的流并打印结果。 /** Creates a finite stream with events * $ch-1 – $ch-4 */ fun createFinite(ch: Char): Observable<String> = Observable.interval(1, TimeUnit.SECONDS) .take(4) .map({ "$ch-$it" }).share() fun main(args: Array<String>) { var ch = 'A' […]