如何在RxJava2中用重试运算符记住状态

我有一个网络客户端,可以从中断恢复,但需要最后一条消息时,这样做是在重试。

Kotlin示例:

fun requestOrResume(last: Message? = null): Flowable = Flowable.create({ emitter -> val connection = if (last != null) client.start() else client.resumeFrom(last.id) while (!emitter.isDisposed) { val msg = connection.nextMessage() emitter.onNext(msg) } }, BackpressureStrategy.MISSING) requestOrResume() .retryWhen { it.flatMap { Flowable.timer(5, SECONDS) } } // how to pass the resume data when there is a retry? 

问题 :正如你所看到的,我需要最后收到的消息,以准备简历电话。 我如何跟踪它,以便在重试时可以提出恢复请求?

一个可能的解决方案可能是创建持有者类,只持有对最后一条消息的引用,并在收到新消息时进行更新。 这样当重试时,可以从持有者那里获得最后的消息。 例:

 class MsgHolder(var last: Message? = null) fun request(): Flowable { val holder = MsgHolder() return Flowable.create({ emitter -> val connection = if (holder.last != null) client.start() else client.resumeFrom(holder.last.id) while (!emitter.isDisposed) { val msg = connection.nextMessage() holder.last = msg // <-- update holder reference emitter.onNext(msg) } }, BackpressureStrategy.MISSING) } 

我认为这可能工作,但感觉像一个黑客(线程同步问题?)。

有没有更好的方法来跟踪状态,以便重试?

请注意,除非您在最后一个元素(不是function上与现有的“破解”解决方案function上不同,而是方式更为复杂)之间重新引入一个包装,否则没有任何error handling操作员可以在没有外部帮助的情况下恢复最后一个元素,因为他们只能访问到Throwable流。 相反,看下面的递归方法是否适合你的需求:

 fun retryWithLast(seed: Flowable): Flowable { val last$ = seed.last().cache(); return seed.onErrorResumeNext { it.flatMap { retryWithLast(last$.flatMap { requestOrResume(it) }) } }; } retryWithLast(requestOrResume()); 

最大的区别是缓存来自缓存中的最后一次尝试的尾随值,而不是在值中手动执行。 还要注意,error handling程序中的递归意味着如果后续的尝试继续失败, retryWithLast将继续扩展流。

仔细看看buffer()运算符: 链接你可以像这样使用它:

 requestOrResume() .buffer(2) 

从现在起,您的Flowable将返回两个最新对象的List