Cambio observable a condición cumplida – RxJava2

Usando RxJava2 RxKotlin and Room , necesito consultar la database para una búsqueda abierta. Eso significa que busco una búsqueda que contenga un atributo llamado closed con valor false . Una vez que se ha encontrado la búsqueda, debe cambiar la consulta a esa búsqueda en particular.

Tengo 2 methods para esas consultas:

 getOpenHunt(teamId:String): Flowable<List<Hunt>> getHunt(huntId:String): Flowable<List<Hunt>> 

Ambos devuelven una List porque, de lo contrario, la consulta se bloquea cuando no se encuentra la búsqueda.

Mi idea es algo así como

 fun queryHunt(teamId:String):Flowable<Optional<Hunt>>{ getOpenHunt(teamId) .map<Optional<Hunt>> { Optional.create(it.firstOrNull()) } .switchToFlowableIf ( it is Optional.Some, getHunt(it.element().id) } //With switchToFlowableIf's being fun <E:Any> switchToFlowableIf(condition: (E)->Boolean, newFlowable: Flowable<E>): Flowable<E> //It should unsubscribe from getOpenHunt and subscribe to newFlowable 

Como reference, aquí está mi class Optional

 sealed class Optional<out T> { class Some<out T>(val element: T) : Optional<T>() object None : Optional<Nothing>() fun element(): T? { return when (this) { is Optional.None -> null is Optional.Some -> element } } companion object { fun <T> create(element: T?): Optional<T> { return if (element != null) { Optional.Some(element) } else { Optional.None } } } } 

¿Existe un método similar ya construido en RxJava2? Si no, ¿cómo lo implementarías?

    Lo resolví mirando onErrorResumeNext. Copie pegado algún código y modificado a mis necesidades. No creo que esto sea perfecto, pero cumple su function. Comente si encuentra algunos posibles errores.

     public final class FlowableOnPnetworkingicateNext<T> extends AbstractFlowableWithUpstream<T, T> { private final Pnetworkingicate<? super T> pnetworkingicate; private final Function<? super T, ? extends Publisher<? extends T>> next; public FlowableOnPnetworkingicateNext(Flowable<T> source, Pnetworkingicate<? super T> pnetworkingicate, Function<? super T, ? extends Publisher<? extends T>> next) { super(source); this.pnetworkingicate = pnetworkingicate; this.next = next; } @Override protected void subscribeActual(Subscriber<? super T> s) { OnPnetworkingicateNextSubscriber<T> parent = new OnPnetworkingicateNextSubscriber<>(s, next, pnetworkingicate); s.onSubscribe(parent.arbiter); source.subscribe(parent); } static final class OnPnetworkingicateNextSubscriber<T> implements FlowableSubscriber<T> { private final Subscriber<? super T> actual; private final Pnetworkingicate<? super T> pnetworkingicate; private final SubscriptionArbiter arbiter; private final Function<? super T, ? extends Publisher<? extends T>> nextSupplier; private boolean switched = false; OnPnetworkingicateNextSubscriber(Subscriber<? super T> actual, Function<? super T, ? extends Publisher<? extends T>> nextSupplier, Pnetworkingicate<? super T> pnetworkingicate) { this.actual = actual; this.pnetworkingicate = pnetworkingicate; this.nextSupplier = nextSupplier; this.arbiter = new SubscriptionArbiter(); } @Override public void onSubscribe(Subscription s) { arbiter.setSubscription(s); } @Override public void onNext(T t) { try { if (!switched && pnetworkingicate.test(t)) { Publisher<? extends T> p; p = nextSupplier.apply(t); p.subscribe(this); switched = true; } else { actual.onNext(t); } } catch (Exception e) { actual.onError(e); } } @Override public void onError(Throwable t) { actual.onError(t); } @Override public void onComplete() { actual.onComplete(); } } } 

    Usando Kotlin, escribí una function de extensión:

     @CheckReturnValue @BackpressureSupport(BackpressureKind.FULL) @SchedulerSupport(SchedulerSupport.NONE) fun <E, T : Flowable<E>> T.onPnetworkingicateResumeNext(pnetworkingicate: Pnetworkingicate<E>, resumeFunction: io.reactivex.functions.Function<E, Publisher<E>>): Flowable<E> { return RxJavaPlugins.onAssembly<E>(FlowableOnPnetworkingicateNext<E>(this, pnetworkingicate, resumeFunction )) } 

    Y ahora lo estoy usando así:

     override fun getOpenHunt(teamId: String): Flowable<Optional<Hunt>> { return created().getOpenHunt(teamId) .map { Optional.create(it.firstOrNull()) } .onPnetworkingicateResumeNext(pnetworkingicate = Pnetworkingicate { it.element() != null }, resumeFunction = Function { created() .getHunt(it.element()!!.id) .map<Optional<Hunt>> { Optional.create(it.firstOrNull()) } }) } 

    Lo haría de esa manera reutilizando tus methods:

     getOpenHunt(teamId:String): Flowable<List<Hunt>> getHunt(huntId:String): Flowable<List<Hunt>> getOpenHunt(yourId) // return your Flowable<List<Hunt>> .flatMapIterable { it } // get each result of open hunts .flatMapMaybe { getHunt(it.id) } // querie each result .toList() // back to a Maybe/Single<List<Hunt>> .toFlowable() // back to a Flowable else it wont emit more data .subscribeBy(onNext = { /** process your results **/ }, onError = { /** process if no results found **/ } ) 

    Eso significa que consulta su database si hay una input que coincida con su condición. Luego obtiene el resultado, lo divide en elementos individuales, ejecuta varias consultas usando sus resultados. Si se encuentran resultados, lo convierte nuevamente en una Single<List<Hunt>> . Como quieres mantenerte suscrito, debes convertirlo nuevamente a un .toFlowable() usando .toFlowable()

    Por cierto, cambiar significa que el valor de retorno cambia, pero en su caso es la misma List<Hunt> .

    Todo el flujo no tiene ningún sentido para mí, ya que también puedes get los resultados de getOpenHunt e iterar a través de él.