¿Cómo crear caching / versión caliente de rx.Single?

El RxJava v1.0.13 introdujo un nuevo tipo de Observable: rx.Single . Encaja perfectamente con el model de request-respuesta, pero carece de los efectos secundarios estándar que presentan operadores como doOnNext (). Entonces, es mucho más difícil hacer que sucedan cosas múltiples como resultado.

Mi idea era replace doOnNext () con múltiples suscripciones a la misma instancia única. Pero esto puede hacer que el trabajo de underlaying se haga varias veces: una vez por cada suscripción.

Ejemplo de rx.Single implementación:

private class WorkerSubscribe<SomeData>() : Single.OnSubscribe<SomeData> { override fun call(sub: SingleSubscriber<in SomeData>) { try { val result = fetchSomeData() sub.onSuccess(result) } catch(t: Throwable) { sub.onError(t) } } } val single = Single.create<SomeData>(WorkerSubscribe()) 

Uso:

 single.subscribe({}, {}) single.subscribe({}, {}) // Data is fetched for the second time 

¿Es posible crear una instancia de Single que no vaya a fetchSomeData () varias veces incluso cuando se llama a single.subscribe () varias veces, pero guarda en caching y devuelve el mismo resultado?

Necesita RxJava Asunto : BehaviorSubject o AsyncSubject

Solo necesitaba un comportamiento similar y encontré alguna solución.

Puede convertir la cache() aplicación Single a Observable cache() y luego convertirla de nuevo a Single .

 yourSingle.toObservable().cacheWithInitialCapacity(1).toSingle() 

Uso cacheWithInitialCapacity(1) lugar de solo cache() como optimization: Single nunca emitirá más de un elemento.

También es una buena idea proporcionar la implementación de Transformer

 public class SingleUtils { public static <T> Single.Transformer<T, T> cached() { return single -> single.toObservable() .cacheWithInitialCapacity(1) .toSingle(); } } 

para que pueda usar el almacenamiento en caching donde quiera llamando solo

 yourSingle.compose(SingleUtils.cached()) 

Editar: a partir de rxJava 1.2.2 se ha agregado ( https://github.com/ReactiveX/RxJava/releases/tag/v1.2.2 )

Implementado exactamente de esta manera ( https://github.com/ReactiveX/RxJava/pull/4757 )

Verifique el operador de cache() . Se supone que debe almacenar en caching las emisiones del Observable y reproducirlas en los Subscriber subsiguientes.

Puede crear un BehaviorSubject / ReplaySubject / AsyncSubject – y luego llamar a Single en él.

Hice una solución, que no estoy contento con ella, pero funciona:

 public class Network { private Object data; private Single<Object> dataSingle; public Single<Object> getData { if (data == null) { if (dataSingle == null) { dataSingle = Single.create(...) .doOnSuccess( data -> this.data = data;) .sibscribeOn(..); } return dataSingle; } else { return Single.just(data); } } } 
  • Función rxjava :: zip devuelve un resultado vacío
  • Causado por: rx.exceptions.MissingBackpressureException
  • ¿Cómo podemos implementar Observable.flatMapCompletable?
  • No se puede hacer que la API invoque urlfetch.Fetch en un hilo que no sea el hilo de request original ni un hilo creado por ThreadManager
  • Usando RxJava para unir datos locales con datos remotos (o en caching)
  • Kotlin y RxJava2 zip operator - Ninguna de las siguientes funciones puede invocarse con los arguments suministrados
  • Realm Turn Transaction en observable
  • Filter rxjava no funciona
  • Kotlin y RxJava: ¿por qué mi Single.zip () no se está comstackndo?
  • Observable.fromCallable () implementación con exception
  • Mockito querido pero no invocado