rxjava2 – 在线程池上执行任务的简单示例,在单个线程上订阅
我正在试验下面的任务,让我的头绕过RxJava:
- 给定一个URL列表
- 对线程池上的每个URL执行HTTP请求
- 为每个结果插入一些数据到SQLite数据库(这里没有多线程)
- 阻止方法直到完成
所以我在Kotlin试了一下:
val ex = Executors.newFixedThreadPool(10) Observable.fromIterable((1..100).toList()) .observeOn(Schedulers.from(ex)) .map { Thread.currentThread().name } .subscribe { println(it + " " + Thread.currentThread().name }
我期望它打印
pool-1-thread-1 main pool-1-thread-2 main pool-1-thread-3 main pool-1-thread-4 main ....
但是它打印:
pool-1-thread-1 pool-1-thread-1 pool-1-thread-1 pool-1-thread-1 pool-1-thread-1 pool-1-thread-1
任何人都可以纠正我对这是如何工作的误解? 为什么不使用线程池的所有线程? 我如何让我的订阅者在主线程上运行或阻塞直到完成?
Rx并不是一个平行的执行服务,为此使用Java的流api。 Rx事件是同步的,随后将流过流。 observeOn在建立流时会请求一个线程并在该线程上逐个处理排放。
您也希望subscribe
在主线程上执行。 observeOn
切换线程,并在该线程上发生所有下游事件。 如果你想切换到主线程,你必须在subscribe
之前插入另一个observeOn
。
要使地图块内的代码并行工作,您应该用自己的调度程序将其包装为可观察的:
val ex = Executors.newFixedThreadPool(10) val scheduler = Schedulers.from(ex) Observable.fromIterable((1..100).toList()) .flatMap { Observable .fromCallable { Thread.currentThread().name } .subscribeOn(scheduler) } .subscribe { println(it + " " + Thread.currentThread().name) }
在这种情况下,你会看到结果:
pool-1-thread-1 pool-1-thread-1 pool-1-thread-2 pool-1-thread-1 pool-1-thread-3 pool-1-thread-1 pool-1-thread-4 pool-1-thread-1 ...
您可以查看文章RxJava – 实现并行化 ,解释这种行为。
另外,RxJava 2.0.5引入了ParallelFlowable API