Skip to content

Commit

Permalink
Model no memory fragment handling as wrapper PartitionMemoryEstimator
Browse files Browse the repository at this point in the history
Also add some test coverage
  • Loading branch information
losipiuk committed Oct 13, 2023
1 parent c0a78a7 commit fa01166
Show file tree
Hide file tree
Showing 5 changed files with 336 additions and 47 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -37,9 +37,6 @@
import io.airlift.units.Duration;
import io.opentelemetry.api.trace.Tracer;
import io.trino.Session;
import io.trino.connector.informationschema.InformationSchemaTableHandle;
import io.trino.connector.system.GlobalSystemConnector;
import io.trino.connector.system.SystemTableHandle;
import io.trino.exchange.SpoolingExchangeInput;
import io.trino.execution.BasicStageStats;
import io.trino.execution.ExecutionFailureInfo;
Expand Down Expand Up @@ -94,9 +91,7 @@
import io.trino.sql.planner.plan.PlanFragmentId;
import io.trino.sql.planner.plan.PlanNode;
import io.trino.sql.planner.plan.PlanNodeId;
import io.trino.sql.planner.plan.RefreshMaterializedViewNode;
import io.trino.sql.planner.plan.RemoteSourceNode;
import io.trino.sql.planner.plan.TableScanNode;
import it.unimi.dsi.fastutil.ints.Int2ObjectMap;
import it.unimi.dsi.fastutil.ints.Int2ObjectOpenHashMap;
import it.unimi.dsi.fastutil.ints.IntOpenHashSet;
Expand Down Expand Up @@ -1194,7 +1189,6 @@ private void createStageExecution(SubPlan subPlan, boolean rootFragment, Map<Sta

boolean coordinatorStage = stage.getFragment().getPartitioning().equals(COORDINATOR_DISTRIBUTION);

boolean noMemoryFragment = isNoMemoryFragment(fragment);
if (eager) {
sourceExchanges.values().forEach(sourceExchange -> sourceExchange.setSourceHandlesDeliveryMode(EAGER));
}
Expand All @@ -1211,8 +1205,7 @@ private void createStageExecution(SubPlan subPlan, boolean rootFragment, Map<Sta
taskSource,
sinkPartitioningScheme,
exchange,
noMemoryFragment,
noMemoryFragment ? new NoMemoryPartitionMemoryEstimator() : memoryEstimatorFactory.createPartitionMemoryEstimator(session, fragment, planFragmentLookup),
memoryEstimatorFactory.createPartitionMemoryEstimator(session, fragment, planFragmentLookup),
// do not retry coordinator only tasks
coordinatorStage ? 1 : maxTaskExecutionAttempts,
schedulingPriority,
Expand Down Expand Up @@ -1246,36 +1239,6 @@ private void createStageExecution(SubPlan subPlan, boolean rootFragment, Map<Sta
}
}

private boolean isNoMemoryFragment(PlanFragment fragment)
{
if (fragment.getRoot().getSources().stream()
.anyMatch(planNode -> planNode instanceof RefreshMaterializedViewNode)) {
// REFRESH MATERIALIZED VIEW will issue other SQL commands under the hood. If its task memory is
// non-zero, then a deadlock scenario is possible if we only have a single node in the cluster.
return true;
}

// If source fragments are not tagged as "no-memory" assume that they may produce significant amount of data.
// We stay on the safe side an assume that we should use standard memory estimation for this fragment
if (!fragment.getRemoteSourceNodes().stream().flatMap(node -> node.getSourceFragmentIds().stream())
.allMatch(sourceFragmentId -> stageExecutions.get(getStageId(sourceFragmentId)).isNoMemoryFragment())) {
return false;
}

// If fragment source is not reading any external tables or only accesses information_schema assume it does not need significant amount of memory.
// Allow scheduling even if whole server memory is pre allocated.
List<PlanNode> tableScanNodes = PlanNodeSearcher.searchFrom(fragment.getRoot()).whereIsInstanceOfAny(TableScanNode.class).findAll();
return tableScanNodes.stream().allMatch(node -> isMetadataTableScan((TableScanNode) node));
}

private static boolean isMetadataTableScan(TableScanNode tableScanNode)
{
return (tableScanNode.getTable().getConnectorHandle() instanceof InformationSchemaTableHandle) ||
(tableScanNode.getTable().getCatalogHandle().getCatalogName().equals(GlobalSystemConnector.NAME) &&
(tableScanNode.getTable().getConnectorHandle() instanceof SystemTableHandle systemHandle) &&
systemHandle.getSchemaName().equals("jdbc"));
}

private StageId getStageId(PlanFragmentId fragmentId)
{
return StageId.create(queryStateMachine.getQueryId(), fragmentId);
Expand Down Expand Up @@ -1634,7 +1597,6 @@ private static class StageExecution
private final EventDrivenTaskSource taskSource;
private final FaultTolerantPartitioningScheme sinkPartitioningScheme;
private final Exchange exchange;
private final boolean noMemoryFragment;
private final PartitionMemoryEstimator partitionMemoryEstimator;
private final int maxTaskExecutionAttempts;
private final int schedulingPriority;
Expand Down Expand Up @@ -1674,7 +1636,6 @@ private StageExecution(
EventDrivenTaskSource taskSource,
FaultTolerantPartitioningScheme sinkPartitioningScheme,
Exchange exchange,
boolean noMemoryFragment,
PartitionMemoryEstimator partitionMemoryEstimator,
int maxTaskExecutionAttempts,
int schedulingPriority,
Expand All @@ -1693,7 +1654,6 @@ private StageExecution(
this.taskSource = requireNonNull(taskSource, "taskSource is null");
this.sinkPartitioningScheme = requireNonNull(sinkPartitioningScheme, "sinkPartitioningScheme is null");
this.exchange = requireNonNull(exchange, "exchange is null");
this.noMemoryFragment = noMemoryFragment;
this.partitionMemoryEstimator = requireNonNull(partitionMemoryEstimator, "partitionMemoryEstimator is null");
this.maxTaskExecutionAttempts = maxTaskExecutionAttempts;
this.schedulingPriority = schedulingPriority;
Expand Down Expand Up @@ -1771,11 +1731,6 @@ public boolean isExchangeClosed()
return exchangeClosed;
}

public boolean isNoMemoryFragment()
{
return noMemoryFragment;
}

public void addPartition(int partitionId, NodeRequirements nodeRequirements)
{
if (getState().isDone()) {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,103 @@
/*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.trino.execution.scheduler;

import com.google.inject.BindingAnnotation;
import com.google.inject.Inject;
import io.trino.Session;
import io.trino.connector.informationschema.InformationSchemaTableHandle;
import io.trino.connector.system.GlobalSystemConnector;
import io.trino.connector.system.SystemTableHandle;
import io.trino.sql.planner.PlanFragment;
import io.trino.sql.planner.optimizations.PlanNodeSearcher;
import io.trino.sql.planner.plan.PlanFragmentId;
import io.trino.sql.planner.plan.PlanNode;
import io.trino.sql.planner.plan.RefreshMaterializedViewNode;
import io.trino.sql.planner.plan.TableScanNode;

import java.lang.annotation.Retention;
import java.lang.annotation.Target;
import java.util.List;
import java.util.function.Function;

import static java.lang.annotation.ElementType.FIELD;
import static java.lang.annotation.ElementType.METHOD;
import static java.lang.annotation.ElementType.PARAMETER;
import static java.lang.annotation.RetentionPolicy.RUNTIME;
import static java.util.Objects.requireNonNull;

public class NoMemoryAwarePartitionMemoryEstimator
{
@Retention(RUNTIME)
@Target({FIELD, PARAMETER, METHOD})
@BindingAnnotation
public @interface ForNoMemoryAwarePartitionMemoryEstimator {}

public static class Factory
implements PartitionMemoryEstimatorFactory
{
private final PartitionMemoryEstimatorFactory delegateFactory;

@Inject
public Factory(@ForNoMemoryAwarePartitionMemoryEstimator PartitionMemoryEstimatorFactory delegateFactory)
{
this.delegateFactory = requireNonNull(delegateFactory, "delegateFactory is null");
}

@Override
public PartitionMemoryEstimator createPartitionMemoryEstimator(
Session session,
PlanFragment planFragment,
Function<PlanFragmentId, PlanFragment> sourceFragmentLookup)
{
if (isNoMemoryFragment(planFragment, sourceFragmentLookup)) {
return NoMemoryPartitionMemoryEstimator.INSTANCE;
}
return delegateFactory.createPartitionMemoryEstimator(session, planFragment, sourceFragmentLookup);
}

private boolean isNoMemoryFragment(PlanFragment fragment, Function<PlanFragmentId, PlanFragment> childFragmentLookup)
{
if (fragment.getRoot().getSources().stream()
.anyMatch(planNode -> planNode instanceof RefreshMaterializedViewNode)) {
// REFRESH MATERIALIZED VIEW will issue other SQL commands under the hood. If its task memory is
// non-zero, then a deadlock scenario is possible if we only have a single node in the cluster.
return true;
}

// If source fragments are not tagged as "no-memory" assume that they may produce significant amount of data.
// We stay on the safe side an assume that we should use standard memory estimation for this fragment
if (!fragment.getRemoteSourceNodes().stream().flatMap(node -> node.getSourceFragmentIds().stream())
// TODO: childFragmentLookup will be executed for subtree of every fragment in query plan. That means fragment will be
// analyzed multiple time. Given fact that logic here is not extremely expensive and plans are not gigantic (up to ~200 fragments)
// we can keep it as a first approach. Ultimately we should profile execution and possibly put in place some mechanisms to avoid repeated work.
.allMatch(sourceFragmentId -> isNoMemoryFragment(childFragmentLookup.apply(sourceFragmentId), childFragmentLookup))) {
return false;
}

// If fragment source is not reading any external tables or only accesses information_schema assume it does not need significant amount of memory.
// Allow scheduling even if whole server memory is pre allocated.
List<PlanNode> tableScanNodes = PlanNodeSearcher.searchFrom(fragment.getRoot()).whereIsInstanceOfAny(TableScanNode.class).findAll();
return tableScanNodes.stream().allMatch(node -> isMetadataTableScan((TableScanNode) node));
}

private static boolean isMetadataTableScan(TableScanNode tableScanNode)
{
return (tableScanNode.getTable().getConnectorHandle() instanceof InformationSchemaTableHandle) ||
(tableScanNode.getTable().getCatalogHandle().getCatalogName().equals(GlobalSystemConnector.NAME) &&
(tableScanNode.getTable().getConnectorHandle() instanceof SystemTableHandle systemHandle) &&
systemHandle.getSchemaName().equals("jdbc"));
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,10 @@
public class NoMemoryPartitionMemoryEstimator
implements PartitionMemoryEstimator
{
public static final NoMemoryPartitionMemoryEstimator INSTANCE = new NoMemoryPartitionMemoryEstimator();

private NoMemoryPartitionMemoryEstimator() {}

@Override
public MemoryRequirements getInitialMemoryRequirements()
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,8 @@
import io.trino.execution.scheduler.BinPackingNodeAllocatorService;
import io.trino.execution.scheduler.EventDrivenTaskSourceFactory;
import io.trino.execution.scheduler.ExponentialGrowthPartitionMemoryEstimator;
import io.trino.execution.scheduler.NoMemoryAwarePartitionMemoryEstimator;
import io.trino.execution.scheduler.NoMemoryAwarePartitionMemoryEstimator.ForNoMemoryAwarePartitionMemoryEstimator;
import io.trino.execution.scheduler.NodeAllocatorService;
import io.trino.execution.scheduler.PartitionMemoryEstimatorFactory;
import io.trino.execution.scheduler.SplitSchedulerStats;
Expand Down Expand Up @@ -221,7 +223,10 @@ protected void setup(Binder binder)
// node allocator
binder.bind(BinPackingNodeAllocatorService.class).in(Scopes.SINGLETON);
binder.bind(NodeAllocatorService.class).to(BinPackingNodeAllocatorService.class);
binder.bind(PartitionMemoryEstimatorFactory.class).to(ExponentialGrowthPartitionMemoryEstimator.Factory.class).in(Scopes.SINGLETON);
binder.bind(PartitionMemoryEstimatorFactory.class).to(NoMemoryAwarePartitionMemoryEstimator.Factory.class).in(Scopes.SINGLETON);
binder.bind(PartitionMemoryEstimatorFactory.class)
.annotatedWith(ForNoMemoryAwarePartitionMemoryEstimator.class)
.to(ExponentialGrowthPartitionMemoryEstimator.Factory.class).in(Scopes.SINGLETON);

// node monitor
binder.bind(ClusterSizeMonitor.class).in(Scopes.SINGLETON);
Expand Down
Loading

0 comments on commit fa01166

Please sign in to comment.