Crear un observable común que evite múltiples llamadas

Tengo un método que lee / escribe un post.

fun sendMessage(message: String): Observable<MESSAGE> { return readMessage().doOnSubscribe { socket.write(message) } } 

readMessage() devuelve un tema de publicación que emite un valor de un elemento observable en caliente de una secuencia ( socket.read() ).

 protected fun readMessage(): Observable<MESSAGE> { if (messageSubject == null) { messageSubject = PublishSubject.create() socket.read() .flatMap { return@flatMap flowTransformer.runLoop(it) } .flatMap { //Do some stuff } .subscribe(messageSubject) } return messageSubject } 

Llamo a sendMessage() en un punto diferente y varias veces en la misma cadena.

  sendMessage("Message1").flatMap { sendMessage("Message2") }.flatMap { sendMessage("Message 3") }.subscribe({ //next }, { //error }) 

El problema es cuando llamo sendMessage() Es posible que todavía no me haya suscrito al editor (por lo que la respuesta del post es drop). Me temo que si uso ReplaySubject emitirá demasiados posts, porque uso mucho sendMessage() .

Algún time el readObservable del primer sendMessage lee todo el siguiente post. Y es un problema porque la operación de análisis es intensiva de la CPU.

¿Cómo podría mejorar esa cadena?

Puede crear un object PublishSubject con un tamaño de búfer

 public void ReplaySubjectBufferExample() { var bufferSize = 2; var subject = new ReplaySubject<string>(bufferSize); subject.OnNext("a"); subject.OnNext("b"); subject.OnNext("c"); subject.Subscribe(Console.WriteLine); subject.OnNext("d"); } 

Entonces, de esta manera, puede controlar la cantidad de elementos que se reproducirán. Source ReplaySubject

  • RxJava Debounce onNext ()
  • rx.Scheduler no se puede proporcionar sin un método @ Provides- o @ Produces-anotado
  • Uso del map RXJava en Kotlin?
  • Rxjava retrofit parse api error para el usuario
  • Pruebas unitarias Rxjava observables que tienen un retraso
  • Recuperar respuesta JSON de rxjava / retrofit request POST
  • RxJava (Kotlin), Observable.amb y PublishSubject no están disparando
  • Orden de configuration de progtwigdores en Rx
  • Kotlin: ¿Qué significa "return @"?
  • Cómo fusionar 2 flujos separados, almacenar los datos rellenos y subscribirlos después de un time corto
  • Prueba de intervalo infinito RxJava