¿Cómo continuar el procesamiento después de que ocurra un error en RxJava 2?

Tengo un PublishSubject y un Subscriber que utilizo para procesar una (posiblemente) secuencia infinita de datos preprocesados. El problema es que algunos de los elementos pueden contener algún error. Me gustaría ignorarlos y continuar el process. ¿Como lo puedo hacer? He intentado algo como esto:

  val subject = PublishSubject.create<String>() subject.retry().subscribe({ println("next: $it") }, { println("error") }, { println("complete") }) subject.onNext("foo") subject.onNext("bar") subject.onError(RuntimeException()) subject.onNext("wom") subject.onComplete() 

Mi problema es que ninguno de los methods de event handling errores me ayuda aquí:

onErrorResumeNext() : ordera a un Observable que emita una secuencia de elementos si encuentra un error

onErrorReturn( ) – ordera a un observable que emita un elemento en particular cuando encuentra un error

onExceptionResumeNext( ) : ordera a un observable que continúe emitiendo elementos después de que encuentra una exception (pero no otra variedad de throwable)

retry( ) – si una fuente Observable emite un error, vuelva a suscribirse con la esperanza de que se complete sin error

retryWhen( ) – si una fuente observable emite un error, pase ese error a otro observable para determinar si volverá a suscribirse a la fuente

Intenté retry() por ejemplo, pero cuelga mi process después del error indefinidamente.

También probé onErrorResumeNext() pero no funciona como se esperaba:

  val backupSubject = PublishSubject.create<String>() val subject = PublishSubject.create<String>() var currentSubject = subject subject.onErrorResumeNext(backupSubject).subscribe({ println("next: $it") }, { println("error") currentSubject = backupSubject }, { println("complete") }) backupSubject.subscribe({ println("backup") }, { println("backup error") }) currentSubject.onNext("foo") currentSubject.onNext("bar") currentSubject.onError(RuntimeException()) currentSubject.onNext("wom") currentSubject.onComplete() 

Esto solo imprime foo y bar .

Si desea continuar procesando después de un error, significa que su error es un valor como su String y debe continuar en onNext . Para garantizar la security del tipo en este caso, debe usar alguna forma de contenedor que puede tomar un valor normal o un error; por ejemplo, io.reactivex.Notification<T> está disponible en RxJava 2:

 PublishSubject<Notification<String>> subject = PublishSubject.create(); subject.subscribe(System.out::println); subject.onNext(Notification.createOnNext("Hello")); subject.onNext(Notification.<String>createOnError(new RuntimeException("oops"))); subject.onNext(Notification.createOnNext("World")); 
  • ¿Cómo soluciono el error de inferencia de tipo en un Completable transformado utilizando RxLifecycle.bindToLifecycle ()?
  • Para una function de Kotlin utilizada como expresión, ¿hay una forma concisa de operar y devolver un valor?
  • Cómo hacer un event handling errores en rxjava2 en android
  • RxJava zipCon error IDE en Kotlin con Android Studio 3.0
  • Confundido sobre la asignación de la variable RxJava
  • RxJava salida diferente entre Flowable y Observable con window y Groupby
  • TestScheduler no funciona (Kotlin + RxJava2 + mockito)
  • Cómo contar el time de ejecución de lo observable
  • RxJava2 switchIfEmpty y verificando una ejecución
  • ¿Cómo crear un intervalo infinito observable que emitirá un nuevo object cada intervalo de time?
  • Retrofit 2, Rx 2 y llamadas asincrónicas