RxJava BehaviorSubject no emite el último elemento?

Tengo un RxJava simple, usando ReplaySubject, pude get el resultado, donde se imprimen los 3 numbers.

val observable : Observable<Int> = Observable.just(1, 2, 3) val subject = ReplaySubject.create<Int>() observable.subscribe(subject) subject.subscribe{ result -> System.out.println("Start $result in Subscription Result") } 

Cuando cambio a Comportamiento, espero que se imprima el tercer número, es decir, 3, ya que siempre pensé que el Comportamiento es reproducir el último elemento emitido.

  val observable : Observable<Int> = Observable.just(1, 2, 3) val subject = BehaviorSubject.create<Int>() observable.subscribe(subject) subject.subscribe{ result -> System.out.println("Start $result in Subscription Result") } 

Sin embargo, no imprime nada. ¿Por qué?

¿Extrañé algo importante aquí? En caso afirmativo, avíseme cómo se imprime el último elemento emitido que supuso (es decir, 3).

No imprime nada porque subsription ya ha terminado. Si la suscripción todavía está activa, se imprimirán 3, por ejemplo:

 val o1: Observable<Int> = Observable.just(1, 2, 3) val o2: Observable<Int> = Observable.just(4).delay(100,TimeUnit.MILLISECONDS) val observable: Observable<Int> = Observable.concat(o1, o2) val subject = BehaviorSubject.create<Int>() observable.subscribe(subject) subject.subscribe{ result -> System.out.println("Start $result in Subscription Result") } Thread.sleep(1000) 

Se imprimirán 3 y 4 (después de la demora), con 3 como el evento más reciente antes de la suscripción y 4 después de la suscripción.

Además, como se explica en @akarnokd en la sección de comentarios ReplaySubject.createWithSize(1) puede usarse para reproducir siempre el último elemento incluso después de la finalización observable y si se necesita un solo elemento independientemente del estado de finalización de la secuencia, entonces observable.takeLast(1).subscribe(subject) se puede usar para garantizar que:

 val observable : Observable<Int> = Observable.just(1, 2, 3) val subject = ReplaySubject.createWithSize<Int>(1) observable.takeLast(1).subscribe(subject) //can be moved after subject.subscribe as well subject.subscribe{ result -> System.out.println("Start $result in Subscription Result") } 
  • Cómo recordar el estado con los operadores de rebashs en RxJava2
  • Excepción: blockingConnect no se debe invocar en el subprocess UI a pesar de que subsubscribí en otro subprocess
  • Inferencia de tipo Observable.combineLatest en kotlin
  • Convert Maybe to Single de otra fuente si Maybe completa
  • Cómo burlarse del repository reactivo que devuelve Observable
  • Convierte el código de RxJava a Kotlin correctamente
  • ¿Cómo se coordina una list de ejecuciones Completables con RxJava?
  • Para una function de Kotlin utilizada como expresión, ¿hay una forma concisa de operar y devolver un valor?
  • RxJava Observable.create envolver suscripciones observables
  • Ingrese los valores propios para combinarlos
  • ¿Por qué tengo un Log de salida no deseado al fusionar 2 observables en otro Observable, que los almacena temporalmente cada 10 segundos?