Skip to content

Commit

Permalink
Hardened HttpClient Tests Further (#29648)
Browse files Browse the repository at this point in the history
Hardened HttpClient Tests Further
  • Loading branch information
alzimmermsft authored Jun 27, 2022
1 parent 110f930 commit 4a59fa7
Show file tree
Hide file tree
Showing 8 changed files with 153 additions and 229 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,8 @@
import com.azure.core.http.HttpRequest;
import com.azure.core.http.HttpResponse;
import com.github.tomakehurst.wiremock.WireMockServer;
import com.github.tomakehurst.wiremock.client.WireMock;
import com.github.tomakehurst.wiremock.core.WireMockConfiguration;
import com.github.tomakehurst.wiremock.http.Fault;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeAll;
Expand All @@ -23,26 +23,27 @@
import reactor.test.StepVerifierOptions;

import java.io.IOException;
import java.io.OutputStream;
import java.net.MalformedURLException;
import java.net.ServerSocket;
import java.net.Socket;
import java.net.URL;
import java.nio.ByteBuffer;
import java.nio.charset.StandardCharsets;
import java.time.Duration;
import java.util.concurrent.CountDownLatch;

import static com.github.tomakehurst.wiremock.client.WireMock.aResponse;
import static com.github.tomakehurst.wiremock.client.WireMock.get;
import static com.github.tomakehurst.wiremock.client.WireMock.post;
import static org.junit.jupiter.api.Assertions.assertArrayEquals;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertNotNull;

@DisabledForJreRange(max = JRE.JAVA_11)
public class JdkAsyncHttpClientTests {

private static final byte[] SHORT_BODY = "hi there".getBytes(StandardCharsets.UTF_8);
private static final byte[] LONG_BODY = createLongBody();

private static final StepVerifierOptions EMPTY_INITIAL_REQUEST_OPTIONS = StepVerifierOptions.create()
.initialRequest(0);

private static WireMockServer server;

@BeforeAll
Expand All @@ -52,13 +53,11 @@ public static void beforeClass() {
.disableRequestJournal()
.gzipDisabled(true));

server.stubFor(
WireMock.get("/short").willReturn(WireMock.aResponse().withBody(SHORT_BODY)));
server.stubFor(WireMock.get("/long").willReturn(WireMock.aResponse().withBody(LONG_BODY)));
server.stubFor(WireMock.get("/error")
.willReturn(WireMock.aResponse().withBody("error").withStatus(500)));
server.stubFor(
WireMock.post("/shortPost").willReturn(WireMock.aResponse().withBody(SHORT_BODY)));
server.stubFor(get("/short").willReturn(aResponse().withBody(SHORT_BODY)));
server.stubFor(get("/long").willReturn(aResponse().withBody(LONG_BODY)));
server.stubFor(get("/error").willReturn(aResponse().withBody("error").withStatus(500)));
server.stubFor(post("/shortPost").willReturn(aResponse().withBody(SHORT_BODY)));
server.stubFor(get("/connectionClose").willReturn(aResponse().withFault(Fault.RANDOM_DATA_THEN_CLOSE)));
server.start();
}

Expand All @@ -81,18 +80,18 @@ public void testFlowableResponseLongBodyAsByteArrayAsync() {

@Test
public void testMultipleSubscriptionsEmitsError() {
HttpResponse response = getResponse("/short").block();
Mono<byte[]> response = getResponse("/short").cache().flatMap(HttpResponse::getBodyAsByteArray);

// Subscription:1
StepVerifier.create(response.getBodyAsByteArray())
StepVerifier.create(response)
.assertNext(Assertions::assertNotNull)
.expectComplete()
.verify(Duration.ofSeconds(20));

// Subscription:2
// Getting the bytes of an JDK HttpClient response closes the stream on first read.
// Subsequent reads will return an IllegalStateException due to the stream being closed.
StepVerifier.create(response.getBodyAsByteArray())
StepVerifier.create(response)
.expectNextCount(0)
.expectError(IllegalStateException.class)
.verify(Duration.ofSeconds(20));
Expand All @@ -113,10 +112,7 @@ public void testFlowableWhenServerReturnsBodyAndNoErrorsWhenHttp500Returned() {

@Test
public void testFlowableBackpressure() {
StepVerifierOptions stepVerifierOptions = StepVerifierOptions.create();
stepVerifierOptions.initialRequest(0);

StepVerifier.create(getResponse("/long").flatMapMany(HttpResponse::getBody), stepVerifierOptions)
StepVerifier.create(getResponse("/long").flatMapMany(HttpResponse::getBody), EMPTY_INITIAL_REQUEST_OPTIONS)
.expectNextCount(0)
.thenRequest(1)
.expectNextCount(1)
Expand Down Expand Up @@ -162,44 +158,12 @@ public void testRequestBodyEndsInErrorShouldPropagateToResponse() {

@Test
public void testServerShutsDownSocketShouldPushErrorToContentFlowable() {
Assertions.assertTimeout(Duration.ofMillis(5000), () -> {
CountDownLatch latch = new CountDownLatch(1);
try (ServerSocket ss = new ServerSocket(0)) {
Mono.fromCallable(() -> {
latch.countDown();
Socket socket = ss.accept();
// give the client time to get request across
Thread.sleep(500);
// respond but don't send the complete response
byte[] bytes = new byte[1024];
int n = socket.getInputStream().read(bytes);
String response = "HTTP/1.1 200 OK\r\n" //
+ "Content-Type: text/plain\r\n" //
+ "Content-Length: 10\r\n" //
+ "\r\n" //
+ "zi";
OutputStream out = socket.getOutputStream();
out.write(response.getBytes());
out.flush();
// kill the socket with HTTP response body incomplete
socket.close();
return 1;
}).subscribeOn(Schedulers.boundedElastic()).subscribe();
//
latch.await();
HttpClient client = new JdkAsyncHttpClientBuilder().build();
HttpRequest request = new HttpRequest(HttpMethod.GET,
new URL("http://localhost:" + ss.getLocalPort() + "/ioException"));

HttpResponse response = client.send(request).block();

assertNotNull(response);
assertEquals(200, response.getStatusCode());

StepVerifier.create(response.getBodyAsByteArray())
.verifyError(IOException.class);
}
});
HttpClient client = new JdkAsyncHttpClientBuilder().build();

HttpRequest request = new HttpRequest(HttpMethod.GET, url(server, "/connectionClose"));

StepVerifier.create(client.send(request).flatMap(HttpResponse::getBodyAsByteArray))
.verifyError(IOException.class);
}

@Test
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,9 @@ public class NettyAsyncHttpClientTests {

static final String TEST_HEADER = "testHeader";

private static final StepVerifierOptions EMPTY_INITIAL_REQUEST_OPTIONS = StepVerifierOptions.create()
.initialRequest(0);

private static WireMockServer server;

@BeforeAll
Expand All @@ -100,7 +103,7 @@ public static void beforeClass() {
server.stubFor(get(NO_DOUBLE_UA_PATH).willReturn(aResponse()
.withTransformers(NettyAsyncHttpClientResponseTransformer.NAME)));
server.stubFor(get(IO_EXCEPTION_PATH).willReturn(aResponse().withStatus(200).but()
.withFault(Fault.MALFORMED_RESPONSE_CHUNK)));
.withFault(Fault.RANDOM_DATA_THEN_CLOSE)));
server.stubFor(get(RETURN_HEADERS_AS_IS_PATH).willReturn(aResponse()
.withTransformers(NettyAsyncHttpClientResponseTransformer.NAME)));

Expand Down Expand Up @@ -168,10 +171,7 @@ public void testDispose() {
public void testCancel() {
NettyAsyncHttpResponse response = getResponse(LONG_BODY_PATH);
//
StepVerifierOptions stepVerifierOptions = StepVerifierOptions.create();
stepVerifierOptions.initialRequest(0);
//
StepVerifier.create(response.getBody(), stepVerifierOptions)
StepVerifier.create(response.getBody(), EMPTY_INITIAL_REQUEST_OPTIONS)
.expectNextCount(0)
.thenRequest(1)
.expectNextCount(1)
Expand All @@ -194,10 +194,7 @@ public void testFlowableWhenServerReturnsBodyAndNoErrorsWhenHttp500Returned() {
public void testFlowableBackpressure() {
HttpResponse response = getResponse(LONG_BODY_PATH);
//
StepVerifierOptions stepVerifierOptions = StepVerifierOptions.create();
stepVerifierOptions.initialRequest(0);
//
StepVerifier.create(response.getBody(), stepVerifierOptions)
StepVerifier.create(response.getBody(), EMPTY_INITIAL_REQUEST_OPTIONS)
.expectNextCount(0)
.thenRequest(1)
.expectNextCount(1)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@
import java.nio.ByteBuffer;
import java.time.Duration;

import static com.azure.core.http.okhttp.TestUtils.createQuietDispatcher;

public class OkHttpAsyncHttpClientHttpClientTests extends HttpClientTests {
private static WireMockServer server;

Expand Down Expand Up @@ -50,6 +52,7 @@ protected HttpClient createHttpClient() {
@Test
public void testVerySlowFluxGetsInterruptedByOkHttpInternals() {
HttpClient httpClient = new OkHttpAsyncHttpClientBuilder()
.dispatcher(createQuietDispatcher(IllegalStateException.class, "blocking read"))
.callTimeout(Duration.ofMillis(1000)) // this caps full req-res round trip.
.build();

Expand All @@ -72,6 +75,7 @@ public void testVerySlowFluxGetsInterruptedByOkHttpInternals() {
@Test
public void testUnresponsiveFluxGetsInterruptedInFluxRequestBody() {
HttpClient httpClient = new OkHttpAsyncHttpClientBuilder()
.dispatcher(createQuietDispatcher(IllegalStateException.class, "blocking read"))
.callTimeout(Duration.ofMillis(1000)) // this caps full req-res round trip.
.build();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,13 +3,16 @@

package com.azure.core.http.okhttp;

import com.azure.core.exception.UnexpectedLengthException;
import com.azure.core.http.HttpClient;
import com.azure.core.test.RestProxyTestsWireMockServer;
import com.azure.core.test.implementation.RestProxyTests;
import com.github.tomakehurst.wiremock.WireMockServer;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.BeforeAll;

import static com.azure.core.http.okhttp.TestUtils.createQuietDispatcher;

public class OkHttpAsyncHttpClientRestProxyTests extends RestProxyTests {
private static WireMockServer server;

Expand All @@ -33,6 +36,8 @@ protected int getWireMockPort() {

@Override
protected HttpClient createHttpClient() {
return new OkHttpAsyncHttpClientBuilder().build();
return new OkHttpAsyncHttpClientBuilder()
.dispatcher(createQuietDispatcher(UnexpectedLengthException.class, "request body emitted"))
.build();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
import com.azure.core.http.HttpResponse;
import com.github.tomakehurst.wiremock.WireMockServer;
import com.github.tomakehurst.wiremock.core.WireMockConfiguration;
import com.github.tomakehurst.wiremock.http.Fault;
import okhttp3.Dispatcher;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.Assertions;
Expand All @@ -23,32 +24,31 @@
import reactor.test.StepVerifierOptions;

import java.io.IOException;
import java.io.OutputStream;
import java.net.MalformedURLException;
import java.net.ServerSocket;
import java.net.Socket;
import java.net.URL;
import java.nio.ByteBuffer;
import java.nio.charset.StandardCharsets;
import java.time.Duration;
import java.util.Arrays;
import java.util.List;
import java.util.concurrent.CountDownLatch;

import static com.azure.core.http.okhttp.TestUtils.createQuietDispatcher;
import static com.github.tomakehurst.wiremock.client.WireMock.aResponse;
import static com.github.tomakehurst.wiremock.client.WireMock.get;
import static com.github.tomakehurst.wiremock.client.WireMock.post;
import static org.junit.jupiter.api.Assertions.assertArrayEquals;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertLinesMatch;
import static org.junit.jupiter.api.Assertions.assertNotNull;

public class OkHttpAsyncHttpClientTests {
static final String RETURN_HEADERS_AS_IS_PATH = "/returnHeadersAsIs";

private static final byte[] SHORT_BODY = "hi there".getBytes(StandardCharsets.UTF_8);
private static final byte[] LONG_BODY = createLongBody();

private static final StepVerifierOptions EMPTY_INITIAL_REQUEST_OPTIONS = StepVerifierOptions.create()
.initialRequest(0);

private static WireMockServer server;

@BeforeAll
Expand All @@ -65,6 +65,7 @@ public static void beforeClass() {
server.stubFor(post("/shortPost").willReturn(aResponse().withBody(SHORT_BODY)));
server.stubFor(get(RETURN_HEADERS_AS_IS_PATH).willReturn(aResponse()
.withTransformers(OkHttpAsyncHttpClientResponseTransformer.NAME)));
server.stubFor(get("/connectionClose").willReturn(aResponse().withFault(Fault.RANDOM_DATA_THEN_CLOSE)));

server.start();
}
Expand Down Expand Up @@ -120,10 +121,7 @@ public void testFlowableWhenServerReturnsBodyAndNoErrorsWhenHttp500Returned() {

@Test
public void testFlowableBackpressure() {
StepVerifierOptions stepVerifierOptions = StepVerifierOptions.create();
stepVerifierOptions.initialRequest(0);

StepVerifier.create(getResponse("/long").flatMapMany(HttpResponse::getBody), stepVerifierOptions)
StepVerifier.create(getResponse("/long").flatMapMany(HttpResponse::getBody), EMPTY_INITIAL_REQUEST_OPTIONS)
.expectNextCount(0)
.thenRequest(1)
.expectNextCount(1)
Expand All @@ -136,7 +134,10 @@ public void testFlowableBackpressure() {

@Test
public void testRequestBodyIsErrorShouldPropagateToResponse() {
HttpClient client = new OkHttpAsyncClientProvider().createInstance();
HttpClient client = new OkHttpAsyncHttpClientBuilder()
.dispatcher(createQuietDispatcher(RuntimeException.class, "boo"))
.build();

HttpRequest request = new HttpRequest(HttpMethod.POST, url(server, "/shortPost"))
.setHeader("Content-Length", "123")
.setBody(Flux.error(new RuntimeException("boo")));
Expand All @@ -148,7 +149,10 @@ public void testRequestBodyIsErrorShouldPropagateToResponse() {

@Test
public void testRequestBodyEndsInErrorShouldPropagateToResponse() {
HttpClient client = new OkHttpAsyncClientProvider().createInstance();
HttpClient client = new OkHttpAsyncHttpClientBuilder()
.dispatcher(createQuietDispatcher(RuntimeException.class, "boo"))
.build();

String contentChunk = "abcdefgh";
int repetitions = 1000;
HttpRequest request = new HttpRequest(HttpMethod.POST, url(server, "/shortPost"))
Expand All @@ -169,44 +173,12 @@ public void testRequestBodyEndsInErrorShouldPropagateToResponse() {

@Test
public void testServerShutsDownSocketShouldPushErrorToContentFlowable() {
Assertions.assertTimeout(Duration.ofMillis(5000), () -> {
CountDownLatch latch = new CountDownLatch(1);
try (ServerSocket ss = new ServerSocket(0)) {
Mono.fromCallable(() -> {
latch.countDown();
Socket socket = ss.accept();
// give the client time to get request across
Thread.sleep(500);
// respond but don't send the complete response
byte[] bytes = new byte[1024];
int n = socket.getInputStream().read(bytes);
String response = "HTTP/1.1 200 OK\r\n" //
+ "Content-Type: text/plain\r\n" //
+ "Content-Length: 10\r\n" //
+ "\r\n" //
+ "zi";
OutputStream out = socket.getOutputStream();
out.write(response.getBytes());
out.flush();
// kill the socket with HTTP response body incomplete
socket.close();
return 1;
}).subscribeOn(Schedulers.boundedElastic()).subscribe();
//
latch.await();
HttpClient client = new OkHttpAsyncHttpClientBuilder().build();
HttpRequest request = new HttpRequest(HttpMethod.GET,
new URL("http://localhost:" + ss.getLocalPort() + "/ioException"));

HttpResponse response = client.send(request).block();

assertNotNull(response);
assertEquals(200, response.getStatusCode());
HttpClient client = new OkHttpAsyncClientProvider().createInstance();

HttpRequest request = new HttpRequest(HttpMethod.GET, url(server, "/connectionClose"));

StepVerifier.create(response.getBodyAsByteArray())
.verifyError(IOException.class);
}
});
StepVerifier.create(client.send(request).flatMap(HttpResponse::getBodyAsByteArray))
.verifyError(IOException.class);
}

@Test
Expand Down Expand Up @@ -296,14 +268,14 @@ private static byte[] createLongBody() {
return longBody;
}

private void checkBodyReceived(byte[] expectedBody, String path) {
private static void checkBodyReceived(byte[] expectedBody, String path) {
HttpClient client = new OkHttpAsyncHttpClientBuilder().build();
StepVerifier.create(doRequest(client, path).flatMap(HttpResponse::getBodyAsByteArray))
.assertNext(bytes -> assertArrayEquals(expectedBody, bytes))
.verifyComplete();
}

private Mono<HttpResponse> doRequest(HttpClient client, String path) {
private static Mono<HttpResponse> doRequest(HttpClient client, String path) {
HttpRequest request = new HttpRequest(HttpMethod.GET, url(server, path));
return client.send(request);
}
Expand Down
Loading

0 comments on commit 4a59fa7

Please sign in to comment.