¿Cómo puedo pausar que un evento fluya a través de un observable?

Intento crear un Observable que se pueda pausar de tal forma que los elementos dejen de ser empujados a través de lo observable hasta que no se pause.

En ese punto, me gustaría que reanude el procesamiento de todos los elementos sin procesar. Mi fuente de datos proviene de fuera de la class, así que lo que tengo se ve así:

class Agent { val publisher = PublishSubject.create<Event>() val subscription = createSubscription() fun trackEvent(e: Event) { publisher.onNext(e) } fun pause() { // ??? } fun resume() { // ??? } private fun createSubscription(): Subscription { return publisher .map { stringify(it) } .buffer(10L, TimeUnit.SECONDS, 500) // capture 500 events or 10 seconds worth, whichever comes first. .map { /* create HttpPost request */ } .flatMap { /* send request to server */ } .subscribe { println("Received response: $it") } } } 

Lo que pretendo es que la function de pause detenga los events incluso al ir al server (pero se aferraría a ellos hasta que se resume ). En el momento en que se resume el resume , enviamos los events. (Obviamente, agregamos algo de ayuda adicional para la contrapresión en caso de que tuviéramos demasiados events durante el estado de pausa.

He intentado varios usos de almacenamiento en búfer y windows para hacer que esto funcione, pero en realidad nunca pausa lo observable. En cambio, sucede una de dos cosas:

  1. El evento se descarta por completo (en el caso de anulación de suscripción, filter, etc.)
  2. El evento fluye como si nada hubiera sucedido.

¿Hay algo que pueda hacer para apoyar este caso de uso? ¿O debería escribir esto bajo la expectativa de que uno de los dos resultados anteriores sea lo que sucederá?

El truco es usar otro BehaviorSubject como un evento de cierre para el almacenamiento en búfer adicional:

 val publisher = PublishSubject.create<Event>() fun trackEvent(e: Event) { publisher.onNext(e) isPaused.onNext(isPaused.value) } val isActive = BehaviorSubject.create(true) fun pause() { isActive.onNext(false) } fun resume() { isActive.onNext(true) } private fun createSubscription(): Subscription { return publisher .buffer(10L, TimeUnit.SECONDS, 500) // -> Observable<List<Event>> .buffer({ isActive.filter { it } }) // -> Observable<List<List<Event>>> .flatMap { Observable.from(it) } // -> Observable<List<Event>> .map { /* create HttpPost request */ } .flatMap { /* send request to server */ } .subscribe { println("Received response: $it") } } 

La primera llamada de buffer colocará los events entrantes en intervalos de tamaño especificado o después de que transcurra el time. El segundo buffer cerrará el depósito actual en events emitidos por observable, lo que indica que el Agent no está en pausa ( isActive.filter { it } ). isActive emite un valor en cada evento y dado que isActive es BehaviorSubject , emitirá su último valor a cada nuevo suscriptor. Es decir, en cada segmento emitido por la primera llamada de buffer continuará de inmediato o esperará hasta que se reanude el Agent .

  • ¿Expresar "súper" generics en los types funcionales de Kotlin?
  • RxKotlin - matriz de observadores dynamics
  • RxJava 2 requiere un tipo de retorno Observable diferente de RxJava 1 (Kotlin)
  • RxJava Observable a Completable, cómo evitar toBlocking ()
  • ¿Hay alguna manera de cambiar mi método a la stream Observable que será una cadena de modificadores?
  • Cómo probar el observador?
  • Argumento de tipo explícito Kotlin y RxJava
  • Datos de caching de request HTTP en RxJava
  • RxJava - ¿Entradas de keyboard de contrapresión?
  • Solo la primera testing pasa con TestScheduler cuando se ejecutan varias testings (Kotlin)
  • Clasificación de objects alfanuméricamente