RxJava Observable to Completable,如何避免toBlocking()
我目前正在使用Kotlin Android上的RxJava,但我有一个问题,我不能解决不使用toBlocking()。
我有雇员服务的方法返回一个Observable>:
fun all(): Observable<List<Employee>>
这是完全正确的,因为每当员工发生变化时,Observable就会发布新的员工列表。 但我想从员工生成一个PDF文件,显然不需要每次员工变动时都运行。 另外,我想从我的PDF生成器方法返回一个Completable
对象。 我想为我的PDF添加一个标题,然后遍历员工并计算每位员工的工资,这也会返回一个Observable,这是我现在正在使用toBlocking的地方。 我目前的做法是这样的:
private fun generatePdf(outputStream: OutputStream): Completable { return employeeService.all().map { employees -> try { addHeaderToPDF() for (i in employees) { val calculated = employeeService.calculateWage(i.id).toBlocking().first() // Print calculated to PDF.... } addFooterToPDF() return @map Completable.complete() } catch (e: Exception) { return @map Completable.error(e) } }.first().toCompletable()
有什么办法可以使这个代码更清洁使用RxJava?
提前致谢!
免责声明:这个答案是一个正在进行的工作。
基本前提:如果你在流中blocking
,你做错了。
注意:没有状态必须离开可观测的lambda。
步骤1:流式传输整个数据集
输入是员工流。 对于每个员工,你需要获得一份工资。 让我们成为一个流。
/** * @param employeesObservable * Stream of employees we're interested in. * @param wageProvider * Transformation function which takes an employee and returns a [Single] of their wage. * @return * Observable stream spitting individual [Pair]s of employees and their wages. */ fun getEmployeesAndWagesObservable( employeesObservable: Observable<Employee>, wageProvider: Function<Employee, Single<Int>> ): Observable<Pair<Employee, Int>>? { val employeesAndWagesObservable: Observable<Pair<Employee, Int>> // Each Employee from the original stream will be converted // to a Single<Pair<Employee, Int>> via flatMapSingle operator. // Remember, we need a stream and Single is a stream. employeesAndWagesObservable = employeesObservable.flatMapSingle { employee -> // We need to get a source of wage value for current employee. // That source emits a single Int or errors. val wageForEmployeeSingle: Single<Int> = wageProvider.apply(employee) // Once the wage from said source is loaded... val employeeAndWageSingle: Single<Pair<Employee, Int> = wageForEmployeeSingle.map { wage -> // ... construct a Pair<Employee, Int> employee to wage } // This code is not executed now. It will be executed for each Employee // after the original Observable<Employee> starts spitting out items. // After subscribing to the resulting observable. return@flatMapSingle employeeAndWageSingle } return employeesAndWagesObservable }
订阅时会发生什么情况:
- 从来源采取一名员工。
- 取雇员的工资。
- 吐出一对雇员和他们的工资。
这重复,直到employeesObservable
信号onComplete
或某些失败onError
。
使用操作符:
- flatMapSingle :将实际值转换为某个转换值的新Single流。
- map :将实际值转换为其他实际值(无嵌套流)。
嘿,你怎么把它连接到你的代码:
fun doStuff() { val employeesObservable = employeeService.all() val wageProvider = Function<Employee, Single<Int>> { employee -> // Don't listen to changes. Take first wage and use that. employeeService.calculateWage(employee.id).firstOrError() } val employeesAndWagesObservable = getEmployeesAndWagesObservable(employeesObservable, wageProvider) // Subscribe... }
使用操作符:
- 第一 :从可观察到的第一个项目,把它变成一个单一的流。
- 超时 :如果你通过网络获得工资,一个好主意就是超时工资。
下一步
选项1:在这里结束
不要订阅,打电话
val blockingIterable = employeesAndWagesObservable.blockingIterable() blockingIterable.forEach { ... }
并以同步的方式处理每个项目。 坐下来,找出下一步,观看演示,阅读例子。
选项2:添加图层
- 将这些
Pair<Employee, Int>
每一个.map
到一些抽象的PDF构建块。 - 通过
Observable.fromCallable { ... }
将您的页眉和页脚打印机转到Observables,让他们也返回PDF构建块。 - 通过
Observable.concat(headerObs, employeeDataObs, footerObs)
以顺序的方式合并所有这些。 -
.subscribe
此结果并开始将PDF构建块写入PDF作家。 - 去做:
- 找出一种方法来初步化PDF编写器(不在构建流之前)的订阅,
- 删除错误输出,
- 在完成或错误时关闭输出流。
我想出了这个:
return employeeService.all().first() .doOnSubscribe { addHeaderToPDF() } .flatMapIterable { it } .flatMap { employeeService.calculateWage(it.id).first() } .doOnNext { printEmployeeWage(it) } .doOnCompleted { addFooterToPDF } .toCompletable()
这是应该怎么做? 🙂