From 20f2d448b568d238d00688d27292d50f290af24e Mon Sep 17 00:00:00 2001 From: Zebing Lin Date: Tue, 14 Mar 2023 15:32:51 -0400 Subject: [PATCH] Close its source exchanges once a stage finishes execution --- ...ventDrivenFaultTolerantQueryScheduler.java | 35 +++++++++++++++++-- 1 file changed, 33 insertions(+), 2 deletions(-) diff --git a/core/trino-main/src/main/java/io/trino/execution/scheduler/EventDrivenFaultTolerantQueryScheduler.java b/core/trino-main/src/main/java/io/trino/execution/scheduler/EventDrivenFaultTolerantQueryScheduler.java index 7b3a15ee5079..ce9fff59aec2 100644 --- a/core/trino-main/src/main/java/io/trino/execution/scheduler/EventDrivenFaultTolerantQueryScheduler.java +++ b/core/trino-main/src/main/java/io/trino/execution/scheduler/EventDrivenFaultTolerantQueryScheduler.java @@ -722,9 +722,14 @@ private void updateStageExecutions() PlanFragmentId fragmentId = subPlan.getFragment().getId(); StageId stageId = getStageId(fragmentId); currentPlanStages.add(stageId); - if (isReadyForExecution(subPlan) && !stageExecutions.containsKey(stageId)) { + StageExecution stageExecution = stageExecutions.get(stageId); + if (isReadyForExecution(subPlan) && stageExecution != null) { createStageExecution(subPlan, fragmentId.equals(rootFragmentId), nextSchedulingPriority++); } + if (stageExecution != null && stageExecution.getState().equals(StageState.FINISHED) && !stageExecution.isExchangeClosed()) { + // we are ready to close its source exchanges + closeSourceExchanges(subPlan); + } } stageExecutions.forEach((stageId, stageExecution) -> { if (!currentPlanStages.contains(stageId)) { @@ -749,6 +754,16 @@ private boolean isReadyForExecution(SubPlan subPlan) return true; } + private void closeSourceExchanges(SubPlan subPlan) + { + for (SubPlan child : subPlan.getChildren()) { + StageExecution childExecution = stageExecutions.get(getStageId(child.getFragment().getId())); + if (childExecution != null) { + childExecution.closeExchange(); + } + } + } + private void createStageExecution(SubPlan subPlan, boolean rootFragment, int schedulingPriority) { Closer closer = Closer.create(); @@ -1159,6 +1174,7 @@ private static class StageExecution private final Map sourceOutputSelectors = new HashMap<>(); private boolean taskDescriptorLoadingActive; + private boolean exchangeClosed; private StageExecution( QueryStateMachine queryStateMachine, @@ -1219,6 +1235,11 @@ public Exchange getExchange() return exchange; } + public boolean isExchangeClosed() + { + return exchangeClosed; + } + public Optional addPartition(int partitionId, NodeRequirements nodeRequirements) { if (getState().isDone()) { @@ -1289,6 +1310,16 @@ public void noMorePartitions() } } + public void closeExchange() + { + if (exchangeClosed) { + return; + } + + exchange.close(); + exchangeClosed = true; + } + public Optional getExchangeSinkInstanceHandle(int partitionId) { if (getState().isDone()) { @@ -1638,7 +1669,7 @@ private Closer createStageExecutionCloser() { Closer closer = Closer.create(); closer.register(taskSource); - closer.register(exchange); + closer.register(this::closeExchange); return closer; }