From 6e0520299da4868f880c1b4d8ee5772be4fd791a Mon Sep 17 00:00:00 2001 From: Violeta Georgieva Date: Sat, 11 Apr 2020 15:29:32 +0300 Subject: [PATCH] Adapt tests --- .../channel/ChannelOperationsHandlerTest.java | 6 +- .../netty/channel/FluxReceiveTest.java | 5 +- .../reactor/netty/http/HttpResourcesTest.java | 12 ++- .../java/reactor/netty/http/HttpTests.java | 6 +- .../netty/http/HttpsMetricsHandlerTests.java | 3 +- .../http/client/HttpClientProxyTest.java | 2 +- .../netty/http/client/HttpClientTest.java | 80 ++++++++----------- .../http/client/HttpClientWithTomcatTest.java | 10 +-- .../netty/http/client/WebsocketTest.java | 8 +- .../http/server/ConnectionInfoTests.java | 10 +-- .../netty/http/server/HttpServerTests.java | 40 +++++----- .../PooledConnectionProviderTest.java | 66 +++++++++++---- .../netty/tcp/BlockingConnectionTest.java | 50 +++++++++--- .../reactor/netty/tcp/SslProviderTests.java | 6 +- .../reactor/netty/tcp/TcpClientTests.java | 13 +-- .../reactor/netty/tcp/TcpMetricsTests.java | 23 +++--- .../reactor/netty/tcp/TcpResourcesTest.java | 11 ++- .../reactor/netty/tcp/TcpServerTests.java | 15 ++-- .../ByteBufAllocatorMetricsTest.java | 2 +- 19 files changed, 208 insertions(+), 160 deletions(-) diff --git a/src/test/java/reactor/netty/channel/ChannelOperationsHandlerTest.java b/src/test/java/reactor/netty/channel/ChannelOperationsHandlerTest.java index 5c67705ace..16c282a69e 100644 --- a/src/test/java/reactor/netty/channel/ChannelOperationsHandlerTest.java +++ b/src/test/java/reactor/netty/channel/ChannelOperationsHandlerTest.java @@ -76,7 +76,7 @@ private void doTestPublisherSenderOnCompleteFlushInProgress(boolean useScheduler DisposableServer server = HttpServer.create() .port(0) - .tcpConfiguration(tcpServer -> tcpServer.doOnConnection(conn -> { conn.addHandler(new LineBasedFrameDecoder(10)); })) + .doOnConnection(conn -> conn.addHandler(new LineBasedFrameDecoder(10))) .handle((req, res) -> req.receive() .asString() @@ -92,11 +92,11 @@ private void doTestPublisherSenderOnCompleteFlushInProgress(boolean useScheduler } Mono code = HttpClient.create() - .tcpConfiguration(tcpClient -> tcpClient.doOnConnected(conn -> { + .doOnConnected(conn -> { if (handler != null) { conn.addHandlerLast(handler); } - })) + }) .port(server.address().getPort()) .wiretap(true) .post() diff --git a/src/test/java/reactor/netty/channel/FluxReceiveTest.java b/src/test/java/reactor/netty/channel/FluxReceiveTest.java index 5532d93afc..7eef397dce 100644 --- a/src/test/java/reactor/netty/channel/FluxReceiveTest.java +++ b/src/test/java/reactor/netty/channel/FluxReceiveTest.java @@ -99,9 +99,8 @@ public void testByteBufsReleasedWhenTimeoutUsingHandlers() { routes.get("/forward", (req, res) -> HttpClient.create() .port(server1.address().getPort()) - .tcpConfiguration(tcpClient -> - tcpClient.doOnConnected(c -> - c.addHandlerFirst(new ReadTimeoutHandler(50, TimeUnit.MILLISECONDS)))) + .doOnConnected(c -> + c.addHandlerFirst(new ReadTimeoutHandler(50, TimeUnit.MILLISECONDS))) .get() .uri("/target") .responseContent() diff --git a/src/test/java/reactor/netty/http/HttpResourcesTest.java b/src/test/java/reactor/netty/http/HttpResourcesTest.java index b9532a5548..2423df9fc1 100644 --- a/src/test/java/reactor/netty/http/HttpResourcesTest.java +++ b/src/test/java/reactor/netty/http/HttpResourcesTest.java @@ -15,17 +15,21 @@ */ package reactor.netty.http; +import java.net.SocketAddress; import java.time.Duration; import java.util.concurrent.atomic.AtomicBoolean; +import java.util.function.Supplier; -import io.netty.bootstrap.Bootstrap; import io.netty.channel.EventLoopGroup; +import io.netty.resolver.AddressResolverGroup; import org.junit.Before; import org.junit.Test; import reactor.core.publisher.Mono; import reactor.netty.Connection; +import reactor.netty.ConnectionObserver; import reactor.netty.resources.ConnectionProvider; import reactor.netty.resources.LoopResources; +import reactor.netty.transport.TransportConfig; import static org.assertj.core.api.Assertions.assertThat; @@ -58,8 +62,12 @@ public boolean isDisposed() { }; ConnectionProvider poolResources = new ConnectionProvider() { + @Override - public Mono acquire(Bootstrap bootstrap) { + public Mono acquire(TransportConfig config, + ConnectionObserver observer, + Supplier remoteAddress, + AddressResolverGroup resolverGroup) { return Mono.never(); } diff --git a/src/test/java/reactor/netty/http/HttpTests.java b/src/test/java/reactor/netty/http/HttpTests.java index bde626df73..00b0dd441b 100644 --- a/src/test/java/reactor/netty/http/HttpTests.java +++ b/src/test/java/reactor/netty/http/HttpTests.java @@ -707,7 +707,7 @@ public void testHttpNoSslH2Fails() { .wiretap(true) .bind() ).verifyErrorMessage("Configured H2 protocol without TLS. Use" + - " a clear-text h2 protocol via HttpServer#protocol or configure TLS" + + " a Clear-Text H2 protocol via HttpServer#protocol or configure TLS" + " via HttpServer#secure"); } @@ -723,7 +723,9 @@ public void testHttpSslH2CFails() throws Exception { .handle((req, res) -> res.sendString(Mono.just("Hello"))) .wiretap(true) .bind() - ).verifyErrorMessage("Configured H2 Clear-Text protocol with TLS. Use the non clear-text h2 protocol via HttpServer#protocol or disable TLS via HttpServer#tcpConfiguration(tcp -> tcp.noSSL())"); + ).verifyErrorMessage("Configured H2 Clear-Text protocol with TLS. Use" + + " the non Clear-Text H2 protocol via HttpServer#protocol or disable TLS" + + " via HttpServer#noSSL())"); } @Test(expected = IllegalArgumentException.class) diff --git a/src/test/java/reactor/netty/http/HttpsMetricsHandlerTests.java b/src/test/java/reactor/netty/http/HttpsMetricsHandlerTests.java index a7c21534ed..a48a1c0d6b 100644 --- a/src/test/java/reactor/netty/http/HttpsMetricsHandlerTests.java +++ b/src/test/java/reactor/netty/http/HttpsMetricsHandlerTests.java @@ -26,7 +26,6 @@ import reactor.netty.ByteBufFlux; import reactor.netty.http.client.HttpClient; import reactor.netty.http.server.HttpServer; -import reactor.netty.tcp.TcpServer; import javax.net.ssl.SSLException; import java.net.InetSocketAddress; @@ -84,7 +83,7 @@ protected void checkTlsTimer(String name, String[] tags, long expectedCount) { @Test public void testIssue896() throws Exception { - disposableServer = httpServer.tcpConfiguration(TcpServer::noSSL) + disposableServer = httpServer.noSSL() .bindNow(); CountDownLatch latch = new CountDownLatch(1); diff --git a/src/test/java/reactor/netty/http/client/HttpClientProxyTest.java b/src/test/java/reactor/netty/http/client/HttpClientProxyTest.java index b0fca63ea5..12f29731c8 100644 --- a/src/test/java/reactor/netty/http/client/HttpClientProxyTest.java +++ b/src/test/java/reactor/netty/http/client/HttpClientProxyTest.java @@ -172,7 +172,7 @@ private Mono> sendRequest( boolean wiretap) { HttpClient client = HttpClient.create() - .tcpConfiguration(tcpClient -> tcpClient.proxy(proxyOptions)) + .proxy(proxyOptions) .doOnResponse((res, conn) -> { ChannelHandler handler = conn.channel().pipeline().get(NettyPipeline.ProxyLoggingHandler); res.responseHeaders() diff --git a/src/test/java/reactor/netty/http/client/HttpClientTest.java b/src/test/java/reactor/netty/http/client/HttpClientTest.java index a127ccbaf0..1225a311bb 100644 --- a/src/test/java/reactor/netty/http/client/HttpClientTest.java +++ b/src/test/java/reactor/netty/http/client/HttpClientTest.java @@ -161,15 +161,6 @@ public void abort() { pool.dispose(); } - private DefaultFullHttpResponse response() { - DefaultFullHttpResponse r = - new DefaultFullHttpResponse(HttpVersion.HTTP_1_1, - HttpResponseStatus.ACCEPTED); - r.headers() - .set(HttpHeaderNames.CONTENT_LENGTH, 0); - return r; - } - /** This ensures that non-default values for the HTTP request line are visible for parsing. */ @Test public void postVisibleToOnRequest() { @@ -489,14 +480,14 @@ public void testUserAgent() { @Test public void gettingOptionsDuplicates() { - HttpClient client = HttpClient.create() - .tcpConfiguration(tcpClient -> tcpClient.host("example.com")) - .wiretap(true) - .port(123) - .compress(true); - assertThat(client.tcpConfiguration()) - .isNotSameAs(HttpClient.DEFAULT_TCP_CLIENT) - .isNotSameAs(client.tcpConfiguration()); + HttpClient client1 = HttpClient.create(); + HttpClient client2 = client1.host("example.com") + .wiretap(true) + .port(123) + .compress(true); + assertThat(client2) + .isNotSameAs(client1) + .isNotSameAs(((HttpClientConnect) client2).duplicate()); } @Test @@ -944,7 +935,7 @@ public void testIssue407_1() throws Exception { .trustManager(InsecureTrustManagerFactory.INSTANCE))); AtomicReference ch1 = new AtomicReference<>(); - StepVerifier.create(client.tcpConfiguration(tcpClient -> tcpClient.doOnConnected(c -> ch1.set(c.channel()))) + StepVerifier.create(client.doOnConnected(c -> ch1.set(c.channel())) .get() .uri("/1") .responseContent() @@ -955,7 +946,7 @@ public void testIssue407_1() throws Exception { .verify(Duration.ofSeconds(30)); AtomicReference ch2 = new AtomicReference<>(); - StepVerifier.create(client.tcpConfiguration(tcpClient -> tcpClient.doOnConnected(c -> ch2.set(c.channel()))) + StepVerifier.create(client.doOnConnected(c -> ch2.set(c.channel())) .post() .uri("/2") .send(ByteBufFlux.fromString(Mono.just("test"))) @@ -968,7 +959,7 @@ public void testIssue407_1() throws Exception { AtomicReference ch3 = new AtomicReference<>(); StepVerifier.create( - client.tcpConfiguration(tcpClient -> tcpClient.doOnConnected(c -> ch3.set(c.channel()))) + client.doOnConnected(c -> ch3.set(c.channel())) .secure(spec -> spec.sslContext( SslContextBuilder.forClient() .trustManager(InsecureTrustManagerFactory.INSTANCE)) @@ -1007,10 +998,10 @@ public void testIssue407_2() throws Exception { ConnectionProvider provider = ConnectionProvider.create("testIssue407_2", 1); HttpClient client = createHttpClientForContextWithAddress(provider) - .tcpConfiguration(tcpClient -> tcpClient.secure(spec -> spec.sslContext(clientSslContextBuilder))); + .secure(spec -> spec.sslContext(clientSslContextBuilder)); AtomicReference ch1 = new AtomicReference<>(); - StepVerifier.create(client.tcpConfiguration(tcpClient -> tcpClient.doOnConnected(c -> ch1.set(c.channel()))) + StepVerifier.create(client.doOnConnected(c -> ch1.set(c.channel())) .get() .uri("/1") .responseContent() @@ -1021,7 +1012,7 @@ public void testIssue407_2() throws Exception { .verify(Duration.ofSeconds(30)); AtomicReference ch2 = new AtomicReference<>(); - StepVerifier.create(client.tcpConfiguration(tcpClient -> tcpClient.doOnConnected(c -> ch2.set(c.channel()))) + StepVerifier.create(client.doOnConnected(c -> ch2.set(c.channel())) .post() .uri("/2") .send(ByteBufFlux.fromString(Mono.just("test"))) @@ -1034,10 +1025,9 @@ public void testIssue407_2() throws Exception { AtomicReference ch3 = new AtomicReference<>(); StepVerifier.create( - client.tcpConfiguration(tcpClient -> - tcpClient.doOnConnected(c -> ch3.set(c.channel())) - .secure(spec -> spec.sslContext(clientSslContextBuilder) - .defaultConfiguration(SslProvider.DefaultConfigurationType.TCP))) + client.doOnConnected(c -> ch3.set(c.channel())) + .secure(spec -> spec.sslContext(clientSslContextBuilder) + .defaultConfiguration(SslProvider.DefaultConfigurationType.TCP)) .post() .uri("/3") .responseContent() @@ -1131,7 +1121,7 @@ public void doOnError() { requestError1.set(req.currentContext().getOrDefault("test", "empty"))) .doOnResponseError((res, err) -> responseError1.set(res.currentContext().getOrDefault("test", "empty"))) - .mapConnect((c, b) -> c.subscriberContext(Context.of("test", "success"))) + .mapConnect((c) -> c.subscriberContext(Context.of("test", "success"))) .get() .uri("/") .responseContent() @@ -1154,7 +1144,7 @@ public void doOnError() { requestError2.set(req.currentContext().getOrDefault("test", "empty")) ,(res, err) -> responseError2.set(res.currentContext().getOrDefault("test", "empty"))) - .mapConnect((c, b) -> c.subscriberContext(Context.of("test", "success"))) + .mapConnect((c) -> c.subscriberContext(Context.of("test", "success"))) .get() .uri("/") .responseContent() @@ -1178,7 +1168,7 @@ public void withConnector() { .bindNow(); Mono content = createHttpClientForContextWithPort() - .mapConnect((c, b) -> c.subscriberContext(Context.of("test", "success"))) + .mapConnect((c) -> c.subscriberContext(Context.of("test", "success"))) .post() .uri("/") .send((req, out) -> { @@ -1425,7 +1415,7 @@ private void doTestIssue600(boolean withLoop) { HttpClient client; if (withLoop) { client = createHttpClientForContextWithAddress(pool) - .tcpConfiguration(tcpClient -> tcpClient.runOn(loop)); + .runOn(loop); } else { client = createHttpClientForContextWithAddress(pool); @@ -1476,13 +1466,12 @@ public void testChannelGroupClosesAllConnections() throws Exception { Flux.just("/never", "/delay10", "/delay1") .flatMap(s -> - client.tcpConfiguration( - tcpClient -> tcpClient.doOnConnected(c -> { + client.doOnConnected(c -> { c.onDispose() .subscribe(null, null, latch2::countDown); group.add(c.channel()); latch1.countDown(); - })) + }) .get() .uri(s) .responseContent() @@ -1540,11 +1529,10 @@ public void testIssue632() throws Exception { CountDownLatch latch = new CountDownLatch(1); createHttpClientForContextWithPort() - .tcpConfiguration(tcpClient -> - tcpClient.doOnConnected(conn -> + .doOnConnected(conn -> conn.channel() .closeFuture() - .addListener(future -> latch.countDown()))) + .addListener(future -> latch.countDown())) .get() .uri("/") .responseContent() @@ -1648,8 +1636,7 @@ public void httpClientResponseConfigInjectAttributes() { .initialBufferSize(10) .failOnMissingResponse(true) .parseHttpAfterConnectRequest(true)) - .tcpConfiguration(tcp -> - tcp.doOnConnected(c -> { + .doOnConnected(c -> { channelRef.set(c.channel()); HttpClientCodec codec = c.channel() .pipeline() @@ -1657,7 +1644,7 @@ public void httpClientResponseConfigInjectAttributes() { HttpObjectDecoder decoder = (HttpObjectDecoder) getValueReflection(codec, "inboundHandler", 1); chunkSize.set((Integer) getValueReflection(decoder, "maxChunkSize", 2)); validate.set((Boolean) getValueReflection(decoder, "validateHeaders", 2)); - })) + }) .post() .uri("/") .send(ByteBufFlux.fromString(Mono.just("bodysample"))) @@ -2093,11 +2080,11 @@ public void testIssue988() { ConnectionProvider provider = ConnectionProvider.create("testIssue988", 1); HttpClient client = createHttpClientForContextWithAddress(provider) - .tcpConfiguration(tcpClient -> tcpClient.wiretap("testIssue988", LogLevel.INFO) - .metrics(true)); + .wiretap("testIssue988", LogLevel.INFO) + .metrics(true); AtomicReference ch1 = new AtomicReference<>(); - StepVerifier.create(client.tcpConfiguration(tcpClient -> tcpClient.doOnConnected(c -> ch1.set(c.channel()))) + StepVerifier.create(client.doOnConnected(c -> ch1.set(c.channel())) .get() .uri("/1") .responseContent() @@ -2108,7 +2095,7 @@ public void testIssue988() { .verify(Duration.ofSeconds(30)); AtomicReference ch2 = new AtomicReference<>(); - StepVerifier.create(client.tcpConfiguration(tcpClient -> tcpClient.doOnConnected(c -> ch2.set(c.channel()))) + StepVerifier.create(client.doOnConnected(c -> ch2.set(c.channel())) .post() .uri("/2") .send(ByteBufFlux.fromString(Mono.just("test"))) @@ -2121,9 +2108,8 @@ public void testIssue988() { AtomicReference ch3 = new AtomicReference<>(); StepVerifier.create( - client.tcpConfiguration(tcpClient -> - tcpClient.doOnConnected(c -> ch3.set(c.channel())) - .wiretap("testIssue988", LogLevel.ERROR)) + client.doOnConnected(c -> ch3.set(c.channel())) + .wiretap("testIssue988", LogLevel.ERROR) .post() .uri("/3") .responseContent() diff --git a/src/test/java/reactor/netty/http/client/HttpClientWithTomcatTest.java b/src/test/java/reactor/netty/http/client/HttpClientWithTomcatTest.java index 4e82bfb0ca..f27b6a5536 100644 --- a/src/test/java/reactor/netty/http/client/HttpClientWithTomcatTest.java +++ b/src/test/java/reactor/netty/http/client/HttpClientWithTomcatTest.java @@ -91,7 +91,7 @@ public void nettyNetChannelAcceptsNettyChannelHandlers() throws Exception { public void postUpload() throws Exception { HttpClient client = HttpClient.create() - .tcpConfiguration(tcpClient -> tcpClient.host("localhost")) + .host("localhost") .port(getPort()) .wiretap(true); @@ -125,7 +125,7 @@ public void simpleTest404_1() { HttpClient client = HttpClient.create(pool) .port(getPort()) - .tcpConfiguration(tcpClient -> tcpClient.host("localhost")) + .host("localhost") .wiretap(true); doSimpleTest404(client); doSimpleTest404(client); @@ -151,7 +151,7 @@ public void disableChunkForced() { AtomicReference headers = new AtomicReference<>(); Tuple2 r = HttpClient.newConnection() - .tcpConfiguration(tcpClient -> tcpClient.host("localhost")) + .host("localhost") .port(getPort()) .headers(h -> h.set(HttpHeaderNames.TRANSFER_ENCODING, HttpHeaderValues.CHUNKED)) .wiretap(true) @@ -175,7 +175,7 @@ public void disableChunkForced2() { AtomicReference headers = new AtomicReference<>(); Tuple2 r = HttpClient.newConnection() - .tcpConfiguration(tcpClient -> tcpClient.host("localhost")) + .host("localhost") .port(getPort()) .wiretap(true) .doAfterRequest((req, connection) -> headers.set(req.requestHeaders())) @@ -231,7 +231,7 @@ public void disableChunkImplicitDefault() { ConnectionProvider p = ConnectionProvider.create("disableChunkImplicitDefault", 1); HttpClient client = HttpClient.create(p) - .tcpConfiguration(tcpClient -> tcpClient.host("localhost")) + .host("localhost") .port(getPort()) .wiretap(true); diff --git a/src/test/java/reactor/netty/http/client/WebsocketTest.java b/src/test/java/reactor/netty/http/client/WebsocketTest.java index 596fcda33a..bd07da3570 100644 --- a/src/test/java/reactor/netty/http/client/WebsocketTest.java +++ b/src/test/java/reactor/netty/http/client/WebsocketTest.java @@ -46,7 +46,6 @@ import reactor.netty.http.websocket.WebsocketInbound; import reactor.netty.http.websocket.WebsocketOutbound; import reactor.netty.resources.ConnectionProvider; -import reactor.netty.tcp.TcpClient; import reactor.test.StepVerifier; import reactor.util.Logger; import reactor.util.Loggers; @@ -941,10 +940,9 @@ public void testIssue460() { .handle((req, res) -> res.sendWebsocket((in, out) -> Mono.never())) .bindNow(); - TcpClient client = TcpClient.create() - .remoteAddress(() -> new InetSocketAddress("not a valid host name", 42)); - HttpClient httpClient = HttpClient.from(client); - StepVerifier.create(httpClient.websocket() + HttpClient client = HttpClient.create() + .remoteAddress(() -> new InetSocketAddress("not a valid host name", 42)); + StepVerifier.create(client.websocket() .connect()) .expectError(UnknownHostException.class) .verify(Duration.ofSeconds(5)); diff --git a/src/test/java/reactor/netty/http/server/ConnectionInfoTests.java b/src/test/java/reactor/netty/http/server/ConnectionInfoTests.java index 9b4c82c274..54e6034d91 100644 --- a/src/test/java/reactor/netty/http/server/ConnectionInfoTests.java +++ b/src/test/java/reactor/netty/http/server/ConnectionInfoTests.java @@ -40,7 +40,6 @@ import reactor.netty.Connection; import reactor.netty.DisposableServer; import reactor.netty.NettyPipeline; -import reactor.netty.channel.BootstrapHandlers; import reactor.netty.http.client.HttpClient; import reactor.netty.tcp.TcpClient; @@ -396,15 +395,10 @@ public void httpsUserAddedSslHandler() throws SSLException { clientRequestHeaders -> {}, serverRequest -> Assertions.assertThat(serverRequest.scheme()).isEqualTo("https"), httpClient -> httpClient.secure(ssl -> ssl.sslContext(clientSslContext)), - httpServer -> httpServer.tcpConfiguration(tcpServer -> { - tcpServer = tcpServer.bootstrap(serverBootstrap -> - BootstrapHandlers.updateConfiguration(serverBootstrap, NettyPipeline.SslHandler, (connectionObserver, channel) -> { + httpServer -> httpServer.doOnChannelInit((channel, observer, remoteAddress) -> { SslHandler sslHandler = serverSslContext.newHandler(channel.alloc()); channel.pipeline().addFirst(NettyPipeline.SslHandler, sslHandler); - })); - - return tcpServer; - }), + }), true); } diff --git a/src/test/java/reactor/netty/http/server/HttpServerTests.java b/src/test/java/reactor/netty/http/server/HttpServerTests.java index 670e169fd9..4975817448 100644 --- a/src/test/java/reactor/netty/http/server/HttpServerTests.java +++ b/src/test/java/reactor/netty/http/server/HttpServerTests.java @@ -147,7 +147,7 @@ public void releaseInboundChannelOnNonKeepAliveRequest() { Flux.range(0, 100) .concatMap(n -> HttpClient.create() .port(disposableServer.address().getPort()) - .tcpConfiguration(TcpClient::noSSL) + .noSSL() .wiretap(true) .keepAlive(false) .post() @@ -170,7 +170,7 @@ public void testRestart() { .bindAddress(() -> new InetSocketAddress(8080)); HttpClient client1 = HttpClient.create() .port(8080) - .tcpConfiguration(tcpClient -> tcpClient.host("localhost")); + .host("localhost"); HttpClient client2 = HttpClient.create() .baseUrl("http://localhost:8080"); doTestRestart(server1, client1); @@ -321,8 +321,8 @@ public void flushOnComplete() { Flux client = HttpClient.create() .port(disposableServer.address().getPort()) .wiretap(true) - .tcpConfiguration(tcp -> tcp.doOnConnected(res -> - res.addHandler(new LineBasedFrameDecoder(10)))) + .doOnConnected(res -> + res.addHandler(new LineBasedFrameDecoder(10))) .get() .uri("/") .responseContent() @@ -410,13 +410,13 @@ public void keepAlive() throws URISyntaxException { @Test public void gettingOptionsDuplicates() { - HttpServer server = HttpServer.create() - .port(123) - .host(("example.com")) - .compress(true); - assertThat(server.tcpConfiguration().configure()) - .isNotSameAs(HttpServer.DEFAULT_TCP_SERVER) - .isNotSameAs(server.tcpConfiguration().configure()); + HttpServer server1 = HttpServer.create(); + HttpServer server2 = server1.port(123) + .host(("example.com")) + .compress(true); + assertThat(server2) + .isNotSameAs(server1) + .isNotSameAs(((HttpServerBind) server2).duplicate()); } @Test @@ -795,8 +795,7 @@ public void httpServerRequestConfigInjectAttributes() { .validateHeaders(false) .initialBufferSize(10)) .handle((req, resp) -> req.receive().then(resp.sendNotFound())) - .tcpConfiguration(tcp -> - tcp.doOnConnection(c -> { + .doOnConnection(c -> { channelRef.set(c.channel()); HttpServerCodec codec = c.channel() .pipeline() @@ -804,7 +803,7 @@ public void httpServerRequestConfigInjectAttributes() { HttpObjectDecoder decoder = (HttpObjectDecoder) getValueReflection(codec, "inboundHandler", 1); chunkSize.set((Integer) getValueReflection(decoder, "maxChunkSize", 2)); validate.set((Boolean) getValueReflection(decoder, "validateHeaders", 2)); - })) + }) .wiretap(true); disposableServer = server.bindNow(); @@ -956,8 +955,7 @@ public void testIssue525() { disposableServer = HttpServer.create() .port(0) - .tcpConfiguration(tcpServer -> - tcpServer.doOnConnection(c -> c.addHandlerFirst("decompressor", new HttpContentDecompressor()))) + .doOnConnection(c -> c.addHandlerFirst("decompressor", new HttpContentDecompressor())) .handle((req, res) -> res.send(req.receive() .retain())) .wiretap(true) @@ -997,8 +995,7 @@ public void testCustomHandlerInvokedBeforeIOHandler() { disposableServer = HttpServer.create() .port(0) - .tcpConfiguration(tcpServer -> - tcpServer.doOnConnection(c -> c.addHandlerFirst("custom", new ChannelInboundHandlerAdapter() { + .doOnConnection(c -> c.addHandlerFirst("custom", new ChannelInboundHandlerAdapter() { @Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { if (msg instanceof HttpRequest) { @@ -1006,7 +1003,7 @@ public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception } super.channelRead(ctx, msg); } - }))) + })) .handle((req, res) -> res.sendString( Mono.just(req.requestHeaders().get("test", "not found")))) .wiretap(true) @@ -1596,12 +1593,11 @@ public void testGracefulShutdown() throws Exception { disposableServer = HttpServer.create() .port(0) - .tcpConfiguration(tcpServer -> - tcpServer.runOn(loop) + .runOn(loop) .doOnConnection(c -> { c.onDispose().subscribe(null, null, latch2::countDown); latch1.countDown(); - })) + }) // Register a channel group, when invoking disposeNow() // the implementation will wait for the active requests to finish .channelGroup(new DefaultChannelGroup(new DefaultEventExecutor())) diff --git a/src/test/java/reactor/netty/resources/PooledConnectionProviderTest.java b/src/test/java/reactor/netty/resources/PooledConnectionProviderTest.java index eb5e91e450..eed729fad0 100644 --- a/src/test/java/reactor/netty/resources/PooledConnectionProviderTest.java +++ b/src/test/java/reactor/netty/resources/PooledConnectionProviderTest.java @@ -17,8 +17,10 @@ import java.io.IOException; import java.net.InetSocketAddress; +import java.net.SocketAddress; import java.security.cert.CertificateException; import java.time.Duration; +import java.util.Collections; import java.util.List; import java.util.concurrent.ConcurrentMap; import java.util.concurrent.CountDownLatch; @@ -31,10 +33,14 @@ import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicReference; import java.util.concurrent.locks.LockSupport; +import java.util.function.Supplier; -import io.netty.bootstrap.Bootstrap; +import io.netty.channel.Channel; +import io.netty.channel.ChannelFactory; +import io.netty.channel.EventLoopGroup; import io.netty.channel.nio.NioEventLoopGroup; import io.netty.channel.socket.nio.NioSocketChannel; +import io.netty.handler.logging.LoggingHandler; import io.netty.handler.ssl.SslContextBuilder; import io.netty.handler.ssl.util.SelfSignedCertificate; import io.netty.handler.codec.http.HttpHeaderNames; @@ -49,12 +55,14 @@ import reactor.netty.ConnectionObserver; import reactor.netty.DisposableServer; import reactor.netty.SocketUtils; +import reactor.netty.channel.ChannelMetricsRecorder; import reactor.netty.http.client.HttpClient; import reactor.netty.http.server.HttpServer; import reactor.netty.resources.PooledConnectionProvider.PooledConnection; import reactor.netty.tcp.TcpClient; import reactor.netty.tcp.TcpClientTests; import reactor.netty.tcp.TcpServer; +import reactor.netty.transport.TransportClientConfig; import reactor.pool.InstrumentedPool; import reactor.pool.PoolAcquirePendingLimitException; import reactor.pool.PooledRef; @@ -134,14 +142,41 @@ public void fixedPoolTwoAcquire() final InetSocketAddress address = InetSocketAddress.createUnresolved("localhost", echoServerPort); ConnectionProvider pool = ConnectionProvider.create("fixedPoolTwoAcquire", 2); - Bootstrap bootstrap = new Bootstrap().remoteAddress(address) - .channelFactory(NioSocketChannel::new) - .group(new NioEventLoopGroup(2)); + Supplier remoteAddress = () -> address; + ConnectionObserver observer = ConnectionObserver.emptyListener(); + EventLoopGroup group = new NioEventLoopGroup(2); + TransportClientConfig config = new TransportClientConfig(pool, Collections.emptyMap(), remoteAddress) { + + @Override + protected ChannelFactory connectionFactory(EventLoopGroup elg) { + return NioSocketChannel::new; + } + + @Override + protected LoggingHandler defaultLoggingHandler() { + return null; + } + + @Override + protected LoopResources defaultLoopResources() { + return preferNative -> group; + } + + @Override + protected ChannelMetricsRecorder defaultMetricsRecorder() { + return null; + } + + @Override + protected EventLoopGroup eventLoopGroup() { + return group; + } + }; //fail a couple - StepVerifier.create(pool.acquire(bootstrap)) + StepVerifier.create(pool.acquire(config, observer, remoteAddress, config.resolver())) .verifyErrorMatches(msg -> msg.getMessage().contains("Connection refused")); - StepVerifier.create(pool.acquire(bootstrap)) + StepVerifier.create(pool.acquire(config, observer, remoteAddress, config.resolver())) .verifyErrorMatches(msg -> msg.getMessage().contains("Connection refused")); //start the echo server @@ -149,10 +184,10 @@ public void fixedPoolTwoAcquire() Thread.sleep(100); //acquire 2 - final PooledConnection c1 = (PooledConnection) pool.acquire(bootstrap) + final PooledConnection c1 = (PooledConnection) pool.acquire(config, observer, remoteAddress, config.resolver()) .block(Duration.ofSeconds(30)); assertThat(c1).isNotNull(); - final PooledConnection c2 = (PooledConnection) pool.acquire(bootstrap) + final PooledConnection c2 = (PooledConnection) pool.acquire(config, observer, remoteAddress, config.resolver()) .block(Duration.ofSeconds(30)); assertThat(c2).isNotNull(); @@ -160,7 +195,7 @@ public void fixedPoolTwoAcquire() c2.disposeNow(); - final PooledConnection c3 = (PooledConnection) pool.acquire(bootstrap) + final PooledConnection c3 = (PooledConnection) pool.acquire(config, observer, remoteAddress, config.resolver()) .block(Duration.ofSeconds(30)); assertThat(c3).isNotNull(); @@ -170,7 +205,7 @@ public void fixedPoolTwoAcquire() .MILLISECONDS); - final PooledConnection c4 = (PooledConnection) pool.acquire(bootstrap) + final PooledConnection c4 = (PooledConnection) pool.acquire(config, observer, remoteAddress, config.resolver()) .block(Duration.ofSeconds(30)); assertThat(c4).isNotNull(); @@ -407,7 +442,7 @@ public void testIssue973() { .build(); AtomicReference> pool1 = new AtomicReference<>(); HttpClient.create(provider) - .tcpConfiguration(tcpClient -> tcpClient.doOnConnected(conn -> { + .doOnConnected(conn -> { ConcurrentMap> pools = provider.channelPools; for (InstrumentedPool pool : pools.values()) { @@ -416,7 +451,7 @@ public void testIssue973() { return; } } - })) + }) .wiretap(true) .get() .uri("http://localhost:" + server.port() +"/") @@ -428,7 +463,7 @@ public void testIssue973() { AtomicReference> pool2 = new AtomicReference<>(); HttpClient.create(provider) - .tcpConfiguration(tcpClient -> tcpClient.doOnConnected(conn -> { + .doOnConnected(conn -> { ConcurrentMap> pools = provider.channelPools; for (InstrumentedPool pool : pools.values()) { @@ -437,7 +472,7 @@ public void testIssue973() { return; } } - })) + }) .wiretap(true) .get() .uri("https://example.com/") @@ -469,8 +504,7 @@ public void testIssue1012() throws Exception { HttpClient.create(provider) .port(server.port()) .wiretap(true) - .tcpConfiguration(tcpClient -> - tcpClient.doOnConnected(conn -> conn.channel().closeFuture().addListener(f -> latch.countDown()))); + .doOnConnected(conn -> conn.channel().closeFuture().addListener(f -> latch.countDown())); client.get() .uri("/1") diff --git a/src/test/java/reactor/netty/tcp/BlockingConnectionTest.java b/src/test/java/reactor/netty/tcp/BlockingConnectionTest.java index ef821d057d..6cad28cfe4 100644 --- a/src/test/java/reactor/netty/tcp/BlockingConnectionTest.java +++ b/src/test/java/reactor/netty/tcp/BlockingConnectionTest.java @@ -21,8 +21,6 @@ import java.util.List; import java.util.concurrent.atomic.AtomicReference; -import io.netty.bootstrap.Bootstrap; -import io.netty.bootstrap.ServerBootstrap; import io.netty.channel.Channel; import io.netty.channel.embedded.EmbeddedChannel; import org.junit.Test; @@ -153,23 +151,45 @@ public void simpleServerFromAsyncServer() throws InterruptedException { @Test public void testTimeoutOnStart() { - TcpClient neverStart = new TcpClient(){ + TcpClient neverStart = new TcpClient() { + + @Override + public TcpClientConfig configuration() { + return null; + } + + @Override + protected TcpClient duplicate() { + return null; + } + @Override - public Mono connect(Bootstrap b) { + public Mono connect() { return Mono.never(); } }; assertThatExceptionOfType(IllegalStateException.class) .isThrownBy(() -> neverStart.connectNow(Duration.ofMillis(100))) - .withMessage("TcpClient couldn't be started within 100ms"); + .withMessage(" couldn't be started within 100ms"); } @Test public void testTimeoutOnStop() { - Connection c = new TcpClient(){ + Connection c = new TcpClient() { + + @Override + public TcpClientConfig configuration() { + return null; + } + @Override - public Mono connect(Bootstrap b) { + protected TcpClient duplicate() { + return null; + } + + @Override + public Mono connect() { return Mono.just(NEVER_STOP_CONTEXT); } }.connectNow(); @@ -181,15 +201,21 @@ public Mono connect(Bootstrap b) { @Test public void getContextAddressAndHost() { - DisposableServer c = new TcpServer(){ + DisposableServer c = new TcpServer() { + @Override - public Mono bind(ServerBootstrap b) { - return Mono.just(NEVER_STOP_SERVER); + public TcpServerConfig configuration() { + return null; } @Override - public ServerBootstrap configure() { - return TcpServerBind.INSTANCE.createServerBootstrap(); + protected TcpServer duplicate() { + return null; + } + + @Override + public Mono bind() { + return Mono.just(NEVER_STOP_SERVER); } }.bindNow(); diff --git a/src/test/java/reactor/netty/tcp/SslProviderTests.java b/src/test/java/reactor/netty/tcp/SslProviderTests.java index f36abe60f8..2324ce7bf6 100644 --- a/src/test/java/reactor/netty/tcp/SslProviderTests.java +++ b/src/test/java/reactor/netty/tcp/SslProviderTests.java @@ -49,13 +49,13 @@ public void setUp() throws Exception { protocols = new ArrayList<>(); server = HttpServer.create() .port(0) - .tcpConfiguration(tcpServer -> tcpServer.doOnBind(b -> { - SslProvider ssl = reactor.netty.tcp.SslProvider.findSslSupport(b); + .doOnBind(conf -> { + SslProvider ssl = conf.sslProvider(); if (ssl != null) { protocols.addAll(ssl.sslContext.applicationProtocolNegotiator().protocols()); sslContext = ssl.sslContext; } - })); + }); } @Test diff --git a/src/test/java/reactor/netty/tcp/TcpClientTests.java b/src/test/java/reactor/netty/tcp/TcpClientTests.java index de48064d71..fedfebbf9e 100644 --- a/src/test/java/reactor/netty/tcp/TcpClientTests.java +++ b/src/test/java/reactor/netty/tcp/TcpClientTests.java @@ -141,8 +141,8 @@ public void disableSsl() { TcpClient secureClient = TcpClient.create() .secure(); - assertTrue(secureClient.isSecure()); - assertFalse(secureClient.noSSL().isSecure()); + assertTrue(secureClient.configuration().isSecure()); + assertFalse(secureClient.noSSL().configuration().isSecure()); } @Test @@ -563,10 +563,11 @@ public void writeIdleDoesNotFireWhileDataIsBeingSent() @Test public void gettingOptionsDuplicates() { - TcpClient client = TcpClient.create().host("example.com").port(123); - Assertions.assertThat(client.configure()) - .isNotSameAs(TcpClient.DEFAULT_BOOTSTRAP) - .isNotSameAs(client.configure()); + TcpClient client1 = TcpClient.create(); + TcpClient client2 = client1.host("example.com").port(123); + Assertions.assertThat(client2) + .isNotSameAs(client1) + .isNotSameAs(((TcpClientConnect) client2).duplicate()); } public static final class EchoServer diff --git a/src/test/java/reactor/netty/tcp/TcpMetricsTests.java b/src/test/java/reactor/netty/tcp/TcpMetricsTests.java index 9016a44a17..44b7af34fe 100644 --- a/src/test/java/reactor/netty/tcp/TcpMetricsTests.java +++ b/src/test/java/reactor/netty/tcp/TcpMetricsTests.java @@ -37,7 +37,6 @@ import reactor.netty.Connection; import reactor.netty.DisposableServer; import reactor.netty.SocketUtils; -import reactor.netty.channel.BootstrapHandlers; import reactor.netty.resources.ConnectionProvider; import java.net.InetSocketAddress; @@ -139,18 +138,16 @@ public void testFailedConnect() throws Exception { try { connection = tcpClient.host("127.0.0.1") .port(port) - .doOnConnect(b -> - BootstrapHandlers.updateConfiguration(b, "testFailedConnect", - (o, c) -> - c.pipeline() - .addLast(new ChannelInboundHandlerAdapter() { - - @Override - public void channelUnregistered(ChannelHandlerContext ctx) { - latch.countDown(); - ctx.fireChannelUnregistered(); - } - }))) + .doOnChannelInit((channel, observer, remoteAddress) -> + channel.pipeline() + .addLast(new ChannelInboundHandlerAdapter() { + + @Override + public void channelUnregistered(ChannelHandlerContext ctx) { + latch.countDown(); + ctx.fireChannelUnregistered(); + } + })) .connectNow(); fail("Connect should fail."); } diff --git a/src/test/java/reactor/netty/tcp/TcpResourcesTest.java b/src/test/java/reactor/netty/tcp/TcpResourcesTest.java index a6fb82f434..4fd1e0521c 100644 --- a/src/test/java/reactor/netty/tcp/TcpResourcesTest.java +++ b/src/test/java/reactor/netty/tcp/TcpResourcesTest.java @@ -15,22 +15,26 @@ */ package reactor.netty.tcp; +import java.net.SocketAddress; import java.time.Duration; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; +import java.util.function.Supplier; -import io.netty.bootstrap.Bootstrap; import io.netty.channel.EventLoopGroup; +import io.netty.resolver.AddressResolverGroup; import org.junit.Before; import org.junit.Test; import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; import reactor.netty.Connection; +import reactor.netty.ConnectionObserver; import reactor.netty.DisposableServer; import reactor.netty.SocketUtils; import reactor.netty.resources.ConnectionProvider; import reactor.netty.resources.LoopResources; +import reactor.netty.transport.TransportConfig; import static org.assertj.core.api.Assertions.assertThat; import static org.junit.Assert.assertTrue; @@ -66,7 +70,10 @@ public boolean isDisposed() { ConnectionProvider poolResources = new ConnectionProvider() { @Override - public Mono acquire(Bootstrap bootstrap) { + public Mono acquire(TransportConfig config, + ConnectionObserver observer, + Supplier remoteAddress, + AddressResolverGroup resolverGroup) { return Mono.never(); } diff --git a/src/test/java/reactor/netty/tcp/TcpServerTests.java b/src/test/java/reactor/netty/tcp/TcpServerTests.java index 309651f19e..245a0c87c2 100644 --- a/src/test/java/reactor/netty/tcp/TcpServerTests.java +++ b/src/test/java/reactor/netty/tcp/TcpServerTests.java @@ -91,7 +91,7 @@ public class TcpServerTests { final Logger log = Loggers.getLogger(TcpServerTests.class); @Test - public void tcpServerHandlesJsonPojosOverSsl() throws Exception { + public void tcpServerHandlesJsonPojosOverSsl() throws Exception { final CountDownLatch latch = new CountDownLatch(2); SelfSignedCertificate cert = new SelfSignedCertificate(); @@ -309,10 +309,11 @@ public void testIssue462() throws InterruptedException { @Test public void gettingOptionsDuplicates() { - TcpServer server = TcpServer.create().host("example.com").port(123); - Assertions.assertThat(server.configure()) - .isNotSameAs(TcpServerBind.INSTANCE.serverBootstrap) - .isNotSameAs(server.configure()); + TcpServer server1 = TcpServer.create(); + TcpServer server2 = server1.host("example.com").port(123); + Assertions.assertThat(server2) + .isNotSameAs(server1) + .isNotSameAs(((TcpServerBind) server2).duplicate()); } @Test @@ -802,7 +803,7 @@ public void testIssue688() throws Exception { DisposableServer server = TcpServer.create() .port(0) - .observe((connection, newState) -> { + .childObserve((connection, newState) -> { if (newState == ConnectionObserver.State.CONNECTED) { group.add(connection.channel()); connected.countDown(); @@ -841,7 +842,7 @@ public void testHalfClosedConnection() throws Exception { DisposableServer server = TcpServer.create() .port(0) - .option(ChannelOption.ALLOW_HALF_CLOSURE, true) + .childOption(ChannelOption.ALLOW_HALF_CLOSURE, true) .wiretap(true) .handle((in, out) -> in.receive() .asString() diff --git a/src/test/java/reactor/netty/transport/ByteBufAllocatorMetricsTest.java b/src/test/java/reactor/netty/transport/ByteBufAllocatorMetricsTest.java index 543eedbd2a..21ae71487d 100644 --- a/src/test/java/reactor/netty/transport/ByteBufAllocatorMetricsTest.java +++ b/src/test/java/reactor/netty/transport/ByteBufAllocatorMetricsTest.java @@ -80,7 +80,7 @@ public void test() throws Exception { PooledByteBufAllocator alloc = new PooledByteBufAllocator(true); HttpClient.create() .port(server.port()) - .tcpConfiguration(tcpClient -> tcpClient.option(ChannelOption.ALLOCATOR, alloc)) + .option(ChannelOption.ALLOCATOR, alloc) .doOnResponse((res, conn) -> conn.channel() .closeFuture() .addListener(f -> latch.countDown()))