Skip to content

Commit

Permalink
Adapt tests
Browse files Browse the repository at this point in the history
  • Loading branch information
violetagg committed Apr 11, 2020
1 parent 0ed8701 commit 6e05202
Show file tree
Hide file tree
Showing 19 changed files with 208 additions and 160 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@ private void doTestPublisherSenderOnCompleteFlushInProgress(boolean useScheduler
DisposableServer server =
HttpServer.create()
.port(0)
.tcpConfiguration(tcpServer -> tcpServer.doOnConnection(conn -> { conn.addHandler(new LineBasedFrameDecoder(10)); }))
.doOnConnection(conn -> conn.addHandler(new LineBasedFrameDecoder(10)))
.handle((req, res) ->
req.receive()
.asString()
Expand All @@ -92,11 +92,11 @@ private void doTestPublisherSenderOnCompleteFlushInProgress(boolean useScheduler
}
Mono<Integer> code =
HttpClient.create()
.tcpConfiguration(tcpClient -> tcpClient.doOnConnected(conn -> {
.doOnConnected(conn -> {
if (handler != null) {
conn.addHandlerLast(handler);
}
}))
})
.port(server.address().getPort())
.wiretap(true)
.post()
Expand Down
5 changes: 2 additions & 3 deletions src/test/java/reactor/netty/channel/FluxReceiveTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -99,9 +99,8 @@ public void testByteBufsReleasedWhenTimeoutUsingHandlers() {
routes.get("/forward", (req, res) ->
HttpClient.create()
.port(server1.address().getPort())
.tcpConfiguration(tcpClient ->
tcpClient.doOnConnected(c ->
c.addHandlerFirst(new ReadTimeoutHandler(50, TimeUnit.MILLISECONDS))))
.doOnConnected(c ->
c.addHandlerFirst(new ReadTimeoutHandler(50, TimeUnit.MILLISECONDS)))
.get()
.uri("/target")
.responseContent()
Expand Down
12 changes: 10 additions & 2 deletions src/test/java/reactor/netty/http/HttpResourcesTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -15,17 +15,21 @@
*/
package reactor.netty.http;

import java.net.SocketAddress;
import java.time.Duration;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Supplier;

import io.netty.bootstrap.Bootstrap;
import io.netty.channel.EventLoopGroup;
import io.netty.resolver.AddressResolverGroup;
import org.junit.Before;
import org.junit.Test;
import reactor.core.publisher.Mono;
import reactor.netty.Connection;
import reactor.netty.ConnectionObserver;
import reactor.netty.resources.ConnectionProvider;
import reactor.netty.resources.LoopResources;
import reactor.netty.transport.TransportConfig;

import static org.assertj.core.api.Assertions.assertThat;

Expand Down Expand Up @@ -58,8 +62,12 @@ public boolean isDisposed() {
};

ConnectionProvider poolResources = new ConnectionProvider() {

@Override
public Mono<? extends Connection> acquire(Bootstrap bootstrap) {
public Mono<? extends Connection> acquire(TransportConfig config,
ConnectionObserver observer,
Supplier<? extends SocketAddress> remoteAddress,
AddressResolverGroup<?> resolverGroup) {
return Mono.never();
}

Expand Down
6 changes: 4 additions & 2 deletions src/test/java/reactor/netty/http/HttpTests.java
Original file line number Diff line number Diff line change
Expand Up @@ -707,7 +707,7 @@ public void testHttpNoSslH2Fails() {
.wiretap(true)
.bind()
).verifyErrorMessage("Configured H2 protocol without TLS. Use" +
" a clear-text h2 protocol via HttpServer#protocol or configure TLS" +
" a Clear-Text H2 protocol via HttpServer#protocol or configure TLS" +
" via HttpServer#secure");
}

Expand All @@ -723,7 +723,9 @@ public void testHttpSslH2CFails() throws Exception {
.handle((req, res) -> res.sendString(Mono.just("Hello")))
.wiretap(true)
.bind()
).verifyErrorMessage("Configured H2 Clear-Text protocol with TLS. Use the non clear-text h2 protocol via HttpServer#protocol or disable TLS via HttpServer#tcpConfiguration(tcp -> tcp.noSSL())");
).verifyErrorMessage("Configured H2 Clear-Text protocol with TLS. Use" +
" the non Clear-Text H2 protocol via HttpServer#protocol or disable TLS" +
" via HttpServer#noSSL())");
}

@Test(expected = IllegalArgumentException.class)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@
import reactor.netty.ByteBufFlux;
import reactor.netty.http.client.HttpClient;
import reactor.netty.http.server.HttpServer;
import reactor.netty.tcp.TcpServer;

import javax.net.ssl.SSLException;
import java.net.InetSocketAddress;
Expand Down Expand Up @@ -84,7 +83,7 @@ protected void checkTlsTimer(String name, String[] tags, long expectedCount) {

@Test
public void testIssue896() throws Exception {
disposableServer = httpServer.tcpConfiguration(TcpServer::noSSL)
disposableServer = httpServer.noSSL()
.bindNow();

CountDownLatch latch = new CountDownLatch(1);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -172,7 +172,7 @@ private Mono<Tuple2<String, HttpHeaders>> sendRequest(
boolean wiretap) {
HttpClient client =
HttpClient.create()
.tcpConfiguration(tcpClient -> tcpClient.proxy(proxyOptions))
.proxy(proxyOptions)
.doOnResponse((res, conn) -> {
ChannelHandler handler = conn.channel().pipeline().get(NettyPipeline.ProxyLoggingHandler);
res.responseHeaders()
Expand Down
80 changes: 33 additions & 47 deletions src/test/java/reactor/netty/http/client/HttpClientTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -161,15 +161,6 @@ public void abort() {
pool.dispose();
}

private DefaultFullHttpResponse response() {
DefaultFullHttpResponse r =
new DefaultFullHttpResponse(HttpVersion.HTTP_1_1,
HttpResponseStatus.ACCEPTED);
r.headers()
.set(HttpHeaderNames.CONTENT_LENGTH, 0);
return r;
}

/** This ensures that non-default values for the HTTP request line are visible for parsing. */
@Test
public void postVisibleToOnRequest() {
Expand Down Expand Up @@ -489,14 +480,14 @@ public void testUserAgent() {

@Test
public void gettingOptionsDuplicates() {
HttpClient client = HttpClient.create()
.tcpConfiguration(tcpClient -> tcpClient.host("example.com"))
.wiretap(true)
.port(123)
.compress(true);
assertThat(client.tcpConfiguration())
.isNotSameAs(HttpClient.DEFAULT_TCP_CLIENT)
.isNotSameAs(client.tcpConfiguration());
HttpClient client1 = HttpClient.create();
HttpClient client2 = client1.host("example.com")
.wiretap(true)
.port(123)
.compress(true);
assertThat(client2)
.isNotSameAs(client1)
.isNotSameAs(((HttpClientConnect) client2).duplicate());
}

@Test
Expand Down Expand Up @@ -944,7 +935,7 @@ public void testIssue407_1() throws Exception {
.trustManager(InsecureTrustManagerFactory.INSTANCE)));

AtomicReference<Channel> ch1 = new AtomicReference<>();
StepVerifier.create(client.tcpConfiguration(tcpClient -> tcpClient.doOnConnected(c -> ch1.set(c.channel())))
StepVerifier.create(client.doOnConnected(c -> ch1.set(c.channel()))
.get()
.uri("/1")
.responseContent()
Expand All @@ -955,7 +946,7 @@ public void testIssue407_1() throws Exception {
.verify(Duration.ofSeconds(30));

AtomicReference<Channel> ch2 = new AtomicReference<>();
StepVerifier.create(client.tcpConfiguration(tcpClient -> tcpClient.doOnConnected(c -> ch2.set(c.channel())))
StepVerifier.create(client.doOnConnected(c -> ch2.set(c.channel()))
.post()
.uri("/2")
.send(ByteBufFlux.fromString(Mono.just("test")))
Expand All @@ -968,7 +959,7 @@ public void testIssue407_1() throws Exception {

AtomicReference<Channel> ch3 = new AtomicReference<>();
StepVerifier.create(
client.tcpConfiguration(tcpClient -> tcpClient.doOnConnected(c -> ch3.set(c.channel())))
client.doOnConnected(c -> ch3.set(c.channel()))
.secure(spec -> spec.sslContext(
SslContextBuilder.forClient()
.trustManager(InsecureTrustManagerFactory.INSTANCE))
Expand Down Expand Up @@ -1007,10 +998,10 @@ public void testIssue407_2() throws Exception {
ConnectionProvider provider = ConnectionProvider.create("testIssue407_2", 1);
HttpClient client =
createHttpClientForContextWithAddress(provider)
.tcpConfiguration(tcpClient -> tcpClient.secure(spec -> spec.sslContext(clientSslContextBuilder)));
.secure(spec -> spec.sslContext(clientSslContextBuilder));

AtomicReference<Channel> ch1 = new AtomicReference<>();
StepVerifier.create(client.tcpConfiguration(tcpClient -> tcpClient.doOnConnected(c -> ch1.set(c.channel())))
StepVerifier.create(client.doOnConnected(c -> ch1.set(c.channel()))
.get()
.uri("/1")
.responseContent()
Expand All @@ -1021,7 +1012,7 @@ public void testIssue407_2() throws Exception {
.verify(Duration.ofSeconds(30));

AtomicReference<Channel> ch2 = new AtomicReference<>();
StepVerifier.create(client.tcpConfiguration(tcpClient -> tcpClient.doOnConnected(c -> ch2.set(c.channel())))
StepVerifier.create(client.doOnConnected(c -> ch2.set(c.channel()))
.post()
.uri("/2")
.send(ByteBufFlux.fromString(Mono.just("test")))
Expand All @@ -1034,10 +1025,9 @@ public void testIssue407_2() throws Exception {

AtomicReference<Channel> ch3 = new AtomicReference<>();
StepVerifier.create(
client.tcpConfiguration(tcpClient ->
tcpClient.doOnConnected(c -> ch3.set(c.channel()))
.secure(spec -> spec.sslContext(clientSslContextBuilder)
.defaultConfiguration(SslProvider.DefaultConfigurationType.TCP)))
client.doOnConnected(c -> ch3.set(c.channel()))
.secure(spec -> spec.sslContext(clientSslContextBuilder)
.defaultConfiguration(SslProvider.DefaultConfigurationType.TCP))
.post()
.uri("/3")
.responseContent()
Expand Down Expand Up @@ -1131,7 +1121,7 @@ public void doOnError() {
requestError1.set(req.currentContext().getOrDefault("test", "empty")))
.doOnResponseError((res, err) ->
responseError1.set(res.currentContext().getOrDefault("test", "empty")))
.mapConnect((c, b) -> c.subscriberContext(Context.of("test", "success")))
.mapConnect((c) -> c.subscriberContext(Context.of("test", "success")))
.get()
.uri("/")
.responseContent()
Expand All @@ -1154,7 +1144,7 @@ public void doOnError() {
requestError2.set(req.currentContext().getOrDefault("test", "empty"))
,(res, err) ->
responseError2.set(res.currentContext().getOrDefault("test", "empty")))
.mapConnect((c, b) -> c.subscriberContext(Context.of("test", "success")))
.mapConnect((c) -> c.subscriberContext(Context.of("test", "success")))
.get()
.uri("/")
.responseContent()
Expand All @@ -1178,7 +1168,7 @@ public void withConnector() {
.bindNow();

Mono<String> content = createHttpClientForContextWithPort()
.mapConnect((c, b) -> c.subscriberContext(Context.of("test", "success")))
.mapConnect((c) -> c.subscriberContext(Context.of("test", "success")))
.post()
.uri("/")
.send((req, out) -> {
Expand Down Expand Up @@ -1425,7 +1415,7 @@ private void doTestIssue600(boolean withLoop) {
HttpClient client;
if (withLoop) {
client = createHttpClientForContextWithAddress(pool)
.tcpConfiguration(tcpClient -> tcpClient.runOn(loop));
.runOn(loop);
}
else {
client = createHttpClientForContextWithAddress(pool);
Expand Down Expand Up @@ -1476,13 +1466,12 @@ public void testChannelGroupClosesAllConnections() throws Exception {

Flux.just("/never", "/delay10", "/delay1")
.flatMap(s ->
client.tcpConfiguration(
tcpClient -> tcpClient.doOnConnected(c -> {
client.doOnConnected(c -> {
c.onDispose()
.subscribe(null, null, latch2::countDown);
group.add(c.channel());
latch1.countDown();
}))
})
.get()
.uri(s)
.responseContent()
Expand Down Expand Up @@ -1540,11 +1529,10 @@ public void testIssue632() throws Exception {

CountDownLatch latch = new CountDownLatch(1);
createHttpClientForContextWithPort()
.tcpConfiguration(tcpClient ->
tcpClient.doOnConnected(conn ->
.doOnConnected(conn ->
conn.channel()
.closeFuture()
.addListener(future -> latch.countDown())))
.addListener(future -> latch.countDown()))
.get()
.uri("/")
.responseContent()
Expand Down Expand Up @@ -1648,16 +1636,15 @@ public void httpClientResponseConfigInjectAttributes() {
.initialBufferSize(10)
.failOnMissingResponse(true)
.parseHttpAfterConnectRequest(true))
.tcpConfiguration(tcp ->
tcp.doOnConnected(c -> {
.doOnConnected(c -> {
channelRef.set(c.channel());
HttpClientCodec codec = c.channel()
.pipeline()
.get(HttpClientCodec.class);
HttpObjectDecoder decoder = (HttpObjectDecoder) getValueReflection(codec, "inboundHandler", 1);
chunkSize.set((Integer) getValueReflection(decoder, "maxChunkSize", 2));
validate.set((Boolean) getValueReflection(decoder, "validateHeaders", 2));
}))
})
.post()
.uri("/")
.send(ByteBufFlux.fromString(Mono.just("bodysample")))
Expand Down Expand Up @@ -2093,11 +2080,11 @@ public void testIssue988() {
ConnectionProvider provider = ConnectionProvider.create("testIssue988", 1);
HttpClient client =
createHttpClientForContextWithAddress(provider)
.tcpConfiguration(tcpClient -> tcpClient.wiretap("testIssue988", LogLevel.INFO)
.metrics(true));
.wiretap("testIssue988", LogLevel.INFO)
.metrics(true);

AtomicReference<Channel> ch1 = new AtomicReference<>();
StepVerifier.create(client.tcpConfiguration(tcpClient -> tcpClient.doOnConnected(c -> ch1.set(c.channel())))
StepVerifier.create(client.doOnConnected(c -> ch1.set(c.channel()))
.get()
.uri("/1")
.responseContent()
Expand All @@ -2108,7 +2095,7 @@ public void testIssue988() {
.verify(Duration.ofSeconds(30));

AtomicReference<Channel> ch2 = new AtomicReference<>();
StepVerifier.create(client.tcpConfiguration(tcpClient -> tcpClient.doOnConnected(c -> ch2.set(c.channel())))
StepVerifier.create(client.doOnConnected(c -> ch2.set(c.channel()))
.post()
.uri("/2")
.send(ByteBufFlux.fromString(Mono.just("test")))
Expand All @@ -2121,9 +2108,8 @@ public void testIssue988() {

AtomicReference<Channel> ch3 = new AtomicReference<>();
StepVerifier.create(
client.tcpConfiguration(tcpClient ->
tcpClient.doOnConnected(c -> ch3.set(c.channel()))
.wiretap("testIssue988", LogLevel.ERROR))
client.doOnConnected(c -> ch3.set(c.channel()))
.wiretap("testIssue988", LogLevel.ERROR)
.post()
.uri("/3")
.responseContent()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,7 @@ public void nettyNetChannelAcceptsNettyChannelHandlers() throws Exception {
public void postUpload() throws Exception {
HttpClient client =
HttpClient.create()
.tcpConfiguration(tcpClient -> tcpClient.host("localhost"))
.host("localhost")
.port(getPort())
.wiretap(true);

Expand Down Expand Up @@ -125,7 +125,7 @@ public void simpleTest404_1() {
HttpClient client =
HttpClient.create(pool)
.port(getPort())
.tcpConfiguration(tcpClient -> tcpClient.host("localhost"))
.host("localhost")
.wiretap(true);
doSimpleTest404(client);
doSimpleTest404(client);
Expand All @@ -151,7 +151,7 @@ public void disableChunkForced() {
AtomicReference<HttpHeaders> headers = new AtomicReference<>();
Tuple2<HttpResponseStatus, String> r =
HttpClient.newConnection()
.tcpConfiguration(tcpClient -> tcpClient.host("localhost"))
.host("localhost")
.port(getPort())
.headers(h -> h.set(HttpHeaderNames.TRANSFER_ENCODING, HttpHeaderValues.CHUNKED))
.wiretap(true)
Expand All @@ -175,7 +175,7 @@ public void disableChunkForced2() {
AtomicReference<HttpHeaders> headers = new AtomicReference<>();
Tuple2<HttpResponseStatus, String> r =
HttpClient.newConnection()
.tcpConfiguration(tcpClient -> tcpClient.host("localhost"))
.host("localhost")
.port(getPort())
.wiretap(true)
.doAfterRequest((req, connection) -> headers.set(req.requestHeaders()))
Expand Down Expand Up @@ -231,7 +231,7 @@ public void disableChunkImplicitDefault() {
ConnectionProvider p = ConnectionProvider.create("disableChunkImplicitDefault", 1);
HttpClient client =
HttpClient.create(p)
.tcpConfiguration(tcpClient -> tcpClient.host("localhost"))
.host("localhost")
.port(getPort())
.wiretap(true);

Expand Down
Loading

0 comments on commit 6e05202

Please sign in to comment.