Implementando la búsqueda que empuja los resultados a la list tan pronto como estén disponibles usando rxJava

Necesito implementar una búsqueda en un gran set de datos que puede tomar algún time para completarse en dispositivos mobilees. Por lo tanto, quiero mostrar cada resultado coincidente tan pronto como esté disponible.

Necesito search todos los datos disponibles de un almacén de datos que decida si los obtiene de la networking o del dispositivo. Esta llamada es un Observable . Tan pronto como los datos de ese Observable estén disponibles, quiero recorrerlos, aplicar un pnetworkingicado de búsqueda y notificar a cualquier observador sobre cualquier coincidencia encontrada.

Hasta ahora, mi idea era usar un PublishSubject para suscribirme y llamar a su function onNext cada vez que la búsqueda encuentre una nueva coincidencia. Sin embargo, parece que no puedo lograr el comportamiento deseado.

Estoy usando el data binding MVVM + Android y quiero mostrar todas las inputs coincidentes en un RecyclerView así que para cada evento onNext que es recibido por el model de vista de observación tengo que llamar notifyItemRangeInserted en el adaptador de RecyclerView.

 class MySearch(val dataStore: MyDataStore) { private val searchSubject = PublishSubject.create<List<MyDto>>() fun findEntries(query: String): Observable<List<MyDto>> { return searchSubject.doOnSubscribe { // dataStore.fetchAll returns an Observable<List<MyDto>> dataStore.fetchAll.doOnNext { myDtos -> if (query.isNotBlank()) { search(query, myDtos) } else { searchSubject.onNext(myDtos) } }.subscribe(searchSubject) } } private fun(query: String, data: List<MyDto>) { data.forEach { if (it.matches(query)) { // in real life I cache a few results and don't send each single item searchSubject.onNext(listOf(it)) } } } fun MyDto.matches(query: String): Boolean // stub } 

 class MyViewModel(val mySearch: MySearch, val viewNotifications: Observer<Pair<Int, Int>>): BaseObservable() { var displayItems: List<MyItemViewModel> = listOf() fun loadData(query: String): Subscription { return mySearch.findEntries(query) .observeOn(AndroidSchedulers.mainThread()) .doOnNext(this::onSearchResult) .doOnCompleted(viewNotifications::onCompleted) .doOnError(viewNotifications::onError) .subscribe() } private fun onSearchResult(List<MyDto> data) { val lastIndex = displayItems.lastIndex displayItems = data.map { createItem(it) } notifyChange() viewNotifications.onNext(Pair(lastIndex, data.count())) } private fun createItem(dto: MyDto): MyItemViewModel // stub } 

El problema que tengo con el código anterior es que con una consulta vacía MyViewModel::onSearchResult se llama 3 veces seguidas y cuando la consulta no está vacía MyViewModel::onSearchResult no se llama en absoluto. Sospecho que el problema se encuentra en algún lugar en la forma en que he nested los Observables en findEntries o que me suscribo mal / findEntries datos de un hilo equivocado.

¿Alguien tiene una idea sobre esto?