RxJava usando Kotlin – cómo sincronizar 2 methods asíncronos, refactorizar desde Java

Tengo 2 collections, que almacenan los events de actualización de location:

private List<LocationGeoEvent> mUpdateGeoEvents = new ArrayList<>(); private List<LocationRSSIEvent> mUpdateRSSIEvents = new ArrayList<>(); 

También está presente en mi código:

  private final ScheduledExecutorService mSaveDataExecutor = Executors.newSingleThreadScheduledExecutor(); private boolean mSaveDataScheduled; private final Object mEventsMonitor = new Object(); private ScheduledFuture<?> mScheduledStopLocationUpdatesFuture; private final ScheduledExecutorService mStopLocationUpdatesExecutor = Executors.newSingleThreadScheduledExecutor(); 

Agrego un evento a esta colección de esta manera:

  public void appendGeoEvent(LocationGeoEvent event) { synchronized (mEventsMonitor) { mUpdateGeoEvents.add(event); scheduleSaveEvents(); } } 

Lo mismo ocurre con el evento RSSI

Ahora, el método scheduleSaveEvents tiene este aspecto:

  private void scheduleSaveEvents() { synchronized (mSaveDataExecutor) { if (!mSaveDataScheduled) { mSaveDataScheduled = true; mSaveDataExecutor.schedule( new Runnable() { @Override public void run() { synchronized (mSaveDataExecutor) { saveEvents(false); mSaveDataScheduled = false; } } }, 30, TimeUnit.SECONDS); } } } 

El problema es que necesito sincronizar el otro método que detiene las actualizaciones. Se desencadena así:

  private void scheduleStopLocationUpdates() { synchronized (mStopLocationUpdatesExecutor) { if (mScheduledStopLocationUpdatesFuture != null) mScheduledStopLocationUpdatesFuture.cancel(true); mScheduledStopLocationUpdatesFuture = mStopLocationUpdatesExecutor.schedule( new Runnable() { @Override public void run() { synchronized (mStopLocationUpdatesExecutor) { stopLocationUpdates(); saveEvents(true); cleanAllReadingsData(); } } }, 45, TimeUnit.SECONDS); } } 

En el método saveEvents hago:

  private void saveEvents(boolean locationUpdatesAboutToStop) { synchronized (mEventsMonitor) { if (mUpdateGeoEvents.size() > 0 || mUpdateRSSIEvents.size() > 0) { //do something with the data from buffenetworking collection arrayLists and with the boolean locationUpdatesAboutToStop mUpdateGeoEvents.clear(); mUpdateRSSIEvents.clear(); } } } 

¿Hay alguna manera de refactorizar esta simplier a RxJava usando Kotlin?

ACTUALIZAR

Aquí está mi método appendRSSIevents:

  private fun appendRSSIEvent(event: LocationRSSIEvent) { synchronized(mEventsMonitor) { if (!shouldSkipRSSIData(event.nexoIdentifier)) { mUpdateRSSIEvents.add(event) acknowledgeDevice(event.nexoIdentifier) scheduleSaveEvents() startLocationUpdates() } else removeExpinetworkingData() } } 

Puede almacenar en búfer los dos flujos de datos y luego combinarlos para savelos. Además, puede usar el activador de almacenamiento intermedio para detener las actualizaciones también.

 PublishSubject<LocationGeoEvent> mUpdateGeoEventsSubject = PublishSubject.create(); PublishSubject<LocationRSSIEvent> mUpdateRSSIEventsSubject = PublishSubject.create(); public void appendGeoEvent(LocationGeoEvent event) { mUpdateGeoEventsSubject.onNext( event ); triggerSave.onNext( Boolean.TRUE ); } 

y lo mismo para la alimentación RSS.

Ahora necesitamos desencadenantes que se utilizarán para conducir el paso de guardado.

 PublishSubject<Boolean> triggerSave = PublishSubject.create(); PublishSubject<Boolean> triggerStopAndSave = PublishSubject.create(); Observable<Boolean> normalSaveTrigger = triggerSave.debounce( 30, TimeUnit.SECONDS ); Observable<Boolean> trigger = Observable.merge( normalSaveTrigger, triggerStopAndSave ); 

El trigger observable se dispara cuando el process de salvar normal se dispara o si estamos deteniendo el guardado.

 private void saveEvents( List<LocationGeoEvent> geo, List<LocationRSSIEvent> rss, boolean locationUpdatesAboutToStop) { synchronized (mEventsMonitor) { if (geo.size() > 0 || rss.size() > 0) { //do something with the data from buffenetworking collection arrayLists and with the boolean locationUpdatesAboutToStop } } } private void scheduleStopLocationUpdates() { stopLocationUpdates(); triggerStopAndSave.onNext( Boolean.FALSE ); cleanAllReadingsData(); } Observable.zip( mUpdateGeoEventsSubject.buffer( trigger ), mUpdateRSSIEventsSubject.buffer( trigger ), trigger, (geo, rss, trgr) -> saveEvents(geo, rss, trgr) ) .subscribe(); 

Todavía necesitará una cierta sintonía con respecto a multi-threading y security. El primer paso sería convertir los distintos temas en SerializedSubject s para que múltiples hilos puedan emitir events.

Si desea que saveEvents ejecute en un planificador en particular, deberá agregar una estructura de datos intermedia, una triple, para pasar los parameters a través del operador observeOn() o aplique el operador observeOn() a cada uno de los arguments zip() .

  • El método de callback a menudo para reenviar el evento a Observable?
  • Cómo señalar un observable para producir más datos
  • Encadenando Observables para evitar suscripciones múltiples
  • ¿Cómo especificar la versión de RxJava al usar RxKotlin?
  • HttpException no capturado por onError ()
  • La function de extensión no crea un nuevo object Observable
  • ¿Hay alguna manera de cambiar mi método a la stream Observable que será una cadena de modificadores?
  • ¿Cómo se usa Flowable.generate de Kotlin?
  • ¿Cómo puedo fusionar una sola <Lista <Lista <T >>> en una Lista <T> con RxJava 2?
  • Retrofit-Vertx con RxJava2 en Kotlin IllegalStateException message == null
  • RxJava 1.x .zip () no funciona en RxJava 2.0