Rx Java Retrofit con flatMap se ejecuta solo una vez

Estoy tratando de get un observable de una secuencia de button, click flatmap con otro observable de la modificación para solicitar un punto final, ¡pero solo se ejecutará una vez! Cuando hago clic de nuevo, no paso por el método de map plano.

Es extraño porque cuando el map plano devuelve otros observables, funciona bien, pero con la actualización uno solo se ejecuta una vez.

override fun onCreate(savedInstanceState: Bundle?) { super.onCreate(savedInstanceState) setContentView(R.layout.activity_main) var retrofit:Retrofit = Retrofit.Builder() .baseUrl("<SERVER_IP>") .addConverterFactory(GsonConverterFactory.create()) .addCallAdapterFactory(RxJavaCallAdapterFactory.createWithScheduler(Schedulers.io())) .build() var testService:TestService = retrofit.create(TestService::class.java) var buttonObservable:Observable<Void> = RxView.clicks(btnRequest) buttonObservable .observeOn(Schedulers.newThread()) .subscribeOn(AndroidSchedulers.mainThread()) .flatMap { Log.i("debug", "run flatmap") var request:Request = Request() request.accessToken = "<acess_token>" testService.test(request) } .subscribeOn(Schedulers.io()) .observeOn(AndroidSchedulers.mainThread()) .subscribe({ user -> Log.i("debug", "user ") }, { err -> Log.d("debug", "err: ${err.message}") }) } 

Interfaz de testing:

 interface TestService { @POST("protected") fun test(@Body request: Request) : Observable<User> } 

Clase de request:

 class Request { var accessToken:String = "" } 

¡¡¡¡Gracias por adelantado!!!!

EDITAR: Al final, lo que sucedía era que se activaba onError, lo que detiene la transmisión.

El siguiente código lo resuelve:

 buttonObservable .observeOn(Schedulers.newThread()) .subscribeOn(AndroidSchedulers.mainThread()) .flatMap { try { Log.i("debug", "run flatmap") var blaService = retrofit.create(TestService::class.java) var request: Request = Request() blaService.test(request) } catch(t:Throwable) { Observable.empty<Void>() } } .subscribeOn(Schedulers.io()) .observeOn(AndroidSchedulers.mainThread()) .subscribe({ user -> Log.i("debug", "user ") }, { err -> Log.d("debug", "err: ${err.message}") }, { Log.i("debug", "onComplete") }) 

Pero no es la mejor manera de hacerlo, ¿hay una forma correcta de continuar la transmisión después de presionar onError o onComplete?