La function de extensión no crea un nuevo object Observable

Estoy teniendo un comportamiento inesperado con kotlin y rxjava. Creé una function de extensión para cargar imágenes usando picasso

fun Picasso.loadBitmap(url: String) : Observable<Bitmap> = Observable.create<Bitmap> { emitter -> Log.d("picasso load bitmap", "me ${this}") try { val bitmap = load(url).centerCrop() .resize(100, 100) .transform(CircleTransformer()) .get() emitter.onNext(bitmap) emitter.onComplete() } catch (e: IOException) { emitter.onError(e) } } 

Estoy llamando a esto varias veces en un intervalo cerrado (casi al mismo time) como este,

 picasso.loadBitmap(place.image_url) .subscribeOn(Schedulers.io()) .retryWhen { error -> error.zipWith(Observable.range(1, 5), BiFunction<Throwable, Int, RetryWrapper> { t1, t2 -> RetryWrapper(t2.toLong(), t1) }) .flatMap { if(it.delay < 4){ Log.d(TAG, "retry no. ${it.delay} for ${place.image_url}") Observable.timer(it.delay * 5, TimeUnit.SECONDS) } else { Log.d(TAG, "DMD ${place.image_url}") Observable.error { it.error } } } } .subscribe ( { bitmap -> markers.find { it.place.id == place.id }?.let { it.marker.icon = IconFactory.getInstance(context).fromBitmap(bitmap) } }, { Log.e(TAG, "error decoding ${place.image_url}", it) }) 

Espero que cada vez que se loadBitmap , cree un nuevo observable. Pero obtuve esto en los loggings

 09-28 11:17:00.022 31694-32276/? D/picasso load bitmap: me com.squareup.picasso.Picasso@c894e26 09-28 11:17:00.068 31694-32277/? D/picasso load bitmap: me com.squareup.picasso.Picasso@c894e26 09-28 11:17:00.069 31694-31959/? D/picasso load bitmap: me com.squareup.picasso.Picasso@c894e26 09-28 11:17:00.108 31694-32278/? D/picasso load bitmap: me com.squareup.picasso.Picasso@c894e26 09-28 11:17:00.112 31694-32251/? D/picasso load bitmap: me com.squareup.picasso.Picasso@c894e26 09-28 11:17:00.125 31694-32260/? D/picasso load bitmap: me com.squareup.picasso.Picasso@c894e26 09-28 11:17:00.162 31694-31794/? D/picasso load bitmap: me com.squareup.picasso.Picasso@c894e26 09-28 11:17:00.192 31694-32280/? D/picasso load bitmap: me com.squareup.picasso.Picasso@c894e26 09-28 11:17:00.195 31694-32279/? D/picasso load bitmap: me com.squareup.picasso.Picasso@c894e26 09-28 11:17:00.219 31694-32281/? D/picasso load bitmap: me com.squareup.picasso.Picasso@c894e26 09-28 11:17:04.828 31694-32262/? D/picasso load bitmap: me com.squareup.picasso.Picasso@c894e26 09-28 11:17:14.885 31694-31793/? D/picasso load bitmap: me com.squareup.picasso.Picasso@c894e26 09-28 11:17:29.928 31694-32269/? D/picasso load bitmap: me com.squareup.picasso.Picasso@c894e26 

El observable es el mismo para todas loadBitmap llamadas a loadBitmap . Necesito que tengan su propio observable porque si no, cuando retryWhen , cuando falle, no retryWhen al siguiente error. Espero que tenga sentido.

Poner lo observable dentro de un flatmap o flatmap no cambiará nada.

EDITAR mi código

 override fun render(state: MainState) { map?.let { map -> val newMarkers: MutableList<PlaceMarker> = mutableListOf() for(place in state.places) { var placeMarker = placeMarkers.find { it.place.id == place.id } if(placeMarker != null && map.markers.contains(placeMarker.marker)) { newMarkers.add(placeMarker) placeMarkers.remove(placeMarker) } else { if(placeMarker != null) placeMarkers.remove(placeMarker) val option = MarkerOptions() option.position = LatLng(place.latitude, place.longitude) option.snippet = place.name placeMarker = PlaceMarker(place, map.addMarker(option)) newMarkers.add(placeMarker) picasso.loadBitmap(place.image_url) .subscribeOn(Schedulers.io()) .retryWhen { error -> error.zipWith(Observable.range(1, 5), BiFunction<Throwable, Int, RetryWrapper> { t1, t2 -> RetryWrapper(t2.toLong(), t1) }) .flatMap { if(it.delay < 4){ Log.d(TAG, "retry no. ${it.delay} for ${place.image_url}") Observable.timer(it.delay * 5, TimeUnit.SECONDS) } else { Log.d(TAG, "DMD ${place.image_url}") Observable.error { it.error } } } } .subscribe ( { bitmap -> placeMarkers.find { it.place.id == place.id }?.let { it.marker.icon = IconFactory.getInstance(context).fromBitmap(bitmap) bitmap.recycle() } }, { Log.e(TAG, "error decoding ${place.image_url}", it) }) } } placeMarkers.forEach { it.marker.remove() } placeMarkers.clear() placeMarkers.addAll(newMarkers) } } 

Estoy usando MVP, solo para que lo veas un poco más. Entonces, esa es una function dentro de una VISTA, el renderizado se activará después de que el MODEL haya terminado de get datos del server.

Tienes que tener cuidado aquí. La palabra key this in

 Log.d("picasso load bitmap", "me ${this}") 

no apunta al Observable sino al tipo de receptor. En tu caso, Picasso . Lo ves en tu log me com.squareup.picasso.Picasso@c894e26

La buena noticia es que obtiene una nueva instancia de Observable para cada llamada de loadBitmap . Puedes verificar esto por:

 val observable = picasso.loadBitmap(place.image_url) Log.d("observable for picasso", "$observable") observable.subscribeOn(Schedulers.io())... 

loadBitmap ve, llama al loadBitmap siempre en la misma instancia de picasso , por eso obtiene el mismo resultado para esa class. Pero cada llamada individual a loadBitmap crea un nuevo Observable

Entonces tu código está bien.

  • Retrofit 2, Rx 2 y llamadas asincrónicas
  • Kotlin no puede inferir el tipo cuando utiliza la reference de método en Flowable
  • RxJava2 cómo separar diferentes implementaciones de emisor observable
  • Cómo controlar el flujo sin .flatMap, que rompe una stream reactiva que impide que operadores como distinctUntilChanged trabajen en toda la secuencia
  • Proguard: ¿Qué regla puedo agregar para evitar no puedo encontrar la class referenceda?
  • Convert Maybe to Single de otra fuente si Maybe completa
  • Cómo fusionar 2 flujos separados, almacenar los datos rellenos y subscribirlos después de un time corto
  • RxJava zipCon error IDE en Kotlin con Android Studio 3.0
  • Confundido sobre la asignación de la variable RxJava
  • Extraño comportamiento de componer y ObservableTransformer en RxJava cuando el genérico pasado se extiende
  • Cómo hacer un event handling errores en rxjava2 en android