testing unitaria con apache storm y cassandra: topología local que no consume el post de kafka

Tengo una unidad / testing de integración configurada para publicar events en una queue de Kafka. También tengo una topología de tormenta local que está configurada para consumir events en la queue kafka, hacer algunas transformaciones en un perno y luego savelas en la database de cassandra. La testing está configurada de la siguiente manera:

class StormPersistorTest{ lateinit var cluster: LocalCluster private val zk = ZookeeperHelper() @Before fun setup() { cluster = runBlocking { // same code as in main function val localCluster = LocalCluster() val persistorTopology = PersistorsTopology() val conf = persistorTopology.createLocalConfiguration() conf.setDebug(true) val topology = persistorTopology.createTopology(arrayOf(), conf) localCluster.submitTopology(persistorTopology.name, conf, topology) localCluster } zk.setUp() // creates a ZkClient } @After fun tearDown(){ cluster.killTopology(GraphPersistorsTopology.APPNAME); cluster.shutdown() zk.tearDown() // closes the ZkClient } @Test fun EventPersistorTest(){ val payload = getEventSerializer().initialize().serialize(generateTestEvent()).toByteBuffer() val dataMessage = DataMessage .newBuilder() .setId(null) .setPayload(payload) .build() KafkaIntegrationTestHelpers.enqueueToKafka(topicName, dataMessage, getDataMessageSerializer()) Thread.sleep(5000) Assert.assertEquals(1, countRowsInTable("cassandra_table")) } 

Para la aplicación, tengo una function principal en el package que ejecuta la topología:

 fun main(args: Array<String>) { val persistorTopology = GraphPersistorsTopology() val conf = persistorTopology.createLocalConfiguration() val topology = persistorTopology.createTopology(arrayOf(), conf) val cluster = LocalCluster() cluster.submitTopology(persistorTopology.name, conf, topology) } } 

Lo que he estado observando es que si ejecuto la function principal para iniciar la topología en una instancia separada (y comentar el bloque async junto con las primeras 2 líneas en tearDown ), entonces la topología podrá consumir correctamente el post del kafka queue. Por otro lado, si no ejecuto la function principal para iniciar la topología y usar el bloque async , entonces no arroja un error, pero el método nextTuple ni siquiera se ejecuta, lo que falla mi testing.

Tenga en count que había intentado ejecutar el código en runBlocking sin runBlocking con el runBlocking . Pensé que tal vez la razón por la que la function principal funcionó fue porque la topología de tormenta se inició en otro hilo.