Combina Rx Singles en Observables recursivamente

Digamos que tengo un Single llamado s_0 que puede emitir un elemento t_0 del tipo T o fail (que sería un Single<T> en algunos idiomas). Es decir:

 s_0: -- t_0 // Success OR s_0: -- X // Failure 

Las instancias de tipo T tienen un método next() que también devuelve un Single opcional del tipo T ( Single<T>? In Kotlin ). Este comportamiento conduce a una cadena de instancias Single capaz de emitir una cadena de instancias T , donde cada s_i individual puede emitir un elemento t_i capaz de devolver un siguiente single s_i+1 , que emitirá un elemento t_i+1 y así sucesivamente, hasta el último elemento t_n-1 no devuelve uno o ninguno de los singles falla:

 s_0: -- t_0 ↓ s_1: -- t_1 ↓ s_2: -- t_2 ... ↓ s_n-1: -- t_n-1 ↓ null OR s_0: -- t_0 ↓ s_1: -- t_1 ↓ s_2: -- t_2 ... ↓ s_i: -- X 

Estoy buscando una forma elegante de get un Observable o del tipo T capaz de emitir todos los elementos de la cadena iniciada por s_0 termina satisfactoriamente cuando no hay más singles en la cadena o falla si falla alguno:

 o: -- t_0 -- t_1 -- t_2 -- ... -- t_n-1 --o // Success OR o: -- t_0 -- t_1 -- t_2 -- ... --X // Failure 

Por elegante , me refiero a algo tan simple como esto (en Kotlin ):

 // Get single somehow (out of the scope of this question) val s0: Single<T> = provideSingle() // Get observable val o: Observable<T> = s0.chain() // Define extension method fun Single<T>.chain(): Observable<T> { /* Implement here */ } // Element interface interface T { fun next(): Single<T>? } 

¿Cuál es la aplicabilidad de esto?

Este escenario se puede encontrar al consumir una API REST con pagination, donde las instancias Single se pueden utilizar para recuperar páginas individuales, que a su vez pueden proporcionar instancias Single capaces de emitir páginas posteriores.

No lo he probado, pero basado en una solución que codigo hace un time a un problema de pagination similar que traduje a Kotlin.

 fun Single<T>.chain(): Observable<T> = toObservable() .concatWith { it.next()?.chain() ?: Observable.empty() } 

La key para get ese encadenamiento "recursivo" es el concatWith operador llamando recursivamente al método de chain

 public class Q44535765 { public static void main(String[] args) { Maybe<Element> first = get(); first.toObservable() .compose(o -> chain(o)) .doOnError(e -> System.out.println(e)) .subscribe( e -> System.out.println(e), e -> System.out.println("fail"), () -> System.out.println("complete")); } static Maybe<Element> get() { return Maybe.just( () -> If.<Maybe<Element>> that(Math.random() > 0.1) .tobe(() -> get()) .orbe(() -> If.<Maybe<Element>> that(Math.random() > 0.5) .tobe(() -> Maybe.empty()) .orbe(() -> null) .result()) .result()); } static Observable<Element> chain(Observable<Element> s) { return s.concatMap( e -> Observable.just(e) .concatWith(e.next() .toObservable() .compose(o -> chain(o)))); } interface Element { Maybe<Element> next(); } } 

Mientras que If es mi class de utilidades, puede en lugar de hacerlo if...else...

  • Cómo pasar nulo a un Observable con tipo anulable en RxJava 2 y Kotlin
  • Asunto de RxJava: escuche el tipo diferente que el que emite
  • Pausa / Reanudar un timer / retraso en RX
  • Cómo burlarse del repository reactivo que devuelve Observable
  • RxJava2 cómo separar diferentes implementaciones de emisor observable
  • RxJava - ¿Entradas de keyboard de contrapresión?
  • RxJava: cómo devolver el tipo correcto de nulo
  • RXJava ... emiten un observable cada segundo
  • IncompatibleClassChangeError: Class 'java.lang.VirtualMachineError' no implementa la interfaz 'java.lang.CharSequence'
  • Cómo comprimir algunos observables en lenguaje Kotlin con RxAndroid
  • RxJava Observable a Completable, cómo evitar toBlocking ()