¿Hay alguna manera de cambiar mi método a la stream Observable que será una cadena de modificadores?

Originalmente tengo una function como esta:

private fun rxSaveEvents( events: List<LocationEvent>, locationUpdatesAboutToStop: Boolean): Boolean { synchronized(mEventsMonitor) { if (events.isNotEmpty()) { val locationEvents = LocationEvents() segregateLocationEvents(events, locationEvents) scoreReadings(locationEvents) sendLocationEventsInternal(locationEvents, locationUpdatesAboutToStop) Completable.fromCallable { updateLocationEventsToDb(locationEvents) }.subscribeOn(Schedulers.io()).subscribe() } if (locationUpdatesAboutToStop) { stopLocationUpdates() Completable.fromCallable { cleanAllReadingsData() }.subscribeOn(Schedulers.io()).subscribe() Completable.fromCallable { removeExpinetworkingData() }.subscribeOn(Schedulers.io()).subscribe() } locationEvents.clear() return true } } 

Esta function se activa después de un período de time. Se puede activar de 2 forms diferentes: progtwigdo con boolean como verdadero o falso. Aquí están los progtwigdores, que progtwign la invocación de este método:

  val scheduleNormalSave = Completable.timer(locationEventsSaveThrottlingSeconds, TimeUnit.SECONDS).doOnSubscribe(Consumer { normalSaveScheduled.set(true) }).doOnComplete(Action { normalSaveScheduled.set(false) }) val scheduleStopAndSave = Completable.timer(locationStopUpdatesSeconds, TimeUnit.SECONDS).doOnSubscribe(Consumer { stopAndSaveScheduled.set(true) }).doOnComplete(Action { stopAndSaveScheduled.set(false) }) 

Los booleans:

  private var normalSaveScheduled: AtomicBoolean = AtomicBoolean(false) private var stopAndSaveScheduled: AtomicBoolean = AtomicBoolean(false) 

Si se desencadena algún evento, se configurará el planificador y se iniciará la invocación de progtwigción: Ej.

  if (!normalSaveScheduled.get()) { configureNormalSaveCompletable() } 

Y el método configure normalSaveCompletable:

  fun configureNormalSaveCompletable() { scheduleNormalSave.subscribe(Action { rxSaveEvents(locationEvents, false) }) } 

LocationEvents es solo un arrayList, que almacena algunos events de forma asincrónica.

 private val locationEvents: ArrayList<LocationEvent> = ArrayList<LocationEvent>() 

Entonces, cuando ocurre algún evento (por ejemplo, actualización de location) inserta el evento LocationEvent en este ArrayList.

¿Entonces mi pregunta es cómo hacer que mi function, esta rxSaveEvents sea más reactiva? ¿Necesita ver sus otras funciones en esta implementación de método?