Skip to content

Commit

Permalink
Add explicit synchronization section
Browse files Browse the repository at this point in the history
  • Loading branch information
sopel39 committed Sep 2, 2022
1 parent 31e7bd6 commit 0ae33b5
Showing 1 changed file with 58 additions and 56 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -581,71 +581,73 @@ private void triggerUpdate()

private synchronized void sendUpdate()
{
TaskStatus taskStatus = getTaskStatus();
// don't update if the task hasn't been started yet or if it is already finished
if (!started.get() || taskStatus.getState().isDone()) {
return;
}

int currentPendingRequestsCounter = pendingRequestsCounter.get();
if (currentPendingRequestsCounter == 0) {
return;
}
synchronized (this) {
TaskStatus taskStatus = getTaskStatus();
// don't update if the task hasn't been started yet or if it is already finished
if (!started.get() || taskStatus.getState().isDone()) {
return;
}

// if there is a request already running, wait for it to complete
// currentRequest is always cleared when request is complete
if (currentRequest != null) {
return;
}
int currentPendingRequestsCounter = pendingRequestsCounter.get();
if (currentPendingRequestsCounter == 0) {
return;
}

// if throttled due to error, asynchronously wait for timeout and try again
ListenableFuture<Void> errorRateLimit = updateErrorTracker.acquireRequestPermit();
if (!errorRateLimit.isDone()) {
errorRateLimit.addListener(this::sendUpdate, executor);
return;
}
// if there is a request already running, wait for it to complete
// currentRequest is always cleared when request is complete
if (currentRequest != null) {
return;
}

List<SplitAssignment> splitAssignments = getSplitAssignments();
VersionedDynamicFilterDomains dynamicFilterDomains = outboundDynamicFiltersCollector.acknowledgeAndGetNewDomains(sentDynamicFiltersVersion);
// if throttled due to error, asynchronously wait for timeout and try again
ListenableFuture<Void> errorRateLimit = updateErrorTracker.acquireRequestPermit();
if (!errorRateLimit.isDone()) {
errorRateLimit.addListener(this::sendUpdate, executor);
return;
}

// Workers don't need the embedded JSON representation when the fragment is sent
Optional<PlanFragment> fragment = sendPlan.get() ? Optional.of(planFragment.withoutEmbeddedJsonRepresentation()) : Optional.empty();
TaskUpdateRequest updateRequest = new TaskUpdateRequest(
session.toSessionRepresentation(),
session.getIdentity().getExtraCredentials(),
fragment,
splitAssignments,
outputBuffers.get(),
dynamicFilterDomains.getDynamicFilterDomains());
byte[] taskUpdateRequestJson = taskUpdateRequestCodec.toJsonBytes(updateRequest);
if (fragment.isPresent()) {
stats.updateWithPlanBytes(taskUpdateRequestJson.length);
}
if (!dynamicFilterDomains.getDynamicFilterDomains().isEmpty()) {
stats.updateWithDynamicFilterBytes(taskUpdateRequestJson.length);
}
List<SplitAssignment> splitAssignments = getSplitAssignments();
VersionedDynamicFilterDomains dynamicFilterDomains = outboundDynamicFiltersCollector.acknowledgeAndGetNewDomains(sentDynamicFiltersVersion);

// Workers don't need the embedded JSON representation when the fragment is sent
Optional<PlanFragment> fragment = sendPlan.get() ? Optional.of(planFragment.withoutEmbeddedJsonRepresentation()) : Optional.empty();
TaskUpdateRequest updateRequest = new TaskUpdateRequest(
session.toSessionRepresentation(),
session.getIdentity().getExtraCredentials(),
fragment,
splitAssignments,
outputBuffers.get(),
dynamicFilterDomains.getDynamicFilterDomains());
byte[] taskUpdateRequestJson = taskUpdateRequestCodec.toJsonBytes(updateRequest);
if (fragment.isPresent()) {
stats.updateWithPlanBytes(taskUpdateRequestJson.length);
}
if (!dynamicFilterDomains.getDynamicFilterDomains().isEmpty()) {
stats.updateWithDynamicFilterBytes(taskUpdateRequestJson.length);
}

HttpUriBuilder uriBuilder = getHttpUriBuilder(taskStatus);
Request request = preparePost()
.setUri(uriBuilder.build())
.setHeader(HttpHeaders.CONTENT_TYPE, MediaType.JSON_UTF_8.toString())
.setBodyGenerator(createStaticBodyGenerator(taskUpdateRequestJson))
.build();
HttpUriBuilder uriBuilder = getHttpUriBuilder(taskStatus);
Request request = preparePost()
.setUri(uriBuilder.build())
.setHeader(HttpHeaders.CONTENT_TYPE, MediaType.JSON_UTF_8.toString())
.setBodyGenerator(createStaticBodyGenerator(taskUpdateRequestJson))
.build();

updateErrorTracker.startRequest();
updateErrorTracker.startRequest();

ListenableFuture<JsonResponse<TaskInfo>> future = httpClient.executeAsync(request, createFullJsonResponseHandler(taskInfoCodec));
currentRequest = future;
currentRequestStartNanos = System.nanoTime();
ListenableFuture<JsonResponse<TaskInfo>> future = httpClient.executeAsync(request, createFullJsonResponseHandler(taskInfoCodec));
currentRequest = future;
currentRequestStartNanos = System.nanoTime();

// if pendingRequestsCounter is still non-zero (e.g. because triggerUpdate was called in the meantime)
// then the request Future callback will send a new update via sendUpdate method call
pendingRequestsCounter.addAndGet(-currentPendingRequestsCounter);
// if pendingRequestsCounter is still non-zero (e.g. because triggerUpdate was called in the meantime)
// then the request Future callback will send a new update via sendUpdate method call
pendingRequestsCounter.addAndGet(-currentPendingRequestsCounter);

Futures.addCallback(
future,
new SimpleHttpResponseHandler<>(new UpdateResponseHandler(splitAssignments, dynamicFilterDomains.getVersion()), request.getUri(), stats),
executor);
Futures.addCallback(
future,
new SimpleHttpResponseHandler<>(new UpdateResponseHandler(splitAssignments, dynamicFilterDomains.getVersion()), request.getUri(), stats),
executor);
}
}

private synchronized List<SplitAssignment> getSplitAssignments()
Expand Down

0 comments on commit 0ae33b5

Please sign in to comment.