RxJava manejando múltiples devoluciones de llamada dentro de un observable

Tengo un Observable que tiene varias devoluciones de llamada dentro de él. El primero obtiene una list del server y el segundo es la callback de cambio de elemento que indica si un elemento de la list ha cambiado. Esta es mi implementación actual:

 Flowable.create({ emitter -> val offlineManager = OfflineManager.getInstance(context) offlineManager.listOfflineRegions(object : OfflineManager.ListOfflineRegionsCallback { override fun onList(offlineRegions: Array<OfflineRegion>) { offlineRegionsFetch(offlineRegions, emitter) } override fun onError(error: String) { emitter.onError(IllegalStateException("Saved offline regions read error")) } }) }, BackpressureStrategy.BUFFER) private fun offlineRegionsFetch(offlineRegionList: Array<OfflineRegion>, emitter: FlowableEmitter<List<OfflineMap>>) { if (offlineRegionList.isNotEmpty()) { val count = 0 val offlineMaps = offlineRegionList.map { OfflineMap(it.id.toString(), OfflineMapManager.getRegionName(it), 0L, OfflineMapStatus(FETCH)) } as ArrayList<OfflineMap> offlineRegionList.forEachIndexed { index, offlineRegion -> offlineRegion.getStatus(object : com.mapbox.mapboxsdk.offline.OfflineRegion.OfflineRegionStatusCallback { override fun onStatus(status: OfflineRegionStatus) { if (status.isComplete) { offlineMaps[index] = OfflineMap(offlineRegion.id.toString(), OfflineMapManager.getRegionName(offlineRegion), status.completedResourceSize, OfflineMapStatus(SUCCESS)) count + 1 } else if (status.downloadState == OfflineRegion.STATE_INACTIVE) { offlineMaps[index] = OfflineMap(offlineRegion.id.toString(), OfflineMapManager.getRegionName(offlineRegion), status.completedResourceSize, OfflineMapStatus(PAUSED)) count + 1 } else if (!status.isRequinetworkingResourceCountPrecise) { offlineMaps[index] = OfflineMap(offlineRegion.id.toString(), OfflineMapManager.getRegionName(offlineRegion), 0L, OfflineMapStatus(DOWNLOAD_STARTED)) count + 1 } else if (!status.isComplete && status.isRequinetworkingResourceCountPrecise) { val percentage = 100.0 * status.completedResourceCount / status.requinetworkingResourceCount offlineMaps[index] = OfflineMap(offlineRegion.id.toString(), OfflineMapManager.getRegionName(offlineRegion), status.completedResourceSize, OfflineMapStatus(PROGRESS, offlineRegion.id.toString(), Math.round(percentage))) count + 1 } } override fun onError(error: String) { offlineMaps[index] = OfflineMap(offlineRegion.id.toString(), OfflineMapManager.getRegionName(offlineRegion), -1L, OfflineMapStatus(ERROR, offlineRegion.id.toString(), errorCode = errorMap(error), errorMsg = error)) count + 1 } }) } if (count == offlineRegionList.size) { emitter.onNext(offlineMaps) } } else { emitter.onNext(listOf()) } } 

El onNext aquí ni siquiera ocurre ya que la callback de estado es asíncrona. ¿Cómo puedo hacer que el onNext espere la callback y luego se dispare ? También su n número de devoluciones de llamada de estado donde n es el número de elementos en la list.