如何在Android中将许多AsyncTasks转换为Rx Observables?

我正在使用Facebook Graph API来查找喜欢我的帖子的用户。 所有的逻辑实现在几个步骤:

  1. 通过向AsyncTask中的API发送请求来查找所有帖子
  2. 将此AT转换为Rx.Observable
  3. 将观察者GraphResponse映射到列表(Post是POJO)
  4. 调用FlatMap运算符,并在其中调用每个帖子并调用asyncTask的body调用方法,然后将其转换为Observable并放入Array中。
  5. 在一个可观测数据中合并观测数组。
  6. 将其GraphResponse映射到喜欢的配置文件
  7. 订阅并呈现“客户列表”
  8. 利润!

在步骤#4-5我有一些小问题。 请看存储库中的方法'喜欢'。 在评论中,我写了问题

提示 :我正在使用MVP +清洁架构与存储库(数据层)和Interactor(业务层)

class FacebookRepository { private val facebook = Facebook.instance() private val gson = GsonBuilder().create() fun posts(): Observable<GraphResponse>? { return RxDecorator<GraphResponse>().decorate(Observable.defer({ val request = GraphRequest( facebook.token, "/me/posts", null, HttpMethod.GET, GraphRequest.Callback { /* handle the result */ } ) Observable.just(request.executeAndWait()) })) } fun setFaceBookAccessToken(currentAccessToken: AccessToken?) { facebook.token = currentAccessToken } fun logout() { facebook.logout() } fun token(): String? { return facebook.token?.token } fun likes(posts: List<Post>?): Observable<List<Profile>> { Log.d("observables:posts", posts.toString()) val p = iterateObservables(posts) // STOP HERE and WAIT to complete this method. // Then p is composite - merge and return return Observable.merge(p).map { Log.d("merge:posts", it.toString()) val profiles = gson.fromJson<List<Profile>>( it.jsonObject["data"].toString(), object : TypeToken<List<Profile>>() {}.type ) return@map profiles } } private fun iterateObservables(posts: List<Post>?): MutableList<Observable<GraphResponse>>? { val observables: MutableList<Observable<GraphResponse>>? = null Log.d("iterateObs:posts", posts.toString()) Log.d("posts_not_null", (posts != null).toString()) Log.d("posts.size", posts?.size.toString()) if (posts != null) { for (post in posts) { Log.d("iterateObs:post", post.toString()) val request = GraphRequest( AccessToken.getCurrentAccessToken(), "/${post.id}/likes", null, HttpMethod.GET, GraphRequest.Callback { Log.d("fb:post:id", "${post.id}: ${it.jsonObject}") }).executeAsync() Log.d("obs:after:post", observables.toString()) } } return observables } } 

这是Interactor

 class FacebookInteractor { private val callbackManager = com.facebook.CallbackManager.Factory.create() private val repository = FacebookRepository() fun facebookAuth(view: IMainView) { LoginManager .getInstance() .logInWithReadPermissions( view.getContext() as MainActivity, Arrays.asList("user_friends", "user_likes", "user_posts", "public_profile", "email") ) } fun onFacebookLoginResult(requestCode: Int, resultCode: Int, data: Intent) { callbackManager.onActivityResult(requestCode, resultCode, data) } fun facebookAccessTokenChanged(oldAccessToken: AccessToken?, currentAccessToken: AccessToken?) { if(oldAccessToken?.token != currentAccessToken?.token) { repository.setFaceBookAccessToken(currentAccessToken) } } fun likes(): Observable<List<Profile>>? { return repository.posts()?.map { val gson = GsonBuilder().create() val posts = gson.fromJson<List<Post>>( it.jsonObject["data"].toString(), object : TypeToken<List<Post>>() {}.getType() ) return@map posts }?.flatMap { return@flatMap repository.likes(it) } } fun logout() { repository.logout() } fun isLogined(): Boolean { return repository.token() != null } } 

我使用Kotlin作为开发语言。

谢谢EveryOne!)但在另一种情况下的答案)问题是在逻辑,而不是在执行)Facebook的SDK有RequestBatch而不需要在未来或停止线程)

 package com.github.scrobot.likes_listener.data.facebook import android.util.Log import com.facebook.* import com.github.scrobot.likes_listener.data.facebook.models.Facebook import com.github.scrobot.likes_listener.data.facebook.models.Post import com.github.scrobot.likes_listener.data.facebook.models.Profile import com.google.gson.GsonBuilder import com.google.gson.reflect.TypeToken import rus.pifpaf.client.util.rx.RxDecorator import rx.Observable import java.util.* /** * Created by aleksejskrobot on 07.12.16. */ class FacebookRepository { private val facebook = Facebook.instance() private val gson = GsonBuilder().create() fun posts(): Observable<GraphResponse>? { return RxDecorator<GraphResponse>().decorate(Observable.defer({ val request = GraphRequest( facebook.token, "/me/posts", null, HttpMethod.GET, GraphRequest.Callback { /* handle the result */ } ) Observable.just(request.executeAndWait()) })) } fun setFaceBookAccessToken(currentAccessToken: AccessToken?) { facebook.token = currentAccessToken } fun logout() { facebook.logout() } fun token(): String? { return facebook.token?.token } fun likes(posts: List<Post>?): Observable<List<Profile>> { val batch = GraphRequestBatch() posts?.mapTo(batch) { post -> GraphRequest( facebook.token, "/${post.id}/likes", null, HttpMethod.GET, GraphRequest.Callback { Log.d("fb:post:id", "${post.id}: ${it.jsonObject}") } ) } return RxDecorator<List<GraphResponse>>().decorate(rx.Observable.defer({ rx.Observable.just(batch.executeAndWait()) })).map { Log.d("batchResponse:it", it.toString()) val list = ArrayList<Profile>() for (i in it) { list.addAll(gson.fromJson<List<Profile>>( i.jsonObject["data"].toString(), object : TypeToken<List<Profile>>() {}.type )) } list.filter { list.contains(it) } return@map list } } }