Usando RxJava para unir datos locales con datos remotos (o en caching)

Esto es código de trabajo, pero tengo algunas preguntas, así como una request de asesoramiento para mejorarlo. Soy nuevo en RxJava y no me he acostumbrado a cómo encadenar este tipo de observables.

Tengo dos objects model, ListItem y UserInfo . ListItem s existe en una database local y UserInfo se UserInfo del server utilizando una ID proporcionada por ListItem .

El service web UserInfo acepta una matriz de ID para la que devolverá una list de objects UserInfo .

El flujo de este código es el siguiente:

  1. Cargar ListItem s de la database
  2. Utilizando el ListItem recuperado de la database, marque un caching en la memory para ver si ya he buscado el UserInfo para un ListItem particular.
  3. Para cualquier elemento cuya información de UserInfo no se almacena en la UserInfo caching, UserInfo de la networking
  4. Coloque los objects de UserInfo en el caching
  5. Vuelva a ejecutar el paso 2 (el método es loadCachedUserInfo )
  6. Devuelve los resultados al suscriptor

NOTA: Los objects UserInfo solo se deben search para un ListItem si la list se ha considerado como isUserList .

Aquí está el código:

 fun itemsInList(list : ATList, parentValue : String? = null, searchString : String? = null, limit : Int = defaultFetchLimit, sortOrder: SortDescriptor? = null) : Observable<List<ATListItem>> { return Observable.create<List<ATListItem>> { subscriber -> val listItems = listItemsInList(list, parentValue = parentValue, searchString = searchString, limit = limit, sortOrder = sortOrder) subscriber.onNext(listItems) subscriber.onCompleted() }.flatMap { listItems -> if ( list.isUserList ) { return@flatMap loadCachedUserInfo(listItems, userIDIndex = list.userIDIndex!!) } return@flatMap Observable.just(listItems) }.flatMap { listItems -> if ( list.isUserList ) { return@flatMap fetchUserInfoForListItems(listItems, list.userIDIndex!!, force = false) } return@flatMap Observable.just(listItems) } } fun loadCachedUserInfo(listItems : List<ATListItem>, userIDIndex : Int) : Observable<List<ATListItem>> { return Observable.create<List<ATListItem>> { subscriber -> for ( listItem in listItems ) { listItem.coreUserInfo = coreUserMap[listItem.valueForAttributeIndex(userIDIndex)?.toLowerCase()] } subscriber.onNext(listItems) subscriber.onCompleted() } } fun fetchUserInfoForListItems(listItems : List<ATListItem>, userIDIndex: Int, force: Boolean) : Observable<List<ATListItem>> { val itemsToFetch = if ( force ) listItems else listItems.filter { it.coreUserInfo == null } val ids = itemsToFetch.map { it.valueForAttributeIndex(userIDIndex) ?: "" }.filter { !it.isEmpty() } val records = hashMapOf("records" to ids) if ( itemsToFetch.count() == 0 ) { return Observable.just(listItems) } return RuntimeDataController.dataService.fetchCoreUserInfo(recordsMap = records) .map { json -> val recordsArray = json.arrayValue("records") for ( i in 0..recordsArray.length() - 1) { val coreUserInfo = CoreUserInfo(recordsArray.getJSONObject(i)) coreUserMap[coreUserInfo.username.toLowerCase()] = coreUserInfo coreUserMap[coreUserInfo.userID] = coreUserInfo coreUserInfo.externalUserID?.let { coreUserMap[it] = coreUserInfo } } return@map listItems }.flatMap { loadCachedUserInfo(listItems, userIDIndex = userIDIndex) } } 

El usuario iniciaría la secuencia de events llamando:

ListController.itemsInList(list)

Mis preguntas sobre este código son:

  1. Actualmente, loadCachedUserInfo toma una matriz de ListItem y devuelve esa misma matriz como un observable después de que los elementos almacenados en caching se hayan asociado a ella. Esto me parece mal. En su lugar, creo que esta llamada solo debe devolver los elementos que tienen un UserInfo caching asociado. Sin embargo, tengo que seguir pasando la matriz completa de ListItem al siguiente método

2.) ¿Debo hacer un trabajo adicional para apoyar la cancelación de la suscripción?

3.) Esta es una pregunta similar 1. My fetchUserInfoForListItems toma una matriz de elementos de la list y devuelve un observable con la misma matriz de elementos de la list una vez que se han recuperado y vuelto a ejecutar a través del método de caching. Esto también me parece incorrecto. Prefiero que este método devuelva un Observable<List<UserInfo>> para los objects que se obtuvieron. No entiendo cómo en itemsInList asociar los ListItem con el UserInfo recién recuperado y devolver un Observable de esos ListItem s.

