¿Cómo escalar dinámicamente el rebote de la stream de emisión de ráfagas?

Tengo una transmisión en búfer, esperando una cantidad pnetworkingeterminada de time de silencio, antes de publicar una list de elementos que se han almacenado temporalmente:

INTEGERS .share() .buffer(INTEGERS.debounce(DEBOUNCE_TIME,TimeUnit.MILLISECONDS,scheduler)) .map { durations -> ... } 

Me gustaría hacer que DEBOUNCE_TIME se ajuste dinámicamente según el promedio de los elementos almacenados en el búfer, pero me está costando trabajo encontrar la manera de lograrlo.

Puede aplazar el rebote, tomar un elemento y desencadenar la repetición una vez que se haya determinado el nuevo valor de rebote:

 int DEBOUNCE_TIME = 100; AtomicInteger debounceTime = new AtomicInteger(DEBOUNCE_TIME); PublishSubject<Integer> mayRepeat = PublishSubject.create(); AtomicInteger counter = new AtomicInteger(); Observable<Integer> INTEGERS = Observable.fromArray(10, 20, 200, 250, 300, 550, 600, 650, 700, 1200) .flatMap(v -> Observable.timer(v, TimeUnit.MILLISECONDS) .map(w -> counter.incrementAndGet())); INTEGERS.publish(o -> o.buffer( Observable.defer(() -> o.debounce( debounceTime.get(), TimeUnit.MILLISECONDS) ) .take(1) .repeatWhen(v -> v.zipWith(mayRepeat, (a, b) -> b)) ) ) .map(list -> { int nextDebounce = Math.min(100, list.size() * 100); debounceTime.set(nextDebounce); mayRepeat.onNext(1); return list; }) .blockingSubscribe(System.out::println); 

Esto imprime:

 [1, 2] [3, 4, 5] [6, 7, 8, 9] [10] 
  • ¿Cómo leer JSON desde Url usando kotlin Android?
  • ¿Expresar "súper" generics en los types funcionales de Kotlin?
  • Rx-Kotlin awaitTerminalEvent nunca se sube a Completo
  • No se puede 'observar en' hilo principal con RxKotlin
  • RxKotlin flattenAsObservable (): no coincide con el método de reference
  • OnComplete nunca se llamó con toSortedList () y groupBy ()
  • RxJava2 Tal vez devuelva Observable vacío si no hay elemento
  • Inyectar constructor y object complementario
  • ¿Cómo corotines de Kotlin son mejores que RxKotlin?
  • Obtiene N últimos objects emitidos por observables en RxJava2
  • RxKotlin - matriz de observadores dynamics