RxJava2 Publicado

Cuál es la diferencia entre

ObservableTransformer { Observable.merge( it.ofType(x).compose(transformerherex), it.ofType(y).compose(transformerherey) ) } 

y

 ObservableTransformer { it.publish{ shanetworking -> Observable.merge( shanetworking.ofType(x).compose(transformerherex), shanetworking.ofType(y).compose(transformerherey) ) } } 

cuando ejecuto mi código usando estos dos, obtuve los mismos resultados. Qué hace publicar aquí

La diferencia es que el transformador superior se suscribirá dos veces en sentido ascendente para una suscripción única desde el nivel inferior, duplicando los efectos secundarios del flujo ascendente que generalmente no se desean:

 Observable<Object> mixedSource = Observable.<Object>just("a", 1, "b", 2, "c", 3) .doOnSubscribe(s -> System.out.println("Subscribed!")); mixedSource.compose(f -> Observable.merge( f.ofType(Integer.class).compose(g -> g.map(v -> v + 1)), f.ofType(String.class).compose(g -> g.map(v -> v.toUpperCase())) ) ) .subscribe(System.out::println); 

imprimirá

 Subscribed! 2 3 4 Subscribed! A B C 

El efecto secundario representado aquí es la printing Subscribed! Dependiendo del trabajo real en una fuente real, eso podría significar el envío de un correo electrónico dos veces, recuperar las filas de una tabla dos veces. Con este ejemplo particular, puede ver que incluso si los valores de origen están intercalados en su tipo, la salida los contiene por separado.

Por el contrario, publish(Function) establecerá una suscripción a la fuente por un abonado final, por lo que cualquier efecto secundario en la fuente solo se producirá una vez.

 mixedSource.publish(f -> Observable.merge( f.ofType(Integer.class).compose(g -> g.map(v -> v + 1)), f.ofType(String.class).compose(g -> g.map(v -> v.toUpperCase())) ) ) .subscribe(System.out::println); 

que imprime

 Subscribed! A 2 B 3 C 4 

porque la fuente está suscrita una vez y cada elemento es multidifusión a los dos "arms" de .ofType().compose() .

publish operador de publish convierte su Observable en Connectable Observable .

Veamos qué significa Connectable Observable : supongamos que desea suscribirse a un time múltiple observable y desea entregar los mismos artículos a cada suscriptor. Necesita usar Connectable Observable .

Ejemplo:

 var period = TimeSpan.FromSeconds(1); var observable = Observable.Interval(period).Publish(); observable.Connect(); observable.Subscribe(i => Console.WriteLine("first subscription : {0}", i)); Thread.Sleep(period); observable.Subscribe(i => Console.WriteLine("second subscription : {0}", i)); 

salida:

 first subscription : 0 first subscription : 1 second subscription : 1 first subscription : 2 second subscription : 2 

En este caso, somos lo suficientemente rápidos para suscribirnos antes de que se publique el primer elemento, pero solo en la primera suscripción. La segunda suscripción se suscribe tarde y pierde la primera publicación.

Podríamos mover la invocación del método Connect () hasta que todas las suscripciones se hayan realizado. De esa forma, incluso con la llamada a Thread.Sleep, no nos suscribiremos realmente al subyacente hasta después de que se hayan realizado ambas suscripciones. Esto se haría de la siguiente manera:

 var period = TimeSpan.FromSeconds(1); var observable = Observable.Interval(period).Publish(); observable.Subscribe(i => Console.WriteLine("first subscription : {0}", i)); Thread.Sleep(period); observable.Subscribe(i => Console.WriteLine("second subscription : {0}", i)); observable.Connect(); 

salida:

 first subscription : 0 second subscription : 0 first subscription : 1 second subscription : 1 first subscription : 2 second subscription : 2 

Entonces, usando Completable Observable, tenemos una manera de controlar cuándo dejar que Observable emita elementos.

Ejemplo tomado de: http://www.introtorx.com/Content/v1.0.10621.0/14_HotAndColdObservables.html#PublishAndConnect

EDITAR Según la diapositiva número 180 en este enlace :

Otra de las características de la publicación es que si un observador comienza a observar después de 10 segundos de haber comenzado a emitir elementos observables, el observador obtiene solo los elementos que fueron emitidos después de 10 segundos (en el momento de la suscripción) no todos los artículos. Por lo que a los lados, como pude entender, la publicación se está utilizando para events de interfaz de usuario. Y tiene sentido que cualquier observador solo reciba los events que se hayan realizado después de que se haya suscrito, NO todos los events ocurridos anteriormente.

Espero eso ayude.

  • RxJava Observable.create envolver suscripciones observables
  • Para una function de Kotlin utilizada como expresión, ¿hay una forma concisa de operar y devolver un valor?
  • No se pudo deducir Kotlin y RxJava
  • Buscando con RxJava no funciona
  • Reescriba el código de Java en Kotlin utilizando la reference de function.
  • ¿Cómo soluciono el error de inferencia de tipo en un Completable transformado utilizando RxLifecycle.bindToLifecycle ()?
  • RxJava y Retrofit usando Kotlin
  • Escuchar posts y escribir commands en un flujo observable
  • Utilice la unidad de Kotlin (o cualquier otro object) escriba en el layout de Android
  • Problema de encadenamiento Completable after flatMapCompletable
  • No se puede 'observar en' hilo principal con RxKotlin