RxJava-将Observable转变为Iterator,Stream或Sequence
我知道这打破了很多Rx规则,但我真的很喜欢RxJava-JDBC ,所以我的队友也是这样。 关系数据库对于我们的工作非常重要,Rx也是如此。
但是有些情况下,我们不希望作为Observable<ResultSet>
发出,而只是有一个基于拉的Java 8 Stream<ResultSet>
或Kotlin Sequence<ResultSet>
。 但是我们非常习惯于只返回一个Observable<ResultSet>
的RxJava-JDBC库。
因此,我想知道是否有一种方法可以使用扩展函数将Observable<ResultSet>
转换为Sequence<ResultSet>
,而不执行任何中间集合或toBlocking()
调用。 下面是我迄今所有的,但我的头正在旋转,试图连接推拉系统,我不能缓冲,因为每个onNext()
调用ResultSet
是有状态的。 这是不可能的任务吗?
import rx.Observable import rx.Subscriber import java.sql.ResultSet fun Observable<ResultSet>.asSequence() = object: Iterator<ResultSet>, Subscriber<ResultSet>() { private var isComplete = false override fun onCompleted() { isComplete = true } override fun onError(e: Throwable?) { throw UnsupportedOperationException() } override fun onNext(rs: ResultSet?) { throw UnsupportedOperationException() } override fun hasNext(): Boolean { throw UnsupportedOperationException() } override fun next(): ResultSet { throw UnsupportedOperationException() } }.asSequence()
我不知道这是实现你想要的最简单的方法,但你可以试试这个代码。 它通过创建阻塞队列并将Observable
所有事件发布到此队列来将Observable
转换为Iterator
。 Iterable
从队列中取出事件,如果没有则取消。 然后根据收到的当前事件修改自己的状态。
class ObservableIterator<T>( observable: Observable<T>, scheduler: Scheduler ) : Iterator<T>, Closeable { private val queue = LinkedBlockingQueue<Notification<T>>() private var cached: Notification<T>? = null private var completed: Boolean = false private val subscription = observable .materialize() .subscribeOn(scheduler) .subscribe({ queue.put(it) }) override fun hasNext(): Boolean { cacheNext() return !completed } override fun next(): T { cacheNext() val notification = cached ?: throw NoSuchElementException() check(notification.isOnNext) cached = null return notification.value } private fun cacheNext() { if (completed) { return } if (cached == null) { queue.take().let { notification -> if (notification.isOnError) { completed = true throw RuntimeException(notification.throwable) } else if (notification.isOnCompleted) { completed = true } else { cached = notification } } } } override fun close() { subscription.unsubscribe() completed = true cached = null } }
您可以使用以下帮助程序功能:
fun <T> Observable<T>.asSequence() = Sequence { toBlocking().getIterator() }
当为迭代器调用返回的序列时,observable将被订阅。
如果一个observable在它所订阅的同一个线程上发射元素(比如Observable.just
),它将会在迭代器得到返回机会之前填充缓冲区。 在这种情况下,您可能需要通过调用subscribeOn
来直接订阅不同的线程:
observable.subscribeOn(scheduler).asSequence()
但是,虽然toBlocking().getIterator()
不会缓存所有结果,但如果它们没有被迭代器及时地使用,它可能会缓冲其中的一部分。 如果ResultSet
在下一个ResultSet
到达时以某种方式过期,那么这可能是一个问题。