¿Por qué tengo un Log de salida no deseado al fusionar 2 observables en otro Observable, que los almacena temporalmente cada 10 segundos?

Estoy tratando de simular la fusión de 2 secuencias observables diferentes, que están emitiendo algún object por segundo. Este object tiene el mismo padre en esas 2 secuencias.

Pensé, que en la console aparecerá un nuevo object con valor después de 1 segundo. Sin embargo, cuando imprimo esos objects, obtengo objects que saltearon la emisión anterior. Así que como object con valor 1, 3, 5, 7, etc.

Sin embargo, en el buffer, que fusiona esos dos, parece que solo amortigua las emisiones de 2, 4, 6, 8 etc.

Aquí está mi código:

override fun onCreate(savedInstanceState: Bundle?) { super.onCreate(savedInstanceState) setContentView(R.layout.activity_main) var counter = 0 var counter2 = 0 val periodicSomeClass1 = Observable.interval(1, TimeUnit.SECONDS) .flatMap( { counter++ Observable.just(SomeClass1("$counter", counter)) } ) val periodicSomeClass2 = Observable.interval(1, TimeUnit.SECONDS) .flatMap( { counter2++ Observable.just(SomeClass2(counter2.toDouble())) } ) periodicSomeClass1.subscribe { t: SomeClass1 -> Log.v("periodicSomeClass1", t.toString()) } periodicSomeClass2.subscribe { t: SomeClass2 -> Log.v("periodicSomeClass2", t.toString()) } Observable.merge(periodicSomeClass1, periodicSomeClass2) .buffer(10, TimeUnit.SECONDS) .doOnSubscribe { Log.v("bufferObservable", "STARTED") } .subscribe { t: MutableList<Parent> -> Log.v("bufferObservable", "onNext") t.forEach { Log.v("onNext", it.toString()) } } } 

Y esto es lo que tengo en la salida de logging con la primera emsission buffer / merge:

  V/periodicSomeClass1: SomeClass1(a=1, b=1) V/periodicSomeClass2: SomeClass2(a=1.0) V/periodicSomeClass1: SomeClass1(a=3, b=3) V/periodicSomeClass2: SomeClass2(a=3.0) V/periodicSomeClass1: SomeClass1(a=5, b=5) V/periodicSomeClass2: SomeClass2(a=5.0) V/periodicSomeClass1: SomeClass1(a=7, b=7) V/periodicSomeClass2: SomeClass2(a=7.0) V/periodicSomeClass1: SomeClass1(a=9, b=9) V/periodicSomeClass2: SomeClass2(a=9.0) V/periodicSomeClass1: SomeClass1(a=11, b=11) V/periodicSomeClass2: SomeClass2(a=11.0) V/periodicSomeClass1: SomeClass1(a=13, b=13) V/periodicSomeClass2: SomeClass2(a=13.0) V/periodicSomeClass1: SomeClass1(a=15, b=15) V/periodicSomeClass2: SomeClass2(a=15.0) V/periodicSomeClass1: SomeClass1(a=17, b=17) V/periodicSomeClass2: SomeClass2(a=17.0) V/periodicSomeClass1: SomeClass1(a=19, b=19) V/periodicSomeClass2: SomeClass2(a=19.0) V/bufferObservable: onNext V/onNext: SomeClass1(a=2, b=2) V/onNext: SomeClass2(a=2.0) V/onNext: SomeClass1(a=4, b=4) V/onNext: SomeClass2(a=4.0) V/onNext: SomeClass1(a=6, b=6) V/onNext: SomeClass2(a=6.0) V/onNext: SomeClass1(a=8, b=8) V/onNext: SomeClass2(a=8.0) V/onNext: SomeClass1(a=10, b=10) V/onNext: SomeClass2(a=10.0) V/onNext: SomeClass1(a=12, b=12) V/onNext: SomeClass2(a=12.0) V/onNext: SomeClass1(a=14, b=14) V/onNext: SomeClass2(a=14.0) V/onNext: SomeClass1(a=16, b=16) V/onNext: SomeClass2(a=16.0) V/onNext: SomeClass1(a=18, b=18) 

Al igual que sospechaste correctamente en el comentario de tu otra pregunta , este problema sí está relacionado.

1.) Se suscribe dos veces a sus dos fonts observables, una directamente para cada una de ellas y otra vez a través de la suscripción a la merge d Observable.

2.) Entonces tiene cuatro Observables funcionando en total, donde dos de ellos están incrementando (y leyendo de) el counter y los otros dos están incrementando (y leyendo de) el counter2 .

3.) Para cada uno de estos pares, los dos intervalos se compensan ligeramente y cada flatMap del primero de cada par ve el valor n , lo incrementa a n+1 y lo imprime. Luego, poco después, aparece la otra instancia y ve n+1 , se incrementa a n+2 , imprime eso, etc.

Finalmente, el buffer oculta que todo esto sucede entrelazado porque imprime todos los valores pares después de todos los impares.

Cualquier solución realmente depende de lo que quiera lograr: ¿se trata simplemente de un ejemplo de patio de recreo o modela algún problema real?

  • RxJava 1.x .zip () no funciona en RxJava 2.0
  • ¿Cómo crear un intervalo infinito observable que emitirá un nuevo object cada intervalo de time?
  • Kotlin y RxJava2 zip operator - Ninguna de las siguientes funciones puede invocarse con los arguments suministrados
  • Confundido sobre la asignación de la variable RxJava
  • RxJava2 observable no procesando en Siguiente cuando hay un cambio
  • Kotlin: Cómo convertir la testing que usa Thread.sleep a RxJava TestScheduler
  • RxJava2 Tal vez devuelva Observable vacío si no hay elemento
  • Enfrentando problemas con la implementación de Rx Java con la architecture de flujo en Android kotlin
  • RxJava2 cómo separar diferentes implementaciones de emisor observable
  • Mockito never () no funciona con andThen rxjava2
  • Referencia de constructor de Kotlin con generics