¿Cómo crear caching / versión caliente de rx.Single?

El RxJava v1.0.13 introdujo un nuevo tipo de Observable: rx.Single . Encaja perfectamente con el model de request-respuesta, pero carece de los efectos secundarios estándar que presentan operadores como doOnNext (). Entonces, es mucho más difícil hacer que sucedan cosas múltiples como resultado.

Mi idea era replace doOnNext () con múltiples suscripciones a la misma instancia única. Pero esto puede hacer que el trabajo de underlaying se haga varias veces: una vez por cada suscripción.

Ejemplo de rx.Single implementación:

private class WorkerSubscribe<SomeData>() : Single.OnSubscribe<SomeData> { override fun call(sub: SingleSubscriber<in SomeData>) { try { val result = fetchSomeData() sub.onSuccess(result) } catch(t: Throwable) { sub.onError(t) } } } val single = Single.create<SomeData>(WorkerSubscribe()) 

Uso:

 single.subscribe({}, {}) single.subscribe({}, {}) // Data is fetched for the second time 

¿Es posible crear una instancia de Single que no vaya a fetchSomeData () varias veces incluso cuando se llama a single.subscribe () varias veces, pero guarda en caching y devuelve el mismo resultado?

Necesita RxJava Asunto : BehaviorSubject o AsyncSubject

Solo necesitaba un comportamiento similar y encontré alguna solución.

Puede convertir la cache() aplicación Single a Observable cache() y luego convertirla de nuevo a Single .

 yourSingle.toObservable().cacheWithInitialCapacity(1).toSingle() 

Uso cacheWithInitialCapacity(1) lugar de solo cache() como optimization: Single nunca emitirá más de un elemento.

También es una buena idea proporcionar la implementación de Transformer

 public class SingleUtils { public static <T> Single.Transformer<T, T> cached() { return single -> single.toObservable() .cacheWithInitialCapacity(1) .toSingle(); } } 

para que pueda usar el almacenamiento en caching donde quiera llamando solo

 yourSingle.compose(SingleUtils.cached()) 

Editar: a partir de rxJava 1.2.2 se ha agregado ( https://github.com/ReactiveX/RxJava/releases/tag/v1.2.2 )

Implementado exactamente de esta manera ( https://github.com/ReactiveX/RxJava/pull/4757 )

Verifique el operador de cache() . Se supone que debe almacenar en caching las emisiones del Observable y reproducirlas en los Subscriber subsiguientes.

Puede crear un BehaviorSubject / ReplaySubject / AsyncSubject – y luego llamar a Single en él.

Hice una solución, que no estoy contento con ella, pero funciona:

 public class Network { private Object data; private Single<Object> dataSingle; public Single<Object> getData { if (data == null) { if (dataSingle == null) { dataSingle = Single.create(...) .doOnSuccess( data -> this.data = data;) .sibscribeOn(..); } return dataSingle; } else { return Single.just(data); } } } 
  • Crear un observable común que evite múltiples llamadas
  • retryWhen () no llama a lo que está dentro de Observable.just ()
  • Kotlin y RxJava: ¿por qué mi Single.zip () no se está comstackndo?
  • Recuperar respuesta JSON de rxjava / retrofit request POST
  • Clasificación de objects alfanuméricamente
  • ¿Cómo puedo pausar que un evento fluya a través de un observable?
  • MissingMethodInvocationException probando una class abierta en Kotlin
  • Cómo recordar el estado con los operadores de rebashs en RxJava2
  • RxJava Observable.create envolver suscripciones observables
  • Cómo get un tipo correcto al devolver Template <T?> Desde una function estática con null
  • Comportamiento con Kotlin Higher-Order Functions e interfaces de método único?