Manejo de errores de suscripción anidada RX2.0

Estoy tratando de descubrir cómo captan las excepciones las transmisiones RX. En particular, si tengo una suscripción dentro de una suscripción y la suscripción interna recibe una exception de la que es observable, ¿debo manejar eso en onError para suscripción interna o puedo manejarlo en onerror de suscripción principal?

observableThing1.subscribe({ _ -> observableThing2.subscribe({ _ -> // Something horrible happens here an an exception is throw by observableThing2 }) }, { error -> // Expecting I can handle all errors at the top most subscription }) 

En su caso, necesitaría manejar onError para cada subscription :

  observable1Thing().subscribe({ obs1Result -> observable2Thing().subscribe({ obs2Result -> }, { obs2Error -> }) }, { obs1error -> }) 

Si combina los observables con un operador como flatMap , solo necesitaría una subcription y, por lo tanto, manejaría solo un caso de onError . Qué operador usarías depende de cómo quieras combinar los observables .

Ejemplo:

  obs1().flatMap { obs2() }.subscribe({ }, { error -> }) 
  • Kotlin no puede inferir el tipo cuando utiliza la reference de método en Flowable
  • rxjava2 - ejemplo simple de ejecutar tareas en un grupo de subprocesss, suscribirse en un solo hilo
  • RxJava salida diferente entre Flowable y Observable con window y Groupby
  • Cómo retrasar onError () en RxJava 2 y Android?
  • RxJava 2 requiere un tipo de retorno Observable diferente de RxJava 1 (Kotlin)
  • Escuchar posts y escribir commands en un flujo observable
  • ¿Por qué mi configuration de RxJava está bloqueando mi subprocess de interfaz de usuario? Trabajar con la callback BluetoothAdapter.startLeScan
  • RxJava Observable.create envolver suscripciones observables
  • Android Architecture Components Room ViewModel CompleteableFormAction
  • Aplicando transformación a cada elemento en Single <List <T >>
  • Kotlin: Cómo convertir la testing que usa Thread.sleep a RxJava TestScheduler