Llamar a un RxJava Single In Kotlin Lambda

Estoy tratando de jugar con la nueva biblioteca de la sala en el emparejamiento con RxJava.

He encontrado una manera de usar un Single para insert elementos en el hilo de background de esta manera, dentro de una actividad:

 Single.fromCallable { AppDatabase.getInMemoryDatabase(this).taskDao().insertAll(task) } .subscribeOn(Schedulers.newThread()) .subscribe() 

Ahora, tengo un RecyclerView con tareas que tiene una checkbox que puede usar para marcar un artículo como completo o no. Lo que quiero hacer es actualizar el ítem cada vez que se marque / desmarque. Pegaré todo el ViewHolder para completarlo, pero tenga en count específicamente el lambda en bindTask() :

 inner class TaskViewHolder(view: View?) : RecyclerView.ViewHolder(view) { val descriptionTextView = view?.findViewById(R.id.task_description) as? TextView val completedCheckBox = view?.findViewById(R.id.task_completed) as? CheckBox fun bindTask(task: Task) { descriptionTextView?.text = task.description completedCheckBox?.isChecked = task.completed completedCheckBox?.setOnCheckedChangeListener { _, isChecked -> tasks[adapterPosition].completed = isChecked Single.fromCallable { itemView.context.taskDao().update(tasks[adapterPosition]) } .subscribeOn(Schedulers.newThread()) .subscribe() } } } 

Esto funciona para el primer elemento que verifico, pero después de eso no puedo hacer clic en ninguna otra checkbox. Pensé que el Single se destruiría a sí mismo, pero quizás no puedo hacer esto dentro de la lambda. ¿Debo sacar el Single de alguna manera?

Crearía Observable usando Observable.create , Observable.create ese emisor usando lambda, y setOnCheckedChangeListener siguientes elementos dentro de setOnCheckedChangeListener usando emitter.onNext()

 class TaskViewHolder(view: View) : RecyclerView.ViewHolder(view) { private lateinit var emitter: ObservableEmitter<Task> private val disposable: Disposable = Observable.create(ObservableOnSubscribe<Task> { e -> emitter = e }) .subscribeOn(Schedulers.newThread()) .observeOn(Schedulers.newThread()) .subscribe({ itemView.context.taskDao().update(it) }) val descriptionTextView = view?.findViewById(R.id.task_description) as? TextView val completedCheckBox = view?.findViewById(R.id.task_completed) as? CheckBox fun bindTask(task: Task) { descriptionTextView?.text = task.description completedCheckBox?.isChecked = task.completed completedCheckBox?.setOnCheckedChangeListener { _, isChecked -> tasks[adapterPosition].completed = isChecked emitter.onNext(tasks[adapterPosition]) } } } 

No lo he probado pero esto debería funcionar

 class TaskViewHolder(view: View?) : RecyclerView.ViewHolder(view) { val descriptionTextView: TextView? = null val completedCheckBox: CheckBox? = null fun bindTask(task: Task) { descriptionTextView?.text = task.description completedCheckBox?.isChecked = task.completed completedCheckBox?.setOnCheckedChangeListener { _, isChecked -> tasks[adapterPosition].completed = isChecked itemView.context.taskDao().update(tasks[adapterPosition]) .subscribeOn(Schedulers.io()) .observeOn(AndroidSchedulers.mainThread()) .subscribe({ changeCount -> Timber.i("%,d item(s) updated", changeCount) }, { error -> Timber.e(error, "update failed") }) } } } interface TaskDao { fun update(task: Task): Flowable<Int> } 

En lugar de crear un nuevo Single , uso la funcionalidad o sala RxJava

  • La function de extensión no crea un nuevo object Observable
  • ¿Cómo continuar el procesamiento después de que ocurra un error en RxJava 2?
  • RxAndroid - Manejar errores con el operador Zip
  • ¿Cómo crear un intervalo infinito observable que emitirá un nuevo object cada intervalo de time?
  • Utilice la unidad de Kotlin (o cualquier otro object) escriba en el layout de Android
  • ¿Hay alguna manera de cambiar mi método a la stream Observable que será una cadena de modificadores?
  • Cómo controlar el flujo sin .flatMap, que rompe una stream reactiva que impide que operadores como distinctUntilChanged trabajen en toda la secuencia
  • ¿Comportamiento incorrecto de Maybe <List <T >> en Room?
  • Enfrentando problemas con la implementación de Rx Java con la architecture de flujo en Android kotlin
  • RxJava 1.x .zip () no funciona en RxJava 2.0
  • Asunto de RxJava: escuche el tipo diferente que el que emite