RxJava2 Publicado

Cuál es la diferencia entre

ObservableTransformer { Observable.merge( it.ofType(x).compose(transformerherex), it.ofType(y).compose(transformerherey) ) } 

y

 ObservableTransformer { it.publish{ shanetworking -> Observable.merge( shanetworking.ofType(x).compose(transformerherex), shanetworking.ofType(y).compose(transformerherey) ) } } 

cuando ejecuto mi código usando estos dos, obtuve los mismos resultados. Qué hace publicar aquí

La diferencia es que el transformador superior se suscribirá dos veces en sentido ascendente para una suscripción única desde el nivel inferior, duplicando los efectos secundarios del flujo ascendente que generalmente no se desean:

 Observable<Object> mixedSource = Observable.<Object>just("a", 1, "b", 2, "c", 3) .doOnSubscribe(s -> System.out.println("Subscribed!")); mixedSource.compose(f -> Observable.merge( f.ofType(Integer.class).compose(g -> g.map(v -> v + 1)), f.ofType(String.class).compose(g -> g.map(v -> v.toUpperCase())) ) ) .subscribe(System.out::println); 

imprimirá

 Subscribed! 2 3 4 Subscribed! A B C 

El efecto secundario representado aquí es la printing Subscribed! Dependiendo del trabajo real en una fuente real, eso podría significar el envío de un correo electrónico dos veces, recuperar las filas de una tabla dos veces. Con este ejemplo particular, puede ver que incluso si los valores de origen están intercalados en su tipo, la salida los contiene por separado.

Por el contrario, publish(Function) establecerá una suscripción a la fuente por un abonado final, por lo que cualquier efecto secundario en la fuente solo se producirá una vez.

 mixedSource.publish(f -> Observable.merge( f.ofType(Integer.class).compose(g -> g.map(v -> v + 1)), f.ofType(String.class).compose(g -> g.map(v -> v.toUpperCase())) ) ) .subscribe(System.out::println); 

que imprime

 Subscribed! A 2 B 3 C 4 

porque la fuente está suscrita una vez y cada elemento es multidifusión a los dos "arms" de .ofType().compose() .

publish operador de publish convierte su Observable en Connectable Observable .

Veamos qué significa Connectable Observable : supongamos que desea suscribirse a un time múltiple observable y desea entregar los mismos artículos a cada suscriptor. Necesita usar Connectable Observable .

Ejemplo:

 var period = TimeSpan.FromSeconds(1); var observable = Observable.Interval(period).Publish(); observable.Connect(); observable.Subscribe(i => Console.WriteLine("first subscription : {0}", i)); Thread.Sleep(period); observable.Subscribe(i => Console.WriteLine("second subscription : {0}", i)); 

salida:

 first subscription : 0 first subscription : 1 second subscription : 1 first subscription : 2 second subscription : 2 

En este caso, somos lo suficientemente rápidos para suscribirnos antes de que se publique el primer elemento, pero solo en la primera suscripción. La segunda suscripción se suscribe tarde y pierde la primera publicación.

Podríamos mover la invocación del método Connect () hasta que todas las suscripciones se hayan realizado. De esa forma, incluso con la llamada a Thread.Sleep, no nos suscribiremos realmente al subyacente hasta después de que se hayan realizado ambas suscripciones. Esto se haría de la siguiente manera:

 var period = TimeSpan.FromSeconds(1); var observable = Observable.Interval(period).Publish(); observable.Subscribe(i => Console.WriteLine("first subscription : {0}", i)); Thread.Sleep(period); observable.Subscribe(i => Console.WriteLine("second subscription : {0}", i)); observable.Connect(); 

salida:

 first subscription : 0 second subscription : 0 first subscription : 1 second subscription : 1 first subscription : 2 second subscription : 2 

Entonces, usando Completable Observable, tenemos una manera de controlar cuándo dejar que Observable emita elementos.

Ejemplo tomado de: http://www.introtorx.com/Content/v1.0.10621.0/14_HotAndColdObservables.html#PublishAndConnect

EDITAR Según la diapositiva número 180 en este enlace :

Otra de las características de la publicación es que si un observador comienza a observar después de 10 segundos de haber comenzado a emitir elementos observables, el observador obtiene solo los elementos que fueron emitidos después de 10 segundos (en el momento de la suscripción) no todos los artículos. Por lo que a los lados, como pude entender, la publicación se está utilizando para events de interfaz de usuario. Y tiene sentido que cualquier observador solo reciba los events que se hayan realizado después de que se haya suscrito, NO todos los events ocurridos anteriormente.

Espero eso ayude.

  • Fusionar observables dependientes
  • ¿Cómo puedo fusionar una sola <Lista <Lista <T >>> en una Lista <T> con RxJava 2?
  • Cómo señalar un observable para producir más datos
  • Cambio observable a condición cumplida - RxJava2
  • Cómo pasar nulo a un Observable con tipo anulable en RxJava 2 y Kotlin
  • Android Architecture Components Room ViewModel CompleteableFormAction
  • ¿Cuál es la diferencia entre llaves y soportes normales en RxJava con Kotlin?
  • No se pudo deducir Kotlin y RxJava
  • Cómo recordar el estado con los operadores de rebashs en RxJava2
  • Room - SELECT query, get o default
  • rx-java2 Schedulers.io () steel invoca el método de mainThread