diff --git a/communication/src/main/java/datadog/communication/http/HttpRetryPolicy.java b/communication/src/main/java/datadog/communication/http/HttpRetryPolicy.java
index 48169264d49..23ed4c27525 100644
--- a/communication/src/main/java/datadog/communication/http/HttpRetryPolicy.java
+++ b/communication/src/main/java/datadog/communication/http/HttpRetryPolicy.java
@@ -1,5 +1,8 @@
package datadog.communication.http;
+import java.io.IOException;
+import java.io.InterruptedIOException;
+import java.net.ConnectException;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.TimeUnit;
import javax.annotation.Nullable;
@@ -34,7 +37,7 @@
* instance.
*/
@NotThreadSafe
-public class HttpRetryPolicy {
+public class HttpRetryPolicy implements AutoCloseable {
private static final Logger log = LoggerFactory.getLogger(HttpRetryPolicy.class);
@@ -47,12 +50,35 @@ public class HttpRetryPolicy {
private int retriesLeft;
private long delay;
+ private boolean interrupted;
private final double delayFactor;
+ private final boolean suppressInterrupts;
- private HttpRetryPolicy(int retriesLeft, long delay, double delayFactor) {
+ private HttpRetryPolicy(
+ int retriesLeft, long delay, double delayFactor, boolean suppressInterrupts) {
this.retriesLeft = retriesLeft;
this.delay = delay;
this.delayFactor = delayFactor;
+ this.suppressInterrupts = suppressInterrupts;
+ }
+
+ public boolean shouldRetry(Exception e) {
+ if (e instanceof ConnectException) {
+ return shouldRetry((okhttp3.Response) null);
+ }
+ if (e instanceof InterruptedIOException) {
+ if (suppressInterrupts) {
+ return shouldRetry((okhttp3.Response) null);
+ }
+ }
+ if (e instanceof InterruptedException) {
+ if (suppressInterrupts) {
+ // remember interrupted status to restore the thread's interrupted flag later
+ interrupted = true;
+ return shouldRetry((okhttp3.Response) null);
+ }
+ }
+ return false;
}
public boolean shouldRetry(@Nullable okhttp3.Response response) {
@@ -106,25 +132,52 @@ private long getRateLimitResetTime(okhttp3.Response response) {
}
}
- public long backoff() {
+ long getBackoffDelay() {
long currentDelay = delay;
delay = (long) (delay * delayFactor);
return currentDelay;
}
+ public void backoff() throws IOException {
+ try {
+ Thread.sleep(getBackoffDelay());
+ } catch (InterruptedException e) {
+ if (suppressInterrupts) {
+ // remember interrupted status to restore the thread's interrupted flag later
+ interrupted = true;
+ } else {
+ Thread.currentThread().interrupt();
+ throw new InterruptedIOException("thread interrupted");
+ }
+ }
+ }
+
+ @Override
+ public void close() {
+ if (interrupted) {
+ Thread.currentThread().interrupt();
+ }
+ }
+
public static class Factory {
private final int maxRetries;
private final long initialDelay;
private final double delayFactor;
+ private final boolean retryInterrupts;
public Factory(int maxRetries, int initialDelay, double delayFactor) {
+ this(maxRetries, initialDelay, delayFactor, false);
+ }
+
+ public Factory(int maxRetries, int initialDelay, double delayFactor, boolean retryInterrupts) {
this.maxRetries = maxRetries;
this.initialDelay = initialDelay;
this.delayFactor = delayFactor;
+ this.retryInterrupts = retryInterrupts;
}
public HttpRetryPolicy create() {
- return new HttpRetryPolicy(maxRetries, initialDelay, delayFactor);
+ return new HttpRetryPolicy(maxRetries, initialDelay, delayFactor, retryInterrupts);
}
}
}
diff --git a/communication/src/main/java/datadog/communication/http/OkHttpUtils.java b/communication/src/main/java/datadog/communication/http/OkHttpUtils.java
index 2a69509f31c..e74561f7277 100644
--- a/communication/src/main/java/datadog/communication/http/OkHttpUtils.java
+++ b/communication/src/main/java/datadog/communication/http/OkHttpUtils.java
@@ -11,8 +11,6 @@
import datadog.trace.util.AgentProxySelector;
import java.io.File;
import java.io.IOException;
-import java.io.InterruptedIOException;
-import java.net.ConnectException;
import java.net.InetSocketAddress;
import java.net.Proxy;
import java.nio.ByteBuffer;
@@ -357,16 +355,10 @@ public void writeTo(BufferedSink sink) throws IOException {
}
}
- /**
- * Retries a request in accordance with the provided retry policy. Important:
- * interrupts to a thread executing this method are ignored (the thread's interruption flag is
- * restored on exit)
- */
public static Response sendWithRetries(
- OkHttpClient httpClient, HttpRetryPolicy retryPolicy, Request request) throws IOException {
- boolean interrupted = false;
- try {
-
+ OkHttpClient httpClient, HttpRetryPolicy.Factory retryPolicyFactory, Request request)
+ throws IOException {
+ try (HttpRetryPolicy retryPolicy = retryPolicyFactory.create()) {
while (true) {
try {
Response response = httpClient.newCall(request).execute();
@@ -378,24 +370,13 @@ public static Response sendWithRetries(
} else {
closeQuietly(response);
}
- } catch (ConnectException | InterruptedIOException ex) {
- if (!retryPolicy.shouldRetry(null)) {
+ } catch (Exception ex) {
+ if (!retryPolicy.shouldRetry(ex)) {
throw ex;
}
}
// If we get here, there has been an error, and we still have retries left
- long backoffMs = retryPolicy.backoff();
- try {
- Thread.sleep(backoffMs);
- } catch (InterruptedException e) {
- // remember interrupted status to restore the thread's interrupted flag later
- interrupted = true;
- }
- }
-
- } finally {
- if (interrupted) {
- Thread.currentThread().interrupt();
+ retryPolicy.backoff();
}
}
}
diff --git a/communication/src/test/groovy/datadog/communication/http/HttpRetryPolicyTest.groovy b/communication/src/test/groovy/datadog/communication/http/HttpRetryPolicyTest.groovy
index 6f57de0bd04..a05656176f6 100644
--- a/communication/src/test/groovy/datadog/communication/http/HttpRetryPolicyTest.groovy
+++ b/communication/src/test/groovy/datadog/communication/http/HttpRetryPolicyTest.groovy
@@ -17,10 +17,10 @@ class HttpRetryPolicyTest extends Specification {
when:
while (retry <= maxRetries) {
- def shouldRetry = retryPolicy.shouldRetry()
+ def shouldRetry = retryPolicy.shouldRetry((Response) null)
shouldRetries << shouldRetry
if (shouldRetry) {
- backoffs << retryPolicy.backoff()
+ backoffs << retryPolicy.getBackoffDelay()
}
retry += 1
}
@@ -44,10 +44,10 @@ class HttpRetryPolicyTest extends Specification {
def retryPolicy = new HttpRetryPolicy.Factory(5, 100, 2.0).create()
def responseBuilder = new Response.Builder()
- .code(responseCode)
- .request(GroovyMock(Request))
- .protocol(Protocol.HTTP_1_1)
- .message("")
+ .code(responseCode)
+ .request(GroovyMock(Request))
+ .protocol(Protocol.HTTP_1_1)
+ .message("")
if (rateLimitHeader != null) {
responseBuilder.header("x-ratelimit-reset", rateLimitHeader)
}
@@ -73,4 +73,58 @@ class HttpRetryPolicyTest extends Specification {
500 | null | 5
501 | null | 5
}
+
+ def "test exceptions are retried: #exception with suppress interrupts #suppressInterrupts"() {
+ setup:
+ def retryPolicy = new HttpRetryPolicy.Factory(5, 100, 2.0, suppressInterrupts).create()
+
+ expect:
+ retryPolicy.shouldRetry(exception) == shouldRetry
+
+ where:
+ exception | suppressInterrupts | shouldRetry
+ new NullPointerException() | false | false
+ new IllegalArgumentException() | false | false
+ new ConnectException() | false | true
+ new InterruptedIOException() | false | false
+ new InterruptedIOException() | true | true
+ new InterruptedException() | false | false
+ new InterruptedException() | true | true
+ }
+
+ def "test interrupt flag is preserved when suppressing interrupts"() {
+ setup:
+ def retryPolicy = new HttpRetryPolicy.Factory(5, 100, 2.0, true).create()
+
+ when:
+ retryPolicy.shouldRetry(new InterruptedException())
+ retryPolicy.close()
+
+ then:
+ Thread.interrupted()
+ }
+
+ def "test interrupt flag is preserved if interrupted while backing off"() {
+ setup:
+ boolean[] b = new boolean[2]
+
+ Runnable r = () -> {
+ def retryPolicy = new HttpRetryPolicy.Factory(5, 1000, 2.0, true).create()
+ retryPolicy.backoff()
+
+ b[0] = Thread.currentThread().isInterrupted()
+ retryPolicy.close()
+ b[1] = Thread.interrupted()
+ }
+ Thread t = new Thread(r, "test-http-retry-policy-interrupts")
+
+ when:
+ t.start()
+ t.interrupt()
+ t.join()
+
+ then:
+ !b[0] // before retry policy is closed, the thread should not be interrupted: interrupts are suppressed
+ b[1] // after retry policy is closed, the thread should be interrupted: interrupt flag should be restored
+ }
}
diff --git a/dd-java-agent/agent-ci-visibility/src/main/java/datadog/trace/civisibility/communication/BackendApiFactory.java b/dd-java-agent/agent-ci-visibility/src/main/java/datadog/trace/civisibility/communication/BackendApiFactory.java
index cb4b986c6fc..5fe30e1a9e8 100644
--- a/dd-java-agent/agent-ci-visibility/src/main/java/datadog/trace/civisibility/communication/BackendApiFactory.java
+++ b/dd-java-agent/agent-ci-visibility/src/main/java/datadog/trace/civisibility/communication/BackendApiFactory.java
@@ -23,7 +23,7 @@ public BackendApiFactory(Config config, SharedCommunicationObjects sharedCommuni
}
public @Nullable BackendApi createBackendApi() {
- HttpRetryPolicy.Factory retryPolicyFactory = new HttpRetryPolicy.Factory(5, 100, 2.0);
+ HttpRetryPolicy.Factory retryPolicyFactory = new HttpRetryPolicy.Factory(5, 100, 2.0, true);
if (config.isCiVisibilityAgentlessEnabled()) {
HttpUrl agentlessUrl = getAgentlessUrl();
diff --git a/dd-java-agent/agent-ci-visibility/src/main/java/datadog/trace/civisibility/communication/EvpProxyApi.java b/dd-java-agent/agent-ci-visibility/src/main/java/datadog/trace/civisibility/communication/EvpProxyApi.java
index eea481345d6..fcc482ece68 100644
--- a/dd-java-agent/agent-ci-visibility/src/main/java/datadog/trace/civisibility/communication/EvpProxyApi.java
+++ b/dd-java-agent/agent-ci-visibility/src/main/java/datadog/trace/civisibility/communication/EvpProxyApi.java
@@ -81,9 +81,8 @@ public T post(
final Request request = requestBuilder.post(requestBody).build();
- HttpRetryPolicy retryPolicy = retryPolicyFactory.create();
try (okhttp3.Response response =
- OkHttpUtils.sendWithRetries(httpClient, retryPolicy, request)) {
+ OkHttpUtils.sendWithRetries(httpClient, retryPolicyFactory, request)) {
if (response.isSuccessful()) {
log.debug("Request to {} returned successful response: {}", uri, response.code());
diff --git a/dd-java-agent/agent-ci-visibility/src/main/java/datadog/trace/civisibility/communication/IntakeApi.java b/dd-java-agent/agent-ci-visibility/src/main/java/datadog/trace/civisibility/communication/IntakeApi.java
index a92f1bc5ddb..067969c2789 100644
--- a/dd-java-agent/agent-ci-visibility/src/main/java/datadog/trace/civisibility/communication/IntakeApi.java
+++ b/dd-java-agent/agent-ci-visibility/src/main/java/datadog/trace/civisibility/communication/IntakeApi.java
@@ -85,9 +85,8 @@ public T post(
}
Request request = requestBuilder.build();
- HttpRetryPolicy retryPolicy = retryPolicyFactory.create();
try (okhttp3.Response response =
- OkHttpUtils.sendWithRetries(httpClient, retryPolicy, request)) {
+ OkHttpUtils.sendWithRetries(httpClient, retryPolicyFactory, request)) {
if (response.isSuccessful()) {
log.debug("Request to {} returned successful response: {}", uri, response.code());
InputStream responseBodyStream = response.body().byteStream();
diff --git a/dd-trace-core/src/main/java/datadog/trace/common/writer/ddintake/DDEvpProxyApi.java b/dd-trace-core/src/main/java/datadog/trace/common/writer/ddintake/DDEvpProxyApi.java
index 93346b879a1..8926d17e200 100644
--- a/dd-trace-core/src/main/java/datadog/trace/common/writer/ddintake/DDEvpProxyApi.java
+++ b/dd-trace-core/src/main/java/datadog/trace/common/writer/ddintake/DDEvpProxyApi.java
@@ -92,7 +92,8 @@ public DDEvpProxyApi build() {
? httpClient
: OkHttpUtils.buildHttpClient(proxiedApiUrl, timeoutMillis);
- final HttpRetryPolicy.Factory retryPolicyFactory = new HttpRetryPolicy.Factory(5, 100, 2.0);
+ final HttpRetryPolicy.Factory retryPolicyFactory =
+ new HttpRetryPolicy.Factory(5, 100, 2.0, true);
log.debug("proxiedApiUrl: {}", proxiedApiUrl);
return new DDEvpProxyApi(
@@ -141,9 +142,8 @@ public Response sendSerializedTraces(Payload payload) {
totalTraces += payload.traceCount();
receivedTraces += payload.traceCount();
- HttpRetryPolicy retryPolicy = retryPolicyFactory.create();
try (okhttp3.Response response =
- OkHttpUtils.sendWithRetries(httpClient, retryPolicy, request)) {
+ OkHttpUtils.sendWithRetries(httpClient, retryPolicyFactory, request)) {
if (response.isSuccessful()) {
countAndLogSuccessfulSend(payload.traceCount(), sizeInBytes);
return Response.success(response.code());
diff --git a/dd-trace-core/src/main/java/datadog/trace/common/writer/ddintake/DDIntakeApi.java b/dd-trace-core/src/main/java/datadog/trace/common/writer/ddintake/DDIntakeApi.java
index 2f7b2f44268..165d1a951ab 100644
--- a/dd-trace-core/src/main/java/datadog/trace/common/writer/ddintake/DDIntakeApi.java
+++ b/dd-trace-core/src/main/java/datadog/trace/common/writer/ddintake/DDIntakeApi.java
@@ -43,7 +43,7 @@ public static class DDIntakeApiBuilder {
HttpUrl hostUrl = null;
OkHttpClient httpClient = null;
- HttpRetryPolicy.Factory retryPolicyFactory = new HttpRetryPolicy.Factory(5, 100, 2.0);
+ HttpRetryPolicy.Factory retryPolicyFactory = new HttpRetryPolicy.Factory(5, 100, 2.0, true);
private String apiKey;
@@ -134,9 +134,8 @@ public Response sendSerializedTraces(Payload payload) {
totalTraces += payload.traceCount();
receivedTraces += payload.traceCount();
- HttpRetryPolicy retryPolicy = retryPolicyFactory.create();
try (okhttp3.Response response =
- OkHttpUtils.sendWithRetries(httpClient, retryPolicy, request)) {
+ OkHttpUtils.sendWithRetries(httpClient, retryPolicyFactory, request)) {
if (response.isSuccessful()) {
countAndLogSuccessfulSend(payload.traceCount(), sizeInBytes);
return Response.success(response.code());