¿Cómo puedo unir dos RxJava2 Obvervables por key?

Tengo dos observables sin clasificar de diferentes types. Ambos types comparten una key común. Me gustaría unirlos en un nuevo observable que emita pares de elementos correspondientes y no puedo encontrar la manera de hacerlo.

Tenga en count que algunas de las keys pueden estar ausentes. Estaría bien si no se eliminan los pares completos, pero tener un null en lugar de la pieza faltante sería aún mejor.

Entrada 1:

 Entity(id = 2), Entity(id = 1), Entity(id = 4) 

Entrada 2:

 Dto(id = 3), Dto(id = 2), Dto(id = 1) 

Salida esperada (en cualquier order):

 Pair(Entity(id = 1), Dto(id = 1)), Pair(Entity(id = 2), Dto(id = 2)), Pair(null, Dto(id = 3)), Pair(Entity(id = 4), null) 

Primero, Observable.merge las secuencias juntas: esto le proporciona una secuencia de todos los elementos. (En el siguiente código, he usado una class personalizada para labelr cada flujo).

Luego, para cada elemento en la secuencia, intente hacerla coincidir con un elemento previamente observado del otro tipo, y genere el par. Si no, guárdalo para que coincida más tarde.

Finalmente, una vez que se realiza la secuencia, los elementos restantes sin coincidencia no se combinarán con nada, por lo que se pueden emitir sin emparejar.

 import io.reactivex.Observable data class Entity(val id: Int) data class Dto(val id: Int) sealed class Either<out A, out B> data class Left<A>(val value: A) : Either<A, Nothing>() data class Right<B>(val value: B) : Either<Nothing, B>() fun <A : Any, B : Any, C> joinById(a: Observable<A>, idA: (A) -> C, b: Observable<B>, idB : (B) -> C): Observable<Pair<A?, B?>> { val unmatchedA = mutableMapOf<C, A>() val unmatchedB = mutableMapOf<C, B>() val merged = Observable.mergeDelayError(a.map(::Left), b.map(::Right)).flatMap { latest -> when (latest) { is Left -> { val id = idA(latest.value) unmatchedB.remove(id)?.let { return@flatMap Observable.just(latest.value to it) } unmatchedA.put(id, latest.value) } is Right -> { val id = idB(latest.value) unmatchedA.remove(id)?.let { return@flatMap Observable.just(it to latest.value) } unmatchedB.put(id, latest.value) } } Observable.empty<Nothing>() } return Observable.concat(merged, Observable.create { emitter -> unmatchedA.values.forEach { emitter.onNext(it to null) } unmatchedB.values.forEach { emitter.onNext(null to it) } emitter.onComplete() }) } fun main(args: Array<String>) { val entities = Observable.just(Entity(2), Entity(1), Entity(4)) val dtos = Observable.just(Dto(3), Dto(2), Dto(1)) joinById(entities, Entity::id, dtos, Dto::id).blockingForEach(::println) } 
 (Entity(id=2), Dto(id=2)) (Entity(id=1), Dto(id=1)) (Entity(id=4), null) (null, Dto(id=3)) 

Tenga en count que esto puede tener un comportamiento extraño si los identificadores se repiten dentro de una transmisión y, dependiendo de la estructura de las transmisiones, es posible que esto termine almacenando muchos elementos en la memory.

  • Cómo manejar el event handling errores en un solo lugar en rxjava usando wrapper
  • Obtiene N últimos objects emitidos por observables en RxJava2
  • ¿Cómo puedo indicar explícitamente la finalización de un Flowable en RxJava?
  • La function de extensión no crea un nuevo object Observable
  • ¿Hay alguna manera de cambiar mi método a la stream Observable que será una cadena de modificadores?
  • RxJava2 observable no procesando en Siguiente cuando hay un cambio
  • Error "No se puede combinar Dex" al usar Room + Kotlin
  • EXCEPCIÓN FATAL: RxCachedThreadScheduler-1 cuando el gatillo se deshace. ¿Por qué?
  • Proguard: ¿Qué regla puedo agregar para evitar no puedo encontrar la class referenceda?
  • Problema de encadenamiento Completable after flatMapCompletable
  • Observables opcionales en combinar