Skip to content

Commit

Permalink
Reactive Rest Client closing connections after server failures
Browse files Browse the repository at this point in the history
  • Loading branch information
Sgitario authored and pedroh-pereira committed Nov 14, 2022
1 parent 5696d7a commit 741ced8
Show file tree
Hide file tree
Showing 4 changed files with 64 additions and 11 deletions.
Original file line number Diff line number Diff line change
@@ -1,7 +1,10 @@
package io.quarkus.rest.client.reactive;

import static io.restassured.RestAssured.given;
import static org.assertj.core.api.Assertions.assertThat;
import static org.awaitility.Awaitility.await;

import java.time.Duration;
import java.util.Map;
import java.util.Set;

Expand Down Expand Up @@ -72,4 +75,23 @@ void shouldMapQueryParamsWithSpecialCharacters() {
assertThat(map.get("p5")).isEqualTo("5");
assertThat(map.get("p6")).isEqualTo("6");
}

/**
* Test to reproduce https://github.com/quarkusio/quarkus/issues/28818.
*/
@Test
void shouldCloseConnectionsWhenFailures() {
// It's using 30 seconds because it's the default timeout to release connections. This timeout should not be taken into
// account when there are failures, and we should be able to call 3 times to the service without waiting.
await().atMost(Duration.ofSeconds(30))
.until(() -> {
for (int call = 0; call < 3; call++) {
given()
.when().get("/hello/callClientForImageInfo")
.then()
.statusCode(500);
}
return true;
});
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
import javax.ws.rs.GET;
import javax.ws.rs.POST;
import javax.ws.rs.Path;
import javax.ws.rs.Produces;
import javax.ws.rs.core.MediaType;

import org.eclipse.microprofile.rest.client.inject.RegisterRestClient;
Expand All @@ -23,4 +24,10 @@ public interface HelloClient2 {
@GET
@Path("delay")
Uni<String> delay();

@POST
@Path("/imageInfo")
@Consumes("image/gif")
@Produces(MediaType.TEXT_PLAIN)
String imageInfo(byte[] imageFile);
}
Original file line number Diff line number Diff line change
Expand Up @@ -78,4 +78,23 @@ public Uni<String> delay() {
return Uni.createFrom().item("Hello")
.onItem().delayIt().by(Duration.ofMillis(500));
}

@Path("callClientForImageInfo")
@GET
public String callClientForImageInfo() {
int size = 1024 * 1024 * 5;

byte[] buffer = new byte[size];

//Should provoke 415 Unsupported Media Type
return client2.imageInfo(buffer);
}

@POST
@Consumes({ "image/jpeg", "image/png" })
@Path("/imageInfo")
@Produces(MediaType.TEXT_PLAIN)
public String imageInfo(byte[] imageFile) {
throw new IllegalStateException("This method should never be invoked");
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import javax.ws.rs.core.HttpHeaders;
import javax.ws.rs.core.MediaType;
import javax.ws.rs.core.MultivaluedMap;
import javax.ws.rs.core.Response;
import javax.ws.rs.core.Variant;

import org.jboss.logging.Logger;
Expand Down Expand Up @@ -243,6 +244,11 @@ public void handle(HttpClientResponse clientResponse) {
}
}

if (Response.Status.Family.familyOf(status) != Response.Status.Family.SUCCESSFUL) {
httpClientRequest.connection().close();
requestContext.resume();
}

if (isResponseMultipart(requestContext)) {
QuarkusMultipartResponseDecoder multipartDecoder = new QuarkusMultipartResponseDecoder(
clientResponse);
Expand Down Expand Up @@ -366,17 +372,16 @@ public void handle(Buffer buffer) {
requestContext.resume(t);
}
}
})
.onFailure(new Handler<>() {
@Override
public void handle(Throwable failure) {
if (failure instanceof IOException) {
requestContext.resume(new ProcessingException(failure));
} else {
requestContext.resume(failure);
}
}
});
}).onFailure(new Handler<>() {
@Override
public void handle(Throwable failure) {
if (failure instanceof IOException) {
requestContext.resume(new ProcessingException(failure));
} else {
requestContext.resume(failure);
}
}
});
}

private boolean isResponseMultipart(RestClientRequestContext requestContext) {
Expand Down

0 comments on commit 741ced8

Please sign in to comment.