RxJava Observable a Completable, cómo evitar toBlocking ()

Actualmente estoy usando RxJava en Android con Kotlin, pero tengo un problema y no puedo resolverlo sin usar toBlocking ().

Tengo un método en el service al empleado que devuelve un Observable>:

fun all(): Observable<List<Employee>> 

Esto es todo y es bueno ya que este Observable emite la nueva list de empleados cada vez que un empleado cambia. Pero me gustaría generar un file PDF de los empleados, que obviamente no necesita ejecutarse cada vez que un empleado cambia. Además, me gustaría devolver un object Completable de mi método generador de PDF. Quiero agregar un encabezado a mi PDF, y luego repetir a través de los empleados y calcular el salario de cada empleado, que también devuelve un Observable, y este es el lugar donde estoy usando para Bloquear en este momento. Mi enfoque actual es este:

 private fun generatePdf(outputStream: OutputStream): Completable { return employeeService.all().map { employees -> try { addHeaderToPDF() for (i in employees) { val calculated = employeeService.calculateWage(i.id).toBlocking().first() // Print calculated to PDF.... } addFooterToPDF() return @map Completable.complete() } catch (e: Exception) { return @map Completable.error(e) } }.first().toCompletable() 

¿Hay alguna manera de hacer que este código sea un poco más limpio usando RxJava?

¡Gracias por adelantado!

Descargo de responsabilidad: esta respuesta es un trabajo en progreso.


Premisa básica: si tiene blocking en la transmisión, lo está haciendo mal.

Nota: Ningún estado debe abandonar la lambda observable.

Paso 1: transmitir todo el set de datos

La input es un flujo de empleados. Para cada empleado necesita get un salario. Vamos a hacerlo en una stream.

 /** * @param employeesObservable * Stream of employees we're interested in. * @param wageProvider * Transformation function which takes an employee and returns a [Single] of their wage. * @return * Observable stream spitting individual [Pair]s of employees and their wages. */ fun getEmployeesAndWagesObservable( employeesObservable: Observable<Employee>, wageProvider: Function<Employee, Single<Int>> ): Observable<Pair<Employee, Int>>? { val employeesAndWagesObservable: Observable<Pair<Employee, Int>> // Each Employee from the original stream will be converted // to a Single<Pair<Employee, Int>> via flatMapSingle operator. // Remember, we need a stream and Single is a stream. employeesAndWagesObservable = employeesObservable.flatMapSingle { employee -> // We need to get a source of wage value for current employee. // That source emits a single Int or errors. val wageForEmployeeSingle: Single<Int> = wageProvider.apply(employee) // Once the wage from said source is loaded... val employeeAndWageSingle: Single<Pair<Employee, Int> = wageForEmployeeSingle.map { wage -> // ... construct a Pair<Employee, Int> employee to wage } // This code is not executed now. It will be executed for each Employee // after the original Observable<Employee> starts spitting out items. // After subscribing to the resulting observable. return@flatMapSingle employeeAndWageSingle } return employeesAndWagesObservable } 

Qué va a pasar cuando te suscribas:

  1. Tome un empleado de la fuente.
  2. Obtener salario de un empleado.
  3. Escupir un par de empleados y su salario.

Esto se repite hasta que los employeesObservable Señales onComplete o algo falla con onError .

Operadores usados:

  • flatMapSingle : Convierte un valor real en una nueva stream única de algún valor transformado.
  • map : convierte un valor real en algún otro valor real (sin secuencias anidadas).

Así es cómo lo conectarías a tu código:

 fun doStuff() { val employeesObservable = employeeService.all() val wageProvider = Function<Employee, Single<Int>> { employee -> // Don't listen to changes. Take first wage and use that. employeeService.calculateWage(employee.id).firstOrError() } val employeesAndWagesObservable = getEmployeesAndWagesObservable(employeesObservable, wageProvider) // Subscribe... } 

Operadores usados:

  • primero : toma el primer elemento de observable y conviértelo en una transmisión única.
  • time de espera : una buena idea sería .timeout el salario si lo obtiene a través de la networking.

Próximos pasos

Opción 1: terminar aquí

No suscribirse, llame

 val blockingIterable = employeesAndWagesObservable.blockingIterable() blockingIterable.forEach { ... } 

y procesar cada elemento de forma síncrona. Siéntese, descubra los próximos pasos, vea presentaciones, lea ejemplos.

Opción 2: agregar capas

  1. .map cada uno de estos Pair<Employee, Int> en algún bloque de creación de PDF abstracto.
  2. Convierta las impresoras de encabezado y pie de página en Observables a través de Observable.fromCallable { ... } , haga que también devuelvan bloques de construcción de PDF.
  3. Combine todos estos de forma secuencial a través de Observable.concat(headerObs, employeeDataObs, footerObs)
  4. .subscribe a este resultado y comience a escribir los bloques de creación de PDF en una .subscribe de PDF.
  5. QUE HACER:
    • Descubre una forma de inicializar el escritor de PDF de forma perezosa en la suscripción (no antes de comstackr la transmisión),
    • Eliminar salida por error,
    • Cerrar flujo de salida en completo o por error.

Se me ocurrió esto:

  return employeeService.all().first() .doOnSubscribe { addHeaderToPDF() } .flatMapIterable { it } .flatMap { employeeService.calculateWage(it.id).first() } .doOnNext { printEmployeeWage(it) } .doOnCompleted { addFooterToPDF } .toCompletable() 

¿Es así como se supone que debe hacerse? 🙂

  • Mockito querido pero no invocado
  • ¿Cuándo no usar el Observable de RxJava?
  • ¿Cómo puedo agregar de manera condicional una operación asincrónica en medio de una transmisión de RxJava?
  • Fusionar datos de diferentes Observables y elegir diferentes estrategias de búsqueda, según la disponibilidad de datos
  • Llamada asincrónica para cada elemento dentro de una colección
  • La biblioteca de Android no puede comstackr kotlin
  • Android: uso de generics para tener 1 llamada de actualización de RxJava que devuelve varios types usando la misma interfaz
  • Error de compilation de error de coincidencia de tipo Kotlin: Requiere éxito <T>, Encontrado MyError
  • RxJava - ¿Entradas de keyboard de contrapresión?
  • ¿Por qué mi configuration de RxJava está bloqueando mi subprocess de interfaz de usuario? Trabajar con la callback BluetoothAdapter.startLeScan
  • RxJava: cómo devolver el tipo correcto de nulo