¿Por qué tengo un Log de salida no deseado al fusionar 2 observables en otro Observable, que los almacena temporalmente cada 10 segundos?

Estoy tratando de simular la fusión de 2 secuencias observables diferentes, que están emitiendo algún object por segundo. Este object tiene el mismo padre en esas 2 secuencias.

Pensé, que en la console aparecerá un nuevo object con valor después de 1 segundo. Sin embargo, cuando imprimo esos objects, obtengo objects que saltearon la emisión anterior. Así que como object con valor 1, 3, 5, 7, etc.

Sin embargo, en el buffer, que fusiona esos dos, parece que solo amortigua las emisiones de 2, 4, 6, 8 etc.

Aquí está mi código:

override fun onCreate(savedInstanceState: Bundle?) { super.onCreate(savedInstanceState) setContentView(R.layout.activity_main) var counter = 0 var counter2 = 0 val periodicSomeClass1 = Observable.interval(1, TimeUnit.SECONDS) .flatMap( { counter++ Observable.just(SomeClass1("$counter", counter)) } ) val periodicSomeClass2 = Observable.interval(1, TimeUnit.SECONDS) .flatMap( { counter2++ Observable.just(SomeClass2(counter2.toDouble())) } ) periodicSomeClass1.subscribe { t: SomeClass1 -> Log.v("periodicSomeClass1", t.toString()) } periodicSomeClass2.subscribe { t: SomeClass2 -> Log.v("periodicSomeClass2", t.toString()) } Observable.merge(periodicSomeClass1, periodicSomeClass2) .buffer(10, TimeUnit.SECONDS) .doOnSubscribe { Log.v("bufferObservable", "STARTED") } .subscribe { t: MutableList<Parent> -> Log.v("bufferObservable", "onNext") t.forEach { Log.v("onNext", it.toString()) } } } 

Y esto es lo que tengo en la salida de logging con la primera emsission buffer / merge:

  V/periodicSomeClass1: SomeClass1(a=1, b=1) V/periodicSomeClass2: SomeClass2(a=1.0) V/periodicSomeClass1: SomeClass1(a=3, b=3) V/periodicSomeClass2: SomeClass2(a=3.0) V/periodicSomeClass1: SomeClass1(a=5, b=5) V/periodicSomeClass2: SomeClass2(a=5.0) V/periodicSomeClass1: SomeClass1(a=7, b=7) V/periodicSomeClass2: SomeClass2(a=7.0) V/periodicSomeClass1: SomeClass1(a=9, b=9) V/periodicSomeClass2: SomeClass2(a=9.0) V/periodicSomeClass1: SomeClass1(a=11, b=11) V/periodicSomeClass2: SomeClass2(a=11.0) V/periodicSomeClass1: SomeClass1(a=13, b=13) V/periodicSomeClass2: SomeClass2(a=13.0) V/periodicSomeClass1: SomeClass1(a=15, b=15) V/periodicSomeClass2: SomeClass2(a=15.0) V/periodicSomeClass1: SomeClass1(a=17, b=17) V/periodicSomeClass2: SomeClass2(a=17.0) V/periodicSomeClass1: SomeClass1(a=19, b=19) V/periodicSomeClass2: SomeClass2(a=19.0) V/bufferObservable: onNext V/onNext: SomeClass1(a=2, b=2) V/onNext: SomeClass2(a=2.0) V/onNext: SomeClass1(a=4, b=4) V/onNext: SomeClass2(a=4.0) V/onNext: SomeClass1(a=6, b=6) V/onNext: SomeClass2(a=6.0) V/onNext: SomeClass1(a=8, b=8) V/onNext: SomeClass2(a=8.0) V/onNext: SomeClass1(a=10, b=10) V/onNext: SomeClass2(a=10.0) V/onNext: SomeClass1(a=12, b=12) V/onNext: SomeClass2(a=12.0) V/onNext: SomeClass1(a=14, b=14) V/onNext: SomeClass2(a=14.0) V/onNext: SomeClass1(a=16, b=16) V/onNext: SomeClass2(a=16.0) V/onNext: SomeClass1(a=18, b=18) 

Al igual que sospechaste correctamente en el comentario de tu otra pregunta , este problema sí está relacionado.

1.) Se suscribe dos veces a sus dos fonts observables, una directamente para cada una de ellas y otra vez a través de la suscripción a la merge d Observable.

2.) Entonces tiene cuatro Observables funcionando en total, donde dos de ellos están incrementando (y leyendo de) el counter y los otros dos están incrementando (y leyendo de) el counter2 .

3.) Para cada uno de estos pares, los dos intervalos se compensan ligeramente y cada flatMap del primero de cada par ve el valor n , lo incrementa a n+1 y lo imprime. Luego, poco después, aparece la otra instancia y ve n+1 , se incrementa a n+2 , imprime eso, etc.

Finalmente, el buffer oculta que todo esto sucede entrelazado porque imprime todos los valores pares después de todos los impares.

Cualquier solución realmente depende de lo que quiera lograr: ¿se trata simplemente de un ejemplo de patio de recreo o modela algún problema real?

  • Ingrese los valores propios para combinarlos
  • ¿Cómo puedo fusionar una sola <Lista <Lista <T >>> en una Lista <T> con RxJava 2?
  • RxJava Observable.create envolver suscripciones observables
  • Cómo manejar el event handling errores en un solo lugar en rxjava usando wrapper
  • ¿Por qué mi configuration de RxJava está bloqueando mi subprocess de interfaz de usuario? Trabajar con la callback BluetoothAdapter.startLeScan
  • ¿Cómo soluciono el error de inferencia de tipo en un Completable transformado utilizando RxLifecycle.bindToLifecycle ()?
  • Encadenando Observables para evitar suscripciones múltiples
  • Proguard: ¿Qué regla puedo agregar para evitar no puedo encontrar la class referenceda?
  • Prueba RxJava2 Flowable Query Room
  • RxJava manejando múltiples devoluciones de llamada dentro de un observable
  • Problema de encadenamiento Completable after flatMapCompletable