Cómo hacer que la function regrese Observable

Estoy intentando hacer que mi código sea más Rx, así que aquí hay un problema con el que me encuentro durante la refactorización:

Anteriormente había funcionado así:

private fun startLocationUpdates() { compositeDisposable.add(rxLocationRepository.onLocationUpdate() .subscribe(Consumer { location -> appendGeoEvent(location) })) rxScheduleStopLocationUpdates() } 

Lo que convierto en algo como esto:

 val locationUpdates = rxLocationRepository.onLocationUpdate() .concatMap(appendGeoEvent()) .doOnSubscribe(rxScheduleStopLocationUpdates()) .publish() private fun startLocationUpdates() {compositeDisposable.add(locationUpdates.connect()) } 

El problema ahora es con el appendGeoEvent para hacer que funcione con mi location creadaActualizar ConnectableObservable.

El appendGeoEvent se veía así (eran dos funciones conectadas)

  fun appendGeoEvent(location: Location) { val locationGeoEvent = LocationGeoEvent( accuracy = location.accuracy.toDouble(), latitude = location.latitude, longitude = location.longitude, timestampGeoEvent = location.time ) appendGeoEvent(locationGeoEvent) } private fun appendGeoEvent(event: LocationGeoEvent) { synchronized(mEventsMonitor) { if (!normalSaveScheduled.get()) { configureNormalSaveCompletable() } locationEvents.add(event) } } 

Así que ahora necesito hacer una function de estos dos que devolverá algún Observable o Copmletable. Una vez más, cómo hacer que tenga sentido y trabajar con esta location Actualizaciones ConectableObservable:

 val locationUpdates = rxLocationRepository.onLocationUpdate() .concatMap(appendGeoEvent()) .doOnSubscribe(rxScheduleStopLocationUpdates()) .publish() 

La otra parte del código es esto:

1) El método configureNormalSaveCompletable:

  fun configureNormalSaveCompletable() { scheduleNormalSave.subscribe(Action { rxSaveEvents(locationEvents, false) }) } 

2) ScheduleNormalSave Completable (progtwig la acción de hacer algo en el futuro y establece el boolean necesario en el método rxSaveEvents)

  val scheduleNormalSave = Completable.timer(locationEventsSaveThrottlingSeconds, TimeUnit.SECONDS).doOnSubscribe(Consumer { normalSaveScheduled.set(true) }).doOnComplete(Action { normalSaveScheduled.set(false) }) 

3) Y aquí está el método rxSaveEvents:

  private fun rxSaveEvents( events: List<LocationEvent>, locationUpdatesAboutToStop: Boolean): Boolean { synchronized(mEventsMonitor) { if (events.isNotEmpty()) { val locationEvents = LocationEvents() segregateLocationEvents(events, locationEvents) scoreReadings(locationEvents) sendLocationEventsInternal(locationEvents, locationUpdatesAboutToStop) Completable.fromCallable { updateLocationEventsToDb(locationEvents) }.subscribeOn(Schedulers.io()).subscribe() } if (locationUpdatesAboutToStop) { stopLocationUpdates() Completable.fromCallable { cleanAllReadingsData() }.subscribeOn(Schedulers.io()).subscribe() Completable.fromCallable { removeExpinetworkingData() }.subscribeOn(Schedulers.io()).subscribe() } locationEvents.clear() return true } } 

ACTUALIZAR

Lo que he intentado hasta ahora es esto:

 //what should be in the method parameter and how to exctract this fun apendGeoEvent(location: Location): Observable<LocationGeoEvent>? { return Observable.just(location) .flatMap { t: Location -> Observable.just(LocationGeoEvent( accuracy = location.accuracy.toDouble(), latitude= location.latitude, longitude = location.longitude, timestampGeoEvent = location.time ) ) } .doOnNext{ if (!normalSaveScheduled.get()) { configureNormalSaveCompletable() } locationEvents.add(it) } } 

Sin embargo, no sé cómo llevar ese Observable desde la stream ascendente a mi método appendGeoEvent:

  val locationUpdates = rxLocationRepository.onLocationUpdate() .concatMap(appendGeoEvent()) .doOnSubscribe(rxScheduleStopLocationUpdates()) .publish()