RxJava- Gire Observable en Iterator, Stream o Sequence

Sé que esto rompe muchas reglas de Rx, pero realmente me gusta RxJava-JDBC y también mis compañeros de equipo. Las bases de datos relacionales son esenciales para lo que hacemos y también lo es Rx.

Sin embargo, hay ocasiones en las que no queremos emitir como un Observable<ResultSet> pero preferiríamos tener un Java 8 Stream<ResultSet> o Kotlin Sequence<ResultSet> basado en pull. Pero estamos muy acostumbrados a la biblioteca RxJava-JDBC que solo devuelve un Observable<ResultSet> .

Por lo tanto, me pregunto si hay una forma de que pueda convertir un Observable<ResultSet> en una Sequence<ResultSet> usando una function de extensión, y no hacer ninguna recolección intermediaria o llamadas toBlocking() . Debajo está todo lo que tengo hasta ahora, pero mi cabeza está girando ahora tratando de conectar sistemas basados ​​en empujar y tirar, y no puedo almacenar en el búfer ya que el ResultSet es con estado con cada llamada a onNext() . ¿Es esto una tarea imposible?

 import rx.Observable import rx.Subscriber import java.sql.ResultSet fun Observable<ResultSet>.asSequence() = object: Iterator<ResultSet>, Subscriber<ResultSet>() { private var isComplete = false override fun onCompleted() { isComplete = true } override fun onError(e: Throwable?) { throw UnsupportedOperationException() } override fun onNext(rs: ResultSet?) { throw UnsupportedOperationException() } override fun hasNext(): Boolean { throw UnsupportedOperationException() } override fun next(): ResultSet { throw UnsupportedOperationException() } }.asSequence() 

    No estoy seguro de que sea la forma más fácil de lograr lo que quieres, pero puedes probar este código. Convierte un Observable en un Iterator al crear una queue de locking y publicar todos los events desde el Observable hasta esta queue. Iterable extrae events de la queue y bloquea si no hay ninguno. Luego modifica su propio estado dependiendo del evento actual recibido.

     class ObservableIterator<T>( observable: Observable<T>, scheduler: Scheduler ) : Iterator<T>, Closeable { private val queue = LinkedBlockingQueue<Notification<T>>() private var cached: Notification<T>? = null private var completed: Boolean = false private val subscription = observable .materialize() .subscribeOn(scheduler) .subscribe({ queue.put(it) }) override fun hasNext(): Boolean { cacheNext() return !completed } override fun next(): T { cacheNext() val notification = cached ?: throw NoSuchElementException() check(notification.isOnNext) cached = null return notification.value } private fun cacheNext() { if (completed) { return } if (cached == null) { queue.take().let { notification -> if (notification.isOnError) { completed = true throw RuntimeException(notification.throwable) } else if (notification.isOnCompleted) { completed = true } else { cached = notification } } } } override fun close() { subscription.unsubscribe() completed = true cached = null } } 

    Puede usar la siguiente function auxiliar:

     fun <T> Observable<T>.asSequence() = Sequence { toBlocking().getIterator() } 

    El observable se suscribirá cuando la secuencia devuelta se llame para el iterador.

    Si un observable emite elementos en el mismo hilo al que se suscribió (como Observable.just por ejemplo), poblará el búfer del iterador antes de que tenga la oportunidad de ser devuelto. En este caso, es posible que deba dirigir la suscripción a la secuencia diferente con una llamada para subscribeOn :

     observable.subscribeOn(scheduler).asSequence() 

    Sin embargo, aunque toBlocking().getIterator() no almacena todos los resultados, podría amortiguar algunos de ellos si el iterador no los consume a time. Eso podría ser un problema si un ResultSet alguna manera expira cuando llega el siguiente ResultSet .