¿Cómo escalar dinámicamente el rebote de la stream de emisión de ráfagas?

Tengo una transmisión en búfer, esperando una cantidad pnetworkingeterminada de time de silencio, antes de publicar una list de elementos que se han almacenado temporalmente:

INTEGERS .share() .buffer(INTEGERS.debounce(DEBOUNCE_TIME,TimeUnit.MILLISECONDS,scheduler)) .map { durations -> ... } 

Me gustaría hacer que DEBOUNCE_TIME se ajuste dinámicamente según el promedio de los elementos almacenados en el búfer, pero me está costando trabajo encontrar la manera de lograrlo.

Puede aplazar el rebote, tomar un elemento y desencadenar la repetición una vez que se haya determinado el nuevo valor de rebote:

 int DEBOUNCE_TIME = 100; AtomicInteger debounceTime = new AtomicInteger(DEBOUNCE_TIME); PublishSubject<Integer> mayRepeat = PublishSubject.create(); AtomicInteger counter = new AtomicInteger(); Observable<Integer> INTEGERS = Observable.fromArray(10, 20, 200, 250, 300, 550, 600, 650, 700, 1200) .flatMap(v -> Observable.timer(v, TimeUnit.MILLISECONDS) .map(w -> counter.incrementAndGet())); INTEGERS.publish(o -> o.buffer( Observable.defer(() -> o.debounce( debounceTime.get(), TimeUnit.MILLISECONDS) ) .take(1) .repeatWhen(v -> v.zipWith(mayRepeat, (a, b) -> b)) ) ) .map(list -> { int nextDebounce = Math.min(100, list.size() * 100); debounceTime.set(nextDebounce); mayRepeat.onNext(1); return list; }) .blockingSubscribe(System.out::println); 

Esto imprime:

 [1, 2] [3, 4, 5] [6, 7, 8, 9] [10] 
  • OnComplete nunca se llamó con toSortedList () y groupBy ()
  • Confusión de la syntax de Kotlin lambda
  • RxJava2 Tal vez devuelva Observable vacío si no hay elemento
  • Rx-Kotlin awaitTerminalEvent nunca se sube a Completo
  • Llamada asincrónica para cada elemento dentro de una colección
  • ¿Cómo hacer un grupo? ¿Por qué coleccionar usando RxJava y Kotlin?
  • Repetir acciones en estado con RxJava
  • Cómo notificar a Observable cuando finalice CountdownTimer
  • Inyectar constructor y object complementario
  • RxKotlin collectInto () MutableList usando references de método
  • Para una function de Kotlin utilizada como expresión, ¿hay una forma concisa de operar y devolver un valor?