RxAndroid, cómo detectar si observable ha finalizado la emisión
Estoy escribiendo el siguiente fragment de código para recuperar la list de alimentos guardados de la database de firebase y luego, al usar esa list, vuelvo a search detalles de alimentos individuales de la database de firebase.
El siguiente código funciona bien, excepto que no puedo averiguar cómo hacer que el segundo flatMap sepa que la emisión del primer flatMap ha finalizado (toda la list de alimentos ha sido procesada). Por lo tanto, no puedo llamar al onCompleted()
, por lo tanto, no onCompleted()
detectar cuándo finaliza todo el process.
- Confusión de la syntax de Kotlin lambda
- ¿Expresar "súper" generics en los types funcionales de Kotlin?
- Obtiene N últimos objects emitidos por observables en RxJava2
- RxJava - ¿Entradas de keyboard de contrapresión?
- Usando RxJava para unir datos locales con datos remotos (o en caching)
Echa un vistazo a los comentarios en el siguiente fragment:
Observable.create<List<PersonalizedFood>> { FirebaseDTDatabase.getSavedDietFoodQuery(user.uid).addListenerForSingleValueEvent(object : ValueEventListener { override fun onCancelled(p0: DatabaseError?) { } override fun onDataChange(p0: DataSnapshot?) { val list = ArrayList<PersonalizedFood>() p0?.let { for (dateObject in p0.children) { for (foodItem in dateObject.children) { val food = foodItem.getValue(FBPersonalizedFood::class.java) as FBPersonalizedFood list.add(PersonalizedFood(food)) } } } it.onNext(list) it.onCompleted() } }) }.subscribeOn(Schedulers.io()).flatMap { Observable.from(it) // returning a Observable that emits items of list ("it" is the list here) }.observeOn(Schedulers.io()).flatMap { // How does this flatMap know that emission of all item has been finished so that onCompleted() method could be called. personalizedFood -> Observable.create<Boolean>{ FirebaseDTDatabase.getFoodListReference(personalizedFood.foodId).addListenerForSingleValueEvent(object :ValueEventListener{ override fun onCancelled(p0: DatabaseError?) { it.onError(p0?.toException()) } override fun onDataChange(p0: DataSnapshot?) { if(p0 != null) { val food = p0.getValue(FBFood::class.java)!! val repo = LocalFoodRepository() doAsync { repo.insertFood(this@LoginActivity, Food(food.foodId, food.foodName, food.foodDesc)) repo.insertServingDetails(this@LoginActivity, food.servingList.map { it.component2() }) repo.saveFood(this@LoginActivity, personalizedFood) it.onNext(true) } }else { it.onNext(false) } } }) } }.observeOn(Schedulers.io()).doOnCompleted{ dismissProgressDialog() finish() }.doOnError{ it.printStackTrace() dismissProgressDialog() finish() }.subscribe()
Gracias.
- Spring 5 and Kotlin 1.1 Coroutines: Type rx.Scheduler no presente
- Cómo notificar a Observable cuando finalice CountdownTimer
- RxKotlin - matriz de observadores dynamics
- RxKotlin flattenAsObservable (): no coincide con el método de reference
- Agregar subscribeOn () está cambiando el tipo de retorno de observable
- ¿Cómo hacer un grupo? ¿Por qué coleccionar usando RxJava y Kotlin?
- No se puede cambiar el text de ActionMenuItemView con RxKotlin
- Repetir acciones en estado con RxJava
El Observable
de flatMap
sabe "cuándo se han terminado todos los elementos" cuando todos los observables emitidos por él han llamado a onCompleted()
. El segundo flatMap
en su código nunca llama onCompleted()
porque ninguno de los observables crea call onCompleted()
.
Debe llamar a onCompleted()
en su método onDataChange()
. Como cada uno de los observables creados en flatMap
solo emite un elemento, se puede onNext()
directamente después del método onNext()
:
override fun onDataChange(p0: DataSnapshot?) { if(p0 != null) { val food = p0.getValue(FBFood::class.java)!! val repo = LocalFoodRepository() doAsync { repo.insertFood(this@LoginActivity, Food(food.foodId, food.foodName, food.foodDesc)) repo.insertServingDetails(this@LoginActivity, food.servingList.map { it.component2() }) repo.saveFood(this@LoginActivity, personalizedFood) it.onNext(true) it.onCompleted() } } else { it.onNext(false) it.onCompleted() } }