RxJava salida diferente entre Flowable y Observable con window y Groupby

Estoy usando RxJava2 con un código que se networkinguce a algo como esto:

val whitespaceRegex = Regex("\\s+") val queryRegex = Regex("query=([^&]+)", RegexOption.IGNORE_CASE) val dateTimeFormatter = DateTimeFormatter.ISO_OFFSET_DATE_TIME @JvmStatic fun main(args: Array<String>) { val cnt = AtomicLong() val templateStr = "|date| /ignonetworking/ query=|query|" val random = ThreadLocalRandom.current() var curDate = ZonedDateTime.of(LocalDate.of(2016, Month.JANUARY, 1), LocalTime.MIDNIGHT, ZoneId.of("UTC")) val generator = Flowable.generate<String> { emitter -> // normally these are read from a file, this is for the example val next = cnt.incrementAndGet() if (next % 3000 == 0L) { curDate = curDate.plusDays(1) } if (next < 100000) { val curStr = templateStr .replace("|date|", dateTimeFormatter.format(curDate)) .replace("|query|", random.nextInt(1, 1000).toString()) emitter.onNext(curStr) } else { emitter.onComplete() } } val source = generator .map { line -> val cols = line.split(whitespaceRegex) val queryRaw = queryRegex.find(cols[2])?.groupValues?.get(1) ?: "" val query = URLDecoder.decode(queryRaw, Charsets.UTF_8.name()).toLowerCase().replace(whitespaceRegex, " ").trim() val date = dateTimeFormatter.parse(cols[0]) Pair(LocalDate.from(date), query) } .share() source .window(source.map { it.first }.distinctUntilChanged()) .flatMap { window -> window .groupBy { pair -> pair } .flatMap({ grouping -> grouping .count() .map { Pair(grouping.key, it) }.toFlowable() }) } .subscribe({ println("Result: $it}") }, { it.printStackTrace() }, { println("Done") }) } 

Cuando uso Observable.generate funciona bien, pero con Flowable.generate no hay salida. Esto es contar cuántas consultas se produjeron en un día determinado. El día aumenta secuencialmente, así que formulo una window de cada día, luego cuento las consultas con un grupo Por. ¿Debo hacer esto de manera diferente con Flowable?

Como mencionó akarnokd, esto se debió a que flatMap tenía un valor máximo pnetworkingeterminado de 128. Encontré este problema, https://github.com/ReactiveX/RxJava/issues/5126 , que describe el motivo con más detalle. Esto soluciona el problema:

  val cnt = AtomicLong() val templateStr = "|date| /ignonetworking/ query=|query|" val random = ThreadLocalRandom.current() var curDate = ZonedDateTime.of(LocalDate.of(2016, Month.JANUARY, 1), LocalTime.MIDNIGHT, ZoneId.of("UTC")) val generator = Flowable.generate<String> { emitter -> val next = cnt.incrementAndGet() if (next % 3000 == 0L) { curDate = curDate.plusDays(1) } if (next < 1000000) { val curStr = templateStr .replace("|date|", dateTimeFormatter.format(curDate)) .replace("|query|", random.nextInt(1, 1000).toString()) emitter.onNext(curStr) } else { emitter.onComplete() } } val source = generator .map { line -> val cols = line.split(whitespaceRegex) val queryRaw = queryRegex.find(cols[2])?.groupValues?.get(1) ?: "" val query = URLDecoder.decode(queryRaw, Charsets.UTF_8.name()).toLowerCase().replace(whitespaceRegex, " ").trim() val date = dateTimeFormatter.parse(cols[0]) Pair(LocalDate.from(date), query) } .share() source .window(source.map { it.first }.distinctUntilChanged().doOnEach({println("Win: $it")})) .flatMap( { window -> window .groupBy { pair -> pair } .flatMap({ grouping -> grouping .count() .map { Pair(grouping.key, it) }.toFlowable() // fix is here }, Int.MAX_VALUE) // and here }, Int.MAX_VALUE) .subscribe({ println("Result: $it}") }, { it.printStackTrace() }, { println("Done") }) 
  • Pruebas unitarias Rxjava observables que tienen un retraso
  • Inferencia de tipo Observable.combineLatest en kotlin
  • Cómo pasar nulo a un Observable con tipo anulable en RxJava 2 y Kotlin
  • Kotlin no puede inferir el tipo cuando utiliza la reference de método en Flowable
  • Aplicando transformación a cada elemento en Single <List <T >>
  • ¿Cómo soluciono el error de inferencia de tipo en un Completable transformado utilizando RxLifecycle.bindToLifecycle ()?
  • No se puede llamar al operador desde () en Observable en Android Kotlin
  • Android Architecture Components Room ViewModel CompleteableFormAction
  • Single.zip completando antes de Success
  • Sitio de Android con RxJava manejar resultado de consulta vacío
  • Espere hasta que dos observables emitan verdadero