diff --git a/plugin/trino-http-event-listener/src/main/java/io/trino/plugin/httpquery/HttpEventListener.java b/plugin/trino-http-event-listener/src/main/java/io/trino/plugin/httpquery/HttpEventListener.java index 6ddfffd74046..e5dcb57fe908 100644 --- a/plugin/trino-http-event-listener/src/main/java/io/trino/plugin/httpquery/HttpEventListener.java +++ b/plugin/trino-http-event-listener/src/main/java/io/trino/plugin/httpquery/HttpEventListener.java @@ -134,16 +134,18 @@ public void onSuccess(StatusResponse result) { verify(result != null); - if (!(result.getStatusCode() >= 200 && result.getStatusCode() < 300) && attempt < config.getRetryCount()) { - Duration nextDelay = nextDelay(delay); - int nextAttempt = attempt + 1; - - log.warn("QueryId = \"%s\", attempt = %d/%d, URL = %s | Ingest server responded with code %d, will retry after approximately %d seconds", - queryId, attempt + 1, config.getRetryCount() + 1, request.getUri().toString(), - result.getStatusCode(), nextDelay.roundTo(TimeUnit.SECONDS)); - - attemptToSend(request, nextAttempt, nextDelay, queryId); - return; + if (shouldRetry(result)) { + if (attempt < config.getRetryCount()) { + Duration nextDelay = nextDelay(delay); + int nextAttempt = attempt + 1; + + log.warn("QueryId = \"%s\", attempt = %d/%d, URL = %s | Ingest server responded with code %d, will retry after approximately %d seconds", + queryId, attempt + 1, config.getRetryCount() + 1, request.getUri().toString(), + result.getStatusCode(), nextDelay.roundTo(TimeUnit.SECONDS)); + + attemptToSend(request, nextAttempt, nextDelay, queryId); + return; + } } log.debug("QueryId = \"%s\", attempt = %d/%d, URL = %s | Query event delivered successfully", @@ -172,6 +174,30 @@ public void onFailure(Throwable t) (long) delay.getValue(), delay.getUnit()); } + private boolean shouldRetry(StatusResponse response) + { + int statusCode = response.getStatusCode(); + + // 1XX Information, requests can't be split + if (statusCode < 200) { + return false; + } + // 2XX - OK + if (200 <= statusCode && statusCode < 300) { + return false; + } + // 3XX Redirects, not following redirects + if (300 <= statusCode && statusCode <= 400) { + return false; + } + // 4XX - client error, no retry except 408 Request Timeout and 429 Too Many Requests + if (400 <= statusCode && statusCode < 500 && statusCode != 408 && statusCode != 429) { + return false; + } + + return true; + } + private Duration nextDelay(Duration delay) { if (delay.compareTo(Duration.valueOf("0s")) == 0) { diff --git a/plugin/trino-http-event-listener/src/test/java/io/trino/plugin/httpquery/TestHttpEventListener.java b/plugin/trino-http-event-listener/src/test/java/io/trino/plugin/httpquery/TestHttpEventListener.java index 527e68ef3958..a2ac353ec6d3 100644 --- a/plugin/trino-http-event-listener/src/test/java/io/trino/plugin/httpquery/TestHttpEventListener.java +++ b/plugin/trino-http-event-listener/src/test/java/io/trino/plugin/httpquery/TestHttpEventListener.java @@ -35,6 +35,7 @@ import okhttp3.mockwebserver.SocketPolicy; import org.testng.annotations.AfterMethod; import org.testng.annotations.BeforeMethod; +import org.testng.annotations.DataProvider; import org.testng.annotations.Test; import javax.net.ssl.KeyManagerFactory; @@ -356,8 +357,14 @@ public void testNoServerCertificateShouldNotSendRequest() assertNull(recordedRequest, "Handshake should have failed"); } - @Test - public void testServer500ShouldRetry() + @DataProvider(name = "retryStatusCodes") + public static Object[][] retryStatusCodes() + { + return new Object[][] {{503}, {500}, {429}, {408}}; + } + + @Test(dataProvider = "retryStatusCodes") + public void testServerShouldRetry(int responseCode) throws Exception { EventListener eventListener = factory.create(Map.of( @@ -365,7 +372,7 @@ public void testServer500ShouldRetry() "http-event-listener.log-completed", "true", "http-event-listener.connect-retry-count", "1")); - server.enqueue(new MockResponse().setResponseCode(500)); + server.enqueue(new MockResponse().setResponseCode(responseCode)); server.enqueue(new MockResponse().setResponseCode(200)); eventListener.queryCompleted(queryCompleteEvent);