Skip to content

Commit

Permalink
Reduce pressure on HttpRemoteTask#executor
Browse files Browse the repository at this point in the history
Do not schedule new update tasks in HttpRemoteTask#executor
if there is an update in progress.
  • Loading branch information
sopel39 committed Aug 31, 2022
1 parent deb7013 commit 1606242
Showing 1 changed file with 17 additions and 10 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,7 @@
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
import java.util.stream.Stream;
Expand Down Expand Up @@ -166,7 +167,7 @@ public final class HttpRemoteTask

private final RequestErrorTracker updateErrorTracker;

private final AtomicBoolean needsUpdate = new AtomicBoolean(true);
private final AtomicInteger pendingRequestsCounter = new AtomicInteger(1);
private final AtomicBoolean sendPlan = new AtomicBoolean(true);

private final PartitionedSplitCountTracker partitionedSplitCountTracker;
Expand Down Expand Up @@ -570,18 +571,24 @@ private void scheduleUpdate()
executor.execute(this::sendUpdate);
}

private synchronized void triggerUpdate()
private void triggerUpdate()
{
// synchronized so that needsUpdate is not cleared in sendUpdate before actual request is sent
needsUpdate.set(true);
scheduleUpdate();
if (pendingRequestsCounter.getAndIncrement() == 0) {
// schedule update if this is the first update requested
scheduleUpdate();
}
}

private synchronized void sendUpdate()
{
TaskStatus taskStatus = getTaskStatus();
// don't update if the task hasn't been started yet or if it is already finished
if (!started.get() || !needsUpdate.get() || taskStatus.getState().isDone()) {
if (!started.get() || taskStatus.getState().isDone()) {
return;
}

int currentPendingRequestsCounter = pendingRequestsCounter.get();
if (currentPendingRequestsCounter == 0) {
return;
}

Expand Down Expand Up @@ -631,9 +638,9 @@ private synchronized void sendUpdate()
currentRequest = future;
currentRequestStartNanos = System.nanoTime();

// The needsUpdate flag needs to be set to false BEFORE adding the Future callback since callback might change the flag value
// and does so without grabbing the instance lock.
needsUpdate.set(false);
// if pendingRequestsCounter is still non-zero (e.g. because triggerUpdate was called in the meantime)
// then the request Future callback will send a new update via sendUpdate method call
pendingRequestsCounter.addAndGet(-currentPendingRequestsCounter);

Futures.addCallback(
future,
Expand Down Expand Up @@ -941,7 +948,7 @@ public void failed(Throwable cause)
updateStats(currentRequestStartNanos);

// on failure assume we need to update again
needsUpdate.set(true);
pendingRequestsCounter.incrementAndGet();

// if task not already done, record error
TaskStatus taskStatus = getTaskStatus();
Expand Down

0 comments on commit 1606242

Please sign in to comment.