La function de extensión no crea un nuevo object Observable

Estoy teniendo un comportamiento inesperado con kotlin y rxjava. Creé una function de extensión para cargar imágenes usando picasso

fun Picasso.loadBitmap(url: String) : Observable<Bitmap> = Observable.create<Bitmap> { emitter -> Log.d("picasso load bitmap", "me ${this}") try { val bitmap = load(url).centerCrop() .resize(100, 100) .transform(CircleTransformer()) .get() emitter.onNext(bitmap) emitter.onComplete() } catch (e: IOException) { emitter.onError(e) } } 

Estoy llamando a esto varias veces en un intervalo cerrado (casi al mismo time) como este,

 picasso.loadBitmap(place.image_url) .subscribeOn(Schedulers.io()) .retryWhen { error -> error.zipWith(Observable.range(1, 5), BiFunction<Throwable, Int, RetryWrapper> { t1, t2 -> RetryWrapper(t2.toLong(), t1) }) .flatMap { if(it.delay < 4){ Log.d(TAG, "retry no. ${it.delay} for ${place.image_url}") Observable.timer(it.delay * 5, TimeUnit.SECONDS) } else { Log.d(TAG, "DMD ${place.image_url}") Observable.error { it.error } } } } .subscribe ( { bitmap -> markers.find { it.place.id == place.id }?.let { it.marker.icon = IconFactory.getInstance(context).fromBitmap(bitmap) } }, { Log.e(TAG, "error decoding ${place.image_url}", it) }) 

Espero que cada vez que se loadBitmap , cree un nuevo observable. Pero obtuve esto en los loggings

 09-28 11:17:00.022 31694-32276/? D/picasso load bitmap: me com.squareup.picasso.Picasso@c894e26 09-28 11:17:00.068 31694-32277/? D/picasso load bitmap: me com.squareup.picasso.Picasso@c894e26 09-28 11:17:00.069 31694-31959/? D/picasso load bitmap: me com.squareup.picasso.Picasso@c894e26 09-28 11:17:00.108 31694-32278/? D/picasso load bitmap: me com.squareup.picasso.Picasso@c894e26 09-28 11:17:00.112 31694-32251/? D/picasso load bitmap: me com.squareup.picasso.Picasso@c894e26 09-28 11:17:00.125 31694-32260/? D/picasso load bitmap: me com.squareup.picasso.Picasso@c894e26 09-28 11:17:00.162 31694-31794/? D/picasso load bitmap: me com.squareup.picasso.Picasso@c894e26 09-28 11:17:00.192 31694-32280/? D/picasso load bitmap: me com.squareup.picasso.Picasso@c894e26 09-28 11:17:00.195 31694-32279/? D/picasso load bitmap: me com.squareup.picasso.Picasso@c894e26 09-28 11:17:00.219 31694-32281/? D/picasso load bitmap: me com.squareup.picasso.Picasso@c894e26 09-28 11:17:04.828 31694-32262/? D/picasso load bitmap: me com.squareup.picasso.Picasso@c894e26 09-28 11:17:14.885 31694-31793/? D/picasso load bitmap: me com.squareup.picasso.Picasso@c894e26 09-28 11:17:29.928 31694-32269/? D/picasso load bitmap: me com.squareup.picasso.Picasso@c894e26 

El observable es el mismo para todas loadBitmap llamadas a loadBitmap . Necesito que tengan su propio observable porque si no, cuando retryWhen , cuando falle, no retryWhen al siguiente error. Espero que tenga sentido.

Poner lo observable dentro de un flatmap o flatmap no cambiará nada.

EDITAR mi código

 override fun render(state: MainState) { map?.let { map -> val newMarkers: MutableList<PlaceMarker> = mutableListOf() for(place in state.places) { var placeMarker = placeMarkers.find { it.place.id == place.id } if(placeMarker != null && map.markers.contains(placeMarker.marker)) { newMarkers.add(placeMarker) placeMarkers.remove(placeMarker) } else { if(placeMarker != null) placeMarkers.remove(placeMarker) val option = MarkerOptions() option.position = LatLng(place.latitude, place.longitude) option.snippet = place.name placeMarker = PlaceMarker(place, map.addMarker(option)) newMarkers.add(placeMarker) picasso.loadBitmap(place.image_url) .subscribeOn(Schedulers.io()) .retryWhen { error -> error.zipWith(Observable.range(1, 5), BiFunction<Throwable, Int, RetryWrapper> { t1, t2 -> RetryWrapper(t2.toLong(), t1) }) .flatMap { if(it.delay < 4){ Log.d(TAG, "retry no. ${it.delay} for ${place.image_url}") Observable.timer(it.delay * 5, TimeUnit.SECONDS) } else { Log.d(TAG, "DMD ${place.image_url}") Observable.error { it.error } } } } .subscribe ( { bitmap -> placeMarkers.find { it.place.id == place.id }?.let { it.marker.icon = IconFactory.getInstance(context).fromBitmap(bitmap) bitmap.recycle() } }, { Log.e(TAG, "error decoding ${place.image_url}", it) }) } } placeMarkers.forEach { it.marker.remove() } placeMarkers.clear() placeMarkers.addAll(newMarkers) } } 

Estoy usando MVP, solo para que lo veas un poco más. Entonces, esa es una function dentro de una VISTA, el renderizado se activará después de que el MODEL haya terminado de get datos del server.

Tienes que tener cuidado aquí. La palabra key this in

 Log.d("picasso load bitmap", "me ${this}") 

no apunta al Observable sino al tipo de receptor. En tu caso, Picasso . Lo ves en tu log me com.squareup.picasso.Picasso@c894e26

La buena noticia es que obtiene una nueva instancia de Observable para cada llamada de loadBitmap . Puedes verificar esto por:

 val observable = picasso.loadBitmap(place.image_url) Log.d("observable for picasso", "$observable") observable.subscribeOn(Schedulers.io())... 

loadBitmap ve, llama al loadBitmap siempre en la misma instancia de picasso , por eso obtiene el mismo resultado para esa class. Pero cada llamada individual a loadBitmap crea un nuevo Observable

Entonces tu código está bien.

  • Enfrentando problemas con la implementación de Rx Java con la architecture de flujo en Android kotlin
  • Kotlin no puede inferir el tipo cuando utiliza la reference de método en Flowable
  • No se puede llamar al operador desde () en Observable en Android Kotlin
  • Completable.andThen resultados en "other is null"
  • RxJava2 switchIfEmpty y verificando una ejecución
  • RxJava Observable.create envolver suscripciones observables
  • ¿Por qué tengo un Log de salida no deseado al fusionar 2 observables en otro Observable, que los almacena temporalmente cada 10 segundos?
  • Cómo retrasar onError () en RxJava 2 y Android?
  • RxJava salida diferente entre Flowable y Observable con window y Groupby
  • TestScheduler no funciona (Kotlin + RxJava2 + mockito)
  • kotlin consiguiendo un suscriptor para observar un observable usando RxJava2