¿Cómo se coordina una list de ejecuciones Completables con RxJava?

Tengo una list de BlockingTasks que quiero ejecutar de forma secuencial:

 class BlockingTaskManager { val tasks = mutableListOf<BlockingTask>() fun run () : Completable { /* What can I put here in order to run all tasks and return a completable according with the requirements described below? */ } } interface BlockingTask { fun run () : Completable fun getDescription(): String fun blockPipe(): Boolean } 

Requisitos

  • Si algún Completable en la tubería termina con un error, toda la ejecución debería detenerse.
  • Cuando finaliza un Completable devuelto por run() , ya sea con error o éxito, se debe emitir un logging que contenga la timestamp y la cadena devuelta por getDescription() ;

Pensé en utilizar Completable.concat { tasks } , pero no veo cómo puedo llamar a currentIterationTask.run() y cómo fallar el conducto si run() produce un error, así como el logging de currentInterationTask.getDescription() .

Otra idea fue usar andThen() , pero no estoy seguro de si funcionará según sea necesario:

  tasks.forEachIndexed { idx, task -> var isBroken = false task.run() .doOnComplete { logTaskCompletedSuccessfully(task) } .doOnError { error -> logTaskFailed(task, error) isBroken = task.blockPipe() } .andThen { if (!isBroken && idx < tasks.size) { tasks[idx+1] } } } 

Puedes hacerlo de varias maneras:

  fun run(): Completable { return Completable.concat(tasks.map { task -> task.run() .doOnComplete { logTaskCompletedSuccessfully(task) } .doOnError { error -> logTaskFailed(task, error) } }) } 

el operador concat la ejecución secuencial, el error de algunos de los Completable s detendrá el flujo con onError() y obtendrá la capacidad de logging con éxito o error.

en cuanto al blockPipe() , si entiendo correctamente que se utiliza para señalar que la tarea ha fallado y que debe interrumpir la transmisión, si ese es el caso, me parece innecesario, si falla en la tarea, ejecute Exception lugar de levantando una bandera, y la exception romperá la stream con onError .


Otra opción es utilizar un enfoque más reactivo en lugar de iterar las tareas en la list, iterarlo con Observable .
comience con Observable , que itera la list de tareas, y luego flatMap cada tarea en Completable . la ejecución secuencial se logra aquí ya que no está aplicando ningún Progtwigdor a cada Completable , y de ahí que se mantenga el order de ejecución.

  fun run(): Completable { return Observable.fromIterable(tasks) .flatMapCompletable { task -> task.run() .doOnComplete { logTaskCompletedSuccessfully(task) } .doOnError { error -> logTaskFailed(task, error) } } .subscribeOn(Schedulers.newThread() // Schedulers.io()) } 
  • Para una function de Kotlin utilizada como expresión, ¿hay una forma concisa de operar y devolver un valor?
  • RxJava2 switchIfEmpty y verificando una ejecución
  • Excepción causada por: java.lang.ClassNotFoundException: org.reactivestreams.Publisher
  • manejar onKeyDown usando RxAndroid
  • Observables opcionales en combinar
  • Retrofit-Vertx con RxJava2 en Kotlin IllegalStateException message == null
  • Cambie Flowable <List <Obj1 >> a Flowable <List <Obj2 >> en la habitación
  • Java genérico para Kotlin genérico. Retorno genérico del método
  • Llamar a un RxJava Single In Kotlin Lambda
  • RxJava2 + Retrofit pantalla negra en request de datos
  • Cómo retrasar onError () en RxJava 2 y Android?