用适当的处理扩展Kotlin中的RxJava Observable

我正在尝试为RxJava创建一个类似于scan的新操作符,但可以采用两种不同的类型。 这与foldreduce的传统实现类似,如果给定了默认值,那么只要返回类型与累加器匹配,折叠函数的参数就可以是不同的类型。

这是我到目前为止。

 fun <T, R> Observable<T>.fold(acc: R, f: (R, T) -> R): Observable<R> = Observable.create { o -> var accm = acc subscribe({ accm = f(accm, it) o.onNext(accm) }, { o.onError(it) }, { o.onComplete() }) } 

我只是在一个Observable.create块内订阅this ,并通过累加器函数更新观察者的新值。 问题是订阅在处理时不会被清理。 我该怎么办?