RxJava Observable to Completable,如何避免toBlocking()

我目前正在使用Kotlin Android上的RxJava,但我有一个问题,我不能解决不使用toBlocking()。

我有雇员服务的方法返回一个Observable>:

fun all(): Observable<List<Employee>> 

这是完全正确的,因为每当员工发生变化时,Observable就会发布新的员工列表。 但我想从员工生成一个PDF文件,显然不需要每次员工变动时都运行。 另外,我想从我的PDF生成器方法返回一个Completable对象。 我想为我的PDF添加一个标题,然后遍历员工并计算每位员工的工资,这也会返回一个Observable,这是我现在正在使用toBlocking的地方。 我目前的做法是这样的:

 private fun generatePdf(outputStream: OutputStream): Completable { return employeeService.all().map { employees -> try { addHeaderToPDF() for (i in employees) { val calculated = employeeService.calculateWage(i.id).toBlocking().first() // Print calculated to PDF.... } addFooterToPDF() return @map Completable.complete() } catch (e: Exception) { return @map Completable.error(e) } }.first().toCompletable() 

有什么办法可以使这个代码更清洁使用RxJava?

提前致谢!

免责声明:这个答案是一个正在进行的工作。


基本前提:如果你在流中blocking ,你做错了。

注意:没有状态必须离开可观测的lambda。

步骤1:流式传输整个数据集

输入是员工流。 对于每个员工,你需要获得一份工资。 让我们成为一个流。

 /** * @param employeesObservable * Stream of employees we're interested in. * @param wageProvider * Transformation function which takes an employee and returns a [Single] of their wage. * @return * Observable stream spitting individual [Pair]s of employees and their wages. */ fun getEmployeesAndWagesObservable( employeesObservable: Observable<Employee>, wageProvider: Function<Employee, Single<Int>> ): Observable<Pair<Employee, Int>>? { val employeesAndWagesObservable: Observable<Pair<Employee, Int>> // Each Employee from the original stream will be converted // to a Single<Pair<Employee, Int>> via flatMapSingle operator. // Remember, we need a stream and Single is a stream. employeesAndWagesObservable = employeesObservable.flatMapSingle { employee -> // We need to get a source of wage value for current employee. // That source emits a single Int or errors. val wageForEmployeeSingle: Single<Int> = wageProvider.apply(employee) // Once the wage from said source is loaded... val employeeAndWageSingle: Single<Pair<Employee, Int> = wageForEmployeeSingle.map { wage -> // ... construct a Pair<Employee, Int> employee to wage } // This code is not executed now. It will be executed for each Employee // after the original Observable<Employee> starts spitting out items. // After subscribing to the resulting observable. return@flatMapSingle employeeAndWageSingle } return employeesAndWagesObservable } 

订阅时会发生什么情况:

  1. 从来源采取一名员工。
  2. 取雇员的工资。
  3. 吐出一对雇员和他们的工资。

这重复,直到employeesObservable信号onComplete或某些失败onError

使用操作符:

  • flatMapSingle :将实际值转换为某个转换值的新Single流。
  • map :将实际值转换为其他实际值(无嵌套流)。

嘿,你怎么把它连接到你的代码:

 fun doStuff() { val employeesObservable = employeeService.all() val wageProvider = Function<Employee, Single<Int>> { employee -> // Don't listen to changes. Take first wage and use that. employeeService.calculateWage(employee.id).firstOrError() } val employeesAndWagesObservable = getEmployeesAndWagesObservable(employeesObservable, wageProvider) // Subscribe... } 

使用操作符:

  • 第一 :从可观察到的第一个项目,把它变成一个单一的流。
  • 超时 :如果你通过网络获得工资,一个好主意就是超时工资。

下一步

选项1:在这里结束

不要订阅,打电话

 val blockingIterable = employeesAndWagesObservable.blockingIterable() blockingIterable.forEach { ... } 

并以同步的方式处理每个项目。 坐下来,找出下一步,观看演示,阅读例子。

选项2:添加图层

  1. 将这些Pair<Employee, Int>每一个.map到一些抽象的PDF构建块。
  2. 通过Observable.fromCallable { ... }将您的页眉和页脚打印机转到Observables,让他们也返回PDF构建块。
  3. 通过Observable.concat(headerObs, employeeDataObs, footerObs)以顺序的方式合并所有这些。
  4. .subscribe此结果并开始将PDF构建块写入PDF作家。
  5. 去做:
    • 找出一种方法来初步化PDF编写器(不在构建流之前)的订阅,
    • 删除错误输出,
    • 在完成或错误时关闭输出流。

我想出了这个:

  return employeeService.all().first() .doOnSubscribe { addHeaderToPDF() } .flatMapIterable { it } .flatMap { employeeService.calculateWage(it.id).first() } .doOnNext { printEmployeeWage(it) } .doOnCompleted { addFooterToPDF } .toCompletable() 

这是应该怎么做? 🙂