Editar : Después de escribir esta publicación, me ayudó a darme count de algunas cosas. Puedo flatMap envolver mis llamadas en un Observable.create que puede contener la inteligencia que quería sacar de mis fetchUserInfoForListItems , permitiéndome abordar la pregunta n. ° 3. Aquí está el código actualizado:

  fun itemsInList(list : ATList, parentValue : String? = null, searchString : String? = null, limit : Int = defaultFetchLimit, sortOrder: SortDescriptor? = null) : Observable<List<ATListItem>> { return Observable.create<List<ATListItem>> { subscriber -> val listItems = listItemsInList(list, parentValue = parentValue, searchString = searchString, limit = limit, sortOrder = sortOrder) subscriber.onNext(listItems) subscriber.onCompleted() }.flatMap { listItems -> if ( list.isUserList ) { return@flatMap loadCachedUserInfo(listItems, userIDIndex = list.userIDIndex!!) } return@flatMap Observable.just(listItems) }.flatMap { listItems -> if ( list.isUserList ) { return@flatMap Observable.create<List<ATListItem>> { subscriber -> fetchUserInfoForListItems(listItems, list.userIDIndex!!, force = false).map { userInfoList -> for (coreUserInfo in userInfoList) { coreUserMap[coreUserInfo.username.toLowerCase()] = coreUserInfo coreUserMap[coreUserInfo.userID] = coreUserInfo coreUserInfo.externalUserID?.let { coreUserMap[it] = coreUserInfo } } }.flatMap { loadCachedUserInfo(listItems, userIDIndex = list.userIDIndex!!) }.subscribe { subscriber.onNext(listItems) subscriber.onCompleted() } } } return@flatMap Observable.just(listItems) } } fun loadCachedUserInfo(listItems : List<ATListItem>, userIDIndex : Int) : Observable<List<ATListItem>> { return Observable.create<List<ATListItem>> { subscriber -> listItems.forEach { listItem -> listItem.coreUserInfo = coreUserMap[listItem.valueForAttributeIndex(userIDIndex)?.toLowerCase()] } subscriber.onNext(listItems) subscriber.onCompleted() } } fun fetchUserInfoForListItems(listItems : List<ATListItem>, userIDIndex: Int, force: Boolean) : Observable<List<CoreUserInfo>> { val itemsToFetch = if ( force ) listItems else listItems.filter { it.coreUserInfo == null } val ids = itemsToFetch.map { it.valueForAttributeIndex(userIDIndex) ?: "" }.filter { !it.isEmpty() } val records = hashMapOf("records" to ids) if ( itemsToFetch.count() == 0 ) { return Observable.just(ArrayList<CoreUserInfo>()) } return RuntimeDataController.dataService.fetchCoreUserInfo(recordsMap = records) .map { json -> val userInfo = ArrayList<CoreUserInfo>() json.arrayValue("records").eachObject { userInfo.add(CoreUserInfo(it)) } return@map userInfo } } 

  1. Actualmente, loadCachedUserInfo toma una matriz de ListItem y devuelve esa misma matriz como un observable después de que los elementos almacenados en caching se hayan asociado a ella. Esto me parece mal. En su lugar, creo que esta llamada solo debe devolver los elementos que tienen un UserInfo en caching asociado. Sin embargo, tengo que seguir pasando la matriz completa de ListItem al siguiente método

No estoy seguro de entenderlo correctamente, pero si solo necesita efectos secundarios (almacenamiento en caching), puede usar doOnNext . Por ejemplo,

 .doOnNext { listItems -> if ( list.isUserList ) { cache(listItems, userIDIndex = list.userIDIndex!!) } } fun cache(listItems : List<ATListItem>, userIDIndex : Int) { // caching } 
  1. ¿Debo hacer un trabajo adicional para apoyar la cancelación de la suscripción?

No, AFAIK.

Nota:

Se puede encontrar más información sobre doOnNext en ¿Cuál es el propósito de doOnNext (…) en RxJava y aquí

Normalmente no necesitas return@... si la última instrucción en lambda es una expresión. p.ej:

 .flatMap { listItems -> if ( list.isUserList ) { return@flatMap loadCachedUserInfo(listItems, userIDIndex = list.userIDIndex!!) } return@flatMap Observable.just(listItems) } 

se puede escribir como:

 .flatMap { listItems -> if ( list.isUserList ) loadCachedUserInfo(listItems, userIDIndex = list.userIDIndex!!) else Observable.just(listItems) } 

No probé el código.

  • Obligatorio <Objeto> y encontrado <Objeto>?
  • RxJava - ¿Entradas de keyboard de contrapresión?
  • Agregar subscribeOn () está cambiando el tipo de retorno de observable
  • No se puede 'observar en' hilo principal con RxKotlin
  • La biblioteca de Kotlin 'rxkotlin-0.21.0.jar' tiene un formatting no compatible. Actualice la biblioteca o el complemento
  • RxKotlin - Single.just () no se emite al suscribirse TestSubscriber
  • ¿Cómo especificar la versión de RxJava al usar RxKotlin?
  • ¿Puedo crear un método de extensión Kotlin para agregar una suscripción rxJava a una suscripción compuesta?
  • cómo implementar Switch usando Data binding en android
  • Kotlin con stack RxKotlinFX da un error de class de acceso
  • RxKotlin collectInto () MutableList usando references de método