¿Es correcto el comportamiento de las corutinas?

Estoy empezando a kotlin y no estoy seguro de haber implementado un patrón de productor y consumidor por kotlin coroutines correctamente?

@RestrictsSuspension interface Producer<in T> { suspend fun yield(value: T); } fun <T> produce(context: CoroutineContext = EmptyCoroutineContext, building: suspend Producer<T>.() -> Unit): Supplier<T> { val (NOT_READY, READY, DONE) = arrayOf(-1, 2, 3); val producer = object : Producer<T>, Continuation<Unit>, Supplier<T> { var value: T? = null; var step: Continuation<Unit>? = null; var state: Int = NOT_READY; override fun get(): T { when (state) { READY -> return pop(); DONE -> throw NoSuchElementException(); } val step = step!!; this.step = null; step.resume(Unit); return get(); } private fun pop(): T { state = NOT_READY; val it = value as T; value = null; return it; } override fun resume(value: Unit) { state = DONE; } override suspend fun yield(value: T) { this.state = READY; this.value = value; return suspendCoroutine<Unit> { step = it; }; } override fun resumeWithException(exception: Throwable) { throw exception; } override val context = EmptyCoroutineContext; }; producer.step = building.createCoroutine(producer, producer).run { return@run context[Async]?.start(this) ?: this; }; return producer; } 

entonces puedo usar el patrón de productor y consumidor de la siguiente manera:

 var steps = ArrayBlockingQueue<Int>(1); fun <T> ArrayBlockingQueue<T>.await() = this.poll(100, MILLISECONDS); val it = produce { steps.add(1); yield("foo"); steps.add(2); }; steps.await();//return null; it.get();//return "foo" steps.await();//return 1; it.get();//throws NoSuchElementException steps.await();//return 2; 

ENTONCES escribo un context Async para resume una Continuation por primera vez solamente. Envío el rest de los resume al sistema kotlin.

  interface Async : CoroutineContext.Element { companion object Key : CoroutineContext.Key<Async>; fun <T> initialize(completion: Continuation<T>): Continuation<T>; fun start(completion: Continuation<Unit>): Continuation<Unit> { return initialize(completion).apply { resume(Unit); }; } } open class ThreadPoolCoroutineContext : AbstractCoroutineContextElement(Async), Async { override fun <T> initialize(completion: Continuation<T>): Continuation<T> { val pool by lazy(ForkJoinPool::commonPool); return object : Continuation<T> { override val context: CoroutineContext = EmptyCoroutineContext; var task: Future<*>? = null; override fun resume(value: T) { dispatcher(completion::resume).invoke(value); } override fun resumeWithException(exception: Throwable) { dispatcher(completion::resumeWithException).invoke(exception); } private fun <T> dispatcher(resume: (T) -> Unit): (T) -> Unit { when (task) { null -> return { value -> task = pool.submit { resume(value) }; } else -> return { task!!.get(); } } } }; } } object CommonPool : ThreadPoolCoroutineContext(); 

ENTONCES puedo comenzar una coroutine por primera vez usando CommonPool :

 val it = produce(CommonPool) { steps.add(1); yield("foo"); steps.add(2); }; steps.await();//return 1; it.get();//return "foo" steps.await();//return null; it.get();//throws NoSuchElementException steps.await();//return 2; 

¿Alguien podría decirme alguna sugerencia si mi suposition es incorrecta? ¿Cuál es una buena forma de implementar una corutina personalizada? Sé que buildSequence es otro uso del patrón productor y consumidor , pero el mío tiene un poco diferente con eso.