Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Do not abort CI Visibility spans dispatch on interrupt #6926

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -1,5 +1,8 @@
package datadog.communication.http;

import java.io.IOException;
import java.io.InterruptedIOException;
import java.net.ConnectException;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.TimeUnit;
import javax.annotation.Nullable;
Expand Down Expand Up @@ -34,7 +37,7 @@
* instance.
*/
@NotThreadSafe
public class HttpRetryPolicy {
public class HttpRetryPolicy implements AutoCloseable {

private static final Logger log = LoggerFactory.getLogger(HttpRetryPolicy.class);

Expand All @@ -47,12 +50,35 @@ public class HttpRetryPolicy {

private int retriesLeft;
private long delay;
private boolean interrupted;
private final double delayFactor;
private final boolean suppressInterrupts;

private HttpRetryPolicy(int retriesLeft, long delay, double delayFactor) {
private HttpRetryPolicy(
int retriesLeft, long delay, double delayFactor, boolean suppressInterrupts) {
this.retriesLeft = retriesLeft;
this.delay = delay;
this.delayFactor = delayFactor;
this.suppressInterrupts = suppressInterrupts;
}

public boolean shouldRetry(Exception e) {
if (e instanceof ConnectException) {
return shouldRetry((okhttp3.Response) null);
}
if (e instanceof InterruptedIOException) {
if (suppressInterrupts) {
return shouldRetry((okhttp3.Response) null);
}
}
if (e instanceof InterruptedException) {
if (suppressInterrupts) {
// remember interrupted status to restore the thread's interrupted flag later
interrupted = true;
return shouldRetry((okhttp3.Response) null);
}
}
return false;
}

public boolean shouldRetry(@Nullable okhttp3.Response response) {
Expand Down Expand Up @@ -106,25 +132,52 @@ private long getRateLimitResetTime(okhttp3.Response response) {
}
}

public long backoff() {
long getBackoffDelay() {
long currentDelay = delay;
delay = (long) (delay * delayFactor);
return currentDelay;
}

public void backoff() throws IOException {
try {
Thread.sleep(getBackoffDelay());
} catch (InterruptedException e) {
if (suppressInterrupts) {
// remember interrupted status to restore the thread's interrupted flag later
interrupted = true;
} else {
Thread.currentThread().interrupt();
throw new InterruptedIOException("thread interrupted");
}
}
}

@Override
public void close() {
if (interrupted) {
Thread.currentThread().interrupt();
}
}

public static class Factory {
private final int maxRetries;
private final long initialDelay;
private final double delayFactor;
private final boolean retryInterrupts;

public Factory(int maxRetries, int initialDelay, double delayFactor) {
this(maxRetries, initialDelay, delayFactor, false);
}

public Factory(int maxRetries, int initialDelay, double delayFactor, boolean retryInterrupts) {
this.maxRetries = maxRetries;
this.initialDelay = initialDelay;
this.delayFactor = delayFactor;
this.retryInterrupts = retryInterrupts;
}

public HttpRetryPolicy create() {
return new HttpRetryPolicy(maxRetries, initialDelay, delayFactor);
return new HttpRetryPolicy(maxRetries, initialDelay, delayFactor, retryInterrupts);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@
import datadog.trace.util.AgentProxySelector;
import java.io.File;
import java.io.IOException;
import java.net.ConnectException;
import java.net.InetSocketAddress;
import java.net.Proxy;
import java.nio.ByteBuffer;
Expand Down Expand Up @@ -357,30 +356,27 @@ public void writeTo(BufferedSink sink) throws IOException {
}

public static Response sendWithRetries(
OkHttpClient httpClient, HttpRetryPolicy retryPolicy, Request request) throws IOException {
while (true) {
try {
okhttp3.Response response = httpClient.newCall(request).execute();
if (response.isSuccessful()) {
return response;
OkHttpClient httpClient, HttpRetryPolicy.Factory retryPolicyFactory, Request request)
throws IOException {
try (HttpRetryPolicy retryPolicy = retryPolicyFactory.create()) {
while (true) {
try {
Response response = httpClient.newCall(request).execute();
if (response.isSuccessful()) {
return response;
}
if (!retryPolicy.shouldRetry(response)) {
return response;
} else {
closeQuietly(response);
}
} catch (Exception ex) {
if (!retryPolicy.shouldRetry(ex)) {
throw ex;
}
}
if (!retryPolicy.shouldRetry(response)) {
return response;
} else {
closeQuietly(response);
}
} catch (ConnectException ex) {
if (!retryPolicy.shouldRetry(null)) {
throw ex;
}
}
// If we get here, there has been an error, and we still have retries left
long backoffMs = retryPolicy.backoff();
try {
Thread.sleep(backoffMs);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new IOException(e);
// If we get here, there has been an error, and we still have retries left
retryPolicy.backoff();
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,10 @@ class HttpRetryPolicyTest extends Specification {

when:
while (retry <= maxRetries) {
def shouldRetry = retryPolicy.shouldRetry()
def shouldRetry = retryPolicy.shouldRetry((Response) null)
shouldRetries << shouldRetry
if (shouldRetry) {
backoffs << retryPolicy.backoff()
backoffs << retryPolicy.getBackoffDelay()
}
retry += 1
}
Expand All @@ -44,10 +44,10 @@ class HttpRetryPolicyTest extends Specification {
def retryPolicy = new HttpRetryPolicy.Factory(5, 100, 2.0).create()

def responseBuilder = new Response.Builder()
.code(responseCode)
.request(GroovyMock(Request))
.protocol(Protocol.HTTP_1_1)
.message("")
.code(responseCode)
.request(GroovyMock(Request))
.protocol(Protocol.HTTP_1_1)
.message("")
if (rateLimitHeader != null) {
responseBuilder.header("x-ratelimit-reset", rateLimitHeader)
}
Expand All @@ -73,4 +73,58 @@ class HttpRetryPolicyTest extends Specification {
500 | null | 5
501 | null | 5
}

def "test exceptions are retried: #exception with suppress interrupts #suppressInterrupts"() {
setup:
def retryPolicy = new HttpRetryPolicy.Factory(5, 100, 2.0, suppressInterrupts).create()

expect:
retryPolicy.shouldRetry(exception) == shouldRetry

where:
exception | suppressInterrupts | shouldRetry
new NullPointerException() | false | false
new IllegalArgumentException() | false | false
new ConnectException() | false | true
new InterruptedIOException() | false | false
new InterruptedIOException() | true | true
new InterruptedException() | false | false
new InterruptedException() | true | true
}

def "test interrupt flag is preserved when suppressing interrupts"() {
setup:
def retryPolicy = new HttpRetryPolicy.Factory(5, 100, 2.0, true).create()

when:
retryPolicy.shouldRetry(new InterruptedException())
retryPolicy.close()

then:
Thread.interrupted()
}

def "test interrupt flag is preserved if interrupted while backing off"() {
setup:
boolean[] b = new boolean[2]

Runnable r = () -> {
def retryPolicy = new HttpRetryPolicy.Factory(5, 1000, 2.0, true).create()
retryPolicy.backoff()

b[0] = Thread.currentThread().isInterrupted()
retryPolicy.close()
b[1] = Thread.interrupted()
}
Thread t = new Thread(r, "test-http-retry-policy-interrupts")

when:
t.start()
t.interrupt()
t.join()

then:
!b[0] // before retry policy is closed, the thread should not be interrupted: interrupts are suppressed
b[1] // after retry policy is closed, the thread should be interrupted: interrupt flag should be restored
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ public BackendApiFactory(Config config, SharedCommunicationObjects sharedCommuni
}

public @Nullable BackendApi createBackendApi() {
HttpRetryPolicy.Factory retryPolicyFactory = new HttpRetryPolicy.Factory(5, 100, 2.0);
HttpRetryPolicy.Factory retryPolicyFactory = new HttpRetryPolicy.Factory(5, 100, 2.0, true);

if (config.isCiVisibilityAgentlessEnabled()) {
HttpUrl agentlessUrl = getAgentlessUrl();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -81,9 +81,8 @@ public <T> T post(

final Request request = requestBuilder.post(requestBody).build();

HttpRetryPolicy retryPolicy = retryPolicyFactory.create();
try (okhttp3.Response response =
OkHttpUtils.sendWithRetries(httpClient, retryPolicy, request)) {
OkHttpUtils.sendWithRetries(httpClient, retryPolicyFactory, request)) {
if (response.isSuccessful()) {
log.debug("Request to {} returned successful response: {}", uri, response.code());

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -85,9 +85,8 @@ public <T> T post(
}

Request request = requestBuilder.build();
HttpRetryPolicy retryPolicy = retryPolicyFactory.create();
try (okhttp3.Response response =
OkHttpUtils.sendWithRetries(httpClient, retryPolicy, request)) {
OkHttpUtils.sendWithRetries(httpClient, retryPolicyFactory, request)) {
if (response.isSuccessful()) {
log.debug("Request to {} returned successful response: {}", uri, response.code());
InputStream responseBodyStream = response.body().byteStream();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,8 @@ public DDEvpProxyApi build() {
? httpClient
: OkHttpUtils.buildHttpClient(proxiedApiUrl, timeoutMillis);

final HttpRetryPolicy.Factory retryPolicyFactory = new HttpRetryPolicy.Factory(5, 100, 2.0);
final HttpRetryPolicy.Factory retryPolicyFactory =
new HttpRetryPolicy.Factory(5, 100, 2.0, true);

log.debug("proxiedApiUrl: {}", proxiedApiUrl);
return new DDEvpProxyApi(
Expand Down Expand Up @@ -141,9 +142,8 @@ public Response sendSerializedTraces(Payload payload) {
totalTraces += payload.traceCount();
receivedTraces += payload.traceCount();

HttpRetryPolicy retryPolicy = retryPolicyFactory.create();
try (okhttp3.Response response =
OkHttpUtils.sendWithRetries(httpClient, retryPolicy, request)) {
OkHttpUtils.sendWithRetries(httpClient, retryPolicyFactory, request)) {
if (response.isSuccessful()) {
countAndLogSuccessfulSend(payload.traceCount(), sizeInBytes);
return Response.success(response.code());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ public static class DDIntakeApiBuilder {

HttpUrl hostUrl = null;
OkHttpClient httpClient = null;
HttpRetryPolicy.Factory retryPolicyFactory = new HttpRetryPolicy.Factory(5, 100, 2.0);
HttpRetryPolicy.Factory retryPolicyFactory = new HttpRetryPolicy.Factory(5, 100, 2.0, true);

private String apiKey;

Expand Down Expand Up @@ -134,9 +134,8 @@ public Response sendSerializedTraces(Payload payload) {
totalTraces += payload.traceCount();
receivedTraces += payload.traceCount();

HttpRetryPolicy retryPolicy = retryPolicyFactory.create();
try (okhttp3.Response response =
OkHttpUtils.sendWithRetries(httpClient, retryPolicy, request)) {
OkHttpUtils.sendWithRetries(httpClient, retryPolicyFactory, request)) {
if (response.isSuccessful()) {
countAndLogSuccessfulSend(payload.traceCount(), sizeInBytes);
return Response.success(response.code());
Expand Down
Loading