Al utilizar tanto publishOn como subscribeOn en un flujo, no ocurre nada

Siempre que utilizo tanto subscribeOn como publishOn, nada se imprime. Si uso solo uno, se imprimirá. Si uso subscribeOn (Schedulers.immediate ()) o elástico, funciona. ¿Alguna idea de por qué es eso?

Tenía entendido que publishOn afecta a qué hilo se publica y se suscribe al hilo que ejecuta el suscriptor. ¿Podría señalarme en la dirección correcta?

fun test() { val testPublisher = EmitterProcessor.create<String>().connect() testPublisher .publishOn(Schedulers.elastic()) .map { it -> println("map on ${Thread.currentThread().name}") it } .subscribeOn(Schedulers.parallel()) .subscribe { println("subscribe on ${Thread.currentThread().name}") } testPublisher.onNext("a") testPublisher.onNext("b") testPublisher.onNext("c") Thread.sleep(5000) println("---") } 

subscribeOn lugar de influenciar dónde se produce la suscripción . Es decir, el evento inicial que desencadena la fuente para emitir elementos. Por otro lado, el gancho onNext Subscriber está influenciado por la publishOn más publishOn en la cadena (al igual que su map ).

Pero EmitterProcessor , como la mayoría de los Processor , está más avanzado y puede robar trabajo. No estoy seguro de por qué no se imprime nada en su caja (su muestra convertida a Java funciona en mi máquina), pero apuesto a que tiene algo que ver con ese procesador.

Este código demostraría mejor subscribeOn publishOn vs publishOn :

 Flux.just("a", "b", "c") //this is where subscription triggers data production //this is influenced by subscribeOn .doOnNext(v -> System.out.println("before publishOn: " + Thread.currentThread().getName())) .publishOn(Schedulers.elastic()) //the rest is influenced by publishOn .doOnNext(v -> System.out.println("after publishOn: " + Thread.currentThread().getName())) .subscribeOn(Schedulers.parallel()) .subscribe(v -> System.out.println("received " + v + " on " + Thread.currentThread().getName())); Thread.sleep(5000); 

Esto se imprime:

 before publishOn: parallel-1 before publishOn: parallel-1 before publishOn: parallel-1 after publishOn: elastic-2 received a on elastic-2 after publishOn: elastic-2 received b on elastic-2 after publishOn: elastic-2 received c on elastic-2 
  • Cargue la configuration xml del muelle de forma dinámica
  • ¿Cómo usar los methods Spring Data JPA que devuelven un Stream en un bloque try-with-resources en Kotlin?
  • Spring @Autowire no trabaja en Kotlin
  • Cómo crear consultas HQL usando campos de class extendida
  • Usando @ EnableNeo4jRepositories (basePackageClasses = "myApp") en Kotlin
  • ¿Puedo escribir una function de extensión de Kotlin que use un bean de spring autoconectado?
  • Cómo usar la dependency injection en las testings de Spek
  • La desconnection no funciona en la aplicación Spring Boot (no es compatible con el método POST)
  • Spring Boot 2 y Kotlin (con Maven)
  • Probar los methods @Async de devolución de vacío de Testing
  • Spring Injecting util: map en Kotlin con tipo de security