RxAndroid, cómo detectar si observable ha finalizado la emisión

Estoy escribiendo el siguiente fragment de código para recuperar la list de alimentos guardados de la database de firebase y luego, al usar esa list, vuelvo a search detalles de alimentos individuales de la database de firebase.

El siguiente código funciona bien, excepto que no puedo averiguar cómo hacer que el segundo flatMap sepa que la emisión del primer flatMap ha finalizado (toda la list de alimentos ha sido procesada). Por lo tanto, no puedo llamar al onCompleted() , por lo tanto, no onCompleted() detectar cuándo finaliza todo el process.

Echa un vistazo a los comentarios en el siguiente fragment:

 Observable.create<List<PersonalizedFood>> { FirebaseDTDatabase.getSavedDietFoodQuery(user.uid).addListenerForSingleValueEvent(object : ValueEventListener { override fun onCancelled(p0: DatabaseError?) { } override fun onDataChange(p0: DataSnapshot?) { val list = ArrayList<PersonalizedFood>() p0?.let { for (dateObject in p0.children) { for (foodItem in dateObject.children) { val food = foodItem.getValue(FBPersonalizedFood::class.java) as FBPersonalizedFood list.add(PersonalizedFood(food)) } } } it.onNext(list) it.onCompleted() } }) }.subscribeOn(Schedulers.io()).flatMap { Observable.from(it) // returning a Observable that emits items of list ("it" is the list here) }.observeOn(Schedulers.io()).flatMap { // How does this flatMap know that emission of all item has been finished so that onCompleted() method could be called. personalizedFood -> Observable.create<Boolean>{ FirebaseDTDatabase.getFoodListReference(personalizedFood.foodId).addListenerForSingleValueEvent(object :ValueEventListener{ override fun onCancelled(p0: DatabaseError?) { it.onError(p0?.toException()) } override fun onDataChange(p0: DataSnapshot?) { if(p0 != null) { val food = p0.getValue(FBFood::class.java)!! val repo = LocalFoodRepository() doAsync { repo.insertFood(this@LoginActivity, Food(food.foodId, food.foodName, food.foodDesc)) repo.insertServingDetails(this@LoginActivity, food.servingList.map { it.component2() }) repo.saveFood(this@LoginActivity, personalizedFood) it.onNext(true) } }else { it.onNext(false) } } }) } }.observeOn(Schedulers.io()).doOnCompleted{ dismissProgressDialog() finish() }.doOnError{ it.printStackTrace() dismissProgressDialog() finish() }.subscribe() 

Gracias.

El Observable de flatMap sabe "cuándo se han terminado todos los elementos" cuando todos los observables emitidos por él han llamado a onCompleted() . El segundo flatMap en su código nunca llama onCompleted() porque ninguno de los observables crea call onCompleted() .

Debe llamar a onCompleted() en su método onDataChange() . Como cada uno de los observables creados en flatMap solo emite un elemento, se puede onNext() directamente después del método onNext() :

 override fun onDataChange(p0: DataSnapshot?) { if(p0 != null) { val food = p0.getValue(FBFood::class.java)!! val repo = LocalFoodRepository() doAsync { repo.insertFood(this@LoginActivity, Food(food.foodId, food.foodName, food.foodDesc)) repo.insertServingDetails(this@LoginActivity, food.servingList.map { it.component2() }) repo.saveFood(this@LoginActivity, personalizedFood) it.onNext(true) it.onCompleted() } } else { it.onNext(false) it.onCompleted() } } 
  • cómo implementar Switch usando Data binding en android
  • Inyectar constructor y object complementario
  • Múltiples requestes de modificación2 usando Flowable en Kotlin
  • La suscripción de rx kotlin no funciona, no recibe artículos
  • Comportamiento con Kotlin Higher-Order Functions e interfaces de método único?
  • Usando RxJava para unir datos locales con datos remotos (o en caching)
  • No se puede 'observar en' hilo principal con RxKotlin
  • Cómo pasar nulo a un Observable con tipo anulable en RxJava 2 y Kotlin
  • ¿Cómo corotines de Kotlin son mejores que RxKotlin?
  • Spring 5 and Kotlin 1.1 Coroutines: Type rx.Scheduler no presente
  • La biblioteca de Kotlin 'rxkotlin-0.21.0.jar' tiene un formatting no compatible. Actualice la biblioteca o el complemento