在同一个开发机器上运行许多Netty客户端和服务器

我正在编写一个应用程序,其中客户端和服务器都是使用Netty编写的,而且服务器应该(显然)一次支持许多客户端。 我试图通过创建共享一个EventLoopGroup 1000个客户端来测试它,并在单台机器上运行一切。

最初,我有多个客户端有时由于超时而无法连接。 增加客户端上的SO_TIMEOUT_MILLIS并将SO_BACKLOG设置为服务器上的numberOfClients修复了问题。 但是,我仍然得到connection reset by peer

 io.netty.channel.AbstractChannel$AnnotatedConnectException: syscall:getsockopt(..) failed: Connection refused: localhost/127.0.0.1:8080 at io.netty.channel.unix.Socket.finishConnect(..)(Unknown Source) Caused by: io.netty.channel.unix.Errors$NativeConnectException: syscall:getsockopt(..) failed: Connection refused ... 1 more 

在客户端有时(特别是当我增加客户端数量时)。 服务器端LoggingHandler输出似乎没有显示任何尝试从这些通道绑定到客户端端口。 试图使用Nio*而不是Epoll*types也没有帮助。

还有其他选项需要设置,以允许更多的连接(可能在服务器端,如果它真的是拒绝/重置连接)?

为了简化情况,我删除了自己的逻辑,所以客户端只需通过websocket连接并在握手成功后关闭通道。 就我的理解而言,Netty通常不会遇到处理10000个并发的websocket连接的问题,

ulimit -n是1000000, ulimit -u是772794,所以两者都不应该成为问题。

这里是代码(在Kotlin中,但是Java的翻译应该是清楚的):

 package netty import io.netty.bootstrap.Bootstrap import io.netty.bootstrap.ServerBootstrap import io.netty.channel.* import io.netty.handler.codec.http.HttpClientCodec import io.netty.handler.codec.http.HttpObjectAggregator import io.netty.handler.codec.http.HttpServerCodec import io.netty.handler.codec.http.websocketx.WebSocketClientHandshakerFactory import io.netty.handler.codec.http.websocketx.WebSocketClientProtocolHandler import io.netty.handler.codec.http.websocketx.WebSocketServerProtocolHandler import io.netty.handler.codec.http.websocketx.WebSocketVersion import io.netty.handler.codec.http.websocketx.extensions.compression.WebSocketClientCompressionHandler import io.netty.handler.codec.http.websocketx.extensions.compression.WebSocketServerCompressionHandler import io.netty.handler.logging.LogLevel import io.netty.handler.logging.LoggingHandler import org.junit.Test import java.net.URI @Suppress("OverridingDeprecatedMember") class NettyTest { private fun channelInitializer(f: (Channel) -> Unit) = object : ChannelInitializer() { override fun initChannel(ch: Channel) { f(ch) } } private val numberOfClients = 10000 private val maxHttpContentLength = 65536 @Test fun manyClients() { // set up server val bossLoopGroup = EpollEventLoopGroup(1) val workerLoopGroup = EpollEventLoopGroup() val serverChannelFactory = ChannelFactory { EpollServerSocketChannel() } val clientLoopGroup = EpollEventLoopGroup() val clientChannelFactory = ChannelFactory { EpollSocketChannel() } val serverChannel = ServerBootstrap().channelFactory(serverChannelFactory).group(bossLoopGroup, workerLoopGroup).handler(LoggingHandler(LogLevel.DEBUG)).childHandler(channelInitializer { it.pipeline().addLast( HttpServerCodec(), HttpObjectAggregator(maxHttpContentLength), WebSocketServerCompressionHandler(), WebSocketServerProtocolHandler("/", null, true, maxHttpContentLength)/*, myServerHandler*/ ) }).option(ChannelOption.SO_BACKLOG, numberOfClients).bind("localhost", 8080).sync().channel() println("Server started") try { // set up clients val url = URI("ws://localhost") val futures = List(numberOfClients) { clientIndex -> val handshaker = WebSocketClientHandshakerFactory.newHandshaker(url, WebSocketVersion.V13, null, true, null) val promise = clientLoopGroup.next().newPromise() val connectFuture = Bootstrap().channelFactory(clientChannelFactory).group(clientLoopGroup).handler(channelInitializer { it.pipeline().addLast( HttpClientCodec(), HttpObjectAggregator(maxHttpContentLength), WebSocketClientCompressionHandler.INSTANCE, WebSocketClientProtocolHandler(handshaker, true), object : ChannelInboundHandlerAdapter() { override fun userEventTriggered(ctx: ChannelHandlerContext, evt: Any) { if (evt == WebSocketClientProtocolHandler.ClientHandshakeStateEvent.HANDSHAKE_COMPLETE) { promise.setSuccess(ctx.channel()) println("Client $clientIndex handshake successful") } } override fun exceptionCaught(ctx: ChannelHandlerContext, cause: Throwable) { promise.setFailure(cause) } }) }).option(ChannelOption.CONNECT_TIMEOUT_MILLIS, 120000).connect("localhost", 8080) Pair(promise, connectFuture) } for ((promise, connectFuture) in futures) { connectFuture.sync() try { promise.sync() } finally { connectFuture.channel().close().sync() } } } finally { try { serverChannel.close().sync() } finally { workerLoopGroup.shutdownGracefully() bossLoopGroup.shutdownGracefully() clientLoopGroup.shutdownGracefully() } } } } 

Interesting Posts