从正确的线程调用RxJava2可取消/一次性

我正在实现一个可观察的Resource从一个Resource发出线。

问题是,这个资源真的不喜欢被关闭从一个不同的线程,它创建(它杀死一只小狗,抛出异常,当这种情况发生)。

当我处理订阅时,从main线程调用资源Cancellable / Disposable ,而在Schedulers.io()上订阅observable。

这是Kotlin代码:

 fun lines(): Observable<String> = Observable.create { emitter -> val resource = NetworkResource() emitter.setCancellable { resource.close() // <-- main thread :( } try { while (!emitter.isDisposed) emitter.onNext(resource.readLine()) // <-- blocked here! } catch (ioe: IOException) { emitter.tryOnError(ioe) // <-- this also triggers the cancellable } } val disposable = lines() .subscribeOn(Schedulers.io()) .observeOn(AndroidSchedulers.mainThread()) .subscribe { Log.i(TAG, "Line: $it" } disposable.dispose() // <-- main thread :) 

问题 :是否可以从正确的*线程调用Cancellable ,考虑到订阅线程在resource.readLine()上被阻塞?

*正确的线程意味着来自subscribeOn(Schedures.io())线程。

编辑 :恐怕这个问题没有一个正确的答案,除非resource.close()是线程安全的或某种投票resource.dataReady执行,以便线程不被阻止。

Schedulers.io()管理一个线程池,所以它可能会也可能不会使用同一个线程来处理你的资源。 您将不得不使用自定义调度程序和unsubscribeOn()运算符来确保您的Observable在同一个线程上被订阅和取消订阅。 就像是:

 Scheduler customScheduler = Schedulers.from(Executors.newSingleThreadExecutor()); val disposable = lines() .unsubscribeOn(customScheduler) .subscribeOn(customScheduler) .observeOn(AndroidSchedulers.mainThread()) .subscribe { Log.i(TAG, "Line: $it" } 

如果你不介意延迟对NetworkResource#close的调用NetworkResource#close一点,为什么不呢

  fun lines(): Observable<String> = Observable.create { emitter -> val resource = NetworkResource() try { while (!emitter.isDisposed) { emitter.onNext(resource.readLine()) } resource.close() } catch (ioe: IOException) { emitter.tryOnError(ioe) } } 

但是这仍然有一个问题:如果发生IOException没有人会调用NetworkResource#close (在你的例子中,我想也是这样)。

试图解决这个问题:

  fun lines(): Observable<String> = Observable.create { emitter -> val resource = NetworkResource() try { while (!emitter.isDisposed) { emitter.onNext(resource.readLine()) } } catch (ioe: IOException) { emitter.tryOnError(ioe) } finally { resource.close() // try-catch here, too? } } 

或者使用“Kotlin-Try-With-Resources”功能use

  fun lines(): Observable<String> = Observable.create { emitter -> NetworkResource().use { resource -> try { while (!emitter.isDisposed) { emitter.onNext(resource.readLine()) } } catch (ioe: IOException) { emitter.tryOnError(ioe) } } } 

我希望这有帮助。 希望你过个愉快的周末。

那么采取一条替代路径呢?

a)通过使NetworkResource成为线程安全的 (如果你在控制源代码的话)

要么

b)通过包装NetworkResource与“代理” ? 对于“代理”,我的意思是一个内部使用专用线程的代理,它执行与NetworkResource (构造,readLine,close,…)的所有交互。