Escuchar posts y escribir commands en un flujo observable

Tengo un método que abre una connection ( Single<Connection> ) y otro que devuelve bytes ( Observable<Byte> ) que debe escribirse en la connection cuando está abierto:

 interface Connection { fun read(): Observable<Byte> fun write(command: Byte) } fun openConnection(): Single<Connection> // opens a single connection fun toBeWritten(): Observable<Byte> // output messages from elsewhere 

Problema : deseo hacer un flujo observable que lea los posts entrantes y al mismo time se suscriba a toBeWritten() y escriba los valores emitidos en la connection.

Hasta ahora lo he conseguido trabajando con el siguiente truco: combinar el flujo de input con la salida usando withLatestFrom y escribiendo con doOnNext .

 openConnection() .toObservable() .flatMap { connection -> connection.read() .withLatestFrom( toBeWritten() .doOnNext { connection.write(it) } .startWith(0), BiFunction { readMe: Byte, writeMe: Byte -> readMe } ) } .retry() .subscribe { handleReadMessage(it) } 

El startWith(0) después de doOnNext es necesario, o no se leerá nada hasta que toBeWritten() emita algo.

Pregunta : ¿hay una mejor solución para este problema? este se siente hackish .

Bono : ¿desventajas de esta solución? (¿pérdidas de memory? ¿etc.?)