合并多个单打组成一个Observable

我正在编写一个Android应用程序,需要按以下顺序执行2个查询:

  1. 向一个返回Single<List<String>> urls的库发出一个请求(让我们称之为RequestA)。
  2. 根据我从RequestA收到的内容,我必须使用每个这些URL向另一个库发出请求(RequestB)。 每个RequestB现在都返回一个Single。

现在我已经把所有的RequestB中的所有Single结合起来形成了一个observable。

就像Observable.mergedelayerror(List<Single>) 。 我不能这样做,因为mergedelayerror期望ObservableSource iterable

我知道我可以通过实现回调和使用一些丑陋的逻辑来实现这一目标但是我真的只需要使用由RX提供的运算符

你需要Observable.fromIterable() 。 尝试这个:

Java 8:

 public interface Api { Single<List<String>> urls(); Single<String> url(String url); } private Api api; public void request() { api.urls() .flatMapObservable(Observable::fromIterable)//Observable<String> .flatMapSingle(api::url) .subscribeOn(Schedulers.io()) .observeOn(AndroidSchedulers.mainThread()) .subscribe(); } 

非Java 8:

 api.urls() .flatMapObservable(new Function<List<String>, ObservableSource<? extends String>>() { @Override public ObservableSource<? extends String> apply(@NonNull List<String> source) throws Exception { return Observable.fromIterable(source); } }) .flatMapSingle(new Function<String, SingleSource<? extends String>>() { @Override public SingleSource<? extends String> apply(@NonNull String url) throws Exception { return api.url(url); } }) .subscribeOn(Schedulers.io()) .observeOn(AndroidSchedulers.mainThread()) .subscribe(); 

科特林:

 api.urls() .flatMapObservable { Observable.fromIterable(it) } .flatMapSingle { api.url(it) } .subscribeOn(Schedulers.io()) .observeOn(AndroidSchedulers.mainThread()) .subscribe() 

我建议使用Java 8或切换到Kotlin。 这将是更好的使用。

这里是一个kotlin的例子解释。 Java版本将几乎完全相同。 您可能需要查看这些操作员的大理石图,以便更直观地看到发生了什么。

 getData() //<--- original api call that returns a list of urls .flatMapObservable { list -> //<- Since we are on a single we have to convert to an observable to emit each item onto the stream Observable.fromIterable(list) // <-- create that observable that emits each item } .flatMapSingle { url -> // we a single url entering here and have add it's result onto the stream but since it's a single we use flatMapSingle instead secondApi(url) // we call our second api here } .toList() // <-- you may want to group them into a list .subscribeOn(Schedulers.io()) // <-- causes the work to be done on the io scheduler so that you don't hit network on mainthread exception nor slow down the ui .observeOn(AndroidSchedulers.mainThread()) // <-- causes the outcome to be on the ui thread so you can safely update without getting an illegal state exception can't update the ui from a now ui thread .subscribe { next, error -> next?.let { //add to an adapter or something else } error?.let{ // handle error } }