RxAndroid, cómo detectar si observable ha finalizado la emisión

Estoy escribiendo el siguiente fragment de código para recuperar la list de alimentos guardados de la database de firebase y luego, al usar esa list, vuelvo a search detalles de alimentos individuales de la database de firebase.

El siguiente código funciona bien, excepto que no puedo averiguar cómo hacer que el segundo flatMap sepa que la emisión del primer flatMap ha finalizado (toda la list de alimentos ha sido procesada). Por lo tanto, no puedo llamar al onCompleted() , por lo tanto, no onCompleted() detectar cuándo finaliza todo el process.

Echa un vistazo a los comentarios en el siguiente fragment:

 Observable.create<List<PersonalizedFood>> { FirebaseDTDatabase.getSavedDietFoodQuery(user.uid).addListenerForSingleValueEvent(object : ValueEventListener { override fun onCancelled(p0: DatabaseError?) { } override fun onDataChange(p0: DataSnapshot?) { val list = ArrayList<PersonalizedFood>() p0?.let { for (dateObject in p0.children) { for (foodItem in dateObject.children) { val food = foodItem.getValue(FBPersonalizedFood::class.java) as FBPersonalizedFood list.add(PersonalizedFood(food)) } } } it.onNext(list) it.onCompleted() } }) }.subscribeOn(Schedulers.io()).flatMap { Observable.from(it) // returning a Observable that emits items of list ("it" is the list here) }.observeOn(Schedulers.io()).flatMap { // How does this flatMap know that emission of all item has been finished so that onCompleted() method could be called. personalizedFood -> Observable.create<Boolean>{ FirebaseDTDatabase.getFoodListReference(personalizedFood.foodId).addListenerForSingleValueEvent(object :ValueEventListener{ override fun onCancelled(p0: DatabaseError?) { it.onError(p0?.toException()) } override fun onDataChange(p0: DataSnapshot?) { if(p0 != null) { val food = p0.getValue(FBFood::class.java)!! val repo = LocalFoodRepository() doAsync { repo.insertFood(this@LoginActivity, Food(food.foodId, food.foodName, food.foodDesc)) repo.insertServingDetails(this@LoginActivity, food.servingList.map { it.component2() }) repo.saveFood(this@LoginActivity, personalizedFood) it.onNext(true) } }else { it.onNext(false) } } }) } }.observeOn(Schedulers.io()).doOnCompleted{ dismissProgressDialog() finish() }.doOnError{ it.printStackTrace() dismissProgressDialog() finish() }.subscribe() 

Gracias.

El Observable de flatMap sabe "cuándo se han terminado todos los elementos" cuando todos los observables emitidos por él han llamado a onCompleted() . El segundo flatMap en su código nunca llama onCompleted() porque ninguno de los observables crea call onCompleted() .

Debe llamar a onCompleted() en su método onDataChange() . Como cada uno de los observables creados en flatMap solo emite un elemento, se puede onNext() directamente después del método onNext() :

 override fun onDataChange(p0: DataSnapshot?) { if(p0 != null) { val food = p0.getValue(FBFood::class.java)!! val repo = LocalFoodRepository() doAsync { repo.insertFood(this@LoginActivity, Food(food.foodId, food.foodName, food.foodDesc)) repo.insertServingDetails(this@LoginActivity, food.servingList.map { it.component2() }) repo.saveFood(this@LoginActivity, personalizedFood) it.onNext(true) it.onCompleted() } } else { it.onNext(false) it.onCompleted() } } 
  • RxJava- Gire Observable en Iterator, Stream o Sequence
  • Kotlin con stack RxKotlinFX da un error de class de acceso
  • RxKotlin - Single.just () no se emite al suscribirse TestSubscriber
  • Obligatorio <Objeto> y encontrado <Objeto>?
  • Rx-Kotlin awaitTerminalEvent nunca se sube a Completo
  • Spring 5 and Kotlin 1.1 Coroutines: Type rx.Scheduler no presente
  • RxKotlin flattenAsObservable (): no coincide con el método de reference
  • OnComplete nunca se llamó con toSortedList () y groupBy ()
  • La biblioteca de Kotlin 'rxkotlin-0.21.0.jar' tiene un formatting no compatible. Actualice la biblioteca o el complemento
  • No se puede 'observar en' hilo principal con RxKotlin
  • RxJava2 Tal vez devuelva Observable vacío si no hay elemento