Skip to content

Commit

Permalink
Add source fragments lookup to PartitionMemoryEstimatorFactory
Browse files Browse the repository at this point in the history
  • Loading branch information
losipiuk committed Oct 6, 2023
1 parent fc167ae commit 902d8f6
Show file tree
Hide file tree
Showing 4 changed files with 29 additions and 7 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -1198,6 +1198,12 @@ private void createStageExecution(SubPlan subPlan, boolean rootFragment, Map<Sta
if (eager) {
sourceExchanges.values().forEach(sourceExchange -> sourceExchange.setSourceHandlesDeliveryMode(EAGER));
}

Function<PlanFragmentId, PlanFragment> planFragmentLookup = planFragmentId -> {
StageExecution stageExecution = stageExecutions.get(getStageId(planFragmentId));
checkArgument(stageExecution != null, "stage for fragment %s not started yet", planFragmentId);
return stageExecution.getStageInfo().getPlan();
};
StageExecution execution = new StageExecution(
queryStateMachine,
taskDescriptorStorage,
Expand All @@ -1206,7 +1212,7 @@ private void createStageExecution(SubPlan subPlan, boolean rootFragment, Map<Sta
sinkPartitioningScheme,
exchange,
noMemoryFragment,
noMemoryFragment ? new NoMemoryPartitionMemoryEstimator() : memoryEstimatorFactory.createPartitionMemoryEstimator(session, fragment),
noMemoryFragment ? new NoMemoryPartitionMemoryEstimator() : memoryEstimatorFactory.createPartitionMemoryEstimator(session, fragment, planFragmentLookup),
// do not retry coordinator only tasks
coordinatorStage ? 1 : maxTaskExecutionAttempts,
schedulingPriority,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import io.trino.spi.ErrorCode;
import io.trino.spi.memory.MemoryPoolInfo;
import io.trino.sql.planner.PlanFragment;
import io.trino.sql.planner.plan.PlanFragmentId;
import jakarta.annotation.PostConstruct;
import jakarta.annotation.PreDestroy;
import org.assertj.core.util.VisibleForTesting;
Expand All @@ -34,6 +35,7 @@
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Function;
import java.util.function.Supplier;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
Expand Down Expand Up @@ -117,7 +119,10 @@ void refreshNodePoolMemoryInfos()
}

@Override
public PartitionMemoryEstimator createPartitionMemoryEstimator(Session session, PlanFragment planFragment)
public PartitionMemoryEstimator createPartitionMemoryEstimator(
Session session,
PlanFragment planFragment,
Function<PlanFragmentId, PlanFragment> sourceFragmentLookup)
{
DataSize defaultInitialMemoryLimit = planFragment.getPartitioning().equals(COORDINATOR_DISTRIBUTION) ?
getFaultTolerantExecutionDefaultCoordinatorTaskMemory(session) :
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,9 +15,15 @@

import io.trino.Session;
import io.trino.sql.planner.PlanFragment;
import io.trino.sql.planner.plan.PlanFragmentId;

import java.util.function.Function;

@FunctionalInterface
public interface PartitionMemoryEstimatorFactory
{
PartitionMemoryEstimator createPartitionMemoryEstimator(Session session, PlanFragment planFragment);
PartitionMemoryEstimator createPartitionMemoryEstimator(
Session session,
PlanFragment planFragment,
Function<PlanFragmentId, PlanFragment> sourceFragmentLookup);
}
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@

import java.net.URI;
import java.util.Optional;
import java.util.function.Function;

import static io.airlift.units.DataSize.Unit.GIGABYTE;
import static io.airlift.units.DataSize.Unit.MEGABYTE;
Expand All @@ -50,6 +51,10 @@

public class TestExponentialGrowthPartitionMemoryEstimator
{
private static final Function<PlanFragmentId, PlanFragment> THROWING_PLAN_FRAGMENT_LOOKUP = planFragmentId -> {
throw new RuntimeException("should not be used");
};

@Test
public void testDefaultInitialEstimation()
{
Expand All @@ -63,10 +68,10 @@ public void testDefaultInitialEstimation()
.setSystemProperty(FAULT_TOLERANT_EXECUTION_TASK_MEMORY, "113MB")
.build();

assertThat(estimatorFactory.createPartitionMemoryEstimator(session, getPlanFragment(COORDINATOR_DISTRIBUTION)).getInitialMemoryRequirements())
assertThat(estimatorFactory.createPartitionMemoryEstimator(session, getPlanFragment(COORDINATOR_DISTRIBUTION), THROWING_PLAN_FRAGMENT_LOOKUP).getInitialMemoryRequirements())
.isEqualTo(new MemoryRequirements(DataSize.of(107, MEGABYTE)));

assertThat(estimatorFactory.createPartitionMemoryEstimator(session, getPlanFragment(SINGLE_DISTRIBUTION)).getInitialMemoryRequirements())
assertThat(estimatorFactory.createPartitionMemoryEstimator(session, getPlanFragment(SINGLE_DISTRIBUTION), THROWING_PLAN_FRAGMENT_LOOKUP).getInitialMemoryRequirements())
.isEqualTo(new MemoryRequirements(DataSize.of(113, MEGABYTE)));
}

Expand All @@ -82,7 +87,7 @@ public void testEstimator()
.setSystemProperty(FAULT_TOLERANT_EXECUTION_TASK_MEMORY, "107MB")
.build();

PartitionMemoryEstimator estimator = estimatorFactory.createPartitionMemoryEstimator(session, getPlanFragment(SINGLE_DISTRIBUTION));
PartitionMemoryEstimator estimator = estimatorFactory.createPartitionMemoryEstimator(session, getPlanFragment(SINGLE_DISTRIBUTION), THROWING_PLAN_FRAGMENT_LOOKUP);

assertThat(estimator.getInitialMemoryRequirements())
.isEqualTo(new MemoryRequirements(DataSize.of(107, MEGABYTE)));
Expand Down Expand Up @@ -207,7 +212,7 @@ private static void testInitialEstimationWithFinishedPartitions(
.setSystemProperty(FAULT_TOLERANT_EXECUTION_TASK_MEMORY, defaultInitialTaskMemory.toString())
.build();

PartitionMemoryEstimator estimator = estimatorFactory.createPartitionMemoryEstimator(session, getPlanFragment(SINGLE_DISTRIBUTION));
PartitionMemoryEstimator estimator = estimatorFactory.createPartitionMemoryEstimator(session, getPlanFragment(SINGLE_DISTRIBUTION), THROWING_PLAN_FRAGMENT_LOOKUP);

for (int i = 0; i < recordedPartitionsCount; i++) {
estimator.registerPartitionFinished(new MemoryRequirements(recordedMemoryUsage), recordedMemoryUsage, true, Optional.empty());
Expand Down

0 comments on commit 902d8f6

Please sign in to comment.