ReactiveStreams NPE cuando se utiliza publishOn con Publisher personalizado

Cuando uso Reactive Streams ( https://github.com/reactor/reactor-core ) con un Publisher personalizado en combinación con la function publishOn , siempre obtengo un NPE. ¿Qué pasa con mi código? ¿Utilizo el Publisher de una manera incorrecta?

 Flux.from(MyPublisher()) .publishOn(Schedulers.single()) .subscribe { println("<-- $it received") } class MyPublisher : Publisher<Int> { override fun subscribe(sub: Subscriber<in Int>) { while (true) { Thread.sleep(300) sub.onNext(1) } } } 

La exception es:

 Exception in thread "main" java.lang.NullPointerException at reactor.core.publisher.FluxPublishOn$PublishOnSubscriber.onNext(FluxPublishOn.java:212) at org.guenhter.kotlin.hello.MyPublisher.subscribe(HelloWorld.kt:18) at reactor.core.publisher.FluxSource.subscribe(FluxSource.java:52) at reactor.core.publisher.FluxPublishOn.subscribe(FluxPublishOn.java:96) at reactor.core.publisher.Flux.subscribe(Flux.java:6447) at reactor.core.publisher.Flux.subscribeWith(Flux.java:6614) at reactor.core.publisher.Flux.subscribe(Flux.java:6440) at reactor.core.publisher.Flux.subscribe(Flux.java:6404) at reactor.core.publisher.Flux.subscribe(Flux.java:6347) at org.guenhter.kotlin.hello.HelloWorldKt.main(HelloWorld.kt:11) 

Publisher está definido por el estándar "streams reactivos" y tiene una serie de requisitos. Uno de estos requisitos es que se debe llamar a Subscriber.onSubscribe antes que cualquiera de los otros methods para seguir el protocolo.

Como no ha hecho esto, significa que probablemente algo no se haya inicializado correctamente, lo que causa que el NPE esté dentro de la class del reactor.

Sin embargo, incluso si resuelve este problema, el estándar está diseñado para ser reactivo, lo que significa que solo emite datos cuando el suscriptor lo solicita. Dado que usted le enviará datos independientemente de eso, probablemente causará una exception más adelante en la línea. Use Flux.create para crear un emisor que pueda manejar adecuadamente las requestes en lugar de crear su propia implementación de Publisher.