RxJava – ¿Entradas de keyboard de contrapresión?

Aquí hay un problema divertido de RxJava.

Quiero usar los operadores de contrapresión de RxJava para search rápidamente una input mecanografiada mientras se escribe cada carácter, al igual que hace Google en su página de búsqueda. Revisé la documentation de Backpressure y se me ocurrió esto (estoy usando RxJavaFX / RxKotlinFX para aprovechar JavaFX).

val myControl: Node = ... val burstyMulticast = myControl.events(KeyEvent.KEY_TYPED).publish().refCount() val burstyDebounced = burstyMulticast.debounce(200, TimeUnit.MILLISECONDS) val burstyBuffenetworking = burstyMulticast.buffer(burstyDebounced) burstyBuffenetworking .flatMap { it.toObservable().map { it.character }.networkinguce("") { x,y -> x + y } } .subscribe { println(it) } 

Esto funciona genial Si escribo "Hola" contra el control, emitirá la String "Hola" después de 200 ms sin escribir. Pero si quiero que esto realmente sea más receptivo, en realidad debería tener algún tipo de acumulación progresiva para cada input de keyboard. Entonces mi salida de console realmente debería verse así:

 H He Hel Hell Hello 

Esas deberían ser todas mis emisiones cuando escribo la palabra "Hola", y los 200 ms definen cuánto time debe transcurrir antes de que se restablezca la acumulación. ¿Cómo hago esto?

Puede usar throttleLast en lugar de denunciar.

throttleLast emitirá los elementos más recientes emitidos por un Observable dentro de intervalos de time periódicos

throttleLast o / p console

 H Hel Hello 

¡Lo tengo! Descubrí que en realidad no quiero el buffer() , sino un switchMap() y un scan() dentro de él. Uso una multidifusión para controlar tanto la emisión de restablecimiento del timer introducida en switchMap() como la scan() que concatenará infinitamente los caracteres typescripts hasta que switchMap() mate para restablecerlo.

 val burstyMulticast = events(KeyEvent.KEY_TYPED).publish().refCount().map { it.character } burstyMulticast.throttleLast(1000, TimeUnit.MILLISECONDS).startWith("") .switchMap { burstyMulticast.scan { x,y -> x + y } }.subscribe { println(it) } 
  • Cómo comprimir algunos observables en lenguaje Kotlin con RxAndroid
  • ¿Expresar "súper" generics en los types funcionales de Kotlin?
  • Excepción causada por: java.lang.ClassNotFoundException: org.reactivestreams.Publisher
  • ¿Cómo corotines de Kotlin son mejores que RxKotlin?
  • RxKotlin - Single.just () no se emite al suscribirse TestSubscriber
  • Cómo pasar nulo a un Observable con tipo anulable en RxJava 2 y Kotlin
  • No se puede 'observar en' hilo principal con RxKotlin
  • Confusión de la syntax de Kotlin lambda
  • cómo implementar Switch usando Data binding en android
  • RxJava (Kotlin), Observable.amb y PublishSubject no están disparando
  • Inyectar constructor y object complementario