¿Por qué tengo un Log de salida no deseado al fusionar 2 observables en otro Observable, que los almacena temporalmente cada 10 segundos?

Estoy tratando de simular la fusión de 2 secuencias observables diferentes, que están emitiendo algún object por segundo. Este object tiene el mismo padre en esas 2 secuencias.

Pensé, que en la console aparecerá un nuevo object con valor después de 1 segundo. Sin embargo, cuando imprimo esos objects, obtengo objects que saltearon la emisión anterior. Así que como object con valor 1, 3, 5, 7, etc.

Sin embargo, en el buffer, que fusiona esos dos, parece que solo amortigua las emisiones de 2, 4, 6, 8 etc.

Aquí está mi código:

override fun onCreate(savedInstanceState: Bundle?) { super.onCreate(savedInstanceState) setContentView(R.layout.activity_main) var counter = 0 var counter2 = 0 val periodicSomeClass1 = Observable.interval(1, TimeUnit.SECONDS) .flatMap( { counter++ Observable.just(SomeClass1("$counter", counter)) } ) val periodicSomeClass2 = Observable.interval(1, TimeUnit.SECONDS) .flatMap( { counter2++ Observable.just(SomeClass2(counter2.toDouble())) } ) periodicSomeClass1.subscribe { t: SomeClass1 -> Log.v("periodicSomeClass1", t.toString()) } periodicSomeClass2.subscribe { t: SomeClass2 -> Log.v("periodicSomeClass2", t.toString()) } Observable.merge(periodicSomeClass1, periodicSomeClass2) .buffer(10, TimeUnit.SECONDS) .doOnSubscribe { Log.v("bufferObservable", "STARTED") } .subscribe { t: MutableList<Parent> -> Log.v("bufferObservable", "onNext") t.forEach { Log.v("onNext", it.toString()) } } } 

Y esto es lo que tengo en la salida de logging con la primera emsission buffer / merge:

  V/periodicSomeClass1: SomeClass1(a=1, b=1) V/periodicSomeClass2: SomeClass2(a=1.0) V/periodicSomeClass1: SomeClass1(a=3, b=3) V/periodicSomeClass2: SomeClass2(a=3.0) V/periodicSomeClass1: SomeClass1(a=5, b=5) V/periodicSomeClass2: SomeClass2(a=5.0) V/periodicSomeClass1: SomeClass1(a=7, b=7) V/periodicSomeClass2: SomeClass2(a=7.0) V/periodicSomeClass1: SomeClass1(a=9, b=9) V/periodicSomeClass2: SomeClass2(a=9.0) V/periodicSomeClass1: SomeClass1(a=11, b=11) V/periodicSomeClass2: SomeClass2(a=11.0) V/periodicSomeClass1: SomeClass1(a=13, b=13) V/periodicSomeClass2: SomeClass2(a=13.0) V/periodicSomeClass1: SomeClass1(a=15, b=15) V/periodicSomeClass2: SomeClass2(a=15.0) V/periodicSomeClass1: SomeClass1(a=17, b=17) V/periodicSomeClass2: SomeClass2(a=17.0) V/periodicSomeClass1: SomeClass1(a=19, b=19) V/periodicSomeClass2: SomeClass2(a=19.0) V/bufferObservable: onNext V/onNext: SomeClass1(a=2, b=2) V/onNext: SomeClass2(a=2.0) V/onNext: SomeClass1(a=4, b=4) V/onNext: SomeClass2(a=4.0) V/onNext: SomeClass1(a=6, b=6) V/onNext: SomeClass2(a=6.0) V/onNext: SomeClass1(a=8, b=8) V/onNext: SomeClass2(a=8.0) V/onNext: SomeClass1(a=10, b=10) V/onNext: SomeClass2(a=10.0) V/onNext: SomeClass1(a=12, b=12) V/onNext: SomeClass2(a=12.0) V/onNext: SomeClass1(a=14, b=14) V/onNext: SomeClass2(a=14.0) V/onNext: SomeClass1(a=16, b=16) V/onNext: SomeClass2(a=16.0) V/onNext: SomeClass1(a=18, b=18) 

Al igual que sospechaste correctamente en el comentario de tu otra pregunta , este problema sí está relacionado.

1.) Se suscribe dos veces a sus dos fonts observables, una directamente para cada una de ellas y otra vez a través de la suscripción a la merge d Observable.

2.) Entonces tiene cuatro Observables funcionando en total, donde dos de ellos están incrementando (y leyendo de) el counter y los otros dos están incrementando (y leyendo de) el counter2 .

3.) Para cada uno de estos pares, los dos intervalos se compensan ligeramente y cada flatMap del primero de cada par ve el valor n , lo incrementa a n+1 y lo imprime. Luego, poco después, aparece la otra instancia y ve n+1 , se incrementa a n+2 , imprime eso, etc.

Finalmente, el buffer oculta que todo esto sucede entrelazado porque imprime todos los valores pares después de todos los impares.

Cualquier solución realmente depende de lo que quiera lograr: ¿se trata simplemente de un ejemplo de patio de recreo o modela algún problema real?

  • Espere hasta que dos observables emitan verdadero
  • Para una function de Kotlin utilizada como expresión, ¿hay una forma concisa de operar y devolver un valor?
  • Mockito never () no funciona con andThen rxjava2
  • Referencia de constructor de Kotlin con generics
  • Excepción causada por: java.lang.ClassNotFoundException: org.reactivestreams.Publisher
  • rx-java2 Schedulers.io () steel invoca el método de mainThread
  • Cómo hacer un event handling errores en rxjava2 en android
  • Cómo recordar el estado con los operadores de rebashs en RxJava2
  • Cómo burlarse del repository reactivo que devuelve Observable
  • Encadenando Observables para evitar suscripciones múltiples
  • EXCEPCIÓN FATAL: RxCachedThreadScheduler-1 cuando el gatillo se deshace. ¿Por qué?