Articles of rx java2

¿Hay alguna manera de cambiar mi método a la stream Observable que será una cadena de modificadores?

Originalmente tengo una function como esta: private fun rxSaveEvents( events: List<LocationEvent>, locationUpdatesAboutToStop: Boolean): Boolean { synchronized(mEventsMonitor) { if (events.isNotEmpty()) { val locationEvents = LocationEvents() segregateLocationEvents(events, locationEvents) scoreReadings(locationEvents) sendLocationEventsInternal(locationEvents, locationUpdatesAboutToStop) Completable.fromCallable { updateLocationEventsToDb(locationEvents) }.subscribeOn(Schedulers.io()).subscribe() } if (locationUpdatesAboutToStop) { stopLocationUpdates() Completable.fromCallable { cleanAllReadingsData() }.subscribeOn(Schedulers.io()).subscribe() Completable.fromCallable { removeExpinetworkingData() }.subscribeOn(Schedulers.io()).subscribe() } locationEvents.clear() return true } } Esta function se […]

Retrofit 2, Rx 2 y llamadas asincrónicas

Estoy comenzando con Retrofit y Rx y creé una API sonriente, con tal configuration (Kodein): bind<CallAdapter.Factory>() with singleton { RxJava2CallAdapterFactory.create() } bind<RetrofitInterface>() with singleton { val retrofit = Retrofit.Builder() .client(instance()) .baseUrl(instance<String>("apiRoot")) .addCallAdapterFactory(instance()) .addConverterFactory(instance()) .build() retrofit.create(RetrofitInterface::class.java) } Ahora parece que cuando llamo a mis methods API creados por Retrofit, se suscriben en el hilo actual en […]

Fusionar observables dependientes

Tengo dos Observable que emiten el mismo tipo de datos. Pero ambos dependen del último valor emitido por el otro para su operador de escaneo como valor inicial. Necesito fusionarlos. No puedo hacer que uno se suscriba al otro porque mis Observable se disparan por diferentes valores. Estoy pensando en que los sujetos deberían estar […]

Observable.fromCallable () implementación con exception

Estoy tratando de ejecutar una function que arroja excepciones. Observable.fromCallable { foo() } .subscribeOn(Schedulers.io()) .observeOn(AndroidSchedulers.mainThread()) .take(1) .onErrorReturn { /* onError operation */ } .subscribe { } y quiero hacer alguna otra operación si tiene éxito sin ningún error. ¿Cómo puedo hacer eso?

¿Por qué tengo un Log de salida no deseado al fusionar 2 observables en otro Observable, que los almacena temporalmente cada 10 segundos?

Estoy tratando de simular la fusión de 2 secuencias observables diferentes, que están emitiendo algún object por segundo. Este object tiene el mismo padre en esas 2 secuencias. Pensé, que en la console aparecerá un nuevo object con valor después de 1 segundo. Sin embargo, cuando imprimo esos objects, obtengo objects que saltearon la emisión […]

Descargar un file desde un control remoto y savelo en un dispositivo Android

Lo que bash hacer es desacoplar cómo se descarga el file desde donde lo guardo, también conocido como almacenamiento. El código que funciona bien: El siguiente código descarga un file del control remoto y finalmente lo guarda en el almacenamiento. private fun downloadFile(url: URL): Observable<Int> { return Observable.create(fun(emitter) { var input: InputStream? = null var […]

RxJava2 switchIfEmpty y verificando una ejecución

así que imagina que tengo un método de build así: LocalDatabase: public Observable<PoiObject> getPoiObject() { return Observable.defer { PoiObject object = poiDao.getPoiObject(); if(object == null) { return Observable.empty(); } else { return Observable.just(object); } } } ahora, tengo otro método en otro lugar que dice así: Servicio: public Observable<PoiObject> getPoiObject() { return localDatabase.getPoiObject() } public […]

RxJava2: onComplete no llamado con flatMapIterable

Aquí hay un pequeño fragment de código: val subject = BehaviorSubject.createDefault(emptyList<Int>()) subject.onNext(Arrays.asList(1, 2, 3)) subject.flatMapIterable { list: List<Int> -> list } .subscribeBy( onNext = { l("on next", it) }, onComplete = { l("on complete") } ) ¿Por qué onComplete no llama aquí? ¿Qué debería hacer para trabajar este código? Porque en el código original no […]

Encadenando Observables para evitar suscripciones múltiples

Tengo algunos problemas para encadenar estos observables. Tengo func1() que arroja una exception en caso de error. En el onError tengo func2() que hace una recuperación de database y func3() que hace una database de guardado modificando el documento recuperado. ¿Cómo puedo encadenar esto para evitar suscripciones múltiples? Observable.fromCallable { func1() } .subscribeOn(Schedulers.io()) .observeOn(AndroidSchedulers.mainThread()) .take(1) […]

RxJava2 + Retrofit pantalla negra en request de datos

Estoy enfrentando un problema con una aplicación de Android mientras trato de recuperar algunos datos Api utilizando RxJava2 y Retrofit2 (en Kotlin). Una vez que ejecuto la llamada, la pantalla se vuelve negra y la aplicación ya no responde, sin ningún post de logging. La URL de llamada que estoy usando es la siguiente: api/apartments?projection={"id":1,"address":1} […]