Cómo recordar el estado con los operadores de rebashs en RxJava2

Tengo un cliente de networking que puede reanudar las interrupciones, pero necesita el último post para hacerlo cuando hay un nuevo bash.

Ejemplo en Kotlin:

fun requestOrResume(last: Message? = null): Flowable<Message> = Flowable.create({ emitter -> val connection = if (last != null) client.start() else client.resumeFrom(last.id) while (!emitter.isDisposed) { val msg = connection.nextMessage() emitter.onNext(msg) } }, BackpressureStrategy.MISSING) requestOrResume() .retryWhen { it.flatMap { Flowable.timer(5, SECONDS) } } // how to pass the resume data when there is a retry? 

Pregunta : como puede ver, necesito el último post recibido para preparar la llamada al curriculum vitae. ¿Cómo puedo realizar un seguimiento para que cuando haya un rebash esté disponible para hacer la request de currículum?

Una posible solución puede ser crear una class de titular que solo contenga una reference al último post y se actualice cuando se reciba un nuevo post. De esta forma, cuando hay un rebash, el último post puede getse del titular. Ejemplo:

 class MsgHolder(var last: Message? = null) fun request(): Flowable<Message> { val holder = MsgHolder() return Flowable.create({ emitter -> val connection = if (holder.last != null) client.start() else client.resumeFrom(holder.last.id) while (!emitter.isDisposed) { val msg = connection.nextMessage() holder.last = msg // <-- update holder reference emitter.onNext(msg) } }, BackpressureStrategy.MISSING) } 

Creo que esto podría funcionar, pero se siente como un truco (¿problemas de synchronization de hilos?).

¿Hay una mejor manera de realizar un seguimiento del estado para que esté disponible para rebashs?

Tenga en count que, a less que vuelva a lanzar un envoltorio alnetworkingedor de su último elemento (no demasiado funcionalmente diferente de su solución existente de "hackear" pero de una manera más fea), ningún operador de event handling errores puede recuperar el último elemento sin ayuda externa porque solo tienen acceso a las streams de Throwable . En cambio, vea si el siguiente enfoque recursivo se adapta a sus necesidades:

 fun retryWithLast(seed: Flowable<Message>): Flowable<Message> { val last$ = seed.last().cache(); return seed.onErrorResumeNext { it.flatMap { retryWithLast(last$.flatMap { requestOrResume(it) }) } }; } retryWithLast(requestOrResume()); 

La mayor distinción es save en caching el valor final del último bash en un Observable con cache lugar de hacerlo manualmente en un valor. Tenga en count también que la recursión en el manejador de errores significa que retryWithLast continuará extendiendo la transmisión si los bashs subsiguientes continúan fallando.

Observe de cerca el operador de buffer() : link Puede usarlo así:

 requestOrResume() .buffer(2) 

A partir de ahora, su Flowable devolverá List<Message> con dos últimos objects

  • Android Architecture Components Room ViewModel CompleteableFormAction
  • Sitio de Android con RxJava manejar resultado de consulta vacío
  • Manejo de errores de suscripción anidada RX2.0
  • consultas de múltiples dominios de Android con RXJava
  • No se pudo deducir Kotlin y RxJava
  • ¿Cómo continuar el procesamiento después de que ocurra un error en RxJava 2?
  • No se puede 'observar en' hilo principal con RxKotlin
  • ¿Cómo puedo unir dos RxJava2 Obvervables por key?
  • Enfrentando problemas con la implementación de Rx Java con la architecture de flujo en Android kotlin
  • Cómo get el último valor emitido de observable
  • Convert Maybe to Single de otra fuente si Maybe completa