RxJava2 cómo separar diferentes implementaciones de emisor observable

Esta es mi situación. Quiero exponer 2 implementaciones diferentes de Observable <'Location'> desde mi LocationManager nativo de Android o desde los Servicios de Google.

Quiero comprobar si uso el enfoque nativo o gms.

Así que, al final, quiero exponer a Observable a mi cliente; no necesita saber de qué enfoque reuní la location. TEN EN CUENTA que estoy usando esta biblioteca: https://github.com/mcharmas/Android-ReactiveLocation

para exponer Observable desde los services de google. Ya expone el Observable que estoy buscando. Pero qué pasa con el otro – Administrador de location. Utiliza devoluciones de llamada.

Aquí está mi implementación:

var locationEmitter : Observable<Location> = Observable.empty() init { configureEmitter() } @SuppressLint("MissingPermission") private fun configureEmitter(){ if (!isUsingLocationNativeApi) locationEmitter = reactiveLocationProvider.getUpdatedLocation(reactiveLocationRequest) else{ configureNativeLocationEmitter() } } @SuppressLint("MissingPermission") private fun configureNativeLocationEmitter() { val mLocationCallbackNativeApi: LocationListener = object : LocationListener { override fun onLocationChanged(location: Location) { locationEmitter = Observable.create<Location> { emitter -> emitter.onNext(location) } } override fun onStatusChanged(provider: String, status: Int, extras: Bundle) {} override fun onProviderEnabled(provider: String) {} override fun onProviderDisabled(provider: String) {} } try { locationManager.requestLocationUpdates(LocationManager.GPS_PROVIDER, (geoEventsIntervalSeconds * 1000).toLong(), geoEventsDistanceMeters.toFloat(), mLocationCallbackNativeApi, Looper.getMainLooper()) } catch (ignonetworking: IllegalArgumentException) { ignonetworking.printStackTrace() } try { locationManager.requestLocationUpdates(LocationManager.NETWORK_PROVIDER, (geoEventsIntervalSeconds * 1000).toLong(), geoEventsDistanceMeters.toFloat(), mLocationCallbackNativeApi, Looper.getMainLooper()) } catch (ignonetworking: IllegalArgumentException) { ignonetworking.printStackTrace() } } @SuppressLint("MissingPermission") override fun onLocationUpdate(): Observable<Location> { return locationEmitter } } 

Sin embargo, no funciona para la implementación de LocationManager,

En el lado del cliente, quiero poder hacer algo como esto:

  rxLocationRepository.onLocationUpdate() .subscribe(Consumer { location -> //do something with location, no matter it is from LocationManager or google play services }) 

¿Cómo hacer eso en Kotlin?

ACTUALIZAR

¿Cómo deshacerse del seguimiento del tema (en realidad, deshacerse del tema) y del otro proveedor de location? No quiero perder el uso si el cliente no quiere usar esto y, por ejemplo, invocar dicho método en el lado del cliente

  override fun stopLocationUpdates() { //get rid of the location updates } 

Por ejemplo, en el lado del cliente:

  rxLocationRepository.stopLocationUpdates()