¿Cómo crear caching / versión caliente de rx.Single?

El RxJava v1.0.13 introdujo un nuevo tipo de Observable: rx.Single . Encaja perfectamente con el model de request-respuesta, pero carece de los efectos secundarios estándar que presentan operadores como doOnNext (). Entonces, es mucho más difícil hacer que sucedan cosas múltiples como resultado.

Mi idea era replace doOnNext () con múltiples suscripciones a la misma instancia única. Pero esto puede hacer que el trabajo de underlaying se haga varias veces: una vez por cada suscripción.

Ejemplo de rx.Single implementación:

private class WorkerSubscribe<SomeData>() : Single.OnSubscribe<SomeData> { override fun call(sub: SingleSubscriber<in SomeData>) { try { val result = fetchSomeData() sub.onSuccess(result) } catch(t: Throwable) { sub.onError(t) } } } val single = Single.create<SomeData>(WorkerSubscribe()) 

Uso:

 single.subscribe({}, {}) single.subscribe({}, {}) // Data is fetched for the second time 

¿Es posible crear una instancia de Single que no vaya a fetchSomeData () varias veces incluso cuando se llama a single.subscribe () varias veces, pero guarda en caching y devuelve el mismo resultado?

Necesita RxJava Asunto : BehaviorSubject o AsyncSubject

Solo necesitaba un comportamiento similar y encontré alguna solución.

Puede convertir la cache() aplicación Single a Observable cache() y luego convertirla de nuevo a Single .

 yourSingle.toObservable().cacheWithInitialCapacity(1).toSingle() 

Uso cacheWithInitialCapacity(1) lugar de solo cache() como optimization: Single nunca emitirá más de un elemento.

También es una buena idea proporcionar la implementación de Transformer

 public class SingleUtils { public static <T> Single.Transformer<T, T> cached() { return single -> single.toObservable() .cacheWithInitialCapacity(1) .toSingle(); } } 

para que pueda usar el almacenamiento en caching donde quiera llamando solo

 yourSingle.compose(SingleUtils.cached()) 

Editar: a partir de rxJava 1.2.2 se ha agregado ( https://github.com/ReactiveX/RxJava/releases/tag/v1.2.2 )

Implementado exactamente de esta manera ( https://github.com/ReactiveX/RxJava/pull/4757 )

Verifique el operador de cache() . Se supone que debe almacenar en caching las emisiones del Observable y reproducirlas en los Subscriber subsiguientes.

Puede crear un BehaviorSubject / ReplaySubject / AsyncSubject – y luego llamar a Single en él.

Hice una solución, que no estoy contento con ella, pero funciona:

 public class Network { private Object data; private Single<Object> dataSingle; public Single<Object> getData { if (data == null) { if (dataSingle == null) { dataSingle = Single.create(...) .doOnSuccess( data -> this.data = data;) .sibscribeOn(..); } return dataSingle; } else { return Single.just(data); } } } 
  • RxJava 2 requiere un tipo de retorno Observable diferente de RxJava 1 (Kotlin)
  • Cómo recordar el estado con los operadores de rebashs en RxJava2
  • Kolin - Lista de artículos para el artículo de lists
  • Crear un observable común que evite múltiples llamadas
  • Android JUnit testing bloques indefinidamente cuando observó Observable en AndroidSchedulers.mainThread ()
  • No se puede hacer que la API invoque urlfetch.Fetch en un hilo que no sea el hilo de request original ni un hilo creado por ThreadManager
  • RxJava- Gire Observable en Iterator, Stream o Sequence
  • Función de extensión de llamada dentro de la class Java como cualquier operador de RX
  • Adaptador personalizado de Moshi con RxAndroid & Retrofit & Kotlin
  • ¿Puedo crear un método de extensión Kotlin para agregar una suscripción rxJava a una suscripción compuesta?
  • ¿Cómo crear un intervalo infinito observable que emitirá un nuevo object cada intervalo de time?