Crear un observable común que evite múltiples llamadas

Tengo un método que lee / escribe un post.

fun sendMessage(message: String): Observable<MESSAGE> { return readMessage().doOnSubscribe { socket.write(message) } } 

readMessage() devuelve un tema de publicación que emite un valor de un elemento observable en caliente de una secuencia ( socket.read() ).

 protected fun readMessage(): Observable<MESSAGE> { if (messageSubject == null) { messageSubject = PublishSubject.create() socket.read() .flatMap { return@flatMap flowTransformer.runLoop(it) } .flatMap { //Do some stuff } .subscribe(messageSubject) } return messageSubject } 

Llamo a sendMessage() en un punto diferente y varias veces en la misma cadena.

  sendMessage("Message1").flatMap { sendMessage("Message2") }.flatMap { sendMessage("Message 3") }.subscribe({ //next }, { //error }) 

El problema es cuando llamo sendMessage() Es posible que todavía no me haya suscrito al editor (por lo que la respuesta del post es drop). Me temo que si uso ReplaySubject emitirá demasiados posts, porque uso mucho sendMessage() .

Algún time el readObservable del primer sendMessage lee todo el siguiente post. Y es un problema porque la operación de análisis es intensiva de la CPU.

¿Cómo podría mejorar esa cadena?

Puede crear un object PublishSubject con un tamaño de búfer

 public void ReplaySubjectBufferExample() { var bufferSize = 2; var subject = new ReplaySubject<string>(bufferSize); subject.OnNext("a"); subject.OnNext("b"); subject.OnNext("c"); subject.Subscribe(Console.WriteLine); subject.OnNext("d"); } 

Entonces, de esta manera, puede controlar la cantidad de elementos que se reproducirán. Source ReplaySubject

  • llamar parte de la secuencia una vez con múltiples suscriptores?
  • Cómo upload el valor de retorno de un constructor
  • ¿Por qué mi configuration de RxJava está bloqueando mi subprocess de interfaz de usuario? Trabajar con la callback BluetoothAdapter.startLeScan
  • Observable.combineLa causa de error más reciente después de actualizar a RxJava 2.xx - no se puede inferir el tipo
  • Cómo fusionar 2 flujos separados, almacenar los datos rellenos y subscribirlos después de un time corto
  • Comportamiento de RxJava Schedulers.immediate () mientras testings unitarias
  • Espere hasta que dos observables emitan verdadero
  • La reconnection de RxJava en observable conectable no funciona
  • No se llama a ninguno de los suscriptores de RxJava onNext / onError / onComplete al encadenar Observables creado desde Observable.create ()
  • Tipo de devolución diferente en RxJava 2 (actualización desde RxJava1)
  • Kolin - Lista de artículos para el artículo de lists