diff --git a/core/trino-main/src/main/java/io/trino/execution/scheduler/EventDrivenFaultTolerantQueryScheduler.java b/core/trino-main/src/main/java/io/trino/execution/scheduler/EventDrivenFaultTolerantQueryScheduler.java index 4a69b8f26e54..abd1774c5da3 100644 --- a/core/trino-main/src/main/java/io/trino/execution/scheduler/EventDrivenFaultTolerantQueryScheduler.java +++ b/core/trino-main/src/main/java/io/trino/execution/scheduler/EventDrivenFaultTolerantQueryScheduler.java @@ -37,9 +37,6 @@ import io.airlift.units.Duration; import io.opentelemetry.api.trace.Tracer; import io.trino.Session; -import io.trino.connector.informationschema.InformationSchemaTableHandle; -import io.trino.connector.system.GlobalSystemConnector; -import io.trino.connector.system.SystemTableHandle; import io.trino.exchange.SpoolingExchangeInput; import io.trino.execution.BasicStageStats; import io.trino.execution.ExecutionFailureInfo; @@ -94,9 +91,7 @@ import io.trino.sql.planner.plan.PlanFragmentId; import io.trino.sql.planner.plan.PlanNode; import io.trino.sql.planner.plan.PlanNodeId; -import io.trino.sql.planner.plan.RefreshMaterializedViewNode; import io.trino.sql.planner.plan.RemoteSourceNode; -import io.trino.sql.planner.plan.TableScanNode; import it.unimi.dsi.fastutil.ints.Int2ObjectMap; import it.unimi.dsi.fastutil.ints.Int2ObjectOpenHashMap; import it.unimi.dsi.fastutil.ints.IntOpenHashSet; @@ -1194,7 +1189,6 @@ private void createStageExecution(SubPlan subPlan, boolean rootFragment, Map sourceExchange.setSourceHandlesDeliveryMode(EAGER)); } @@ -1211,8 +1205,7 @@ private void createStageExecution(SubPlan subPlan, boolean rootFragment, Map planNode instanceof RefreshMaterializedViewNode)) { - // REFRESH MATERIALIZED VIEW will issue other SQL commands under the hood. If its task memory is - // non-zero, then a deadlock scenario is possible if we only have a single node in the cluster. - return true; - } - - // If source fragments are not tagged as "no-memory" assume that they may produce significant amount of data. - // We stay on the safe side an assume that we should use standard memory estimation for this fragment - if (!fragment.getRemoteSourceNodes().stream().flatMap(node -> node.getSourceFragmentIds().stream()) - .allMatch(sourceFragmentId -> stageExecutions.get(getStageId(sourceFragmentId)).isNoMemoryFragment())) { - return false; - } - - // If fragment source is not reading any external tables or only accesses information_schema assume it does not need significant amount of memory. - // Allow scheduling even if whole server memory is pre allocated. - List tableScanNodes = PlanNodeSearcher.searchFrom(fragment.getRoot()).whereIsInstanceOfAny(TableScanNode.class).findAll(); - return tableScanNodes.stream().allMatch(node -> isMetadataTableScan((TableScanNode) node)); - } - - private static boolean isMetadataTableScan(TableScanNode tableScanNode) - { - return (tableScanNode.getTable().getConnectorHandle() instanceof InformationSchemaTableHandle) || - (tableScanNode.getTable().getCatalogHandle().getCatalogName().equals(GlobalSystemConnector.NAME) && - (tableScanNode.getTable().getConnectorHandle() instanceof SystemTableHandle systemHandle) && - systemHandle.getSchemaName().equals("jdbc")); - } - private StageId getStageId(PlanFragmentId fragmentId) { return StageId.create(queryStateMachine.getQueryId(), fragmentId); @@ -1634,7 +1597,6 @@ private static class StageExecution private final EventDrivenTaskSource taskSource; private final FaultTolerantPartitioningScheme sinkPartitioningScheme; private final Exchange exchange; - private final boolean noMemoryFragment; private final PartitionMemoryEstimator partitionMemoryEstimator; private final int maxTaskExecutionAttempts; private final int schedulingPriority; @@ -1674,7 +1636,6 @@ private StageExecution( EventDrivenTaskSource taskSource, FaultTolerantPartitioningScheme sinkPartitioningScheme, Exchange exchange, - boolean noMemoryFragment, PartitionMemoryEstimator partitionMemoryEstimator, int maxTaskExecutionAttempts, int schedulingPriority, @@ -1693,7 +1654,6 @@ private StageExecution( this.taskSource = requireNonNull(taskSource, "taskSource is null"); this.sinkPartitioningScheme = requireNonNull(sinkPartitioningScheme, "sinkPartitioningScheme is null"); this.exchange = requireNonNull(exchange, "exchange is null"); - this.noMemoryFragment = noMemoryFragment; this.partitionMemoryEstimator = requireNonNull(partitionMemoryEstimator, "partitionMemoryEstimator is null"); this.maxTaskExecutionAttempts = maxTaskExecutionAttempts; this.schedulingPriority = schedulingPriority; @@ -1771,11 +1731,6 @@ public boolean isExchangeClosed() return exchangeClosed; } - public boolean isNoMemoryFragment() - { - return noMemoryFragment; - } - public void addPartition(int partitionId, NodeRequirements nodeRequirements) { if (getState().isDone()) { diff --git a/core/trino-main/src/main/java/io/trino/execution/scheduler/NoMemoryAwarePartitionMemoryEstimator.java b/core/trino-main/src/main/java/io/trino/execution/scheduler/NoMemoryAwarePartitionMemoryEstimator.java new file mode 100644 index 000000000000..0f0e328b03d9 --- /dev/null +++ b/core/trino-main/src/main/java/io/trino/execution/scheduler/NoMemoryAwarePartitionMemoryEstimator.java @@ -0,0 +1,103 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.trino.execution.scheduler; + +import com.google.inject.BindingAnnotation; +import com.google.inject.Inject; +import io.trino.Session; +import io.trino.connector.informationschema.InformationSchemaTableHandle; +import io.trino.connector.system.GlobalSystemConnector; +import io.trino.connector.system.SystemTableHandle; +import io.trino.sql.planner.PlanFragment; +import io.trino.sql.planner.optimizations.PlanNodeSearcher; +import io.trino.sql.planner.plan.PlanFragmentId; +import io.trino.sql.planner.plan.PlanNode; +import io.trino.sql.planner.plan.RefreshMaterializedViewNode; +import io.trino.sql.planner.plan.TableScanNode; + +import java.lang.annotation.Retention; +import java.lang.annotation.Target; +import java.util.List; +import java.util.function.Function; + +import static java.lang.annotation.ElementType.FIELD; +import static java.lang.annotation.ElementType.METHOD; +import static java.lang.annotation.ElementType.PARAMETER; +import static java.lang.annotation.RetentionPolicy.RUNTIME; +import static java.util.Objects.requireNonNull; + +public class NoMemoryAwarePartitionMemoryEstimator +{ + @Retention(RUNTIME) + @Target({FIELD, PARAMETER, METHOD}) + @BindingAnnotation + public @interface ForNoMemoryAwarePartitionMemoryEstimator {} + + public static class Factory + implements PartitionMemoryEstimatorFactory + { + private final PartitionMemoryEstimatorFactory delegateFactory; + + @Inject + public Factory(@ForNoMemoryAwarePartitionMemoryEstimator PartitionMemoryEstimatorFactory delegateFactory) + { + this.delegateFactory = requireNonNull(delegateFactory, "delegateFactory is null"); + } + + @Override + public PartitionMemoryEstimator createPartitionMemoryEstimator( + Session session, + PlanFragment planFragment, + Function sourceFragmentLookup) + { + if (isNoMemoryFragment(planFragment, sourceFragmentLookup)) { + return NoMemoryPartitionMemoryEstimator.INSTANCE; + } + return delegateFactory.createPartitionMemoryEstimator(session, planFragment, sourceFragmentLookup); + } + + private boolean isNoMemoryFragment(PlanFragment fragment, Function childFragmentLookup) + { + if (fragment.getRoot().getSources().stream() + .anyMatch(planNode -> planNode instanceof RefreshMaterializedViewNode)) { + // REFRESH MATERIALIZED VIEW will issue other SQL commands under the hood. If its task memory is + // non-zero, then a deadlock scenario is possible if we only have a single node in the cluster. + return true; + } + + // If source fragments are not tagged as "no-memory" assume that they may produce significant amount of data. + // We stay on the safe side an assume that we should use standard memory estimation for this fragment + if (!fragment.getRemoteSourceNodes().stream().flatMap(node -> node.getSourceFragmentIds().stream()) + // TODO: childFragmentLookup will be executed for subtree of every fragment in query plan. That means fragment will be + // analyzed multiple time. Given fact that logic here is not extremely expensive and plans are not gigantic (up to ~200 fragments) + // we can keep it as a first approach. Ultimately we should profile execution and possibly put in place some mechanisms to avoid repeated work. + .allMatch(sourceFragmentId -> isNoMemoryFragment(childFragmentLookup.apply(sourceFragmentId), childFragmentLookup))) { + return false; + } + + // If fragment source is not reading any external tables or only accesses information_schema assume it does not need significant amount of memory. + // Allow scheduling even if whole server memory is pre allocated. + List tableScanNodes = PlanNodeSearcher.searchFrom(fragment.getRoot()).whereIsInstanceOfAny(TableScanNode.class).findAll(); + return tableScanNodes.stream().allMatch(node -> isMetadataTableScan((TableScanNode) node)); + } + + private static boolean isMetadataTableScan(TableScanNode tableScanNode) + { + return (tableScanNode.getTable().getConnectorHandle() instanceof InformationSchemaTableHandle) || + (tableScanNode.getTable().getCatalogHandle().getCatalogName().equals(GlobalSystemConnector.NAME) && + (tableScanNode.getTable().getConnectorHandle() instanceof SystemTableHandle systemHandle) && + systemHandle.getSchemaName().equals("jdbc")); + } + } +} diff --git a/core/trino-main/src/main/java/io/trino/execution/scheduler/NoMemoryPartitionMemoryEstimator.java b/core/trino-main/src/main/java/io/trino/execution/scheduler/NoMemoryPartitionMemoryEstimator.java index 7ff39b2d8426..70b34057af4a 100644 --- a/core/trino-main/src/main/java/io/trino/execution/scheduler/NoMemoryPartitionMemoryEstimator.java +++ b/core/trino-main/src/main/java/io/trino/execution/scheduler/NoMemoryPartitionMemoryEstimator.java @@ -21,6 +21,10 @@ public class NoMemoryPartitionMemoryEstimator implements PartitionMemoryEstimator { + public static final NoMemoryPartitionMemoryEstimator INSTANCE = new NoMemoryPartitionMemoryEstimator(); + + private NoMemoryPartitionMemoryEstimator() {} + @Override public MemoryRequirements getInitialMemoryRequirements() { diff --git a/core/trino-main/src/main/java/io/trino/server/CoordinatorModule.java b/core/trino-main/src/main/java/io/trino/server/CoordinatorModule.java index fc9c5848da5e..ad9f98364567 100644 --- a/core/trino-main/src/main/java/io/trino/server/CoordinatorModule.java +++ b/core/trino-main/src/main/java/io/trino/server/CoordinatorModule.java @@ -66,6 +66,8 @@ import io.trino.execution.scheduler.BinPackingNodeAllocatorService; import io.trino.execution.scheduler.EventDrivenTaskSourceFactory; import io.trino.execution.scheduler.ExponentialGrowthPartitionMemoryEstimator; +import io.trino.execution.scheduler.NoMemoryAwarePartitionMemoryEstimator; +import io.trino.execution.scheduler.NoMemoryAwarePartitionMemoryEstimator.ForNoMemoryAwarePartitionMemoryEstimator; import io.trino.execution.scheduler.NodeAllocatorService; import io.trino.execution.scheduler.PartitionMemoryEstimatorFactory; import io.trino.execution.scheduler.SplitSchedulerStats; @@ -221,7 +223,10 @@ protected void setup(Binder binder) // node allocator binder.bind(BinPackingNodeAllocatorService.class).in(Scopes.SINGLETON); binder.bind(NodeAllocatorService.class).to(BinPackingNodeAllocatorService.class); - binder.bind(PartitionMemoryEstimatorFactory.class).to(ExponentialGrowthPartitionMemoryEstimator.Factory.class).in(Scopes.SINGLETON); + binder.bind(PartitionMemoryEstimatorFactory.class).to(NoMemoryAwarePartitionMemoryEstimator.Factory.class).in(Scopes.SINGLETON); + binder.bind(PartitionMemoryEstimatorFactory.class) + .annotatedWith(ForNoMemoryAwarePartitionMemoryEstimator.class) + .to(ExponentialGrowthPartitionMemoryEstimator.Factory.class).in(Scopes.SINGLETON); // node monitor binder.bind(ClusterSizeMonitor.class).in(Scopes.SINGLETON); diff --git a/core/trino-main/src/test/java/io/trino/execution/scheduler/TestNoMemoryAwarePartitionMemoryEstimator.java b/core/trino-main/src/test/java/io/trino/execution/scheduler/TestNoMemoryAwarePartitionMemoryEstimator.java new file mode 100644 index 000000000000..f341bca7e21c --- /dev/null +++ b/core/trino-main/src/test/java/io/trino/execution/scheduler/TestNoMemoryAwarePartitionMemoryEstimator.java @@ -0,0 +1,222 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.trino.execution.scheduler; + +import com.google.common.collect.ImmutableList; +import com.google.common.collect.ImmutableMap; +import com.google.common.collect.ImmutableSet; +import com.google.common.collect.Maps; +import io.airlift.units.DataSize; +import io.trino.Session; +import io.trino.connector.informationschema.InformationSchemaTable; +import io.trino.connector.informationschema.InformationSchemaTableHandle; +import io.trino.cost.StatsAndCosts; +import io.trino.metadata.TableHandle; +import io.trino.operator.RetryPolicy; +import io.trino.plugin.tpch.TpchTableHandle; +import io.trino.spi.ErrorCode; +import io.trino.spi.StandardErrorCode; +import io.trino.spi.connector.ConnectorTableHandle; +import io.trino.spi.predicate.TupleDomain; +import io.trino.sql.planner.Partitioning; +import io.trino.sql.planner.PartitioningScheme; +import io.trino.sql.planner.PlanFragment; +import io.trino.sql.planner.plan.ExchangeNode; +import io.trino.sql.planner.plan.PlanFragmentId; +import io.trino.sql.planner.plan.PlanNodeId; +import io.trino.sql.planner.plan.RemoteSourceNode; +import io.trino.sql.planner.plan.TableScanNode; +import io.trino.testing.TestingSession; +import io.trino.testing.TestingTransactionHandle; +import org.junit.jupiter.api.Test; + +import java.util.Arrays; +import java.util.Optional; +import java.util.OptionalLong; +import java.util.function.Function; +import java.util.stream.Stream; + +import static com.google.common.collect.ImmutableList.toImmutableList; +import static io.airlift.units.DataSize.Unit.BYTE; +import static io.trino.sql.planner.SystemPartitioningHandle.SINGLE_DISTRIBUTION; +import static io.trino.sql.planner.SystemPartitioningHandle.SOURCE_DISTRIBUTION; +import static io.trino.testing.TestingHandles.TEST_CATALOG_HANDLE; +import static io.trino.testing.TestingHandles.TEST_CATALOG_NAME; +import static org.assertj.core.api.Assertions.assertThat; + +public class TestNoMemoryAwarePartitionMemoryEstimator +{ + @Test + public void testInformationSchemaScan() + { + PlanFragment planFragment = tableScanPlanFragment("ts", new InformationSchemaTableHandle(TEST_CATALOG_NAME, InformationSchemaTable.VIEWS, ImmutableSet.of(), OptionalLong.empty())); + + PartitionMemoryEstimator estimator = createEstimator(planFragment); + assertThat(estimator).isInstanceOf(NoMemoryPartitionMemoryEstimator.class); + + // test if NoMemoryPartitionMemoryEstimator returns 0 for initial and retry estimates + PartitionMemoryEstimator.MemoryRequirements noMemoryRequirements = new PartitionMemoryEstimator.MemoryRequirements(DataSize.ofBytes(0)); + assertThat(estimator.getInitialMemoryRequirements()).isEqualTo(noMemoryRequirements); + assertThat(estimator.getNextRetryMemoryRequirements( + new PartitionMemoryEstimator.MemoryRequirements(DataSize.ofBytes(1)), + DataSize.of(5, BYTE), + StandardErrorCode.NOT_SUPPORTED.toErrorCode())) + .isEqualTo(noMemoryRequirements); + assertThat(estimator.getNextRetryMemoryRequirements( + new PartitionMemoryEstimator.MemoryRequirements(DataSize.ofBytes(1)), + DataSize.of(5, BYTE), + StandardErrorCode.GENERIC_INSUFFICIENT_RESOURCES.toErrorCode())) + .isEqualTo(noMemoryRequirements); + } + + @Test + public void testTpchTableScan() + { + PlanFragment planFragment = tableScanPlanFragment("ts", new TpchTableHandle(TEST_CATALOG_NAME, "nation", 1.0)); + PartitionMemoryEstimator estimator = createEstimator(planFragment); + assertThat(estimator).isInstanceOf(MockDelegatePatitionMemoryEstimator.class); + } + + @Test + public void testRemoteFromInformationSchemaScan() + { + PlanFragment tableScanPlanFragment = tableScanPlanFragment("ts", new InformationSchemaTableHandle(TEST_CATALOG_NAME, InformationSchemaTable.VIEWS, ImmutableSet.of(), OptionalLong.empty())); + PlanFragment parentFragment = getParentFragment(tableScanPlanFragment); + + PartitionMemoryEstimator estimator = createEstimator(parentFragment, tableScanPlanFragment); + assertThat(estimator).isInstanceOf(NoMemoryPartitionMemoryEstimator.class); + } + + @Test + public void testRemoteFromTpchScan() + { + PlanFragment tableScanPlanFragment = tableScanPlanFragment("ts", new TpchTableHandle(TEST_CATALOG_NAME, "nation", 1.0)); + PlanFragment parentFragment = getParentFragment(tableScanPlanFragment); + + PartitionMemoryEstimator estimator = createEstimator(parentFragment, tableScanPlanFragment); + assertThat(estimator).isInstanceOf(MockDelegatePatitionMemoryEstimator.class); + } + + @Test + public void testRemoteFromTwoInformationSchemaScans() + { + PlanFragment tableScanPlanFragment1 = tableScanPlanFragment("ts1", new InformationSchemaTableHandle(TEST_CATALOG_NAME, InformationSchemaTable.VIEWS, ImmutableSet.of(), OptionalLong.empty())); + PlanFragment tableScanPlanFragment2 = tableScanPlanFragment("ts2", new InformationSchemaTableHandle(TEST_CATALOG_NAME, InformationSchemaTable.COLUMNS, ImmutableSet.of(), OptionalLong.empty())); + PlanFragment parentFragment = getParentFragment(tableScanPlanFragment1, tableScanPlanFragment2); + + PartitionMemoryEstimator estimator = createEstimator(parentFragment, tableScanPlanFragment1, tableScanPlanFragment2); + assertThat(estimator).isInstanceOf(NoMemoryPartitionMemoryEstimator.class); + } + + @Test + public void testRemoteFromInformationSchemaAndTpchTableScans() + { + PlanFragment tableScanPlanFragment1 = tableScanPlanFragment("ts1", new InformationSchemaTableHandle(TEST_CATALOG_NAME, InformationSchemaTable.VIEWS, ImmutableSet.of(), OptionalLong.empty())); + PlanFragment tableScanPlanFragment2 = tableScanPlanFragment("ts", new TpchTableHandle(TEST_CATALOG_NAME, "nation", 1.0)); + PlanFragment parentFragment = getParentFragment(tableScanPlanFragment1, tableScanPlanFragment2); + + PartitionMemoryEstimator estimator = createEstimator(parentFragment, tableScanPlanFragment1, tableScanPlanFragment2); + assertThat(estimator).isInstanceOf(MockDelegatePatitionMemoryEstimator.class); + } + + private static PlanFragment getParentFragment(PlanFragment... childFragments) + { + ImmutableList childFragmentIds = Stream.of(childFragments) + .map(PlanFragment::getId) + .collect(toImmutableList()); + return new PlanFragment( + new PlanFragmentId("parent"), + new RemoteSourceNode(new PlanNodeId("rsn"), childFragmentIds, ImmutableList.of(), Optional.empty(), ExchangeNode.Type.GATHER, RetryPolicy.TASK), + ImmutableMap.of(), + SOURCE_DISTRIBUTION, + Optional.empty(), + ImmutableList.of(), + new PartitioningScheme(Partitioning.create(SINGLE_DISTRIBUTION, ImmutableList.of()), ImmutableList.of()), + StatsAndCosts.empty(), + ImmutableList.of(), + Optional.empty()); + } + + private PartitionMemoryEstimator createEstimator(PlanFragment planFragment, PlanFragment... sourceFragments) + { + NoMemoryAwarePartitionMemoryEstimator.Factory noMemoryAwareEstimatorFactory = new NoMemoryAwarePartitionMemoryEstimator.Factory(new MockDelgatePartitionMemoryEstimatorFactory()); + Session session = TestingSession.testSessionBuilder().build(); + + Function sourceFragmentsLookup = Maps.uniqueIndex(Arrays.asList(sourceFragments), PlanFragment::getId)::get; + return noMemoryAwareEstimatorFactory.createPartitionMemoryEstimator( + session, + planFragment, + sourceFragmentsLookup); + } + + private static PlanFragment tableScanPlanFragment(String fragmentId, ConnectorTableHandle tableHandle) + { + TableScanNode informationSchemaViewsTableScan = new TableScanNode( + new PlanNodeId("tableScan"), + new TableHandle( + TEST_CATALOG_HANDLE, + tableHandle, + TestingTransactionHandle.create()), + ImmutableList.of(), + ImmutableMap.of(), + TupleDomain.all(), + Optional.empty(), + false, + Optional.empty()); + + return new PlanFragment( + new PlanFragmentId(fragmentId), + informationSchemaViewsTableScan, + ImmutableMap.of(), + SOURCE_DISTRIBUTION, + Optional.empty(), + ImmutableList.of(), + new PartitioningScheme(Partitioning.create(SINGLE_DISTRIBUTION, ImmutableList.of()), ImmutableList.of()), + StatsAndCosts.empty(), + ImmutableList.of(), + Optional.empty()); + } + + private static class MockDelgatePartitionMemoryEstimatorFactory + implements PartitionMemoryEstimatorFactory + { + @Override + public PartitionMemoryEstimator createPartitionMemoryEstimator(Session session, PlanFragment planFragment, Function sourceFragmentLookup) + { + return new MockDelegatePatitionMemoryEstimator(); + } + } + + private static class MockDelegatePatitionMemoryEstimator + implements PartitionMemoryEstimator + { + @Override + public MemoryRequirements getInitialMemoryRequirements() + { + throw new RuntimeException("not implemented"); + } + + @Override + public MemoryRequirements getNextRetryMemoryRequirements(MemoryRequirements previousMemoryRequirements, DataSize peakMemoryUsage, ErrorCode errorCode) + { + throw new RuntimeException("not implemented"); + } + + @Override + public void registerPartitionFinished(MemoryRequirements previousMemoryRequirements, DataSize peakMemoryUsage, boolean success, Optional errorCode) + { + throw new RuntimeException("not implemented"); + } + } +}