RxJava cómo crear Observable a partir de una Suscripción

Estoy buscando una manera de crear Observable después de procesar el resultado en la subscribe .

Dado que tengo este Observable de productRepo.list() que es Retrofit que vuelve Observable<Response<ProductResponse>> .

 productRepo .list() .retry(3) .subscribe { response -> if (response.isSuccessful) { response.body().apply { cache.saveProducts(data) } } } 

El objective de esto es save el resultado en el cache DB local. Esto más otra llamada muy similar llenan DB local con datos remotos de API.

Después de completar las dos llamadas, quería cargar los datos del cache .

No quiero combinar ambos observables de ninguna manera. Solo quiero ejecutar alguna tarea después.

Quiero este manejo como una unidad en el gráfico de llamadas de Rx, de modo que haga Call1 y Call2 al mismo time, y una vez que Call1 y Call2 completen, ejecute Task3. ¿Cuál es la mejor manera en este escenario? Realmente prefiero si el suscriptor de cada llamada está separado.

¿ flatMap es la mejor opción aquí?

Solutions Collecting From Web of "RxJava cómo crear Observable a partir de una Suscripción"

Como has mencionado,

Realmente prefiero si el suscriptor de cada llamada está separado.

Supongamos que tenemos dos observables

 val call1 = Observable.from(arrayOf(1,2,3,4,5,6,7,8)) val call2 = Observable.from(arrayOf(2,4,6,8)) 

Si utilizamos únicamente Observable.zip como sigue, solo puede tener un único suscriptor para ambos Call1 y Call2.

 Observable.zip(call1,call2) {c1, c2 -> Pair(c1,c2) }.subscribe(task3Subscriber) 

Si usamos tres suscriptores separados de la siguiente manera, la transmisión Call1 y Call2 se activará dos veces .

 call1.subscribe(call1Subscriber) call2.subscribe(call2Subscriber) Observable.zip(call1,call2) {c1, c2 -> Pair(c1,c2) }.subscribe(task3Subscriber) 

Por lo tanto, necesitamos usar .share().cacheWithInitialCapacity(1) para hacer los trucos

 val call1 = Observable.from(arrayOf(1,2,3,4,5,6,7,8)) .share() .cacheWithInitialCapacity(1) val call2 = Observable.from(arrayOf(2,4,6,8)) .share() .cacheWithInitialCapacity(1) val task3Signal = Observable.zip(call1,call2){ c1, c2 -> c1 + c2 } call1.subscribe(call1Subscriber) call2.subscribe(call2Subscriber) task3Signal.subscribe(task3Subscriber) 

También puede probar / probar su concepto del gráfico Rx a partir de un simple caso de testing.

 class SimpleJUnitTest { @Test fun test(){ val call1 = Observable.from(arrayOf(1,2,3,4,5,6,7,8)) .doOnNext { println("call1 doOnNext $it") } .share() .cacheWithInitialCapacity(1) val call2 = Observable.from(arrayOf(2,4,6,8)) .doOnNext { println("call2 doOnNext $it") } .share() .cacheWithInitialCapacity(1) val task3Signal = Observable.zip(call1,call2){ c1, c2 -> println("task3Signal c1:$c1, c2: $c2") c1 + c2 } val testSubscriber1 = TestSubscriber<Int>() val testSubscriber2 = TestSubscriber<Int>() val testSubscriber3 = TestSubscriber<Int>() call1.subscribe(testSubscriber1) call2.subscribe(testSubscriber2) task3Signal.subscribe(testSubscriber3) testSubscriber1.assertReceivedOnNext(listOf(1,2,3,4,5,6,7,8)) testSubscriber2.assertReceivedOnNext(listOf(2,4,6,8)) testSubscriber3.assertReceivedOnNext(listOf(3,6,9,12)) testSubscriber1.assertValueCount(8) testSubscriber2.assertValueCount(4) testSubscriber3.assertValueCount(4) } } 

Salida:

 call1 doOnNext 1 call1 doOnNext 2 call1 doOnNext 3 call1 doOnNext 4 call1 doOnNext 5 call1 doOnNext 6 call1 doOnNext 7 call1 doOnNext 8 call2 doOnNext 2 call2 doOnNext 4 call2 doOnNext 6 call2 doOnNext 8 task3Signal c1:1, c2: 2 task3Signal c1:2, c2: 4 task3Signal c1:3, c2: 6 task3Signal c1:4, c2: 8 
 .doOnNext() 

es su respuesta, porque devolverá su respuesta final o cada respuesta si son múltiples. Pruebalo.

Eche un vistazo a Zip . Haz algo como, Observable.zip (firstObservable, secondObservable, ….. {Task 3}