RxJava-将Observable转变为Iterator,Stream或Sequence

我知道这打破了很多Rx规则,但我真的很喜欢RxJava-JDBC ,所以我的队友也是这样。 关系数据库对于我们的工作非常重要,Rx也是如此。

但是有些情况下,我们不希望作为Observable<ResultSet>发出,而只是有一个基于拉的Java 8 Stream<ResultSet>或Kotlin Sequence<ResultSet> 。 但是我们非常习惯于只返回一个Observable<ResultSet>的RxJava-JDBC库。

因此,我想知道是否有一种方法可以使用扩展函数将Observable<ResultSet>转换为Sequence<ResultSet> ,而不执行任何中间集合或toBlocking()调用。 下面是我迄今所有的,但我的头正在旋转,试图连接推拉系统,我不能缓冲,因为每个onNext()调用ResultSet是有状态的。 这是不可能的任务吗?

 import rx.Observable import rx.Subscriber import java.sql.ResultSet fun Observable<ResultSet>.asSequence() = object: Iterator<ResultSet>, Subscriber<ResultSet>() { private var isComplete = false override fun onCompleted() { isComplete = true } override fun onError(e: Throwable?) { throw UnsupportedOperationException() } override fun onNext(rs: ResultSet?) { throw UnsupportedOperationException() } override fun hasNext(): Boolean { throw UnsupportedOperationException() } override fun next(): ResultSet { throw UnsupportedOperationException() } }.asSequence() 

我不知道这是实现你想要的最简单的方法,但你可以试试这个代码。 它通过创建阻塞队列并将Observable所有事件发布到此队列来将Observable转换为IteratorIterable从队列中取出事件,如果没有则取消。 然后根据收到的当前事件修改自己的状态。

 class ObservableIterator<T>( observable: Observable<T>, scheduler: Scheduler ) : Iterator<T>, Closeable { private val queue = LinkedBlockingQueue<Notification<T>>() private var cached: Notification<T>? = null private var completed: Boolean = false private val subscription = observable .materialize() .subscribeOn(scheduler) .subscribe({ queue.put(it) }) override fun hasNext(): Boolean { cacheNext() return !completed } override fun next(): T { cacheNext() val notification = cached ?: throw NoSuchElementException() check(notification.isOnNext) cached = null return notification.value } private fun cacheNext() { if (completed) { return } if (cached == null) { queue.take().let { notification -> if (notification.isOnError) { completed = true throw RuntimeException(notification.throwable) } else if (notification.isOnCompleted) { completed = true } else { cached = notification } } } } override fun close() { subscription.unsubscribe() completed = true cached = null } } 

您可以使用以下帮助程序功能:

 fun <T> Observable<T>.asSequence() = Sequence { toBlocking().getIterator() } 

当为迭代器调用返回的序列时,observable将被订阅。

如果一个observable在它所订阅的同一个线程上发射元素(比如Observable.just ),它将会在迭代器得到返回机会之前填充缓冲区。 在这种情况下,您可能需要通过调用subscribeOn来直接订阅不同的线程:

 observable.subscribeOn(scheduler).asSequence() 

但是,虽然toBlocking().getIterator()不会缓存所有结果,但如果它们没有被迭代器及时地使用,它可能会缓冲其中的一部分。 如果ResultSet在下一个ResultSet到达时以某种方式过期,那么这可能是一个问题。