¿Cómo puedo pausar que un evento fluya a través de un observable?

Intento crear un Observable que se pueda pausar de tal forma que los elementos dejen de ser empujados a través de lo observable hasta que no se pause.

En ese punto, me gustaría que reanude el procesamiento de todos los elementos sin procesar. Mi fuente de datos proviene de fuera de la class, así que lo que tengo se ve así:

class Agent { val publisher = PublishSubject.create<Event>() val subscription = createSubscription() fun trackEvent(e: Event) { publisher.onNext(e) } fun pause() { // ??? } fun resume() { // ??? } private fun createSubscription(): Subscription { return publisher .map { stringify(it) } .buffer(10L, TimeUnit.SECONDS, 500) // capture 500 events or 10 seconds worth, whichever comes first. .map { /* create HttpPost request */ } .flatMap { /* send request to server */ } .subscribe { println("Received response: $it") } } } 

Lo que pretendo es que la function de pause detenga los events incluso al ir al server (pero se aferraría a ellos hasta que se resume ). En el momento en que se resume el resume , enviamos los events. (Obviamente, agregamos algo de ayuda adicional para la contrapresión en caso de que tuviéramos demasiados events durante el estado de pausa.

He intentado varios usos de almacenamiento en búfer y windows para hacer que esto funcione, pero en realidad nunca pausa lo observable. En cambio, sucede una de dos cosas:

  1. El evento se descarta por completo (en el caso de anulación de suscripción, filter, etc.)
  2. El evento fluye como si nada hubiera sucedido.

¿Hay algo que pueda hacer para apoyar este caso de uso? ¿O debería escribir esto bajo la expectativa de que uno de los dos resultados anteriores sea lo que sucederá?

El truco es usar otro BehaviorSubject como un evento de cierre para el almacenamiento en búfer adicional:

 val publisher = PublishSubject.create<Event>() fun trackEvent(e: Event) { publisher.onNext(e) isPaused.onNext(isPaused.value) } val isActive = BehaviorSubject.create(true) fun pause() { isActive.onNext(false) } fun resume() { isActive.onNext(true) } private fun createSubscription(): Subscription { return publisher .buffer(10L, TimeUnit.SECONDS, 500) // -> Observable<List<Event>> .buffer({ isActive.filter { it } }) // -> Observable<List<List<Event>>> .flatMap { Observable.from(it) } // -> Observable<List<Event>> .map { /* create HttpPost request */ } .flatMap { /* send request to server */ } .subscribe { println("Received response: $it") } } 

La primera llamada de buffer colocará los events entrantes en intervalos de tamaño especificado o después de que transcurra el time. El segundo buffer cerrará el depósito actual en events emitidos por observable, lo que indica que el Agent no está en pausa ( isActive.filter { it } ). isActive emite un valor en cada evento y dado que isActive es BehaviorSubject , emitirá su último valor a cada nuevo suscriptor. Es decir, en cada segmento emitido por la primera llamada de buffer continuará de inmediato o esperará hasta que se reanude el Agent .

  • Observable con el valor de LatestFrom
  • Operador RxJava para el método de conmutación
  • RxKotlin - matriz de observadores dynamics
  • Kotlin cuádruple, quíntuple, etc. para desestructurar
  • Cómo consultar Realm en el background Subprocess usando RxJava2
  • Cambios en el valor de ObservableField no propagados
  • La subclass de class sellada de Kotlin debe convertirse a la class base si se proporciona como RxJava Observable
  • ¿Cuándo no usar el Observable de RxJava?
  • No se llama a ninguno de los suscriptores de RxJava onNext / onError / onComplete al encadenar Observables creado desde Observable.create ()
  • Reemplazar callbacks llamados externamente con RxJava
  • Kotlin y RxJava2 zip operator - Ninguna de las siguientes funciones puede invocarse con los arguments suministrados