Skip to content

Commit

Permalink
Reduce lock contention in HttpRemoteTask
Browse files Browse the repository at this point in the history
Use separate locks for sending a new request
(which is potentially expensive) and modifying
HttpRemoteTask state.
  • Loading branch information
sopel39 committed Sep 2, 2022
1 parent 875cf14 commit 12b404e
Showing 1 changed file with 21 additions and 17 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -129,9 +129,9 @@ public final class HttpRemoteTask
// The version of dynamic filters that has been successfully sent to the worker
private final AtomicLong sentDynamicFiltersVersion = new AtomicLong(INITIAL_DYNAMIC_FILTERS_VERSION);

@GuardedBy("this")
@GuardedBy("httpClient")
private Future<?> currentRequest;
@GuardedBy("this")
@GuardedBy("httpClient")
private long currentRequestStartNanos;

@GuardedBy("this")
Expand Down Expand Up @@ -584,9 +584,9 @@ private void triggerUpdate()
}
}

private synchronized void sendUpdate()
private void sendUpdate()
{
synchronized (this) {
synchronized (httpClient) {
TaskStatus taskStatus = getTaskStatus();
// don't update if the task hasn't been started yet or if it is already finished
if (!started.get() || taskStatus.getState().isDone()) {
Expand Down Expand Up @@ -692,26 +692,30 @@ public synchronized void cancel()
}
}

private synchronized void cleanUpTask()
private void cleanUpTask()
{
checkState(getTaskStatus().getState().isDone(), "attempt to clean up a task that is not done yet");

// clear pending splits to free memory
pendingSplits.clear();
pendingSourceSplitCount = 0;
pendingSourceSplitsWeight = 0;
partitionedSplitCountTracker.setPartitionedSplits(PartitionedSplitsInfo.forZeroSplits());
splitQueueHasSpace = true;
whenSplitQueueHasSpace.complete(null, executor);
synchronized (this) {
pendingSplits.clear();
pendingSourceSplitCount = 0;
pendingSourceSplitsWeight = 0;
partitionedSplitCountTracker.setPartitionedSplits(PartitionedSplitsInfo.forZeroSplits());
splitQueueHasSpace = true;
whenSplitQueueHasSpace.complete(null, executor);
}

// clear pending outbound dynamic filters to free memory
outboundDynamicFiltersCollector.acknowledge(Long.MAX_VALUE);

// cancel pending request
if (currentRequest != null) {
currentRequest.cancel(true);
currentRequest = null;
currentRequestStartNanos = 0;
synchronized (httpClient) {
if (currentRequest != null) {
currentRequest.cancel(true);
currentRequest = null;
currentRequestStartNanos = 0;
}
}

taskStatusFetcher.stop();
Expand Down Expand Up @@ -928,7 +932,7 @@ public void success(TaskInfo value)
outboundDynamicFiltersCollector.acknowledge(currentRequestDynamicFiltersVersion);
sendPlan.set(value.isNeedsPlan());
long currentRequestStartNanos;
synchronized (HttpRemoteTask.this) {
synchronized (httpClient) {
currentRequest = null;
currentRequestStartNanos = HttpRemoteTask.this.currentRequestStartNanos;
}
Expand All @@ -948,7 +952,7 @@ public void failed(Throwable cause)
try (SetThreadName ignored = new SetThreadName("UpdateResponseHandler-%s", taskId)) {
try {
long currentRequestStartNanos;
synchronized (HttpRemoteTask.this) {
synchronized (httpClient) {
currentRequest = null;
currentRequestStartNanos = HttpRemoteTask.this.currentRequestStartNanos;
}
Expand Down

0 comments on commit 12b404e

Please sign in to comment.