Skip to content

Commit

Permalink
Add allRequiredSinksFinished method to Exchange interface
Browse files Browse the repository at this point in the history
The engine may decide that not all the sinks are necessary. It should be
up to the engine to provide a signal to the exchange when all useful
data has already been written.
  • Loading branch information
arhimondr committed Oct 7, 2022
1 parent 12b27de commit 5f7f648
Show file tree
Hide file tree
Showing 7 changed files with 50 additions and 15 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -163,6 +163,8 @@ public class FaultTolerantStageScheduler
@GuardedBy("this")
private final Set<Integer> allPartitions = new HashSet<>();
@GuardedBy("this")
private boolean noMorePartitions;
@GuardedBy("this")
private final Queue<Integer> queuedPartitions = new ArrayDeque<>();
@GuardedBy("this")
private final Queue<PendingPartition> pendingPartitions = new ArrayDeque<>();
Expand Down Expand Up @@ -312,6 +314,10 @@ public synchronized void schedule()
if (taskSource.isFinished()) {
dynamicFilterService.stageCannotScheduleMoreTasks(stage.getStageId(), 0, allPartitions.size());
sinkExchange.noMoreSinks();
noMorePartitions = true;
}
if (noMorePartitions && finishedPartitions.keySet().containsAll(allPartitions)) {
sinkExchange.allRequiredSinksFinished();
}
return null;
}
Expand Down Expand Up @@ -617,6 +623,9 @@ private void updateTaskStatus(TaskStatus taskStatus, ExchangeSinkHandle exchange
case FINISHED:
finishedPartitions.put(partitionId, taskId.getAttemptId());
sinkExchange.sinkFinished(exchangeSinkHandle, taskId.getAttemptId());
if (noMorePartitions && finishedPartitions.keySet().containsAll(allPartitions)) {
sinkExchange.allRequiredSinksFinished();
}
partitionToRemoteTaskMap.get(partitionId).forEach(RemoteTask::abort);
partitionMemoryEstimator.registerPartitionFinished(session, memoryLimits, taskStatus.getPeakMemoryReservation(), true, Optional.empty());

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -666,6 +666,7 @@ public synchronized OutputSource createOutputSource(Set<TaskId> selectedTasks)
ListenableFuture<ExchangeSource> exchangeSourceFuture = FluentFuture.from(toListenableFuture(exchangeSink.finish()))
.transformAsync(ignored -> {
exchange.sinkFinished(sinkHandle, 0);
exchange.allRequiredSinksFinished();
synchronized (this) {
exchangeSink = null;
sinkHandle = null;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -278,6 +278,7 @@ public void testHappyPath()
tasks = remoteTaskFactory.getTasks();
assertThat(tasks).hasSize(6);
assertThat(tasks).containsKey(getTaskId(4, 0));
assertTrue(sinkExchange.isNoMoreSinks());

// not finished yet, will be finished when all tasks succeed
assertFalse(scheduler.isFinished());
Expand All @@ -304,6 +305,8 @@ public void testHappyPath()
new TestingExchangeSinkHandle(3),
new TestingExchangeSinkHandle(4));

assertTrue(sinkExchange.isAllRequiredSinksFinished());

assertTrue(scheduler.isFinished());
}
}
Expand Down Expand Up @@ -864,14 +867,15 @@ public void close() {}
sourceExchange1.setSourceHandles(ImmutableList.of());
TestingExchange sourceExchange2 = new TestingExchange();
sourceExchange2.setSourceHandles(ImmutableList.of());
TestingExchange sinkExchange = new TestingExchange();
FaultTolerantStageScheduler scheduler = createFaultTolerantTaskScheduler(
remoteTaskFactory,
(session, fragment, exchangeSourceHandles, getSplitTimeRecorder, bucketToPartition) -> {
taskSourceCreated.set(true);
return taskSource;
},
nodeAllocator,
new TestingExchange(),
sinkExchange,
ImmutableMap.of(
SOURCE_FRAGMENT_ID_1, sourceExchange1,
SOURCE_FRAGMENT_ID_2, sourceExchange2),
Expand All @@ -895,6 +899,8 @@ public void close() {}

future.set(ImmutableList.of());
assertTrue(scheduler.isFinished());
assertTrue(sinkExchange.isNoMoreSinks());
assertTrue(sinkExchange.isAllRequiredSinksFinished());
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ public class TestingExchange
private final Set<TestingExchangeSinkHandle> allSinks = newConcurrentHashSet();
private final AtomicBoolean noMoreSinks = new AtomicBoolean();
private final CompletableFuture<List<ExchangeSourceHandle>> sourceHandles = new CompletableFuture<>();
private final AtomicBoolean allRequiredSinksFinished = new AtomicBoolean();

@Override
public ExchangeId getId()
Expand Down Expand Up @@ -87,6 +88,17 @@ public void sinkFinished(ExchangeSinkHandle sinkHandle, int taskAttemptId)
finishedSinks.add((TestingExchangeSinkHandle) sinkHandle);
}

@Override
public void allRequiredSinksFinished()
{
allRequiredSinksFinished.set(true);
}

public boolean isAllRequiredSinksFinished()
{
return allRequiredSinksFinished.get();
}

public Set<TestingExchangeSinkHandle> getFinishedSinkHandles()
{
return ImmutableSet.copyOf(finishedSinks);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,12 @@ public interface Exchange
*/
void sinkFinished(ExchangeSinkHandle sinkHandle, int taskAttemptId);

/**
* Called by the engine when all required sinks finished successfully.
* While some source tasks may still be running and writing to their sinks the data written to these sinks could be safely ignored after this method is invoked.
*/
void allRequiredSinksFinished();

/**
* Returns an {@link ExchangeSourceHandleSource} instance to be used to enumerate {@link ExchangeSourceHandle}s.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -145,7 +145,6 @@ public void noMoreSinks()
synchronized (this) {
noMoreSinks = true;
}
checkInputReady();
}

@Override
Expand Down Expand Up @@ -178,27 +177,27 @@ public void sinkFinished(ExchangeSinkHandle handle, int taskAttemptId)
FileSystemExchangeSinkHandle sinkHandle = (FileSystemExchangeSinkHandle) handle;
finishedSinks.putIfAbsent(sinkHandle.getPartitionId(), taskAttemptId);
}
checkInputReady();
}

private void checkInputReady()
@Override
public void allRequiredSinksFinished()
{
verify(!Thread.holdsLock(this));
ListenableFuture<List<ExchangeSourceHandle>> exchangeSourceHandlesCreationFuture = null;
ListenableFuture<List<ExchangeSourceHandle>> exchangeSourceHandlesCreationFuture;
synchronized (this) {
if (exchangeSourceHandlesCreationStarted) {
return;
}
if (noMoreSinks && finishedSinks.keySet().containsAll(allSinks)) {
// input is ready, create exchange source handles
exchangeSourceHandlesCreationStarted = true;
exchangeSourceHandlesCreationFuture = stats.getCreateExchangeSourceHandles().record(this::createExchangeSourceHandles);
exchangeSourceHandlesFuture.whenComplete((value, failure) -> {
if (exchangeSourceHandlesFuture.isCancelled()) {
exchangeSourceHandlesFuture.cancel(true);
}
});
}
verify(noMoreSinks, "noMoreSinks is expected to be set");
verify(finishedSinks.keySet().containsAll(allSinks), "all sinks are expected to be finished");
// input is ready, create exchange source handles
exchangeSourceHandlesCreationStarted = true;
exchangeSourceHandlesCreationFuture = stats.getCreateExchangeSourceHandles().record(this::createExchangeSourceHandles);
exchangeSourceHandlesFuture.whenComplete((value, failure) -> {
if (exchangeSourceHandlesFuture.isCancelled()) {
exchangeSourceHandlesFuture.cancel(true);
}
});
}
if (exchangeSourceHandlesCreationFuture != null) {
Futures.addCallback(exchangeSourceHandlesCreationFuture, new FutureCallback<>() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -155,6 +155,7 @@ public void testHappyPath()
1, "2-1-0"),
true);
exchange.sinkFinished(sinkHandle2, 2);
exchange.allRequiredSinksFinished();

ExchangeSourceHandleBatch sourceHandleBatch = exchange.getSourceHandles().getNextBatch().get();
assertTrue(sourceHandleBatch.lastBatch());
Expand Down Expand Up @@ -228,6 +229,7 @@ public void testLargePages()
.build(),
true);
exchange.sinkFinished(sinkHandle2, 0);
exchange.allRequiredSinksFinished();

ExchangeSourceHandleBatch sourceHandleBatch = exchange.getSourceHandles().getNextBatch().get();
assertTrue(sourceHandleBatch.lastBatch());
Expand Down

0 comments on commit 5f7f648

Please sign in to comment.