从正确的线程调用RxJava2可取消/一次性
我正在实现一个可观察的Resource
从一个Resource
发出线。
问题是,这个资源真的不喜欢被关闭从一个不同的线程,它创建(它杀死一只小狗,抛出异常,当这种情况发生)。
当我处理订阅时,从main
线程调用资源Cancellable
/ Disposable
,而在Schedulers.io()
上订阅observable。
这是Kotlin代码:
fun lines(): Observable<String> = Observable.create { emitter -> val resource = NetworkResource() emitter.setCancellable { resource.close() // <-- main thread :( } try { while (!emitter.isDisposed) emitter.onNext(resource.readLine()) // <-- blocked here! } catch (ioe: IOException) { emitter.tryOnError(ioe) // <-- this also triggers the cancellable } } val disposable = lines() .subscribeOn(Schedulers.io()) .observeOn(AndroidSchedulers.mainThread()) .subscribe { Log.i(TAG, "Line: $it" } disposable.dispose() // <-- main thread :)
问题 :是否可以从正确的*线程调用Cancellable
,考虑到订阅线程在resource.readLine()
上被阻塞?
*正确的线程意味着来自subscribeOn(Schedures.io())
线程。
编辑 :恐怕这个问题没有一个正确的答案,除非resource.close()
是线程安全的或某种投票resource.dataReady
执行,以便线程不被阻止。
Schedulers.io()
管理一个线程池,所以它可能会也可能不会使用同一个线程来处理你的资源。 您将不得不使用自定义调度程序和unsubscribeOn()
运算符来确保您的Observable
在同一个线程上被订阅和取消订阅。 就像是:
Scheduler customScheduler = Schedulers.from(Executors.newSingleThreadExecutor()); val disposable = lines() .unsubscribeOn(customScheduler) .subscribeOn(customScheduler) .observeOn(AndroidSchedulers.mainThread()) .subscribe { Log.i(TAG, "Line: $it" }
如果你不介意延迟对NetworkResource#close
的调用NetworkResource#close
一点,为什么不呢
fun lines(): Observable<String> = Observable.create { emitter -> val resource = NetworkResource() try { while (!emitter.isDisposed) { emitter.onNext(resource.readLine()) } resource.close() } catch (ioe: IOException) { emitter.tryOnError(ioe) } }
但是这仍然有一个问题:如果发生IOException
没有人会调用NetworkResource#close
(在你的例子中,我想也是这样)。
试图解决这个问题:
fun lines(): Observable<String> = Observable.create { emitter -> val resource = NetworkResource() try { while (!emitter.isDisposed) { emitter.onNext(resource.readLine()) } } catch (ioe: IOException) { emitter.tryOnError(ioe) } finally { resource.close() // try-catch here, too? } }
或者使用“Kotlin-Try-With-Resources”功能use
fun lines(): Observable<String> = Observable.create { emitter -> NetworkResource().use { resource -> try { while (!emitter.isDisposed) { emitter.onNext(resource.readLine()) } } catch (ioe: IOException) { emitter.tryOnError(ioe) } } }
我希望这有帮助。 希望你过个愉快的周末。
那么采取一条替代路径呢?
a)通过使NetworkResource
成为线程安全的 (如果你在控制源代码的话)
要么
b)通过包装NetworkResource
与“代理” ? 对于“代理”,我的意思是一个内部使用专用线程的代理,它执行与NetworkResource
(构造,readLine,close,…)的所有交互。
- 视图不会显示在ViewPager中
- 为什么在从Kotlin调用方法时,在logcat中看不到任何有用的东西?
- 随播广告对象 – Android是否希望将其注册为活动(?)
- 在Android的Kotlin的帮助下开始一个项目
- 在Android Studio 3.0布局编辑器中渲染错误
- 即时应用基本功能的Application.onCreate()永远不会被调用
- 如何在Anko中调试“无法解析com.google.android:android”错误
- 代码@TypeConverterAnnotation是什么意思?
- RecyclerView – 在.findViewById上的NullPointerException