¿Cómo se simula emitir 2 streams infinitas observables y tener otras observables que las fusionan y amortiguan cada 10 segundos?

Quiero poder simular la fusión de dos flujos separados que emiten algunos objects (que extienden el mismo object principal), almacenarlos en búfer utilizando el operador de búfer y emitir los datos recostackdos después de 10 segundos. Quiero que este mecanismo sea infinito, de modo que esta fusión / búfer siempre se invocará siempre que haya una emisión de las 2 streams separadas.

Esto es lo que hice hasta ahora:

val list1 = mutableListOf<SomeClass1>( SomeClass1("1", 1), SomeClass1("2", 2), SomeClass1("3", 3), SomeClass1("4", 4), SomeClass1("5", 5), SomeClass1("6", 6), SomeClass1("7", 7), SomeClass1("8", 8), SomeClass1("9", 9) ) val list2 = mutableListOf<SomeClass2>( SomeClass2(1.00), SomeClass2(2.00), SomeClass2(3.00), SomeClass2(4.00), SomeClass2(5.00), SomeClass2(6.00), SomeClass2(7.00), SomeClass2(8.00), SomeClass2(9.00) ) val someClass1Observable = Observable .fromIterable(list1) .zipWith(Observable.interval(2, TimeUnit.SECONDS), BiFunction { item: SomeClass1, _: Long -> item }) val someClass2Observable = Observable .fromIterable(list2) .zipWith(Observable.interval(2, TimeUnit.SECONDS), BiFunction { item: SomeClass2, _: Long -> item }) someClass1Observable.subscribe { Log.v("someClass1", it.toString()) } someClass2Observable.subscribe { Log.v("someClass2", it.toString()) } Observable.merge(someClass1Observable, someClass2Observable) .buffer(10, TimeUnit.SECONDS) .repeat() .doOnSubscribe { Log.v("parentObservable", "STARTED") } .subscribe { t: MutableList<Parent> -> Log.v("parentObservable", "onNext") t.forEach { Log.v("onNext", it.toString()) } } Thread.sleep(30000) Log.v("AFTER_SLEEP", "AFTER_SLEEP") someClass1Observable.subscribe { Log.v("someClass1", it.toString()) } someClass2Observable.subscribe { Log.v("someClass2", it.toString()) } 

La primera emisión de las 2 streams funciona bien, la combinación / búfer Observable está recolectando emisiones de ellas cada vez después de 10 segundos. Sin embargo, cuando esos 2 flujos terminaron las emisiones y me suscribo de nuevo, el buffer / merge Observable no funciona. ¿Cómo hacer que esto funcione para infinito? ¿Hay una mejor manera de escribir código para esas 2 secuencias separadas que emiten objects que no necesitarán leer valores de una list y en lugar de esto emitirán algún object nuevo cada 2 segundos de intervalo? ¿Cómo hacer que la combinación / buffer Observable funcione infinitamente, quiero decir cada vez que hay una nueva emisión de esas 2 streams observables?