From 4cce4589334aea408a7883c922b181da41d9d70e Mon Sep 17 00:00:00 2001 From: Nikita Tkachenko Date: Wed, 17 Apr 2024 12:42:49 +0200 Subject: [PATCH 1/2] Do not abort CI Visibility spans dispatch on interrupt --- .../communication/http/OkHttpUtils.java | 53 ++++++++++++------- 1 file changed, 34 insertions(+), 19 deletions(-) diff --git a/communication/src/main/java/datadog/communication/http/OkHttpUtils.java b/communication/src/main/java/datadog/communication/http/OkHttpUtils.java index 2cbd59cceae..2a69509f31c 100644 --- a/communication/src/main/java/datadog/communication/http/OkHttpUtils.java +++ b/communication/src/main/java/datadog/communication/http/OkHttpUtils.java @@ -11,6 +11,7 @@ import datadog.trace.util.AgentProxySelector; import java.io.File; import java.io.IOException; +import java.io.InterruptedIOException; import java.net.ConnectException; import java.net.InetSocketAddress; import java.net.Proxy; @@ -356,31 +357,45 @@ public void writeTo(BufferedSink sink) throws IOException { } } + /** + * Retries a request in accordance with the provided retry policy. Important: + * interrupts to a thread executing this method are ignored (the thread's interruption flag is + * restored on exit) + */ public static Response sendWithRetries( OkHttpClient httpClient, HttpRetryPolicy retryPolicy, Request request) throws IOException { - while (true) { - try { - okhttp3.Response response = httpClient.newCall(request).execute(); - if (response.isSuccessful()) { - return response; - } - if (!retryPolicy.shouldRetry(response)) { - return response; - } else { - closeQuietly(response); + boolean interrupted = false; + try { + + while (true) { + try { + Response response = httpClient.newCall(request).execute(); + if (response.isSuccessful()) { + return response; + } + if (!retryPolicy.shouldRetry(response)) { + return response; + } else { + closeQuietly(response); + } + } catch (ConnectException | InterruptedIOException ex) { + if (!retryPolicy.shouldRetry(null)) { + throw ex; + } } - } catch (ConnectException ex) { - if (!retryPolicy.shouldRetry(null)) { - throw ex; + // If we get here, there has been an error, and we still have retries left + long backoffMs = retryPolicy.backoff(); + try { + Thread.sleep(backoffMs); + } catch (InterruptedException e) { + // remember interrupted status to restore the thread's interrupted flag later + interrupted = true; } } - // If we get here, there has been an error, and we still have retries left - long backoffMs = retryPolicy.backoff(); - try { - Thread.sleep(backoffMs); - } catch (InterruptedException e) { + + } finally { + if (interrupted) { Thread.currentThread().interrupt(); - throw new IOException(e); } } } From 1ec21aadbc2e70f270c5c48ffa16b8a1fd6b6228 Mon Sep 17 00:00:00 2001 From: Nikita Tkachenko Date: Wed, 17 Apr 2024 14:51:13 +0200 Subject: [PATCH 2/2] Encapsulate interrupts suppression in retry policy --- .../communication/http/HttpRetryPolicy.java | 61 +++++++++++++++-- .../communication/http/OkHttpUtils.java | 31 ++------- .../http/HttpRetryPolicyTest.groovy | 66 +++++++++++++++++-- .../communication/BackendApiFactory.java | 2 +- .../communication/EvpProxyApi.java | 3 +- .../civisibility/communication/IntakeApi.java | 3 +- .../common/writer/ddintake/DDEvpProxyApi.java | 6 +- .../common/writer/ddintake/DDIntakeApi.java | 5 +- 8 files changed, 131 insertions(+), 46 deletions(-) diff --git a/communication/src/main/java/datadog/communication/http/HttpRetryPolicy.java b/communication/src/main/java/datadog/communication/http/HttpRetryPolicy.java index 48169264d49..23ed4c27525 100644 --- a/communication/src/main/java/datadog/communication/http/HttpRetryPolicy.java +++ b/communication/src/main/java/datadog/communication/http/HttpRetryPolicy.java @@ -1,5 +1,8 @@ package datadog.communication.http; +import java.io.IOException; +import java.io.InterruptedIOException; +import java.net.ConnectException; import java.util.concurrent.ThreadLocalRandom; import java.util.concurrent.TimeUnit; import javax.annotation.Nullable; @@ -34,7 +37,7 @@ * instance. */ @NotThreadSafe -public class HttpRetryPolicy { +public class HttpRetryPolicy implements AutoCloseable { private static final Logger log = LoggerFactory.getLogger(HttpRetryPolicy.class); @@ -47,12 +50,35 @@ public class HttpRetryPolicy { private int retriesLeft; private long delay; + private boolean interrupted; private final double delayFactor; + private final boolean suppressInterrupts; - private HttpRetryPolicy(int retriesLeft, long delay, double delayFactor) { + private HttpRetryPolicy( + int retriesLeft, long delay, double delayFactor, boolean suppressInterrupts) { this.retriesLeft = retriesLeft; this.delay = delay; this.delayFactor = delayFactor; + this.suppressInterrupts = suppressInterrupts; + } + + public boolean shouldRetry(Exception e) { + if (e instanceof ConnectException) { + return shouldRetry((okhttp3.Response) null); + } + if (e instanceof InterruptedIOException) { + if (suppressInterrupts) { + return shouldRetry((okhttp3.Response) null); + } + } + if (e instanceof InterruptedException) { + if (suppressInterrupts) { + // remember interrupted status to restore the thread's interrupted flag later + interrupted = true; + return shouldRetry((okhttp3.Response) null); + } + } + return false; } public boolean shouldRetry(@Nullable okhttp3.Response response) { @@ -106,25 +132,52 @@ private long getRateLimitResetTime(okhttp3.Response response) { } } - public long backoff() { + long getBackoffDelay() { long currentDelay = delay; delay = (long) (delay * delayFactor); return currentDelay; } + public void backoff() throws IOException { + try { + Thread.sleep(getBackoffDelay()); + } catch (InterruptedException e) { + if (suppressInterrupts) { + // remember interrupted status to restore the thread's interrupted flag later + interrupted = true; + } else { + Thread.currentThread().interrupt(); + throw new InterruptedIOException("thread interrupted"); + } + } + } + + @Override + public void close() { + if (interrupted) { + Thread.currentThread().interrupt(); + } + } + public static class Factory { private final int maxRetries; private final long initialDelay; private final double delayFactor; + private final boolean retryInterrupts; public Factory(int maxRetries, int initialDelay, double delayFactor) { + this(maxRetries, initialDelay, delayFactor, false); + } + + public Factory(int maxRetries, int initialDelay, double delayFactor, boolean retryInterrupts) { this.maxRetries = maxRetries; this.initialDelay = initialDelay; this.delayFactor = delayFactor; + this.retryInterrupts = retryInterrupts; } public HttpRetryPolicy create() { - return new HttpRetryPolicy(maxRetries, initialDelay, delayFactor); + return new HttpRetryPolicy(maxRetries, initialDelay, delayFactor, retryInterrupts); } } } diff --git a/communication/src/main/java/datadog/communication/http/OkHttpUtils.java b/communication/src/main/java/datadog/communication/http/OkHttpUtils.java index 2a69509f31c..e74561f7277 100644 --- a/communication/src/main/java/datadog/communication/http/OkHttpUtils.java +++ b/communication/src/main/java/datadog/communication/http/OkHttpUtils.java @@ -11,8 +11,6 @@ import datadog.trace.util.AgentProxySelector; import java.io.File; import java.io.IOException; -import java.io.InterruptedIOException; -import java.net.ConnectException; import java.net.InetSocketAddress; import java.net.Proxy; import java.nio.ByteBuffer; @@ -357,16 +355,10 @@ public void writeTo(BufferedSink sink) throws IOException { } } - /** - * Retries a request in accordance with the provided retry policy. Important: - * interrupts to a thread executing this method are ignored (the thread's interruption flag is - * restored on exit) - */ public static Response sendWithRetries( - OkHttpClient httpClient, HttpRetryPolicy retryPolicy, Request request) throws IOException { - boolean interrupted = false; - try { - + OkHttpClient httpClient, HttpRetryPolicy.Factory retryPolicyFactory, Request request) + throws IOException { + try (HttpRetryPolicy retryPolicy = retryPolicyFactory.create()) { while (true) { try { Response response = httpClient.newCall(request).execute(); @@ -378,24 +370,13 @@ public static Response sendWithRetries( } else { closeQuietly(response); } - } catch (ConnectException | InterruptedIOException ex) { - if (!retryPolicy.shouldRetry(null)) { + } catch (Exception ex) { + if (!retryPolicy.shouldRetry(ex)) { throw ex; } } // If we get here, there has been an error, and we still have retries left - long backoffMs = retryPolicy.backoff(); - try { - Thread.sleep(backoffMs); - } catch (InterruptedException e) { - // remember interrupted status to restore the thread's interrupted flag later - interrupted = true; - } - } - - } finally { - if (interrupted) { - Thread.currentThread().interrupt(); + retryPolicy.backoff(); } } } diff --git a/communication/src/test/groovy/datadog/communication/http/HttpRetryPolicyTest.groovy b/communication/src/test/groovy/datadog/communication/http/HttpRetryPolicyTest.groovy index 6f57de0bd04..a05656176f6 100644 --- a/communication/src/test/groovy/datadog/communication/http/HttpRetryPolicyTest.groovy +++ b/communication/src/test/groovy/datadog/communication/http/HttpRetryPolicyTest.groovy @@ -17,10 +17,10 @@ class HttpRetryPolicyTest extends Specification { when: while (retry <= maxRetries) { - def shouldRetry = retryPolicy.shouldRetry() + def shouldRetry = retryPolicy.shouldRetry((Response) null) shouldRetries << shouldRetry if (shouldRetry) { - backoffs << retryPolicy.backoff() + backoffs << retryPolicy.getBackoffDelay() } retry += 1 } @@ -44,10 +44,10 @@ class HttpRetryPolicyTest extends Specification { def retryPolicy = new HttpRetryPolicy.Factory(5, 100, 2.0).create() def responseBuilder = new Response.Builder() - .code(responseCode) - .request(GroovyMock(Request)) - .protocol(Protocol.HTTP_1_1) - .message("") + .code(responseCode) + .request(GroovyMock(Request)) + .protocol(Protocol.HTTP_1_1) + .message("") if (rateLimitHeader != null) { responseBuilder.header("x-ratelimit-reset", rateLimitHeader) } @@ -73,4 +73,58 @@ class HttpRetryPolicyTest extends Specification { 500 | null | 5 501 | null | 5 } + + def "test exceptions are retried: #exception with suppress interrupts #suppressInterrupts"() { + setup: + def retryPolicy = new HttpRetryPolicy.Factory(5, 100, 2.0, suppressInterrupts).create() + + expect: + retryPolicy.shouldRetry(exception) == shouldRetry + + where: + exception | suppressInterrupts | shouldRetry + new NullPointerException() | false | false + new IllegalArgumentException() | false | false + new ConnectException() | false | true + new InterruptedIOException() | false | false + new InterruptedIOException() | true | true + new InterruptedException() | false | false + new InterruptedException() | true | true + } + + def "test interrupt flag is preserved when suppressing interrupts"() { + setup: + def retryPolicy = new HttpRetryPolicy.Factory(5, 100, 2.0, true).create() + + when: + retryPolicy.shouldRetry(new InterruptedException()) + retryPolicy.close() + + then: + Thread.interrupted() + } + + def "test interrupt flag is preserved if interrupted while backing off"() { + setup: + boolean[] b = new boolean[2] + + Runnable r = () -> { + def retryPolicy = new HttpRetryPolicy.Factory(5, 1000, 2.0, true).create() + retryPolicy.backoff() + + b[0] = Thread.currentThread().isInterrupted() + retryPolicy.close() + b[1] = Thread.interrupted() + } + Thread t = new Thread(r, "test-http-retry-policy-interrupts") + + when: + t.start() + t.interrupt() + t.join() + + then: + !b[0] // before retry policy is closed, the thread should not be interrupted: interrupts are suppressed + b[1] // after retry policy is closed, the thread should be interrupted: interrupt flag should be restored + } } diff --git a/dd-java-agent/agent-ci-visibility/src/main/java/datadog/trace/civisibility/communication/BackendApiFactory.java b/dd-java-agent/agent-ci-visibility/src/main/java/datadog/trace/civisibility/communication/BackendApiFactory.java index cb4b986c6fc..5fe30e1a9e8 100644 --- a/dd-java-agent/agent-ci-visibility/src/main/java/datadog/trace/civisibility/communication/BackendApiFactory.java +++ b/dd-java-agent/agent-ci-visibility/src/main/java/datadog/trace/civisibility/communication/BackendApiFactory.java @@ -23,7 +23,7 @@ public BackendApiFactory(Config config, SharedCommunicationObjects sharedCommuni } public @Nullable BackendApi createBackendApi() { - HttpRetryPolicy.Factory retryPolicyFactory = new HttpRetryPolicy.Factory(5, 100, 2.0); + HttpRetryPolicy.Factory retryPolicyFactory = new HttpRetryPolicy.Factory(5, 100, 2.0, true); if (config.isCiVisibilityAgentlessEnabled()) { HttpUrl agentlessUrl = getAgentlessUrl(); diff --git a/dd-java-agent/agent-ci-visibility/src/main/java/datadog/trace/civisibility/communication/EvpProxyApi.java b/dd-java-agent/agent-ci-visibility/src/main/java/datadog/trace/civisibility/communication/EvpProxyApi.java index eea481345d6..fcc482ece68 100644 --- a/dd-java-agent/agent-ci-visibility/src/main/java/datadog/trace/civisibility/communication/EvpProxyApi.java +++ b/dd-java-agent/agent-ci-visibility/src/main/java/datadog/trace/civisibility/communication/EvpProxyApi.java @@ -81,9 +81,8 @@ public T post( final Request request = requestBuilder.post(requestBody).build(); - HttpRetryPolicy retryPolicy = retryPolicyFactory.create(); try (okhttp3.Response response = - OkHttpUtils.sendWithRetries(httpClient, retryPolicy, request)) { + OkHttpUtils.sendWithRetries(httpClient, retryPolicyFactory, request)) { if (response.isSuccessful()) { log.debug("Request to {} returned successful response: {}", uri, response.code()); diff --git a/dd-java-agent/agent-ci-visibility/src/main/java/datadog/trace/civisibility/communication/IntakeApi.java b/dd-java-agent/agent-ci-visibility/src/main/java/datadog/trace/civisibility/communication/IntakeApi.java index a92f1bc5ddb..067969c2789 100644 --- a/dd-java-agent/agent-ci-visibility/src/main/java/datadog/trace/civisibility/communication/IntakeApi.java +++ b/dd-java-agent/agent-ci-visibility/src/main/java/datadog/trace/civisibility/communication/IntakeApi.java @@ -85,9 +85,8 @@ public T post( } Request request = requestBuilder.build(); - HttpRetryPolicy retryPolicy = retryPolicyFactory.create(); try (okhttp3.Response response = - OkHttpUtils.sendWithRetries(httpClient, retryPolicy, request)) { + OkHttpUtils.sendWithRetries(httpClient, retryPolicyFactory, request)) { if (response.isSuccessful()) { log.debug("Request to {} returned successful response: {}", uri, response.code()); InputStream responseBodyStream = response.body().byteStream(); diff --git a/dd-trace-core/src/main/java/datadog/trace/common/writer/ddintake/DDEvpProxyApi.java b/dd-trace-core/src/main/java/datadog/trace/common/writer/ddintake/DDEvpProxyApi.java index 93346b879a1..8926d17e200 100644 --- a/dd-trace-core/src/main/java/datadog/trace/common/writer/ddintake/DDEvpProxyApi.java +++ b/dd-trace-core/src/main/java/datadog/trace/common/writer/ddintake/DDEvpProxyApi.java @@ -92,7 +92,8 @@ public DDEvpProxyApi build() { ? httpClient : OkHttpUtils.buildHttpClient(proxiedApiUrl, timeoutMillis); - final HttpRetryPolicy.Factory retryPolicyFactory = new HttpRetryPolicy.Factory(5, 100, 2.0); + final HttpRetryPolicy.Factory retryPolicyFactory = + new HttpRetryPolicy.Factory(5, 100, 2.0, true); log.debug("proxiedApiUrl: {}", proxiedApiUrl); return new DDEvpProxyApi( @@ -141,9 +142,8 @@ public Response sendSerializedTraces(Payload payload) { totalTraces += payload.traceCount(); receivedTraces += payload.traceCount(); - HttpRetryPolicy retryPolicy = retryPolicyFactory.create(); try (okhttp3.Response response = - OkHttpUtils.sendWithRetries(httpClient, retryPolicy, request)) { + OkHttpUtils.sendWithRetries(httpClient, retryPolicyFactory, request)) { if (response.isSuccessful()) { countAndLogSuccessfulSend(payload.traceCount(), sizeInBytes); return Response.success(response.code()); diff --git a/dd-trace-core/src/main/java/datadog/trace/common/writer/ddintake/DDIntakeApi.java b/dd-trace-core/src/main/java/datadog/trace/common/writer/ddintake/DDIntakeApi.java index 2f7b2f44268..165d1a951ab 100644 --- a/dd-trace-core/src/main/java/datadog/trace/common/writer/ddintake/DDIntakeApi.java +++ b/dd-trace-core/src/main/java/datadog/trace/common/writer/ddintake/DDIntakeApi.java @@ -43,7 +43,7 @@ public static class DDIntakeApiBuilder { HttpUrl hostUrl = null; OkHttpClient httpClient = null; - HttpRetryPolicy.Factory retryPolicyFactory = new HttpRetryPolicy.Factory(5, 100, 2.0); + HttpRetryPolicy.Factory retryPolicyFactory = new HttpRetryPolicy.Factory(5, 100, 2.0, true); private String apiKey; @@ -134,9 +134,8 @@ public Response sendSerializedTraces(Payload payload) { totalTraces += payload.traceCount(); receivedTraces += payload.traceCount(); - HttpRetryPolicy retryPolicy = retryPolicyFactory.create(); try (okhttp3.Response response = - OkHttpUtils.sendWithRetries(httpClient, retryPolicy, request)) { + OkHttpUtils.sendWithRetries(httpClient, retryPolicyFactory, request)) { if (response.isSuccessful()) { countAndLogSuccessfulSend(payload.traceCount(), sizeInBytes); return Response.success(response.code());