¿Cómo convertir muchas AsyncTasks a Rx Observables en Android?

Estoy usando Facebook Graph API para encontrar usuarios a los que les hayan gustado mis publicaciones. Toda la lógica se implementa en pocos pasos:

  1. Encuentra todas las publicaciones haciendo request a API en AsyncTask
  2. Convierte este AT a Rx.Observable
  3. Map Observables GraphResponse to List (El comentario es POJO)
  4. Llame al operador de FlatMap y en su método de llamada al cuerpo que itera cada publicación y crea asyncTask, luego conviértalo en Observable y colóquelo en Array.
  5. Merge Array of Observables en One Observable.
  6. Asigna su GraphResponse al perfil de Me gusta
  7. Suscríbete y render Likers List
  8. ¡LUCRO!

Y tengo un pequeño problema en el paso # 4-5. Por favor, mira el método 'me gusta' en el repository. En comentarios escribí problemas

Sugerencia : estoy usando MVP + Clean Architecture con Repository (capa de datos) e Interactor (capa de negocios)

class FacebookRepository { private val facebook = Facebook.instance() private val gson = GsonBuilder().create() fun posts(): Observable<GraphResponse>? { return RxDecorator<GraphResponse>().decorate(Observable.defer({ val request = GraphRequest( facebook.token, "/me/posts", null, HttpMethod.GET, GraphRequest.Callback { /* handle the result */ } ) Observable.just(request.executeAndWait()) })) } fun setFaceBookAccessToken(currentAccessToken: AccessToken?) { facebook.token = currentAccessToken } fun logout() { facebook.logout() } fun token(): String? { return facebook.token?.token } fun likes(posts: List<Post>?): Observable<List<Profile>> { Log.d("observables:posts", posts.toString()) val p = iterateObservables(posts) // STOP HERE and WAIT to complete this method. // Then p is composite - merge and return return Observable.merge(p).map { Log.d("merge:posts", it.toString()) val profiles = gson.fromJson<List<Profile>>( it.jsonObject["data"].toString(), object : TypeToken<List<Profile>>() {}.type ) return@map profiles } } private fun iterateObservables(posts: List<Post>?): MutableList<Observable<GraphResponse>>? { val observables: MutableList<Observable<GraphResponse>>? = null Log.d("iterateObs:posts", posts.toString()) Log.d("posts_not_null", (posts != null).toString()) Log.d("posts.size", posts?.size.toString()) if (posts != null) { for (post in posts) { Log.d("iterateObs:post", post.toString()) val request = GraphRequest( AccessToken.getCurrentAccessToken(), "/${post.id}/likes", null, HttpMethod.GET, GraphRequest.Callback { Log.d("fb:post:id", "${post.id}: ${it.jsonObject}") }).executeAsync() Log.d("obs:after:post", observables.toString()) } } return observables } } 

Y esto es Interactor

 class FacebookInteractor { private val callbackManager = com.facebook.CallbackManager.Factory.create() private val repository = FacebookRepository() fun facebookAuth(view: IMainView) { LoginManager .getInstance() .logInWithReadPermissions( view.getContext() as MainActivity, Arrays.asList("user_friends", "user_likes", "user_posts", "public_profile", "email") ) } fun onFacebookLoginResult(requestCode: Int, resultCode: Int, data: Intent) { callbackManager.onActivityResult(requestCode, resultCode, data) } fun facebookAccessTokenChanged(oldAccessToken: AccessToken?, currentAccessToken: AccessToken?) { if(oldAccessToken?.token != currentAccessToken?.token) { repository.setFaceBookAccessToken(currentAccessToken) } } fun likes(): Observable<List<Profile>>? { return repository.posts()?.map { val gson = GsonBuilder().create() val posts = gson.fromJson<List<Post>>( it.jsonObject["data"].toString(), object : TypeToken<List<Post>>() {}.getType() ) return@map posts }?.flatMap { return@flatMap repository.likes(it) } } fun logout() { repository.logout() } fun isLogined(): Boolean { return repository.token() != null } } 

Estoy usando Kotlin como lenguaje de desarrollo.

Gracias EveryOne!) Pero la respuesta fue en otro caso) El problema estaba en la lógica, no en la implementación) Facebook SDK tiene RequestBatch y no hay necesidad en Future o stoping Threads)

 package com.github.scrobot.likes_listener.data.facebook import android.util.Log import com.facebook.* import com.github.scrobot.likes_listener.data.facebook.models.Facebook import com.github.scrobot.likes_listener.data.facebook.models.Post import com.github.scrobot.likes_listener.data.facebook.models.Profile import com.google.gson.GsonBuilder import com.google.gson.reflect.TypeToken import rus.pifpaf.client.util.rx.RxDecorator import rx.Observable import java.util.* /** * Created by aleksejskrobot on 07.12.16. */ class FacebookRepository { private val facebook = Facebook.instance() private val gson = GsonBuilder().create() fun posts(): Observable<GraphResponse>? { return RxDecorator<GraphResponse>().decorate(Observable.defer({ val request = GraphRequest( facebook.token, "/me/posts", null, HttpMethod.GET, GraphRequest.Callback { /* handle the result */ } ) Observable.just(request.executeAndWait()) })) } fun setFaceBookAccessToken(currentAccessToken: AccessToken?) { facebook.token = currentAccessToken } fun logout() { facebook.logout() } fun token(): String? { return facebook.token?.token } fun likes(posts: List<Post>?): Observable<List<Profile>> { val batch = GraphRequestBatch() posts?.mapTo(batch) { post -> GraphRequest( facebook.token, "/${post.id}/likes", null, HttpMethod.GET, GraphRequest.Callback { Log.d("fb:post:id", "${post.id}: ${it.jsonObject}") } ) } return RxDecorator<List<GraphResponse>>().decorate(rx.Observable.defer({ rx.Observable.just(batch.executeAndWait()) })).map { Log.d("batchResponse:it", it.toString()) val list = ArrayList<Profile>() for (i in it) { list.addAll(gson.fromJson<List<Profile>>( i.jsonObject["data"].toString(), object : TypeToken<List<Profile>>() {}.type )) } list.filter { list.contains(it) } return@map list } } } 
  • rx-java2 Schedulers.io () steel invoca el método de mainThread
  • Cómo get el valor del ObservableField en android
  • RxAndroid, cómo detectar si observable ha finalizado la emisión
  • Modelo lleno con respuesta extra usando Rx
  • Cómo comprimir algunos observables en lenguaje Kotlin con RxAndroid
  • Lista mutable de Obersable <Object> espere hasta que todos terminen y combínelos en una list <Object>
  • ¿Cómo get respuesta en la retroadaptación al usar rxAndroid?
  • ¿Cómo iterar sobre Single <List> y asignar a otra list?
  • Implementando la búsqueda que empuja los resultados a la list tan pronto como estén disponibles usando rxJava
  • RxAndroid - Manejar errores con el operador Zip
  • Cómo get Flowable <List <Foo >> del object 'Bar' con relaciones de uno a muchos 'Foo' usando el reino y Rxjava