¿Por qué tengo un Log de salida no deseado al fusionar 2 observables en otro Observable, que los almacena temporalmente cada 10 segundos?

Estoy tratando de simular la fusión de 2 secuencias observables diferentes, que están emitiendo algún object por segundo. Este object tiene el mismo padre en esas 2 secuencias.

Pensé, que en la console aparecerá un nuevo object con valor después de 1 segundo. Sin embargo, cuando imprimo esos objects, obtengo objects que saltearon la emisión anterior. Así que como object con valor 1, 3, 5, 7, etc.

Sin embargo, en el buffer, que fusiona esos dos, parece que solo amortigua las emisiones de 2, 4, 6, 8 etc.

Aquí está mi código:

override fun onCreate(savedInstanceState: Bundle?) { super.onCreate(savedInstanceState) setContentView(R.layout.activity_main) var counter = 0 var counter2 = 0 val periodicSomeClass1 = Observable.interval(1, TimeUnit.SECONDS) .flatMap( { counter++ Observable.just(SomeClass1("$counter", counter)) } ) val periodicSomeClass2 = Observable.interval(1, TimeUnit.SECONDS) .flatMap( { counter2++ Observable.just(SomeClass2(counter2.toDouble())) } ) periodicSomeClass1.subscribe { t: SomeClass1 -> Log.v("periodicSomeClass1", t.toString()) } periodicSomeClass2.subscribe { t: SomeClass2 -> Log.v("periodicSomeClass2", t.toString()) } Observable.merge(periodicSomeClass1, periodicSomeClass2) .buffer(10, TimeUnit.SECONDS) .doOnSubscribe { Log.v("bufferObservable", "STARTED") } .subscribe { t: MutableList<Parent> -> Log.v("bufferObservable", "onNext") t.forEach { Log.v("onNext", it.toString()) } } } 

Y esto es lo que tengo en la salida de logging con la primera emsission buffer / merge:

  V/periodicSomeClass1: SomeClass1(a=1, b=1) V/periodicSomeClass2: SomeClass2(a=1.0) V/periodicSomeClass1: SomeClass1(a=3, b=3) V/periodicSomeClass2: SomeClass2(a=3.0) V/periodicSomeClass1: SomeClass1(a=5, b=5) V/periodicSomeClass2: SomeClass2(a=5.0) V/periodicSomeClass1: SomeClass1(a=7, b=7) V/periodicSomeClass2: SomeClass2(a=7.0) V/periodicSomeClass1: SomeClass1(a=9, b=9) V/periodicSomeClass2: SomeClass2(a=9.0) V/periodicSomeClass1: SomeClass1(a=11, b=11) V/periodicSomeClass2: SomeClass2(a=11.0) V/periodicSomeClass1: SomeClass1(a=13, b=13) V/periodicSomeClass2: SomeClass2(a=13.0) V/periodicSomeClass1: SomeClass1(a=15, b=15) V/periodicSomeClass2: SomeClass2(a=15.0) V/periodicSomeClass1: SomeClass1(a=17, b=17) V/periodicSomeClass2: SomeClass2(a=17.0) V/periodicSomeClass1: SomeClass1(a=19, b=19) V/periodicSomeClass2: SomeClass2(a=19.0) V/bufferObservable: onNext V/onNext: SomeClass1(a=2, b=2) V/onNext: SomeClass2(a=2.0) V/onNext: SomeClass1(a=4, b=4) V/onNext: SomeClass2(a=4.0) V/onNext: SomeClass1(a=6, b=6) V/onNext: SomeClass2(a=6.0) V/onNext: SomeClass1(a=8, b=8) V/onNext: SomeClass2(a=8.0) V/onNext: SomeClass1(a=10, b=10) V/onNext: SomeClass2(a=10.0) V/onNext: SomeClass1(a=12, b=12) V/onNext: SomeClass2(a=12.0) V/onNext: SomeClass1(a=14, b=14) V/onNext: SomeClass2(a=14.0) V/onNext: SomeClass1(a=16, b=16) V/onNext: SomeClass2(a=16.0) V/onNext: SomeClass1(a=18, b=18) 

Al igual que sospechaste correctamente en el comentario de tu otra pregunta , este problema sí está relacionado.

1.) Se suscribe dos veces a sus dos fonts observables, una directamente para cada una de ellas y otra vez a través de la suscripción a la merge d Observable.

2.) Entonces tiene cuatro Observables funcionando en total, donde dos de ellos están incrementando (y leyendo de) el counter y los otros dos están incrementando (y leyendo de) el counter2 .

3.) Para cada uno de estos pares, los dos intervalos se compensan ligeramente y cada flatMap del primero de cada par ve el valor n , lo incrementa a n+1 y lo imprime. Luego, poco después, aparece la otra instancia y ve n+1 , se incrementa a n+2 , imprime eso, etc.

Finalmente, el buffer oculta que todo esto sucede entrelazado porque imprime todos los valores pares después de todos los impares.

Cualquier solución realmente depende de lo que quiera lograr: ¿se trata simplemente de un ejemplo de patio de recreo o modela algún problema real?

  • El método de callback a menudo para reenviar el evento a Observable?
  • Retrofit 2, Rx 2 y llamadas asincrónicas
  • Excepción causada por: java.lang.ClassNotFoundException: org.reactivestreams.Publisher
  • ¿Comportamiento incorrecto de Maybe <List <T >> en Room?
  • HttpException no capturado por onError ()
  • Cómo hacer que la function regrese Observable
  • Prueba RxJava2 Flowable Query Room
  • EXCEPCIÓN FATAL: RxCachedThreadScheduler-1 cuando el gatillo se deshace. ¿Por qué?
  • Single.zip completando antes de Success
  • Reescriba el código de Java en Kotlin utilizando la reference de function.
  • ¿Cómo se simula emitir 2 streams infinitas observables y tener otras observables que las fusionan y amortiguan cada 10 segundos?