RxKotlin – matriz de observadores dynamics

Estoy usando RxKotlin junto con Retrofit 2

Estoy intentando descubrir cómo tener una list dinámica de observadores en una sola operación.

El primer observador debe activar la operación, y todos los observadores adicionales deben esperar hasta que la operación se complete / falla

Una vez que la operación se completa, necesito realizar la manipulación de datos (Almacenar en caching / Memoria) y luego notificar a todos los observadores.

Esto es lo que hice:

class UserManager { val observers = ArrayList<Observer<ArrayList<User>>>() var isFetchingUsers = false fun getUsers(observer: Observer<ArrayList<User>>) { observers.add(observer) if (isFetchingUsers) { return } api.getUserList.observeOn(AndroidSchedulers.mainThread()).subscribe(object : Observer<UserListResponse> { override fun onNext(response: UserListResponse) { // Do some manipulations on the response and notify all observers.forEach { it.onNext(response.getUsers()) } } override fun onError(e: Throwable) { observers.forEach { it.onError(Throwable()) } } override fun onComplete() { isFetchingUsers = false observers.clear() } override fun onSubscribe(d: Disposable) { } }) } } 

Aquí está la creación observable de Retrofit (esta está en Java …)

  /** * Get users */ public Observable<UserListResponse> getUserList() { return mService.getUserList().subscribeOn(Schedulers.io()); } 

Estoy seguro de que hay una mejor manera de hacerlo

¡Gracias!

Puede usar el operador de share() en un observable. Solo la primera suscripción hará que el observable realice su process de creación. Una vez que el último suscriptor se da de baja, el observable hará su process de destrucción.

 Observable<Long> v; ... Observable<Long> shanetworkingObservable = v .doOnSubscribe( () -> logger.debug("subscribe") ) .doOnUnsubscribe( () -> logger.debug("unsubscribe") ) .share(); ... Subscription v1 = shanetworkingObservable.subscribe(); Subscription v2 = shanetworkingObservable.subscribe(); ... v1.unsubscribe(); v2.unsubscribe(); 

Verá que la operación de suscripción solo ocurre una vez. Debería ver que solo una operación de suscripción pasa al observable original y solo una cancelación.

  • RxAndroid, cómo detectar si observable ha finalizado la emisión
  • Spring 5 and Kotlin 1.1 Coroutines: Type rx.Scheduler no presente
  • Comportamiento con Kotlin Higher-Order Functions e interfaces de método único?
  • Para una function de Kotlin utilizada como expresión, ¿hay una forma concisa de operar y devolver un valor?
  • Agregar subscribeOn () está cambiando el tipo de retorno de observable
  • ¿Cómo hacer un grupo? ¿Por qué coleccionar usando RxJava y Kotlin?
  • Cómo pasar nulo a un Observable con tipo anulable en RxJava 2 y Kotlin
  • No se puede cambiar el text de ActionMenuItemView con RxKotlin
  • RxKotlin flattenAsObservable (): no coincide con el método de reference
  • ¿Puedo crear un método de extensión Kotlin para agregar una suscripción rxJava a una suscripción compuesta?
  • Repetir acciones en estado con RxJava