rxjava2 – 在线程池上执行任务的简单示例,在单个线程上订阅

我正在试验下面的任务,让我的头绕过RxJava:

  • 给定一个URL列表
  • 对线程池上的每个URL执行HTTP请求
  • 为每个结果插入一些数据到SQLite数据库(这里没有多线程)
  • 阻止方法直到完成

所以我在Kotlin试了一下:

val ex = Executors.newFixedThreadPool(10) Observable.fromIterable((1..100).toList()) .observeOn(Schedulers.from(ex)) .map { Thread.currentThread().name } .subscribe { println(it + " " + Thread.currentThread().name } 

我期望它打印

 pool-1-thread-1 main pool-1-thread-2 main pool-1-thread-3 main pool-1-thread-4 main .... 

但是它打印:

 pool-1-thread-1 pool-1-thread-1 pool-1-thread-1 pool-1-thread-1 pool-1-thread-1 pool-1-thread-1 

任何人都可以纠正我对这是如何工作的误解? 为什么不使用线程池的所有线程? 我如何让我的订阅者在主线程上运行或阻塞直到完成?

Rx并不是一个平行的执行服务,为此使用Java的流api。 Rx事件是同步的,随后将流过流。 observeOn在建立流时会请求一个线程并在该线程上逐个处理排放。

您也希望subscribe在主线程上执行。 observeOn切换线程,并在该线程上发生所有下游事件。 如果你想切换到主线程,你必须在subscribe之前插入另一个observeOn

要使地图块内的代码并行工作,您应该用自己的调度程序将其包装为可观察的:

 val ex = Executors.newFixedThreadPool(10) val scheduler = Schedulers.from(ex) Observable.fromIterable((1..100).toList()) .flatMap { Observable .fromCallable { Thread.currentThread().name } .subscribeOn(scheduler) } .subscribe { println(it + " " + Thread.currentThread().name) } 

在这种情况下,你会看到结果:

 pool-1-thread-1 pool-1-thread-1 pool-1-thread-2 pool-1-thread-1 pool-1-thread-3 pool-1-thread-1 pool-1-thread-4 pool-1-thread-1 ... 

您可以查看文章RxJava – 实现并行化 ,解释这种行为。

另外,RxJava 2.0.5引入了ParallelFlowable API