Oyente dentro del productor

Estoy tratando de crear producer para un oyente. Mi código se ve así

 suspend fun foo() = produce{ someEvent.addListener { this.send(it) } } 

Pero Suspension functions can be called only within coroutine un error. Suspension functions can be called only within coroutine que tiene sentido. Mi pregunta es. ¿Hay alguna manera de implementar este patrón usando corutinas?

Existen varias forms de implementarlo, dependiendo de lo que intente lograr:

Si desea recibir solo el evento más reciente , debe usar un canal combinado y offer método de offer que siempre tenga éxito:

 fun foo() = produce<T>(capacity = Channel.CONFLATED) { someEvent.addListener { offer(it) } } 

Si es crítico recibir todos los events, entonces sus elecciones dependen del comportamiento del productor de su evento. La pregunta key a considerar aquí es qué sucede si el productor de su evento comienza a producir muchos events "sin parar". La mayoría de los productores de events "sincrónicos", como regla general, no admiten una señal de contrapresión explícita, pero aún soportan una señal de contrapresión implícita: disminuirán la velocidad si sus oyentes son lentos o bloquean el hilo. Por lo tanto, generalmente, la siguiente solución funciona perfectamente para los productores de events sincrónicos:

 fun foo() = produce<T>() { someEvent.addListener { runBlocking { send(it) } } } 

También puede especificar alguna capacity = xxx positiva capacity = xxx como parámetro para produce generador como una optimization del performance si tiene casos en los que se produce un lote de events a la vez y no quiere bloquear al productor, sino dejar que el consumidor los maneje en su propio ritmo.

En el raro caso en que su productor no comprenda una señal de contrapresión de locking implícita (cuando se trata de un tipo de artilugio de subprocesss múltiples que produce violentamente events sin synchronization interna), entonces puede usar un canal con capacidad ilimitada con offer , pero tenga cuidado. corre el riesgo de quedarse sin memory si el productor deja atrás al consumidor:

 fun foo() = produce<T>(capacity = Channel.UNLIMITED) { someEvent.addListener { offer(it) } } 

Si su productor admite una señal explícita de contrapresión (como las streams reactivas funcionales), entonces debe usar un adaptador especial para transferir adecuadamente su señal de contrapresión a / desde corutinas. La biblioteca kotlinx.coroutines tiene una serie de modules de integración kotlinx.coroutines con varias bibliotecas reactivas para este propósito. Mira aquí .

Nota: no debe marcar su function foo con el modificador de suspend . La invocación de foo no suspende al invocador de todos modos. Inmediatamente (sincrónicamente) inicia una corrutina de productor.

Para aprender más sobre corutinas y diferentes types de canales, recomiendo estudiar la guía sobre kotlinx.coroutines .