Pausa / Reanudar un timer / retraso en RX

Estoy intentando pausar / reanudar una operación retrasada usando rx-Java , y sorprendentemente no puedo encontrar ningún detalle sobre cómo hacerlo.

Obviamente, sé cómo hacerlo creando un hilo de Timer específico y haciendo un seguimiento de la hora, pero estoy buscando una forma más elegante y más reactiva.

Tengo tres observables diferentes, playDetected , uno para pauseDetected y uno para stopDetected . Quiero emitir algo después de un cierto retraso de PLAY , pero pause cuando emite mi pausa observable, y reanudo cuando obtengo otra PLAY

Lo que tengo hasta ahora: (está escrito en kotlin pero Java , pseudocódigo o cualquier otro idioma servirán para una respuesta)

 val playSubscription = playDetected .delay(DELAY, SECONDS, schedulers.computation) .subscribe { emitFinalEvent(it) } stopDetected.subscribe { playSubscription.unsubscribe() } 

Mi retraso funciona, y cuando detecto un STOP , elimina con éxito el retraso para que el próximo PLAY pueda volver a iniciarlo. ¿Pero cómo pausar y reanudar cuando pauseDetected emite algo?

Así es como terminé haciéndolo:

 playDetected .doOnNext { if (trackIsDifferent(it)) resetTimer() trackPlaying.set(it.track) } .switchMap { state -> interval(1, SECONDS, schedulers.computation) .doOnNext { currentTimer.incrementAndGet() } .takeUntil(merge(pauseDetected, stopDetected.doOnNext { resetTimer() })) .filter { currentTimer.get() == DELAY } .map { state } }.subscribe { emitFinalEvent(it)) } 

con:

 private val trackPlaying = AtomicReference<Track>() private val currentTimer = AtomicLong() private fun resetTimer() { currentTimer.set(0) } private fun trackIsDifferent(payload: StateWithTrack) = payload.track != trackPlaying.get() 

Hace algún time, también estaba buscando soluciones de "timer" RX, pero ninguna de ellas cumplió con mis expectativas. Así que allí puedes encontrar mi propia solución:

 AtomicLong elapsedTime = new AtomicLong(); AtomicBoolean resumed = new AtomicBoolean(); AtomicBoolean stopped = new AtomicBoolean(); public Flowable<Long> startTimer() { //Create and starts timper resumed.set(true); stopped.set(false); return Flowable.interval(1, TimeUnit.SECONDS) .takeWhile(tick -> !stopped.get()) .filter(tick -> resumed.get()) .map(tick -> elapsedTime.addAndGet(1000)); } public void pauseTimer() { resumed.set(false); } public void resumeTimer() { resumed.set(true); } public void stopTimer() { stopped.set(true); } public void addToTimer(int seconds) { elapsedTime.addAndGet(seconds * 1000); } 

En caso de retraso, simplemente sobrecarga la function de interval con retraso.