Cómo recordar el estado con los operadores de rebashs en RxJava2

Tengo un cliente de networking que puede reanudar las interrupciones, pero necesita el último post para hacerlo cuando hay un nuevo bash.

Ejemplo en Kotlin:

fun requestOrResume(last: Message? = null): Flowable<Message> = Flowable.create({ emitter -> val connection = if (last != null) client.start() else client.resumeFrom(last.id) while (!emitter.isDisposed) { val msg = connection.nextMessage() emitter.onNext(msg) } }, BackpressureStrategy.MISSING) requestOrResume() .retryWhen { it.flatMap { Flowable.timer(5, SECONDS) } } // how to pass the resume data when there is a retry? 

Pregunta : como puede ver, necesito el último post recibido para preparar la llamada al curriculum vitae. ¿Cómo puedo realizar un seguimiento para que cuando haya un rebash esté disponible para hacer la request de currículum?

Una posible solución puede ser crear una class de titular que solo contenga una reference al último post y se actualice cuando se reciba un nuevo post. De esta forma, cuando hay un rebash, el último post puede getse del titular. Ejemplo:

 class MsgHolder(var last: Message? = null) fun request(): Flowable<Message> { val holder = MsgHolder() return Flowable.create({ emitter -> val connection = if (holder.last != null) client.start() else client.resumeFrom(holder.last.id) while (!emitter.isDisposed) { val msg = connection.nextMessage() holder.last = msg // <-- update holder reference emitter.onNext(msg) } }, BackpressureStrategy.MISSING) } 

Creo que esto podría funcionar, pero se siente como un truco (¿problemas de synchronization de hilos?).

¿Hay una mejor manera de realizar un seguimiento del estado para que esté disponible para rebashs?

Tenga en count que, a less que vuelva a lanzar un envoltorio alnetworkingedor de su último elemento (no demasiado funcionalmente diferente de su solución existente de "hackear" pero de una manera más fea), ningún operador de event handling errores puede recuperar el último elemento sin ayuda externa porque solo tienen acceso a las streams de Throwable . En cambio, vea si el siguiente enfoque recursivo se adapta a sus necesidades:

 fun retryWithLast(seed: Flowable<Message>): Flowable<Message> { val last$ = seed.last().cache(); return seed.onErrorResumeNext { it.flatMap { retryWithLast(last$.flatMap { requestOrResume(it) }) } }; } retryWithLast(requestOrResume()); 

La mayor distinción es save en caching el valor final del último bash en un Observable con cache lugar de hacerlo manualmente en un valor. Tenga en count también que la recursión en el manejador de errores significa que retryWithLast continuará extendiendo la transmisión si los bashs subsiguientes continúan fallando.

Observe de cerca el operador de buffer() : link Puede usarlo así:

 requestOrResume() .buffer(2) 

A partir de ahora, su Flowable devolverá List<Message> con dos últimos objects

  • Convert Maybe to Single de otra fuente si Maybe completa
  • Cambio observable a condición cumplida - RxJava2
  • Single.zip completando antes de Success
  • RxJava 2 requiere un tipo de retorno Observable diferente de RxJava 1 (Kotlin)
  • Cómo get el último valor emitido de observable
  • Kotlin: Cómo convertir la testing que usa Thread.sleep a RxJava TestScheduler
  • RxJava2 Tal vez devuelva Observable vacío si no hay elemento
  • RxJava2: onComplete no llamado con flatMapIterable
  • ¿Por qué mi configuration de RxJava está bloqueando mi subprocess de interfaz de usuario? Trabajar con la callback BluetoothAdapter.startLeScan
  • Escuchar posts y escribir commands en un flujo observable
  • No se puede llamar al operador desde () en Observable en Android Kotlin