Cómo fusionar 2 flujos separados, almacenar los datos rellenos y subscribirlos después de un time corto

Estoy intentando poner a testing una situación como esta:

Tengo 2 classess que solo se extiende desde el mismo Parent.

Estoy creando y Observables de la list de elementos para cada class:

val listSomeClass1 = ArrayList<SomeClass1>() val listSomeClass2 = ArrayList<SomeClass2>() fun populateJust1() { listSomeClass1.add(SomeClass1("23", 23)) listSomeClass1.add(SomeClass1("24", 24)) listSomeClass1.add(SomeClass1("25", 25)) } fun populateJust2() { listSomeClass2.add(SomeClass2(23.00)) listSomeClass2.add(SomeClass2(24.00)) listSomeClass2.add(SomeClass2(25.00)) } populateItemsSomeClass1() populateItemsSomeClass2() 

Ahora puedo crear 2 observables:

  val someClass1Observable = Observable.fromIterable(listSomeClass1) val someClass2Observable = Observable.fromIterable(listSomeClass2) 

Y aquí, quiero fusionar la emisión de ellos, almacenarla y suscribirme a ella después de 10 segundos:

  Observable.merge(someClass1Observable, someClass2Observable) .buffer(10, TimeUnit.SECONDS) .doOnSubscribe { Log.v("parentObservable", "STARTED") } .subscribe { t: MutableList<Parent> -> Log.v("parentObservable", "onNext") t.forEach { Log.v("onNext", it.toString()) } } 

Sin embargo, el observable no comienza después de 10 segundos como esperaba, y acaba de comenzar inmediatamente con estos datos listos. Cómo simular algo así, que reuniré 2 flujos separados y después de 10 segundos podré get los datos recostackdos

Debo señalar que no quiero usar ningún Asunto.

ACTUALIZAR

He hecho algo así:

  val list1 = listOf(SomeClass1("1", 1), SomeClass1("2", 2), SomeClass1("3", 3)) val list2 = listOf(SomeClass2(5.00), SomeClass2(4.00), SomeClass2(6.00)) val someClass1Observable = Observable .fromIterable(list1) .zipWith(Observable.interval(2, TimeUnit.SECONDS), BiFunction { item: SomeClass1, _: Long -> item }) val someClass2Observable = Observable .fromIterable(list2) .zipWith(Observable.interval(1, TimeUnit.SECONDS), BiFunction { item: SomeClass2, _: Long -> item }) someClass1Observable.subscribe { Log.v("someClass1", it.toString()) } someClass2Observable.subscribe { Log.v("someClass2", it.toString()) } Observable.merge(someClass1Observable, someClass2Observable) .buffer(10, TimeUnit.SECONDS) .delay(10, TimeUnit.SECONDS) .doOnSubscribe { Log.v("parentObservable", "STARTED") } .subscribe { t: MutableList<Parent> -> Log.v("parentObservable", "onNext") t.forEach { Log.v("onNext", it.toString()) } } Thread.sleep(13000) someClass1Observable.subscribe { Log.v("someClass1", it.toString()) } someClass2Observable.subscribe { Log.v("someClass2", it.toString()) } 

Aquí, quiero simplemente simular 2 secuencias infinitas de algunos ClassClass1 y someclass2 Observables y lo mismo para la fusión Observable.

Una vez más, quiero tener la capacidad de combinar esas 2 secuencias, búfer de datos poblados y hacer algo con él después de 10 segundos. Si después de 10 segundos esos 2 flujos volverán a poblar algunos datos, la fusión de Observable debería limpiar el búfer anterior, y debería volver a almacenar los datos nuevos y emitir después de 10 segundos, y así sucesivamente, infinito. Sin embargo, mi código no está funcionando como esperaba, ¿qué cambios debo hacer para hacerlo tal como lo describí?