RxJava salida diferente entre Flowable y Observable con window y Groupby

Estoy usando RxJava2 con un código que se networkinguce a algo como esto:

val whitespaceRegex = Regex("\\s+") val queryRegex = Regex("query=([^&]+)", RegexOption.IGNORE_CASE) val dateTimeFormatter = DateTimeFormatter.ISO_OFFSET_DATE_TIME @JvmStatic fun main(args: Array<String>) { val cnt = AtomicLong() val templateStr = "|date| /ignonetworking/ query=|query|" val random = ThreadLocalRandom.current() var curDate = ZonedDateTime.of(LocalDate.of(2016, Month.JANUARY, 1), LocalTime.MIDNIGHT, ZoneId.of("UTC")) val generator = Flowable.generate<String> { emitter -> // normally these are read from a file, this is for the example val next = cnt.incrementAndGet() if (next % 3000 == 0L) { curDate = curDate.plusDays(1) } if (next < 100000) { val curStr = templateStr .replace("|date|", dateTimeFormatter.format(curDate)) .replace("|query|", random.nextInt(1, 1000).toString()) emitter.onNext(curStr) } else { emitter.onComplete() } } val source = generator .map { line -> val cols = line.split(whitespaceRegex) val queryRaw = queryRegex.find(cols[2])?.groupValues?.get(1) ?: "" val query = URLDecoder.decode(queryRaw, Charsets.UTF_8.name()).toLowerCase().replace(whitespaceRegex, " ").trim() val date = dateTimeFormatter.parse(cols[0]) Pair(LocalDate.from(date), query) } .share() source .window(source.map { it.first }.distinctUntilChanged()) .flatMap { window -> window .groupBy { pair -> pair } .flatMap({ grouping -> grouping .count() .map { Pair(grouping.key, it) }.toFlowable() }) } .subscribe({ println("Result: $it}") }, { it.printStackTrace() }, { println("Done") }) } 

Cuando uso Observable.generate funciona bien, pero con Flowable.generate no hay salida. Esto es contar cuántas consultas se produjeron en un día determinado. El día aumenta secuencialmente, así que formulo una window de cada día, luego cuento las consultas con un grupo Por. ¿Debo hacer esto de manera diferente con Flowable?

Como mencionó akarnokd, esto se debió a que flatMap tenía un valor máximo pnetworkingeterminado de 128. Encontré este problema, https://github.com/ReactiveX/RxJava/issues/5126 , que describe el motivo con más detalle. Esto soluciona el problema:

  val cnt = AtomicLong() val templateStr = "|date| /ignonetworking/ query=|query|" val random = ThreadLocalRandom.current() var curDate = ZonedDateTime.of(LocalDate.of(2016, Month.JANUARY, 1), LocalTime.MIDNIGHT, ZoneId.of("UTC")) val generator = Flowable.generate<String> { emitter -> val next = cnt.incrementAndGet() if (next % 3000 == 0L) { curDate = curDate.plusDays(1) } if (next < 1000000) { val curStr = templateStr .replace("|date|", dateTimeFormatter.format(curDate)) .replace("|query|", random.nextInt(1, 1000).toString()) emitter.onNext(curStr) } else { emitter.onComplete() } } val source = generator .map { line -> val cols = line.split(whitespaceRegex) val queryRaw = queryRegex.find(cols[2])?.groupValues?.get(1) ?: "" val query = URLDecoder.decode(queryRaw, Charsets.UTF_8.name()).toLowerCase().replace(whitespaceRegex, " ").trim() val date = dateTimeFormatter.parse(cols[0]) Pair(LocalDate.from(date), query) } .share() source .window(source.map { it.first }.distinctUntilChanged().doOnEach({println("Win: $it")})) .flatMap( { window -> window .groupBy { pair -> pair } .flatMap({ grouping -> grouping .count() .map { Pair(grouping.key, it) }.toFlowable() // fix is here }, Int.MAX_VALUE) // and here }, Int.MAX_VALUE) .subscribe({ println("Result: $it}") }, { it.printStackTrace() }, { println("Done") }) 
  • No se pudo deducir Kotlin y RxJava
  • RxJava2 cómo separar diferentes implementaciones de emisor observable
  • Invoque RxJava2 cancelable / desechable del hilo correcto
  • Observables opcionales en combinar
  • Cómo get el último valor emitido de observable
  • Asunto de RxJava: escuche el tipo diferente que el que emite
  • Para una function de Kotlin utilizada como expresión, ¿hay una forma concisa de operar y devolver un valor?
  • ¿Cuál es la diferencia entre llaves y soportes normales en RxJava con Kotlin?
  • Cómo retrasar onError () en RxJava 2 y Android?
  • Single.zip completando antes de Success
  • RxJava 2 requiere un tipo de retorno Observable diferente de RxJava 1 (Kotlin)