¿Cómo escalar dinámicamente el rebote de la stream de emisión de ráfagas?

Tengo una transmisión en búfer, esperando una cantidad pnetworkingeterminada de time de silencio, antes de publicar una list de elementos que se han almacenado temporalmente:

INTEGERS .share() .buffer(INTEGERS.debounce(DEBOUNCE_TIME,TimeUnit.MILLISECONDS,scheduler)) .map { durations -> ... } 

Me gustaría hacer que DEBOUNCE_TIME se ajuste dinámicamente según el promedio de los elementos almacenados en el búfer, pero me está costando trabajo encontrar la manera de lograrlo.

Puede aplazar el rebote, tomar un elemento y desencadenar la repetición una vez que se haya determinado el nuevo valor de rebote:

 int DEBOUNCE_TIME = 100; AtomicInteger debounceTime = new AtomicInteger(DEBOUNCE_TIME); PublishSubject<Integer> mayRepeat = PublishSubject.create(); AtomicInteger counter = new AtomicInteger(); Observable<Integer> INTEGERS = Observable.fromArray(10, 20, 200, 250, 300, 550, 600, 650, 700, 1200) .flatMap(v -> Observable.timer(v, TimeUnit.MILLISECONDS) .map(w -> counter.incrementAndGet())); INTEGERS.publish(o -> o.buffer( Observable.defer(() -> o.debounce( debounceTime.get(), TimeUnit.MILLISECONDS) ) .take(1) .repeatWhen(v -> v.zipWith(mayRepeat, (a, b) -> b)) ) ) .map(list -> { int nextDebounce = Math.min(100, list.size() * 100); debounceTime.set(nextDebounce); mayRepeat.onNext(1); return list; }) .blockingSubscribe(System.out::println); 

Esto imprime:

 [1, 2] [3, 4, 5] [6, 7, 8, 9] [10] 
  • ¿Cómo puedo agregar de manera condicional una operación asincrónica en medio de una transmisión de RxJava?
  • RxJava - ¿Entradas de keyboard de contrapresión?
  • Obtiene N últimos objects emitidos por observables en RxJava2
  • ¿Puedo crear un método de extensión Kotlin para agregar una suscripción rxJava a una suscripción compuesta?
  • No se puede 'observar en' hilo principal con RxKotlin
  • Cómo notificar a Observable cuando finalice CountdownTimer
  • ¿Cómo leer JSON desde Url usando kotlin Android?
  • RxJava- Gire Observable en Iterator, Stream o Sequence
  • ¿Cómo hacer un grupo? ¿Por qué coleccionar usando RxJava y Kotlin?
  • RxKotlin - Single.just () no se emite al suscribirse TestSubscriber
  • Para una function de Kotlin utilizada como expresión, ¿hay una forma concisa de operar y devolver un valor?