Skip to content

Commit

Permalink
Close its source exchanges once a stage finishes execution
Browse files Browse the repository at this point in the history
  • Loading branch information
linzebing authored and arhimondr committed Mar 15, 2023
1 parent 4d66f67 commit 20f2d44
Showing 1 changed file with 33 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -722,9 +722,14 @@ private void updateStageExecutions()
PlanFragmentId fragmentId = subPlan.getFragment().getId();
StageId stageId = getStageId(fragmentId);
currentPlanStages.add(stageId);
if (isReadyForExecution(subPlan) && !stageExecutions.containsKey(stageId)) {
StageExecution stageExecution = stageExecutions.get(stageId);
if (isReadyForExecution(subPlan) && stageExecution != null) {
createStageExecution(subPlan, fragmentId.equals(rootFragmentId), nextSchedulingPriority++);
}
if (stageExecution != null && stageExecution.getState().equals(StageState.FINISHED) && !stageExecution.isExchangeClosed()) {
// we are ready to close its source exchanges
closeSourceExchanges(subPlan);
}
}
stageExecutions.forEach((stageId, stageExecution) -> {
if (!currentPlanStages.contains(stageId)) {
Expand All @@ -749,6 +754,16 @@ private boolean isReadyForExecution(SubPlan subPlan)
return true;
}

private void closeSourceExchanges(SubPlan subPlan)
{
for (SubPlan child : subPlan.getChildren()) {
StageExecution childExecution = stageExecutions.get(getStageId(child.getFragment().getId()));
if (childExecution != null) {
childExecution.closeExchange();
}
}
}

private void createStageExecution(SubPlan subPlan, boolean rootFragment, int schedulingPriority)
{
Closer closer = Closer.create();
Expand Down Expand Up @@ -1159,6 +1174,7 @@ private static class StageExecution
private final Map<PlanFragmentId, ExchangeSourceOutputSelector> sourceOutputSelectors = new HashMap<>();

private boolean taskDescriptorLoadingActive;
private boolean exchangeClosed;

private StageExecution(
QueryStateMachine queryStateMachine,
Expand Down Expand Up @@ -1219,6 +1235,11 @@ public Exchange getExchange()
return exchange;
}

public boolean isExchangeClosed()
{
return exchangeClosed;
}

public Optional<PrioritizedScheduledTask> addPartition(int partitionId, NodeRequirements nodeRequirements)
{
if (getState().isDone()) {
Expand Down Expand Up @@ -1289,6 +1310,16 @@ public void noMorePartitions()
}
}

public void closeExchange()
{
if (exchangeClosed) {
return;
}

exchange.close();
exchangeClosed = true;
}

public Optional<EventDrivenFaultTolerantQueryScheduler.GetExchangeSinkInstanceHandleResult> getExchangeSinkInstanceHandle(int partitionId)
{
if (getState().isDone()) {
Expand Down Expand Up @@ -1638,7 +1669,7 @@ private Closer createStageExecutionCloser()
{
Closer closer = Closer.create();
closer.register(taskSource);
closer.register(exchange);
closer.register(this::closeExchange);
return closer;
}

Expand Down

0 comments on commit 20f2d44

Please sign in to comment.