Observable con el valor de LatestFrom

Implementé un pseudooperador llamado "FilterByLatestFrom" como una function de extensión para kotlin.

Escribí el siguiente código usando este operador:

fun testFilterByLatestFromOperator(){ val observableA : Observable<Int> = Observable.fromArray(1,2,3,4,5,6,7,8,9,10) val observableC : PublishSubject<Int> = PublishSubject.create() val observableB : Observable<Int> = Observable.just(2).mergeWith(observableC) observableB.subscribe { println("observableB onNext: $it") } observableA .subscribe({ println("Original : $it")}) observableA.filterByLatestFrom(observableB, BiFunction { aVal, bVal -> aVal%bVal==0 }) .subscribe({ println("Result A : $it") }) observableC.onNext(3) observableA.filterByLatestFrom(observableB, BiFunction { aVal, bVal -> aVal%bVal==0 }) .subscribe({ println("Result AC : $it") }) } 

la salida es:

 observableB onNext: 2 Original : 1 Original : 2 Original : 3 Original : 4 Original : 5 Original : 6 Original : 7 Original : 8 Original : 9 Original : 10 Result A : 2 Result A : 4 Result A : 6 Result A : 8 Result A : 10 observableB onNext: 3 Result AC : 2 Result AC : 4 Result AC : 6 Result AC : 8 Result AC : 10 

Quiero que el operador del filter filtre obsA según el último valor del observable B. Funciona para el primer bloque, pero cuando agrego On-next con un nuevo valor no cambia el resultado (use el mismo último valor del observable original).

este es el FilterByLatestFrom impl (fue diseñado para ser utilizado desde Java también (con componer):

 class FilterByLatestFrom<T,U>(private val observable: Observable<T>, private val biFunction: BiFunction<U, T, Boolean>){ fun filter() : ObservableTransformer<U,U> = ObservableTransformer { it .withLatestFrom( observable, BiFunction<U,T,Pair<U,Boolean>> { u, t -> Pair(u,biFunction.apply(u,t)) }) .filter { it.second } .map { it.first } } } fun <T,U> Observable<U>.filterByLatestFrom(observable: Observable<T>, biFunction: BiFunction<U, T, Boolean>) : Observable<U> = this.compose(FilterByLatestFrom(observable,biFunction).filter()) 

¿Qué me estoy perdiendo?

EDIT: Creo que encontré el problema: el PublishSubject debe ser BehaviorSubject en su lugar. y la function de fusión debe ser conacat para prometer que obsC emitirá después de obsB.

    Su pseudooperador filterByLatestFrom está bien, el problema está dentro de las testings, PublishSubject emitirá solo los elementos subsiguientes, así que cuando PublishSubject en su última suscripción ('resultado AC'), observableB emitirá solo 2, ya que observableC ya emitió 3 y lo hará no reproducirlo en observableB (usando la merge ).

    Simplemente mueva the observableC.onNext(3) a después de la última suscripción (última línea) y debería ver el comportamiento esperado.

    EDITAR: también cambiando a PublishSubject como si resolviera el mismo problema (el sujeto volverá a reproducir el último valor de la nueva suscripción)