¿Cómo crear caching / versión caliente de rx.Single?

El RxJava v1.0.13 introdujo un nuevo tipo de Observable: rx.Single . Encaja perfectamente con el model de request-respuesta, pero carece de los efectos secundarios estándar que presentan operadores como doOnNext (). Entonces, es mucho más difícil hacer que sucedan cosas múltiples como resultado.

Mi idea era replace doOnNext () con múltiples suscripciones a la misma instancia única. Pero esto puede hacer que el trabajo de underlaying se haga varias veces: una vez por cada suscripción.

Ejemplo de rx.Single implementación:

private class WorkerSubscribe<SomeData>() : Single.OnSubscribe<SomeData> { override fun call(sub: SingleSubscriber<in SomeData>) { try { val result = fetchSomeData() sub.onSuccess(result) } catch(t: Throwable) { sub.onError(t) } } } val single = Single.create<SomeData>(WorkerSubscribe()) 

Uso:

 single.subscribe({}, {}) single.subscribe({}, {}) // Data is fetched for the second time 

¿Es posible crear una instancia de Single que no vaya a fetchSomeData () varias veces incluso cuando se llama a single.subscribe () varias veces, pero guarda en caching y devuelve el mismo resultado?

Necesita RxJava Asunto : BehaviorSubject o AsyncSubject

Solo necesitaba un comportamiento similar y encontré alguna solución.

Puede convertir la cache() aplicación Single a Observable cache() y luego convertirla de nuevo a Single .

 yourSingle.toObservable().cacheWithInitialCapacity(1).toSingle() 

Uso cacheWithInitialCapacity(1) lugar de solo cache() como optimization: Single nunca emitirá más de un elemento.

También es una buena idea proporcionar la implementación de Transformer

 public class SingleUtils { public static <T> Single.Transformer<T, T> cached() { return single -> single.toObservable() .cacheWithInitialCapacity(1) .toSingle(); } } 

para que pueda usar el almacenamiento en caching donde quiera llamando solo

 yourSingle.compose(SingleUtils.cached()) 

Editar: a partir de rxJava 1.2.2 se ha agregado ( https://github.com/ReactiveX/RxJava/releases/tag/v1.2.2 )

Implementado exactamente de esta manera ( https://github.com/ReactiveX/RxJava/pull/4757 )

Verifique el operador de cache() . Se supone que debe almacenar en caching las emisiones del Observable y reproducirlas en los Subscriber subsiguientes.

Puede crear un BehaviorSubject / ReplaySubject / AsyncSubject – y luego llamar a Single en él.

Hice una solución, que no estoy contento con ella, pero funciona:

 public class Network { private Object data; private Single<Object> dataSingle; public Single<Object> getData { if (data == null) { if (dataSingle == null) { dataSingle = Single.create(...) .doOnSuccess( data -> this.data = data;) .sibscribeOn(..); } return dataSingle; } else { return Single.just(data); } } } 
  • Cómo recordar el estado con los operadores de rebashs en RxJava2
  • Aplicando transformación a cada elemento en Single <List <T >>
  • EXCEPCIÓN FATAL: RxCachedThreadScheduler-1 cuando el gatillo se deshace. ¿Por qué?
  • JavaRX: cómo devolver el valor en caching inmediatamente y en paralelo hacer una request de networking
  • Extender RxJava Observable en Kotlin con eliminación adecuada
  • Reemplazar el doble para cada uno por observable
  • Cambiar Observable desde AutoCompleteTextView a EditText
  • Cómo llenar una vista de list usando Kotlin, retroadaptación y RXjava
  • Kotlin cuádruple, quíntuple, etc. para desestructurar
  • ¿Cómo se simula emitir 2 streams infinitas observables y tener otras observables que las fusionan y amortiguan cada 10 segundos?
  • Combina Rx Singles en Observables recursivamente