RxJava在Flowable和Observable之间使用Window和Groupby的不同输出

我使用的是RxJava2,代码如下:

val whitespaceRegex = Regex("\\s+") val queryRegex = Regex("query=([^&]+)", RegexOption.IGNORE_CASE) val dateTimeFormatter = DateTimeFormatter.ISO_OFFSET_DATE_TIME @JvmStatic fun main(args: Array<String>) { val cnt = AtomicLong() val templateStr = "|date| /ignored/ query=|query|" val random = ThreadLocalRandom.current() var curDate = ZonedDateTime.of(LocalDate.of(2016, Month.JANUARY, 1), LocalTime.MIDNIGHT, ZoneId.of("UTC")) val generator = Flowable.generate<String> { emitter -> // normally these are read from a file, this is for the example val next = cnt.incrementAndGet() if (next % 3000 == 0L) { curDate = curDate.plusDays(1) } if (next < 100000) { val curStr = templateStr .replace("|date|", dateTimeFormatter.format(curDate)) .replace("|query|", random.nextInt(1, 1000).toString()) emitter.onNext(curStr) } else { emitter.onComplete() } } val source = generator .map { line -> val cols = line.split(whitespaceRegex) val queryRaw = queryRegex.find(cols[2])?.groupValues?.get(1) ?: "" val query = URLDecoder.decode(queryRaw, Charsets.UTF_8.name()).toLowerCase().replace(whitespaceRegex, " ").trim() val date = dateTimeFormatter.parse(cols[0]) Pair(LocalDate.from(date), query) } .share() source .window(source.map { it.first }.distinctUntilChanged()) .flatMap { window -> window .groupBy { pair -> pair } .flatMap({ grouping -> grouping .count() .map { Pair(grouping.key, it) }.toFlowable() }) } .subscribe({ println("Result: $it}") }, { it.printStackTrace() }, { println("Done") }) } 

当我使用Observable.generate它工作正常,但与Flowable.generate没有输出。 这是指一天中有多少个查询发生。 当天连续增加,所以我形成了一个每天的窗口,然后用groupBy计数查询。 我是否需要用Flowable做不同的处理?

正如akarnokd提到的,这是由于flatMap默认的maxConcurrency为128.我发现这个问题, https://github.com/ReactiveX/RxJava/issues/5126 ,它更详细地描述了原因。 这解决了这个问题:

  val cnt = AtomicLong() val templateStr = "|date| /ignored/ query=|query|" val random = ThreadLocalRandom.current() var curDate = ZonedDateTime.of(LocalDate.of(2016, Month.JANUARY, 1), LocalTime.MIDNIGHT, ZoneId.of("UTC")) val generator = Flowable.generate<String> { emitter -> val next = cnt.incrementAndGet() if (next % 3000 == 0L) { curDate = curDate.plusDays(1) } if (next < 1000000) { val curStr = templateStr .replace("|date|", dateTimeFormatter.format(curDate)) .replace("|query|", random.nextInt(1, 1000).toString()) emitter.onNext(curStr) } else { emitter.onComplete() } } val source = generator .map { line -> val cols = line.split(whitespaceRegex) val queryRaw = queryRegex.find(cols[2])?.groupValues?.get(1) ?: "" val query = URLDecoder.decode(queryRaw, Charsets.UTF_8.name()).toLowerCase().replace(whitespaceRegex, " ").trim() val date = dateTimeFormatter.parse(cols[0]) Pair(LocalDate.from(date), query) } .share() source .window(source.map { it.first }.distinctUntilChanged().doOnEach({println("Win: $it")})) .flatMap( { window -> window .groupBy { pair -> pair } .flatMap({ grouping -> grouping .count() .map { Pair(grouping.key, it) }.toFlowable() // fix is here }, Int.MAX_VALUE) // and here }, Int.MAX_VALUE) .subscribe({ println("Result: $it}") }, { it.printStackTrace() }, { println("Done") })