Skip to content

Commit

Permalink
Merge pull request #100 from IABTechLab/mkc-UID2-3072-optout-fixes
Browse files Browse the repository at this point in the history
Optout delta sending fixes
  • Loading branch information
mcollins-ttd authored Apr 9, 2024
2 parents 98feb9d + b033db1 commit f5c0715
Show file tree
Hide file tree
Showing 4 changed files with 50 additions and 40 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
import java.net.URI;
import java.net.URISyntaxException;
import java.net.http.HttpRequest;
import java.time.Duration;
import java.util.Set;
import java.util.regex.Pattern;

Expand Down Expand Up @@ -77,7 +78,7 @@ public Future<Void> send(OptOutEntry entry) {

LOGGER.info("replaying optout " + config.url() + " - advertising_id: " + Utils.maskPii(entry.advertisingId) + ", epoch: " + entry.timestamp);

return builder.build();
return builder.timeout(Duration.ofSeconds(30)).build();
},
resp -> {
if (resp == null) {
Expand Down
16 changes: 14 additions & 2 deletions src/main/java/com/uid2/optout/vertx/OptOutSender.java
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import java.time.temporal.ChronoUnit;
import java.util.*;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
Expand Down Expand Up @@ -62,6 +63,14 @@ public void error(String message, Object... args) {
}
}

// When the partner config changes, Verticles are undeployed and new ones
// are created. These newly created Verticles register Micrometer gauges.
// However, you can't "re-register" a gauge with a new number. Therefore,
// we need to re-use the numbers that the gauges track across different
// Verticle instances.
private static final ConcurrentHashMap<String, AtomicLong> lastEntrySentMap = new ConcurrentHashMap<>();
private static final ConcurrentHashMap<String, AtomicInteger> pendingFilesCountMap = new ConcurrentHashMap<>();

private final OptOutSenderLogger logger;
private final HealthComponent healthComponent;
private final String deltaConsumerDir;
Expand All @@ -72,8 +81,8 @@ public void error(String message, Object... args) {
private final IOptOutPartnerEndpoint remotePartner;
private final String eventCloudSyncDownloaded;
private final Map<Tuple.Tuple2<String, String>, Counter> entryReplayStatusCounters = new HashMap<>();
private final AtomicInteger pendingFilesCount = new AtomicInteger(0);
private final AtomicLong lastEntrySent = new AtomicLong(0);
private final AtomicInteger pendingFilesCount;
private final AtomicLong lastEntrySent;
private LinkedList<String> pendingFiles = new LinkedList<>();
private AtomicBoolean isReplaying = new AtomicBoolean(false);
private CompletableFuture pendingAsyncOp = null;
Expand Down Expand Up @@ -107,6 +116,9 @@ public OptOutSender(JsonObject jsonConfig, IOptOutPartnerEndpoint optOutPartner,
this.timestampFile = Paths.get(jsonConfig.getString(Const.Config.OptOutDataDirProp), "remote_replicate", this.remotePartner.name() + "_timestamp.txt");
this.processedDeltasFile = Paths.get(jsonConfig.getString(Const.Config.OptOutDataDirProp), "remote_replicate", this.remotePartner.name() + "_processed.txt");

this.pendingFilesCount = pendingFilesCountMap.computeIfAbsent(remotePartner.name(), s -> new AtomicInteger(0));
this.lastEntrySent = lastEntrySentMap.computeIfAbsent(remotePartner.name(), s -> new AtomicLong(0));

Gauge.builder("uid2.optout.last_entry_sent", () -> this.lastEntrySent.get())
.description("gauge for last entry send epoch seconds, per each remote partner")
.tag("remote_partner", remotePartner.name())
Expand Down
71 changes: 34 additions & 37 deletions src/main/java/com/uid2/optout/web/RetryingWebClient.java
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
package com.uid2.optout.web;

import com.google.common.base.Stopwatch;
import io.netty.handler.codec.http.HttpMethod;
import io.vertx.core.Future;
import io.vertx.core.Promise;
import io.vertx.core.Vertx;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand All @@ -11,7 +11,9 @@
import java.net.http.HttpClient;
import java.net.http.HttpRequest;
import java.net.http.HttpResponse;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
import java.util.function.Function;
import java.util.function.BiFunction;

Expand All @@ -35,51 +37,46 @@ public RetryingWebClient(Vertx vertx, String uri, HttpMethod method, int retryCo
}

public Future<Void> send(BiFunction<URI, HttpMethod, HttpRequest> requestCreator, Function<HttpResponse, Boolean> responseValidator) {
return this.send(requestCreator, responseValidator, 0);
final UUID requestId = UUID.randomUUID();
return this.send(requestCreator, responseValidator, 0, requestId)
.onFailure(ex -> LOGGER.error("requestId={} Request to {} failed", requestId, uri, ex));
}

public Future<Void> send(BiFunction<URI, HttpMethod, HttpRequest> requestCreator, Function<HttpResponse, Boolean> responseValidator, int currentRetries) {
Promise<Void> promise = Promise.promise();

public Future<Void> send(BiFunction<URI, HttpMethod, HttpRequest> requestCreator, Function<HttpResponse, Boolean> responseValidator, int currentRetries, UUID requestId) {
HttpRequest request = requestCreator.apply(this.uri, this.method);
CompletableFuture<HttpResponse<String>> asyncResponse = this.httpClient.sendAsync(request, HttpResponse.BodyHandlers.ofString());

asyncResponse.thenAccept(response -> {
try {
Boolean responseOK = responseValidator.apply(response);
if (responseOK == null) {
throw new RuntimeException("Response validator returned null");
}
LOGGER.info("requestId={} Sending request to {}, currentRetries={}", requestId, uri, currentRetries);

final Stopwatch sw = Stopwatch.createStarted();

final CompletableFuture<HttpResponse<Void>> asyncResponse = this.httpClient.sendAsync(request, HttpResponse.BodyHandlers.discarding());

return Future.fromCompletionStage(asyncResponse, vertx.getOrCreateContext()).compose(response -> {
sw.stop();

LOGGER.info("requestId={} Request to {} completed in {}ms, currentRetries={}, status={}, version={}", requestId, uri, sw.elapsed(TimeUnit.MILLISECONDS), currentRetries, response.statusCode(), response.version());

Boolean responseOK = responseValidator.apply(response);
if (responseOK == null) {
return Future.failedFuture(new RuntimeException("Response validator returned null"));
}

if (responseOK) {
return Future.succeededFuture();
}

if (responseOK) {
promise.complete();
} else if (currentRetries < this.retryCount) {
LOGGER.error("failed sending to " + uri + ", currentRetries: " + currentRetries + ", backing off before retrying");
if (this.retryBackoffMs > 0) {
vertx.setTimer(this.retryBackoffMs, i -> {
send(requestCreator, responseValidator, currentRetries + 1)
.onComplete(ar2 -> promise.handle(ar2));
});
} else {
send(requestCreator, responseValidator, currentRetries + 1)
.onComplete(ar2 -> promise.handle(ar2));
}
if (currentRetries < this.retryCount) {
LOGGER.error("requestId={} failed sending to {}, currentRetries={}, backing off for {}ms before retrying", requestId, uri, currentRetries, this.retryBackoffMs);
if (this.retryBackoffMs > 0) {
return vertx.timer(this.retryBackoffMs)
.compose(v -> send(requestCreator, responseValidator, currentRetries + 1, requestId));
} else {
LOGGER.error("retry count exceeded for sending to " + this.uri);
throw new TooManyRetriesException(currentRetries);
return send(requestCreator, responseValidator, currentRetries + 1, requestId);
}
}
catch (Throwable ex) {
promise.fail(ex);
}
});

asyncResponse.exceptionally(ex -> {
promise.fail(ex);
return null;
LOGGER.error("requestId={} retry count exceeded for sending to {}", requestId, this.uri);
return Future.failedFuture(new TooManyRetriesException(currentRetries));
});


return promise.future();
}
}

0 comments on commit f5c0715

Please sign in to comment.