这是反应堆单声道的错误吗?

使用Reactor时我注意到了一些奇怪的行为。 情况是这样的:

  • 拨打其余的API端点,获取一个包装在Mono中的值
  • 用另一个值调用另一个API端点,检索另一个包装在Mono中的值
  • 压缩两个结果

看起来会发生什么是onSubscribe(FluxMap.MapSubscriber)被调用两次的第一个API调用,然后打开两个连接,并产生两个结果。 传递给第二个API调用的结果是非确定性的,取决于第二个API调用是在前两个调用中的第二个调用完成之前还是之后执行。

这是使用Kotlin和Springboot WebClient重现问题的代码示例。 API端点根据路径参数生成单个GUID或多个GUID。 我使用第一个调用的结果中的第一个数字作为第二个调用中的路径参数:

val api = "https://www.uuidgenerator.net/api/guid" val client = WebClient.builder() .baseUrl(api) .build() @Test public fun reactorBug() { val firstResult = callApi().doOnSuccess { r -> println("callApi returned: $r") } val secondResult = callApi(firstResult).doOnSuccess { r -> println("callApi(result) returned: $r") } println(Mono.zip(firstResult, secondResult, { first, second -> "First result is ${first}Second result is $second" }).block()) } private fun callApi(): Mono { println("Calling Api") return client.get().retrieve().bodyToMono() } private fun callApi(number: Int): Mono { println("Calling Api with $number") return client.get().uri("/{number}", number).retrieve().bodyToMono() } private fun callApi(firstResult: Mono): Mono { println("Extracting number from first result") return firstResult .map { guid -> guid.find { c -> c.isDigit() } } .map { Character.getNumericValue(it!!) } .flatMap { i -> callApi(i) } } 

这是解释问题的示例输出:

调用Api
从第一个结果中提取数字
callApi返回:12ec857b-e42c-42ab-a7a2-69beb9a377e3
callApi返回:5eedefa5-73b5-4995-aef3-8621e31b698d < – 此结果不应该发生
调用Api与5 < – 这应该是1,而不是5
callApi(result)返回:
01c64488-6a8c-4400-9094-6729c64a4e1a
0179beae-d2b4-40b6-8489-52fa58deb25f
8f814b1d-594C-4392-a4f5-04d417367add
45891d71-61b2-4d5b-81ad-2cfd8e453377
08edf0c3-3614-402b-8b17-000fdedce1a0
第一个结果是12ec857b-e42c-42ab-a7a2-69beb9a377e3
第二个结果是
01c64488-6a8c-4400-9094-6729c64a4e1a
0179beae-d2b4-40b6-8489-52fa58deb25f
8f814b1d-594C-4392-a4f5-04d417367add
45891d71-61b2-4d5b-81ad-2cfd8e453377
08edf0c3-3614-402b-8b17-000fdedce1a0

编辑的调试输出:

30-01-2018 22:36:11.889 [main] DEBUG osweb.reactive.function.client.debug – onSubscribe(FluxMap.MapSubscriber)
30-01-2018 22:36:11.920 [main] DEBUG osweb.reactive.function.client.debug – 请求(无界)
30-01-2018 22:36:11.924 [main] DEBUG io.netty.util.NetUtil.debug –Djava.net.preferIPv4Stack:false
30-01-2018 22:36:11.925 [main] DEBUG io.netty.util.NetUtil.debug –Djava.net.preferIPv6Addresses:false
30-01-2018 22:36:12.128 [main] DEBUG io.netty.util.NetUtil.debug – Loopback接口:lo(软件环回接口1,127.0.0.1)
30-01-2018 22:36:12.129 [main] DEBUG io.netty.util.NetUtil.debug – 无法从sysctl和file \ proc \ sys \ net \ core \ somaxconn中获取SOMAXCONN。 默认值:200
30-01-2018 22:36:12.146 [main] DEBUG rinrDefaultLoopEpollDetector.debug – 默认epoll支持:false
30-01-2018 22:36:12.156 [main] DEBUG rinresources.DefaultPoolResources.debug – 用于www.uuidgenerator.net/173.255.225.224:443的新http客户端池
30-01-2018 22:36:12.190 [main] DEBUG io.netty.channel.DefaultChannelId.debug –Dio.netty.processId:4232(自动检测)
30-01-2018 22:36:12.396 [main] DEBUG io.netty.channel.DefaultChannelId.debug – -Dio.netty.machineId:78:e4:00:ff:fe:bf:a5:cb(auto-detected )
30-01-2018 22:36:12.447 [main] DEBUG io.netty.buffer.ByteBufUtil.debug – -Dio.netty.allocator.type:pooled
30-01-2018 22:36:12.448 [main] DEBUG io.netty.buffer.ByteBufUtil.debug – -Dio.netty.threadLocalDirectBufferSize:65536
30-01-2018 22:36:12.448 [main] DEBUG io.netty.buffer.ByteBufUtil.debug – -Dio.netty.maxThreadLocalCharBufferSize:16384
30-01-2018 22:36:12.459 [main] DEBUG rincPooledClientContextHandler.debug – 从池中获取现有频道:DefaultPromise @ d23e4a(incomplete)SimpleChannelPool {activeConnections = 1}
30-01-2018 22:36:12.461 [main] DEBUG osweb.reactive.function.client.debug – onSubscribe(FluxMap.MapSubscriber)
30-01-2018 22:36:12.462 [main] DEBUG osweb.reactive.function.client.debug – 请求(无界)
30-01-2018 22:36:12.463 [main] DEBUG rincPooledClientContextHandler.debug – 从池中获取现有通道:DefaultPromise @ c8295b(incomplete)SimpleChannelPool {activeConnections = 1}
30-01-2018 22:36:12.520 [reactor-http-nio-2] DEBUG rinresources.DefaultPoolResources.debug – 创建[id:0x88225196],现在有2个活动连接
30-01-2018 22:36:12.520 [reactor-http-nio-4] DEBUG rinresources.DefaultPoolResources.debug – 创建[id:0x80971ff0],现在有2个活动连接

为什么第一个API调用会发生两次 – 这是一个错误还是Mono的预期行为?

为什么第一个API调用会发生两次

zip将直接订阅firstResult两次,一次通过map-map-flatMap

在这种情况下,你不需要zip ,只需要flatMap了:

 val firstResult = callApi().doOnSuccess { r -> println("callApi returned: $r") } val lastResult = firstResult .flatMap { first -> Mono.just(first) .map { guid -> guid.find { c -> c.isDigit() } } .map { Character.getNumericValue(it!!) } .flatMap { i -> callApi(i) } .map { second -> "First result is ${first}Second result is $second" } } lastResult.block()