异步调用集合中的每个项目

我有一个问题,迄今为止我无法解决,我是RxKotlin的新手,所以可能很简单。 请看代码:

override fun infos(): Stream<Info> = client.infoAboutItem(identifier) .map { val itemId = it.itemId ?: "" val item = client.itemForId(itemId) ClientInfo(client, it, source, item) as Info } .let { AccessStream(it) } 

流哪里是我们自制的收藏。 地图是一种方法,允许您迭代该集合中的每个项目。

这里的问题是

  client.itemForId(itemId) 

是一个http调用,返回一个不理想的Single。

我想创建一个异步调用内部映射,将返回项目而不是单一,然后将其传递给ClientInfo。 到目前为止我尝试过的事情是使用在地图内部的订阅和使用blockingGet()方法,但是这阻止了主线程,即使我观察和订阅不同的线程

所以它涉及到对集合中的每一件事进行异步调用。

感谢帮助

您可以尝试返回Observable<Stream<Info>> ,然后它看起来像:

  override fun infos(): Observable<Stream<Info>> = Observable.from(client.infoAboutItem(identifier)) .flatMapSingle { val itemId = it.itemId ?: "" client.itemForId(itemId) } .map { ClientInfo(client, it, source, item) as Info } .toList() .flatMap { AccessStream(it) } 

您应该将这些昂贵的操作包装到可观察的地方,并使用平面地图将这些数据压缩到客户端信息中。

我写了一个小样本来展示它。

 class SimpleTest { val testScheduler = TestScheduler() @Test fun test() { infos().observeOn(Schedulers.immediate()) .subscribe { logger("Output", it.toString()) } testScheduler.advanceTimeBy(10, TimeUnit.MINUTES) } fun infos(): Single<List<ClientInfo>> { return Observable.from(infoAboutItem("some_identifier")) .doOnNext { logger("Next", it.toString()) } .flatMap { aboutItem -> Observable.fromCallable { itemForId(aboutItem.itemId) } .subscribeOn(testScheduler) .map { ClientInfo(aboutItem = aboutItem, item = it) } } .doOnNext { logger("Next", it.toString()) } .toList() .toSingle() } data class ClientInfo( val id: String = UUID.randomUUID().toString(), val aboutItem: AboutItem, val item: Item ) data class AboutItem(val itemId: String = UUID.randomUUID().toString()) data class Item(val id: String = UUID.randomUUID().toString()) fun infoAboutItem(identifier: String): List<AboutItem> { return (1..10).map { AboutItem() } } fun itemForId(itemId: String): Item { val sleepTime = Random().nextInt(1000).toLong() Thread.sleep(sleepTime) return Item() } fun logger(tag: String, message: String): Unit { val formattedDate = Date(Schedulers.immediate().now()).format() System.out.println("$tag @ $formattedDate: $message") } fun Date.format(): String { return SimpleDateFormat("HH:mm:ss.SSS", Locale.US).format(this) } }