¿Cómo se coordina una list de ejecuciones Completables con RxJava?

Tengo una list de BlockingTasks que quiero ejecutar de forma secuencial:

 class BlockingTaskManager { val tasks = mutableListOf<BlockingTask>() fun run () : Completable { /* What can I put here in order to run all tasks and return a completable according with the requirements described below? */ } } interface BlockingTask { fun run () : Completable fun getDescription(): String fun blockPipe(): Boolean } 

Requisitos

  • Si algún Completable en la tubería termina con un error, toda la ejecución debería detenerse.
  • Cuando finaliza un Completable devuelto por run() , ya sea con error o éxito, se debe emitir un logging que contenga la timestamp y la cadena devuelta por getDescription() ;

Pensé en utilizar Completable.concat { tasks } , pero no veo cómo puedo llamar a currentIterationTask.run() y cómo fallar el conducto si run() produce un error, así como el logging de currentInterationTask.getDescription() .

Otra idea fue usar andThen() , pero no estoy seguro de si funcionará según sea necesario:

  tasks.forEachIndexed { idx, task -> var isBroken = false task.run() .doOnComplete { logTaskCompletedSuccessfully(task) } .doOnError { error -> logTaskFailed(task, error) isBroken = task.blockPipe() } .andThen { if (!isBroken && idx < tasks.size) { tasks[idx+1] } } } 

Puedes hacerlo de varias maneras:

  fun run(): Completable { return Completable.concat(tasks.map { task -> task.run() .doOnComplete { logTaskCompletedSuccessfully(task) } .doOnError { error -> logTaskFailed(task, error) } }) } 

el operador concat la ejecución secuencial, el error de algunos de los Completable s detendrá el flujo con onError() y obtendrá la capacidad de logging con éxito o error.

en cuanto al blockPipe() , si entiendo correctamente que se utiliza para señalar que la tarea ha fallado y que debe interrumpir la transmisión, si ese es el caso, me parece innecesario, si falla en la tarea, ejecute Exception lugar de levantando una bandera, y la exception romperá la stream con onError .


Otra opción es utilizar un enfoque más reactivo en lugar de iterar las tareas en la list, iterarlo con Observable .
comience con Observable , que itera la list de tareas, y luego flatMap cada tarea en Completable . la ejecución secuencial se logra aquí ya que no está aplicando ningún Progtwigdor a cada Completable , y de ahí que se mantenga el order de ejecución.

  fun run(): Completable { return Observable.fromIterable(tasks) .flatMapCompletable { task -> task.run() .doOnComplete { logTaskCompletedSuccessfully(task) } .doOnError { error -> logTaskFailed(task, error) } } .subscribeOn(Schedulers.newThread() // Schedulers.io()) } 
  • ¿Por qué mi configuration de RxJava está bloqueando mi subprocess de interfaz de usuario? Trabajar con la callback BluetoothAdapter.startLeScan
  • Obtiene N últimos objects emitidos por observables en RxJava2
  • Cómo señalar un observable para producir más datos
  • Cambie Flowable <List <Obj1 >> a Flowable <List <Obj2 >> en la habitación
  • RxJava2 Tal vez devuelva Observable vacío si no hay elemento
  • Extraño comportamiento de componer y ObservableTransformer en RxJava cuando el genérico pasado se extiende
  • RxJava2 switchIfEmpty y verificando una ejecución
  • rx-java2 Schedulers.io () steel invoca el método de mainThread
  • Cómo fusionar 2 flujos separados, almacenar los datos rellenos y subscribirlos después de un time corto
  • ¿Cómo puedo unir dos RxJava2 Obvervables por key?
  • ¿Cómo especificar la versión de RxJava al usar RxKotlin?