¿Cómo puedo unir dos RxJava2 Obvervables por key?

Tengo dos observables sin clasificar de diferentes types. Ambos types comparten una key común. Me gustaría unirlos en un nuevo observable que emita pares de elementos correspondientes y no puedo encontrar la manera de hacerlo.

Tenga en count que algunas de las keys pueden estar ausentes. Estaría bien si no se eliminan los pares completos, pero tener un null en lugar de la pieza faltante sería aún mejor.

Entrada 1:

 Entity(id = 2), Entity(id = 1), Entity(id = 4) 

Entrada 2:

 Dto(id = 3), Dto(id = 2), Dto(id = 1) 

Salida esperada (en cualquier order):

 Pair(Entity(id = 1), Dto(id = 1)), Pair(Entity(id = 2), Dto(id = 2)), Pair(null, Dto(id = 3)), Pair(Entity(id = 4), null) 

Primero, Observable.merge las secuencias juntas: esto le proporciona una secuencia de todos los elementos. (En el siguiente código, he usado una class personalizada para labelr cada flujo).

Luego, para cada elemento en la secuencia, intente hacerla coincidir con un elemento previamente observado del otro tipo, y genere el par. Si no, guárdalo para que coincida más tarde.

Finalmente, una vez que se realiza la secuencia, los elementos restantes sin coincidencia no se combinarán con nada, por lo que se pueden emitir sin emparejar.

 import io.reactivex.Observable data class Entity(val id: Int) data class Dto(val id: Int) sealed class Either<out A, out B> data class Left<A>(val value: A) : Either<A, Nothing>() data class Right<B>(val value: B) : Either<Nothing, B>() fun <A : Any, B : Any, C> joinById(a: Observable<A>, idA: (A) -> C, b: Observable<B>, idB : (B) -> C): Observable<Pair<A?, B?>> { val unmatchedA = mutableMapOf<C, A>() val unmatchedB = mutableMapOf<C, B>() val merged = Observable.mergeDelayError(a.map(::Left), b.map(::Right)).flatMap { latest -> when (latest) { is Left -> { val id = idA(latest.value) unmatchedB.remove(id)?.let { return@flatMap Observable.just(latest.value to it) } unmatchedA.put(id, latest.value) } is Right -> { val id = idB(latest.value) unmatchedA.remove(id)?.let { return@flatMap Observable.just(it to latest.value) } unmatchedB.put(id, latest.value) } } Observable.empty<Nothing>() } return Observable.concat(merged, Observable.create { emitter -> unmatchedA.values.forEach { emitter.onNext(it to null) } unmatchedB.values.forEach { emitter.onNext(null to it) } emitter.onComplete() }) } fun main(args: Array<String>) { val entities = Observable.just(Entity(2), Entity(1), Entity(4)) val dtos = Observable.just(Dto(3), Dto(2), Dto(1)) joinById(entities, Entity::id, dtos, Dto::id).blockingForEach(::println) } 
 (Entity(id=2), Dto(id=2)) (Entity(id=1), Dto(id=1)) (Entity(id=4), null) (null, Dto(id=3)) 

Tenga en count que esto puede tener un comportamiento extraño si los identificadores se repiten dentro de una transmisión y, dependiendo de la estructura de las transmisiones, es posible que esto termine almacenando muchos elementos en la memory.

  • ¿Cómo especificar la versión de RxJava al usar RxKotlin?
  • Excepción: blockingConnect no se debe invocar en el subprocess UI a pesar de que subsubscribí en otro subprocess
  • Cómo recordar el estado con los operadores de rebashs en RxJava2
  • ¿Cómo crear un intervalo infinito observable que emitirá un nuevo object cada intervalo de time?
  • RxJava2 Publicado
  • RxJava manejando múltiples devoluciones de llamada dentro de un observable
  • RxJava 2 requiere un tipo de retorno Observable diferente de RxJava 1 (Kotlin)
  • Aplicando transformación a cada elemento en Single <List <T >>
  • No se puede llamar al operador desde () en Observable en Android Kotlin
  • RxJava2 Tal vez devuelva Observable vacío si no hay elemento
  • llamar parte de la secuencia una vez con múltiples suscriptores?