¿Cómo escalar dinámicamente el rebote de la stream de emisión de ráfagas?

Tengo una transmisión en búfer, esperando una cantidad pnetworkingeterminada de time de silencio, antes de publicar una list de elementos que se han almacenado temporalmente:

INTEGERS .share() .buffer(INTEGERS.debounce(DEBOUNCE_TIME,TimeUnit.MILLISECONDS,scheduler)) .map { durations -> ... } 

Me gustaría hacer que DEBOUNCE_TIME se ajuste dinámicamente según el promedio de los elementos almacenados en el búfer, pero me está costando trabajo encontrar la manera de lograrlo.

Puede aplazar el rebote, tomar un elemento y desencadenar la repetición una vez que se haya determinado el nuevo valor de rebote:

 int DEBOUNCE_TIME = 100; AtomicInteger debounceTime = new AtomicInteger(DEBOUNCE_TIME); PublishSubject<Integer> mayRepeat = PublishSubject.create(); AtomicInteger counter = new AtomicInteger(); Observable<Integer> INTEGERS = Observable.fromArray(10, 20, 200, 250, 300, 550, 600, 650, 700, 1200) .flatMap(v -> Observable.timer(v, TimeUnit.MILLISECONDS) .map(w -> counter.incrementAndGet())); INTEGERS.publish(o -> o.buffer( Observable.defer(() -> o.debounce( debounceTime.get(), TimeUnit.MILLISECONDS) ) .take(1) .repeatWhen(v -> v.zipWith(mayRepeat, (a, b) -> b)) ) ) .map(list -> { int nextDebounce = Math.min(100, list.size() * 100); debounceTime.set(nextDebounce); mayRepeat.onNext(1); return list; }) .blockingSubscribe(System.out::println); 

Esto imprime:

 [1, 2] [3, 4, 5] [6, 7, 8, 9] [10] 
  • ¿Puedo crear un método de extensión Kotlin para agregar una suscripción rxJava a una suscripción compuesta?
  • ¿Expresar "súper" generics en los types funcionales de Kotlin?
  • Comportamiento con Kotlin Higher-Order Functions e interfaces de método único?
  • No se puede 'observar en' hilo principal con RxKotlin
  • cómo implementar Switch usando Data binding en android
  • Para una function de Kotlin utilizada como expresión, ¿hay una forma concisa de operar y devolver un valor?
  • RxAndroid, cómo detectar si observable ha finalizado la emisión
  • Cómo notificar a Observable cuando finalice CountdownTimer
  • RxKotlin - Single.just () no se emite al suscribirse TestSubscriber
  • RxJava- Gire Observable en Iterator, Stream o Sequence
  • Cómo pasar nulo a un Observable con tipo anulable en RxJava 2 y Kotlin