Crear un observable común que evite múltiples llamadas

Tengo un método que lee / escribe un post.

fun sendMessage(message: String): Observable<MESSAGE> { return readMessage().doOnSubscribe { socket.write(message) } } 

readMessage() devuelve un tema de publicación que emite un valor de un elemento observable en caliente de una secuencia ( socket.read() ).

 protected fun readMessage(): Observable<MESSAGE> { if (messageSubject == null) { messageSubject = PublishSubject.create() socket.read() .flatMap { return@flatMap flowTransformer.runLoop(it) } .flatMap { //Do some stuff } .subscribe(messageSubject) } return messageSubject } 

Llamo a sendMessage() en un punto diferente y varias veces en la misma cadena.

  sendMessage("Message1").flatMap { sendMessage("Message2") }.flatMap { sendMessage("Message 3") }.subscribe({ //next }, { //error }) 

El problema es cuando llamo sendMessage() Es posible que todavía no me haya suscrito al editor (por lo que la respuesta del post es drop). Me temo que si uso ReplaySubject emitirá demasiados posts, porque uso mucho sendMessage() .

Algún time el readObservable del primer sendMessage lee todo el siguiente post. Y es un problema porque la operación de análisis es intensiva de la CPU.

¿Cómo podría mejorar esa cadena?