RxJava cómo crear Observable a partir de una Suscripción

Estoy buscando una manera de crear Observable después de procesar el resultado en la subscribe .

Dado que tengo este Observable de productRepo.list() que es Retrofit que vuelve Observable<Response<ProductResponse>> .

 productRepo .list() .retry(3) .subscribe { response -> if (response.isSuccessful) { response.body().apply { cache.saveProducts(data) } } } 

El objective de esto es save el resultado en el cache DB local. Esto más otra llamada muy similar llenan DB local con datos remotos de API.

Después de completar las dos llamadas, quería cargar los datos del cache .

No quiero combinar ambos observables de ninguna manera. Solo quiero ejecutar alguna tarea después.

Quiero este manejo como una unidad en el gráfico de llamadas de Rx, de modo que haga Call1 y Call2 al mismo time, y una vez que Call1 y Call2 completen, ejecute Task3. ¿Cuál es la mejor manera en este escenario? Realmente prefiero si el suscriptor de cada llamada está separado.

¿ flatMap es la mejor opción aquí?

Como has mencionado,

Realmente prefiero si el suscriptor de cada llamada está separado.

Supongamos que tenemos dos observables

 val call1 = Observable.from(arrayOf(1,2,3,4,5,6,7,8)) val call2 = Observable.from(arrayOf(2,4,6,8)) 

Si utilizamos únicamente Observable.zip como sigue, solo puede tener un único suscriptor para ambos Call1 y Call2.

 Observable.zip(call1,call2) {c1, c2 -> Pair(c1,c2) }.subscribe(task3Subscriber) 

Si usamos tres suscriptores separados de la siguiente manera, la transmisión Call1 y Call2 se activará dos veces .

 call1.subscribe(call1Subscriber) call2.subscribe(call2Subscriber) Observable.zip(call1,call2) {c1, c2 -> Pair(c1,c2) }.subscribe(task3Subscriber) 

Por lo tanto, necesitamos usar .share().cacheWithInitialCapacity(1) para hacer los trucos

 val call1 = Observable.from(arrayOf(1,2,3,4,5,6,7,8)) .share() .cacheWithInitialCapacity(1) val call2 = Observable.from(arrayOf(2,4,6,8)) .share() .cacheWithInitialCapacity(1) val task3Signal = Observable.zip(call1,call2){ c1, c2 -> c1 + c2 } call1.subscribe(call1Subscriber) call2.subscribe(call2Subscriber) task3Signal.subscribe(task3Subscriber) 

También puede probar / probar su concepto del gráfico Rx a partir de un simple caso de testing.

 class SimpleJUnitTest { @Test fun test(){ val call1 = Observable.from(arrayOf(1,2,3,4,5,6,7,8)) .doOnNext { println("call1 doOnNext $it") } .share() .cacheWithInitialCapacity(1) val call2 = Observable.from(arrayOf(2,4,6,8)) .doOnNext { println("call2 doOnNext $it") } .share() .cacheWithInitialCapacity(1) val task3Signal = Observable.zip(call1,call2){ c1, c2 -> println("task3Signal c1:$c1, c2: $c2") c1 + c2 } val testSubscriber1 = TestSubscriber<Int>() val testSubscriber2 = TestSubscriber<Int>() val testSubscriber3 = TestSubscriber<Int>() call1.subscribe(testSubscriber1) call2.subscribe(testSubscriber2) task3Signal.subscribe(testSubscriber3) testSubscriber1.assertReceivedOnNext(listOf(1,2,3,4,5,6,7,8)) testSubscriber2.assertReceivedOnNext(listOf(2,4,6,8)) testSubscriber3.assertReceivedOnNext(listOf(3,6,9,12)) testSubscriber1.assertValueCount(8) testSubscriber2.assertValueCount(4) testSubscriber3.assertValueCount(4) } } 

Salida:

 call1 doOnNext 1 call1 doOnNext 2 call1 doOnNext 3 call1 doOnNext 4 call1 doOnNext 5 call1 doOnNext 6 call1 doOnNext 7 call1 doOnNext 8 call2 doOnNext 2 call2 doOnNext 4 call2 doOnNext 6 call2 doOnNext 8 task3Signal c1:1, c2: 2 task3Signal c1:2, c2: 4 task3Signal c1:3, c2: 6 task3Signal c1:4, c2: 8 
 .doOnNext() 

es su respuesta, porque devolverá su respuesta final o cada respuesta si son múltiples. Pruebalo.

Eche un vistazo a Zip . Haz algo como, Observable.zip (firstObservable, secondObservable, ….. {Task 3}

  • RxJava Observable a Completable, cómo evitar toBlocking ()
  • La subclass de class sellada de Kotlin debe convertirse a la class base si se proporciona como RxJava Observable
  • Cómo consultar Realm en el background Subprocess usando RxJava2
  • RxJava- Gire Observable en Iterator, Stream o Sequence
  • Fusionar datos de diferentes Observables y elegir diferentes estrategias de búsqueda, según la disponibilidad de datos
  • Cómo burlarse del repository reactivo que devuelve Observable
  • RX java / Android cómo lograr este brindis en cada clic usando el operador de rebote
  • ¿Por qué tengo un Log de salida no deseado al fusionar 2 observables en otro Observable, que los almacena temporalmente cada 10 segundos?
  • Mockito querido pero no invocado
  • RxJava: onBackpressureBlock () comportamiento extraño
  • Convirtiendo un Observable en un Flujo con contrapresión en RxJava2