¿Cómo se coordina una list de ejecuciones Completables con RxJava?

Tengo una list de BlockingTasks que quiero ejecutar de forma secuencial:

 class BlockingTaskManager { val tasks = mutableListOf<BlockingTask>() fun run () : Completable { /* What can I put here in order to run all tasks and return a completable according with the requirements described below? */ } } interface BlockingTask { fun run () : Completable fun getDescription(): String fun blockPipe(): Boolean } 

Requisitos

  • Si algún Completable en la tubería termina con un error, toda la ejecución debería detenerse.
  • Cuando finaliza un Completable devuelto por run() , ya sea con error o éxito, se debe emitir un logging que contenga la timestamp y la cadena devuelta por getDescription() ;

Pensé en utilizar Completable.concat { tasks } , pero no veo cómo puedo llamar a currentIterationTask.run() y cómo fallar el conducto si run() produce un error, así como el logging de currentInterationTask.getDescription() .

Otra idea fue usar andThen() , pero no estoy seguro de si funcionará según sea necesario:

  tasks.forEachIndexed { idx, task -> var isBroken = false task.run() .doOnComplete { logTaskCompletedSuccessfully(task) } .doOnError { error -> logTaskFailed(task, error) isBroken = task.blockPipe() } .andThen { if (!isBroken && idx < tasks.size) { tasks[idx+1] } } } 

Puedes hacerlo de varias maneras:

  fun run(): Completable { return Completable.concat(tasks.map { task -> task.run() .doOnComplete { logTaskCompletedSuccessfully(task) } .doOnError { error -> logTaskFailed(task, error) } }) } 

el operador concat la ejecución secuencial, el error de algunos de los Completable s detendrá el flujo con onError() y obtendrá la capacidad de logging con éxito o error.

en cuanto al blockPipe() , si entiendo correctamente que se utiliza para señalar que la tarea ha fallado y que debe interrumpir la transmisión, si ese es el caso, me parece innecesario, si falla en la tarea, ejecute Exception lugar de levantando una bandera, y la exception romperá la stream con onError .


Otra opción es utilizar un enfoque más reactivo en lugar de iterar las tareas en la list, iterarlo con Observable .
comience con Observable , que itera la list de tareas, y luego flatMap cada tarea en Completable . la ejecución secuencial se logra aquí ya que no está aplicando ningún Progtwigdor a cada Completable , y de ahí que se mantenga el order de ejecución.

  fun run(): Completable { return Observable.fromIterable(tasks) .flatMapCompletable { task -> task.run() .doOnComplete { logTaskCompletedSuccessfully(task) } .doOnError { error -> logTaskFailed(task, error) } } .subscribeOn(Schedulers.newThread() // Schedulers.io()) } 
  • Cómo burlarse del repository reactivo que devuelve Observable
  • RxJava2 observable no procesando en Siguiente cuando hay un cambio
  • Cómo controlar el flujo sin .flatMap, que rompe una stream reactiva que impide que operadores como distinctUntilChanged trabajen en toda la secuencia
  • kotlin consiguiendo un suscriptor para observar un observable usando RxJava2
  • RxJava2: onComplete no llamado con flatMapIterable
  • Tipo de devolución diferente en RxJava 2 (actualización desde RxJava1)
  • Reescriba el código de Java en Kotlin utilizando la reference de function.
  • ¿Cómo escalar dinámicamente el rebote de la stream de emisión de ráfagas?
  • Cómo pasar nulo a un Observable con tipo anulable en RxJava 2 y Kotlin
  • Cómo contar el time de ejecución de lo observable
  • Descargar un file desde un control remoto y savelo en un dispositivo Android