RxJava2 Publicado

Cuál es la diferencia entre

ObservableTransformer { Observable.merge( it.ofType(x).compose(transformerherex), it.ofType(y).compose(transformerherey) ) } 

y

 ObservableTransformer { it.publish{ shanetworking -> Observable.merge( shanetworking.ofType(x).compose(transformerherex), shanetworking.ofType(y).compose(transformerherey) ) } } 

cuando ejecuto mi código usando estos dos, obtuve los mismos resultados. Qué hace publicar aquí

La diferencia es que el transformador superior se suscribirá dos veces en sentido ascendente para una suscripción única desde el nivel inferior, duplicando los efectos secundarios del flujo ascendente que generalmente no se desean:

 Observable<Object> mixedSource = Observable.<Object>just("a", 1, "b", 2, "c", 3) .doOnSubscribe(s -> System.out.println("Subscribed!")); mixedSource.compose(f -> Observable.merge( f.ofType(Integer.class).compose(g -> g.map(v -> v + 1)), f.ofType(String.class).compose(g -> g.map(v -> v.toUpperCase())) ) ) .subscribe(System.out::println); 

imprimirá

 Subscribed! 2 3 4 Subscribed! A B C 

El efecto secundario representado aquí es la printing Subscribed! Dependiendo del trabajo real en una fuente real, eso podría significar el envío de un correo electrónico dos veces, recuperar las filas de una tabla dos veces. Con este ejemplo particular, puede ver que incluso si los valores de origen están intercalados en su tipo, la salida los contiene por separado.

Por el contrario, publish(Function) establecerá una suscripción a la fuente por un abonado final, por lo que cualquier efecto secundario en la fuente solo se producirá una vez.

 mixedSource.publish(f -> Observable.merge( f.ofType(Integer.class).compose(g -> g.map(v -> v + 1)), f.ofType(String.class).compose(g -> g.map(v -> v.toUpperCase())) ) ) .subscribe(System.out::println); 

que imprime

 Subscribed! A 2 B 3 C 4 

porque la fuente está suscrita una vez y cada elemento es multidifusión a los dos "arms" de .ofType().compose() .

publish operador de publish convierte su Observable en Connectable Observable .

Veamos qué significa Connectable Observable : supongamos que desea suscribirse a un time múltiple observable y desea entregar los mismos artículos a cada suscriptor. Necesita usar Connectable Observable .

Ejemplo:

 var period = TimeSpan.FromSeconds(1); var observable = Observable.Interval(period).Publish(); observable.Connect(); observable.Subscribe(i => Console.WriteLine("first subscription : {0}", i)); Thread.Sleep(period); observable.Subscribe(i => Console.WriteLine("second subscription : {0}", i)); 

salida:

 first subscription : 0 first subscription : 1 second subscription : 1 first subscription : 2 second subscription : 2 

En este caso, somos lo suficientemente rápidos para suscribirnos antes de que se publique el primer elemento, pero solo en la primera suscripción. La segunda suscripción se suscribe tarde y pierde la primera publicación.

Podríamos mover la invocación del método Connect () hasta que todas las suscripciones se hayan realizado. De esa forma, incluso con la llamada a Thread.Sleep, no nos suscribiremos realmente al subyacente hasta después de que se hayan realizado ambas suscripciones. Esto se haría de la siguiente manera:

 var period = TimeSpan.FromSeconds(1); var observable = Observable.Interval(period).Publish(); observable.Subscribe(i => Console.WriteLine("first subscription : {0}", i)); Thread.Sleep(period); observable.Subscribe(i => Console.WriteLine("second subscription : {0}", i)); observable.Connect(); 

salida:

 first subscription : 0 second subscription : 0 first subscription : 1 second subscription : 1 first subscription : 2 second subscription : 2 

Entonces, usando Completable Observable, tenemos una manera de controlar cuándo dejar que Observable emita elementos.

Ejemplo tomado de: http://www.introtorx.com/Content/v1.0.10621.0/14_HotAndColdObservables.html#PublishAndConnect

EDITAR Según la diapositiva número 180 en este enlace :

Otra de las características de la publicación es que si un observador comienza a observar después de 10 segundos de haber comenzado a emitir elementos observables, el observador obtiene solo los elementos que fueron emitidos después de 10 segundos (en el momento de la suscripción) no todos los artículos. Por lo que a los lados, como pude entender, la publicación se está utilizando para events de interfaz de usuario. Y tiene sentido que cualquier observador solo reciba los events que se hayan realizado después de que se haya suscrito, NO todos los events ocurridos anteriormente.

Espero eso ayude.

  • JsonArray a la class de datos de Kotlin con Retrofit (se esperaba BEGIN_OBJECT pero era BEGIN_ARRAY)
  • El método RXjava2 en fromCallable no se puede exceder
  • RxJava Observable.create envolver suscripciones observables
  • rxjava2 - ejemplo simple de ejecutar tareas en un grupo de subprocesss, suscribirse en un solo hilo
  • Asunto de RxJava: escuche el tipo diferente que el que emite
  • Mockito never () no funciona con andThen rxjava2
  • RxJava Debounce onNext ()
  • Cómo recordar el estado con los operadores de rebashs en RxJava2
  • Cómo retrasar onError () en RxJava 2 y Android?
  • La function de extensión no crea un nuevo object Observable
  • Java genérico para Kotlin genérico. Retorno genérico del método