diff --git a/core/trino-main/src/main/java/io/trino/SystemSessionProperties.java b/core/trino-main/src/main/java/io/trino/SystemSessionProperties.java index e06323d0287d..da9f5ad4a7b4 100644 --- a/core/trino-main/src/main/java/io/trino/SystemSessionProperties.java +++ b/core/trino-main/src/main/java/io/trino/SystemSessionProperties.java @@ -60,8 +60,10 @@ public final class SystemSessionProperties public static final String JOIN_DISTRIBUTION_TYPE = "join_distribution_type"; public static final String JOIN_MAX_BROADCAST_TABLE_SIZE = "join_max_broadcast_table_size"; public static final String JOIN_MULTI_CLAUSE_INDEPENDENCE_FACTOR = "join_multi_clause_independence_factor"; + public static final String DETERMINE_PARTITION_COUNT_FOR_WRITE_ENABLED = "determine_partition_count_for_write_enabled"; public static final String MAX_HASH_PARTITION_COUNT = "max_hash_partition_count"; public static final String MIN_HASH_PARTITION_COUNT = "min_hash_partition_count"; + public static final String MIN_HASH_PARTITION_COUNT_FOR_WRITE = "min_hash_partition_count_for_write"; public static final String PREFER_STREAMING_OPERATORS = "prefer_streaming_operators"; public static final String TASK_WRITER_COUNT = "task_writer_count"; public static final String TASK_PARTITIONED_WRITER_COUNT = "task_partitioned_writer_count"; @@ -180,7 +182,9 @@ public final class SystemSessionProperties public static final String FAULT_TOLERANT_EXECUTION_TASK_MEMORY = "fault_tolerant_execution_task_memory"; public static final String FAULT_TOLERANT_EXECUTION_TASK_MEMORY_GROWTH_FACTOR = "fault_tolerant_execution_task_memory_growth_factor"; public static final String FAULT_TOLERANT_EXECUTION_TASK_MEMORY_ESTIMATION_QUANTILE = "fault_tolerant_execution_task_memory_estimation_quantile"; - public static final String FAULT_TOLERANT_EXECUTION_PARTITION_COUNT = "fault_tolerant_execution_partition_count"; + public static final String FAULT_TOLERANT_EXECUTION_MAX_PARTITION_COUNT = "fault_tolerant_execution_max_partition_count"; + public static final String FAULT_TOLERANT_EXECUTION_MIN_PARTITION_COUNT = "fault_tolerant_execution_min_partition_count"; + public static final String FAULT_TOLERANT_EXECUTION_MIN_PARTITION_COUNT_FOR_WRITE = "fault_tolerant_execution_min_partition_count_for_write"; public static final String ADAPTIVE_PARTIAL_AGGREGATION_ENABLED = "adaptive_partial_aggregation_enabled"; public static final String ADAPTIVE_PARTIAL_AGGREGATION_MIN_ROWS = "adaptive_partial_aggregation_min_rows"; public static final String ADAPTIVE_PARTIAL_AGGREGATION_UNIQUE_ROWS_RATIO_THRESHOLD = "adaptive_partial_aggregation_unique_rows_ratio_threshold"; @@ -254,6 +258,11 @@ public SystemSessionProperties( false, value -> validateDoubleRange(value, JOIN_MULTI_CLAUSE_INDEPENDENCE_FACTOR, 0.0, 1.0), value -> value), + booleanProperty( + DETERMINE_PARTITION_COUNT_FOR_WRITE_ENABLED, + "Determine the number of partitions based on amount of data read and processed by the query for write queries", + false, + false), integerProperty( MAX_HASH_PARTITION_COUNT, "Maximum number of partitions for distributed joins and aggregations", @@ -266,6 +275,12 @@ public SystemSessionProperties( queryManagerConfig.getMinHashPartitionCount(), value -> validateIntegerValue(value, MIN_HASH_PARTITION_COUNT, 1, false), false), + integerProperty( + MIN_HASH_PARTITION_COUNT_FOR_WRITE, + "Minimum number of partitions for distributed joins and aggregations in write queries", + queryManagerConfig.getMinHashPartitionCountForWrite(), + value -> validateIntegerValue(value, MIN_HASH_PARTITION_COUNT_FOR_WRITE, 1, false), + false), booleanProperty( PREFER_STREAMING_OPERATORS, "Prefer source table layouts that produce streaming operators", @@ -913,9 +928,22 @@ public SystemSessionProperties( value -> validateDoubleRange(value, FAULT_TOLERANT_EXECUTION_TASK_MEMORY_ESTIMATION_QUANTILE, 0.0, 1.0), false), integerProperty( - FAULT_TOLERANT_EXECUTION_PARTITION_COUNT, - "Number of partitions for distributed joins and aggregations executed with fault tolerant execution enabled", - queryManagerConfig.getFaultTolerantExecutionPartitionCount(), + FAULT_TOLERANT_EXECUTION_MAX_PARTITION_COUNT, + "Maximum number of partitions for distributed joins and aggregations executed with fault tolerant execution enabled", + queryManagerConfig.getFaultTolerantExecutionMaxPartitionCount(), + value -> validateIntegerValue(value, FAULT_TOLERANT_EXECUTION_MAX_PARTITION_COUNT, 1, false), + false), + integerProperty( + FAULT_TOLERANT_EXECUTION_MIN_PARTITION_COUNT, + "Minimum number of partitions for distributed joins and aggregations executed with fault tolerant execution enabled", + queryManagerConfig.getFaultTolerantExecutionMinPartitionCount(), + value -> validateIntegerValue(value, FAULT_TOLERANT_EXECUTION_MIN_PARTITION_COUNT, 1, false), + false), + integerProperty( + FAULT_TOLERANT_EXECUTION_MIN_PARTITION_COUNT_FOR_WRITE, + "Minimum number of partitions for distributed joins and aggregations in write queries executed with fault tolerant execution enabled", + queryManagerConfig.getFaultTolerantExecutionMinPartitionCountForWrite(), + value -> validateIntegerValue(value, FAULT_TOLERANT_EXECUTION_MIN_PARTITION_COUNT_FOR_WRITE, 1, false), false), booleanProperty( ADAPTIVE_PARTIAL_AGGREGATION_ENABLED, @@ -1026,6 +1054,11 @@ public static double getJoinMultiClauseIndependenceFactor(Session session) return session.getSystemProperty(JOIN_MULTI_CLAUSE_INDEPENDENCE_FACTOR, Double.class); } + public static boolean isDeterminePartitionCountForWriteEnabled(Session session) + { + return session.getSystemProperty(DETERMINE_PARTITION_COUNT_FOR_WRITE_ENABLED, Boolean.class); + } + public static int getMaxHashPartitionCount(Session session) { return session.getSystemProperty(MAX_HASH_PARTITION_COUNT, Integer.class); @@ -1036,6 +1069,11 @@ public static int getMinHashPartitionCount(Session session) return session.getSystemProperty(MIN_HASH_PARTITION_COUNT, Integer.class); } + public static int getMinHashPartitionCountForWrite(Session session) + { + return session.getSystemProperty(MIN_HASH_PARTITION_COUNT_FOR_WRITE, Integer.class); + } + public static boolean preferStreamingOperators(Session session) { return session.getSystemProperty(PREFER_STREAMING_OPERATORS, Boolean.class); @@ -1699,9 +1737,19 @@ public static double getFaultTolerantExecutionTaskMemoryEstimationQuantile(Sessi return session.getSystemProperty(FAULT_TOLERANT_EXECUTION_TASK_MEMORY_ESTIMATION_QUANTILE, Double.class); } - public static int getFaultTolerantExecutionPartitionCount(Session session) + public static int getFaultTolerantExecutionMaxPartitionCount(Session session) + { + return session.getSystemProperty(FAULT_TOLERANT_EXECUTION_MAX_PARTITION_COUNT, Integer.class); + } + + public static int getFaultTolerantExecutionMinPartitionCount(Session session) + { + return session.getSystemProperty(FAULT_TOLERANT_EXECUTION_MIN_PARTITION_COUNT, Integer.class); + } + + public static int getFaultTolerantExecutionMinPartitionCountForWrite(Session session) { - return session.getSystemProperty(FAULT_TOLERANT_EXECUTION_PARTITION_COUNT, Integer.class); + return session.getSystemProperty(FAULT_TOLERANT_EXECUTION_MIN_PARTITION_COUNT_FOR_WRITE, Integer.class); } public static boolean isAdaptivePartialAggregationEnabled(Session session) diff --git a/core/trino-main/src/main/java/io/trino/cost/TaskCountEstimator.java b/core/trino-main/src/main/java/io/trino/cost/TaskCountEstimator.java index 488bb5749281..c9d69f826541 100644 --- a/core/trino-main/src/main/java/io/trino/cost/TaskCountEstimator.java +++ b/core/trino-main/src/main/java/io/trino/cost/TaskCountEstimator.java @@ -25,7 +25,7 @@ import java.util.function.IntSupplier; import static io.trino.SystemSessionProperties.getCostEstimationWorkerCount; -import static io.trino.SystemSessionProperties.getFaultTolerantExecutionPartitionCount; +import static io.trino.SystemSessionProperties.getFaultTolerantExecutionMaxPartitionCount; import static io.trino.SystemSessionProperties.getMaxHashPartitionCount; import static io.trino.SystemSessionProperties.getRetryPolicy; import static java.lang.Math.min; @@ -70,7 +70,7 @@ public int estimateHashedTaskCount(Session session) { int partitionCount; if (getRetryPolicy(session) == RetryPolicy.TASK) { - partitionCount = getFaultTolerantExecutionPartitionCount(session); + partitionCount = getFaultTolerantExecutionMaxPartitionCount(session); } else { partitionCount = getMaxHashPartitionCount(session); diff --git a/core/trino-main/src/main/java/io/trino/execution/QueryManagerConfig.java b/core/trino-main/src/main/java/io/trino/execution/QueryManagerConfig.java index 3a8769770348..6806c3f21775 100644 --- a/core/trino-main/src/main/java/io/trino/execution/QueryManagerConfig.java +++ b/core/trino-main/src/main/java/io/trino/execution/QueryManagerConfig.java @@ -56,8 +56,10 @@ public class QueryManagerConfig private int maxConcurrentQueries = 1000; private int maxQueuedQueries = 5000; + private boolean determinePartitionCountForWriteEnabled; private int maxHashPartitionCount = 100; private int minHashPartitionCount = 4; + private int minHashPartitionCountForWrite = 50; private int maxWriterTasksCount = 100; private Duration minQueryExpireAge = new Duration(15, TimeUnit.MINUTES); private int maxQueryHistory = 100; @@ -120,7 +122,9 @@ public class QueryManagerConfig private DataSize faultTolerantExecutionStandardSplitSize = DataSize.of(64, MEGABYTE); private int faultTolerantExecutionMaxTaskSplitCount = 256; private DataSize faultTolerantExecutionTaskDescriptorStorageMaxMemory = DataSize.ofBytes(Math.round(AVAILABLE_HEAP_MEMORY * 0.15)); - private int faultTolerantExecutionPartitionCount = 50; + private int faultTolerantExecutionMaxPartitionCount = 50; + private int faultTolerantExecutionMinPartitionCount = 4; + private int faultTolerantExecutionMinPartitionCountForWrite = 50; private boolean faultTolerantExecutionForcePreferredWritePartitioningEnabled = true; @Min(1) @@ -179,6 +183,19 @@ public QueryManagerConfig setMaxQueuedQueries(int maxQueuedQueries) return this; } + public boolean isDeterminePartitionCountForWriteEnabled() + { + return determinePartitionCountForWriteEnabled; + } + + @Config("query.determine-partition-count-for-write-enabled") + @ConfigDescription("Determine the number of partitions based on amount of data read and processed by the query for write queries") + public QueryManagerConfig setDeterminePartitionCountForWriteEnabled(boolean determinePartitionCountForWriteEnabled) + { + this.determinePartitionCountForWriteEnabled = determinePartitionCountForWriteEnabled; + return this; + } + @Min(1) public int getMaxHashPartitionCount() { @@ -208,6 +225,20 @@ public QueryManagerConfig setMinHashPartitionCount(int minHashPartitionCount) return this; } + @Min(1) + public int getMinHashPartitionCountForWrite() + { + return minHashPartitionCountForWrite; + } + + @Config("query.min-hash-partition-count-for-write") + @ConfigDescription("Minimum number of partitions for distributed joins and aggregations in write queries") + public QueryManagerConfig setMinHashPartitionCountForWrite(int minHashPartitionCountForWrite) + { + this.minHashPartitionCountForWrite = minHashPartitionCountForWrite; + return this; + } + @Min(1) public int getMaxWriterTasksCount() { @@ -860,16 +891,45 @@ public QueryManagerConfig setFaultTolerantExecutionTaskDescriptorStorageMaxMemor } @Min(1) - public int getFaultTolerantExecutionPartitionCount() + public int getFaultTolerantExecutionMaxPartitionCount() + { + return faultTolerantExecutionMaxPartitionCount; + } + + @Config("fault-tolerant-execution-max-partition-count") + @LegacyConfig("fault-tolerant-execution-partition-count") + @ConfigDescription("Maximum number of partitions for distributed joins and aggregations executed with fault tolerant execution enabled") + public QueryManagerConfig setFaultTolerantExecutionMaxPartitionCount(int faultTolerantExecutionMaxPartitionCount) + { + this.faultTolerantExecutionMaxPartitionCount = faultTolerantExecutionMaxPartitionCount; + return this; + } + + @Min(1) + public int getFaultTolerantExecutionMinPartitionCount() + { + return faultTolerantExecutionMinPartitionCount; + } + + @Config("fault-tolerant-execution-min-partition-count") + @ConfigDescription("Minimum number of partitions for distributed joins and aggregations executed with fault tolerant execution enabled") + public QueryManagerConfig setFaultTolerantExecutionMinPartitionCount(int faultTolerantExecutionMinPartitionCount) + { + this.faultTolerantExecutionMinPartitionCount = faultTolerantExecutionMinPartitionCount; + return this; + } + + @Min(1) + public int getFaultTolerantExecutionMinPartitionCountForWrite() { - return faultTolerantExecutionPartitionCount; + return faultTolerantExecutionMinPartitionCountForWrite; } - @Config("fault-tolerant-execution-partition-count") - @ConfigDescription("Number of partitions for distributed joins and aggregations executed with fault tolerant execution enabled") - public QueryManagerConfig setFaultTolerantExecutionPartitionCount(int faultTolerantExecutionPartitionCount) + @Config("fault-tolerant-execution-min-partition-count-for-write") + @ConfigDescription("Minimum number of partitions for distributed joins and aggregations in write queries executed with fault tolerant execution enabled") + public QueryManagerConfig setFaultTolerantExecutionMinPartitionCountForWrite(int faultTolerantExecutionMinPartitionCountForWrite) { - this.faultTolerantExecutionPartitionCount = faultTolerantExecutionPartitionCount; + this.faultTolerantExecutionMinPartitionCountForWrite = faultTolerantExecutionMinPartitionCountForWrite; return this; } diff --git a/core/trino-main/src/main/java/io/trino/execution/scheduler/EventDrivenFaultTolerantQueryScheduler.java b/core/trino-main/src/main/java/io/trino/execution/scheduler/EventDrivenFaultTolerantQueryScheduler.java index 8c941109eb33..7521dd8f294f 100644 --- a/core/trino-main/src/main/java/io/trino/execution/scheduler/EventDrivenFaultTolerantQueryScheduler.java +++ b/core/trino-main/src/main/java/io/trino/execution/scheduler/EventDrivenFaultTolerantQueryScheduler.java @@ -123,7 +123,7 @@ import static com.google.common.util.concurrent.Futures.getDone; import static io.trino.SystemSessionProperties.getFaultTolerantExecutionDefaultCoordinatorTaskMemory; import static io.trino.SystemSessionProperties.getFaultTolerantExecutionDefaultTaskMemory; -import static io.trino.SystemSessionProperties.getFaultTolerantExecutionPartitionCount; +import static io.trino.SystemSessionProperties.getFaultTolerantExecutionMaxPartitionCount; import static io.trino.SystemSessionProperties.getMaxTasksWaitingForExecutionPerQuery; import static io.trino.SystemSessionProperties.getMaxTasksWaitingForNodePerStage; import static io.trino.SystemSessionProperties.getRetryDelayScaleFactor; @@ -270,7 +270,7 @@ public synchronized void start() FaultTolerantPartitioningSchemeFactory partitioningSchemeFactory = new FaultTolerantPartitioningSchemeFactory( nodePartitioningManager, session, - getFaultTolerantExecutionPartitionCount(session)); + getFaultTolerantExecutionMaxPartitionCount(session)); Closer closer = Closer.create(); NodeAllocator nodeAllocator = closer.register(nodeAllocatorService.getNodeAllocator(session)); try { @@ -824,11 +824,13 @@ private void createStageExecution(SubPlan subPlan, boolean rootFragment, int sch stage.getStageSpan(), fragment, sourceExchanges.buildOrThrow(), - partitioningSchemeFactory.get(fragment.getPartitioning()), + partitioningSchemeFactory.get(fragment.getPartitioning(), fragment.getPartitionCount()), stage::recordGetSplitTime, outputDataSizeEstimates.buildOrThrow())); - FaultTolerantPartitioningScheme sinkPartitioningScheme = partitioningSchemeFactory.get(fragment.getOutputPartitioningScheme().getPartitioning().getHandle()); + FaultTolerantPartitioningScheme sinkPartitioningScheme = partitioningSchemeFactory.get( + fragment.getOutputPartitioningScheme().getPartitioning().getHandle(), + fragment.getOutputPartitioningScheme().getPartitionCount()); ExchangeContext exchangeContext = new ExchangeContext(queryStateMachine.getQueryId(), new ExchangeId("external-exchange-" + stage.getStageId().getId())); boolean preserveOrderWithinPartition = rootFragment && stage.getFragment().getPartitioning().equals(SINGLE_DISTRIBUTION); diff --git a/core/trino-main/src/main/java/io/trino/execution/scheduler/FaultTolerantPartitioningSchemeFactory.java b/core/trino-main/src/main/java/io/trino/execution/scheduler/FaultTolerantPartitioningSchemeFactory.java index 0f6bad247b2b..03f66110a906 100644 --- a/core/trino-main/src/main/java/io/trino/execution/scheduler/FaultTolerantPartitioningSchemeFactory.java +++ b/core/trino-main/src/main/java/io/trino/execution/scheduler/FaultTolerantPartitioningSchemeFactory.java @@ -42,43 +42,43 @@ public class FaultTolerantPartitioningSchemeFactory { private final NodePartitioningManager nodePartitioningManager; private final Session session; - private final int partitionCount; + private final int maxPartitionCount; private final Map cache = new HashMap<>(); - public FaultTolerantPartitioningSchemeFactory(NodePartitioningManager nodePartitioningManager, Session session, int partitionCount) + public FaultTolerantPartitioningSchemeFactory(NodePartitioningManager nodePartitioningManager, Session session, int maxPartitionCount) { this.nodePartitioningManager = requireNonNull(nodePartitioningManager, "nodePartitioningManager is null"); this.session = requireNonNull(session, "session is null"); - this.partitionCount = partitionCount; + this.maxPartitionCount = maxPartitionCount; } - public FaultTolerantPartitioningScheme get(PartitioningHandle handle) + public FaultTolerantPartitioningScheme get(PartitioningHandle handle, Optional partitionCount) { FaultTolerantPartitioningScheme result = cache.get(handle); if (result == null) { // Avoid using computeIfAbsent as the "get" method is called recursively from the "create" method - result = create(handle); + result = create(handle, partitionCount); cache.put(handle, result); } return result; } - private FaultTolerantPartitioningScheme create(PartitioningHandle partitioningHandle) + private FaultTolerantPartitioningScheme create(PartitioningHandle partitioningHandle, Optional partitionCount) { if (partitioningHandle.getConnectorHandle() instanceof MergePartitioningHandle mergePartitioningHandle) { - return mergePartitioningHandle.getFaultTolerantPartitioningScheme(this::get); + return mergePartitioningHandle.getFaultTolerantPartitioningScheme(handle -> this.get(handle, partitionCount)); } if (partitioningHandle.equals(FIXED_HASH_DISTRIBUTION) || partitioningHandle.equals(SCALED_WRITER_HASH_DISTRIBUTION)) { - return createSystemSchema(partitionCount); + return createSystemSchema(partitionCount.orElse(maxPartitionCount)); } if (partitioningHandle.getCatalogHandle().isPresent()) { Optional connectorBucketNodeMap = nodePartitioningManager.getConnectorBucketNodeMap(session, partitioningHandle); if (connectorBucketNodeMap.isEmpty()) { - return createSystemSchema(partitionCount); + return createSystemSchema(partitionCount.orElse(maxPartitionCount)); } ToIntFunction splitToBucket = nodePartitioningManager.getSplitToBucket(session, partitioningHandle); - return createConnectorSpecificSchema(partitionCount, connectorBucketNodeMap.get(), splitToBucket); + return createConnectorSpecificSchema(partitionCount.orElse(maxPartitionCount), connectorBucketNodeMap.get(), splitToBucket); } return new FaultTolerantPartitioningScheme(1, Optional.empty(), Optional.empty(), Optional.empty()); } diff --git a/core/trino-main/src/main/java/io/trino/sql/planner/iterative/rule/RewriteSpatialPartitioningAggregation.java b/core/trino-main/src/main/java/io/trino/sql/planner/iterative/rule/RewriteSpatialPartitioningAggregation.java index de1a3d1b1c73..5fd0ff9b6920 100644 --- a/core/trino-main/src/main/java/io/trino/sql/planner/iterative/rule/RewriteSpatialPartitioningAggregation.java +++ b/core/trino-main/src/main/java/io/trino/sql/planner/iterative/rule/RewriteSpatialPartitioningAggregation.java @@ -37,7 +37,7 @@ import java.util.Optional; import static com.google.common.collect.Iterables.getOnlyElement; -import static io.trino.SystemSessionProperties.getFaultTolerantExecutionPartitionCount; +import static io.trino.SystemSessionProperties.getFaultTolerantExecutionMaxPartitionCount; import static io.trino.SystemSessionProperties.getMaxHashPartitionCount; import static io.trino.SystemSessionProperties.getRetryPolicy; import static io.trino.spi.type.IntegerType.INTEGER; @@ -127,7 +127,7 @@ public Result apply(AggregationNode node, Captures captures, Context context) int partitionCount; if (getRetryPolicy(context.getSession()) == RetryPolicy.TASK) { - partitionCount = getFaultTolerantExecutionPartitionCount(context.getSession()); + partitionCount = getFaultTolerantExecutionMaxPartitionCount(context.getSession()); } else { partitionCount = getMaxHashPartitionCount(context.getSession()); diff --git a/core/trino-main/src/main/java/io/trino/sql/planner/optimizations/DeterminePartitionCount.java b/core/trino-main/src/main/java/io/trino/sql/planner/optimizations/DeterminePartitionCount.java index 8814d51917fb..e24c9dac47ce 100644 --- a/core/trino-main/src/main/java/io/trino/sql/planner/optimizations/DeterminePartitionCount.java +++ b/core/trino-main/src/main/java/io/trino/sql/planner/optimizations/DeterminePartitionCount.java @@ -44,13 +44,19 @@ import java.util.Optional; import java.util.function.ToDoubleFunction; +import static com.google.common.base.Verify.verify; import static com.google.common.collect.ImmutableList.toImmutableList; +import static io.trino.SystemSessionProperties.getFaultTolerantExecutionMaxPartitionCount; +import static io.trino.SystemSessionProperties.getFaultTolerantExecutionMinPartitionCount; +import static io.trino.SystemSessionProperties.getFaultTolerantExecutionMinPartitionCountForWrite; import static io.trino.SystemSessionProperties.getMaxHashPartitionCount; import static io.trino.SystemSessionProperties.getMinHashPartitionCount; +import static io.trino.SystemSessionProperties.getMinHashPartitionCountForWrite; import static io.trino.SystemSessionProperties.getMinInputRowsPerTask; import static io.trino.SystemSessionProperties.getMinInputSizePerTask; import static io.trino.SystemSessionProperties.getQueryMaxMemoryPerNode; import static io.trino.SystemSessionProperties.getRetryPolicy; +import static io.trino.SystemSessionProperties.isDeterminePartitionCountForWriteEnabled; import static io.trino.sql.planner.optimizations.QueryCardinalityUtil.isAtMostScalar; import static io.trino.sql.planner.plan.ExchangeNode.Scope.REMOTE; import static io.trino.sql.planner.plan.SimplePlanRewriter.rewriteWith; @@ -108,16 +114,15 @@ public PlanNode optimize( requireNonNull(types, "types is null"); requireNonNull(tableStatsProvider, "tableStatsProvider is null"); - // Skip for write nodes since writing partitioned data with small amount of nodes could cause - // memory related issues even when the amount of data is small. Additionally, skip for FTE mode since we - // are not using estimated partitionCount in FTE scheduler. - if (PlanNodeSearcher.searchFrom(plan).whereIsInstanceOfAny(INSERT_NODES).matches() - || getRetryPolicy(session) == RetryPolicy.TASK) { + // Unless enabled, skip for write nodes since writing partitioned data with small amount of nodes could cause + // memory related issues even when the amount of data is small. + boolean isWriteQuery = PlanNodeSearcher.searchFrom(plan).whereIsInstanceOfAny(INSERT_NODES).matches(); + if (isWriteQuery && !isDeterminePartitionCountForWriteEnabled(session)) { return plan; } try { - return determinePartitionCount(plan, session, types, tableStatsProvider) + return determinePartitionCount(plan, session, types, tableStatsProvider, isWriteQuery) .map(partitionCount -> rewriteWith(new Rewriter(partitionCount), plan)) .orElse(plan); } @@ -128,7 +133,12 @@ public PlanNode optimize( return plan; } - private Optional determinePartitionCount(PlanNode plan, Session session, TypeProvider types, TableStatsProvider tableStatsProvider) + private Optional determinePartitionCount( + PlanNode plan, + Session session, + TypeProvider types, + TableStatsProvider tableStatsProvider, + boolean isWriteQuery) { long minInputSizePerTask = getMinInputSizePerTask(session).toBytes(); long minInputRowsPerTask = getMinInputRowsPerTask(session); @@ -141,6 +151,29 @@ private Optional determinePartitionCount(PlanNode plan, Session session return Optional.empty(); } + int minPartitionCount; + int maxPartitionCount; + if (getRetryPolicy(session).equals(RetryPolicy.TASK)) { + if (isWriteQuery) { + minPartitionCount = getFaultTolerantExecutionMinPartitionCountForWrite(session); + } + else { + minPartitionCount = getFaultTolerantExecutionMinPartitionCount(session); + } + maxPartitionCount = getFaultTolerantExecutionMaxPartitionCount(session); + } + else { + if (isWriteQuery) { + minPartitionCount = getMinHashPartitionCountForWrite(session); + } + else { + minPartitionCount = getMinHashPartitionCount(session); + } + maxPartitionCount = getMaxHashPartitionCount(session); + } + verify(minPartitionCount <= maxPartitionCount, "minPartitionCount %s larger than maxPartitionCount %s", + minPartitionCount, maxPartitionCount); + StatsProvider statsProvider = new CachingStatsProvider(statsCalculator, session, types, tableStatsProvider); long queryMaxMemoryPerNode = getQueryMaxMemoryPerNode(session).toBytes(); @@ -162,9 +195,9 @@ private Optional determinePartitionCount(PlanNode plan, Session session // because huge number of small size rows can be cpu intensive for some operators. On the other // hand, small number of rows with considerable size in bytes can be memory intensive. max(partitionCountBasedOnOutputSize.get(), partitionCountBasedOnRows.get()), - getMinHashPartitionCount(session)); + minPartitionCount); - if (partitionCount >= getMaxHashPartitionCount(session)) { + if (partitionCount >= maxPartitionCount) { return Optional.empty(); } diff --git a/core/trino-main/src/test/java/io/trino/execution/TestQueryManagerConfig.java b/core/trino-main/src/test/java/io/trino/execution/TestQueryManagerConfig.java index 382b0bda6fd1..7d2b650c5f2c 100644 --- a/core/trino-main/src/test/java/io/trino/execution/TestQueryManagerConfig.java +++ b/core/trino-main/src/test/java/io/trino/execution/TestQueryManagerConfig.java @@ -49,8 +49,10 @@ public void testDefaults() .setMinScheduleSplitBatchSize(100) .setMaxConcurrentQueries(1000) .setMaxQueuedQueries(5000) + .setDeterminePartitionCountForWriteEnabled(false) .setMaxHashPartitionCount(100) .setMinHashPartitionCount(4) + .setMinHashPartitionCountForWrite(50) .setQueryManagerExecutorPoolSize(5) .setQueryExecutorPoolSize(1000) .setMaxStateMachineCallbackThreads(5) @@ -92,7 +94,9 @@ public void testDefaults() .setFaultTolerantExecutionStandardSplitSize(DataSize.of(64, MEGABYTE)) .setFaultTolerantExecutionMaxTaskSplitCount(256) .setFaultTolerantExecutionTaskDescriptorStorageMaxMemory(DataSize.ofBytes(Math.round(AVAILABLE_HEAP_MEMORY * 0.15))) - .setFaultTolerantExecutionPartitionCount(50) + .setFaultTolerantExecutionMaxPartitionCount(50) + .setFaultTolerantExecutionMinPartitionCount(4) + .setFaultTolerantExecutionMinPartitionCountForWrite(50) .setFaultTolerantExecutionForcePreferredWritePartitioningEnabled(true) .setMaxWriterTasksCount(100)); } @@ -111,8 +115,10 @@ public void testExplicitPropertyMappings() .put("query.min-schedule-split-batch-size", "9") .put("query.max-concurrent-queries", "10") .put("query.max-queued-queries", "15") + .put("query.determine-partition-count-for-write-enabled", "true") .put("query.max-hash-partition-count", "16") .put("query.min-hash-partition-count", "2") + .put("query.min-hash-partition-count-for-write", "88") .put("query.manager-executor-pool-size", "11") .put("query.executor-pool-size", "111") .put("query.max-state-machine-callback-threads", "112") @@ -154,7 +160,9 @@ public void testExplicitPropertyMappings() .put("fault-tolerant-execution-standard-split-size", "33MB") .put("fault-tolerant-execution-max-task-split-count", "22") .put("fault-tolerant-execution-task-descriptor-storage-max-memory", "3GB") - .put("fault-tolerant-execution-partition-count", "123") + .put("fault-tolerant-execution-max-partition-count", "123") + .put("fault-tolerant-execution-min-partition-count", "12") + .put("fault-tolerant-execution-min-partition-count-for-write", "99") .put("experimental.fault-tolerant-execution-force-preferred-write-partitioning-enabled", "false") .put("query.max-writer-task-count", "101") .buildOrThrow(); @@ -170,8 +178,10 @@ public void testExplicitPropertyMappings() .setMinScheduleSplitBatchSize(9) .setMaxConcurrentQueries(10) .setMaxQueuedQueries(15) + .setDeterminePartitionCountForWriteEnabled(true) .setMaxHashPartitionCount(16) .setMinHashPartitionCount(2) + .setMinHashPartitionCountForWrite(88) .setQueryManagerExecutorPoolSize(11) .setQueryExecutorPoolSize(111) .setMaxStateMachineCallbackThreads(112) @@ -213,7 +223,9 @@ public void testExplicitPropertyMappings() .setFaultTolerantExecutionStandardSplitSize(DataSize.of(33, MEGABYTE)) .setFaultTolerantExecutionMaxTaskSplitCount(22) .setFaultTolerantExecutionTaskDescriptorStorageMaxMemory(DataSize.of(3, GIGABYTE)) - .setFaultTolerantExecutionPartitionCount(123) + .setFaultTolerantExecutionMaxPartitionCount(123) + .setFaultTolerantExecutionMinPartitionCount(12) + .setFaultTolerantExecutionMinPartitionCountForWrite(99) .setFaultTolerantExecutionForcePreferredWritePartitioningEnabled(false) .setMaxWriterTasksCount(101); diff --git a/core/trino-main/src/test/java/io/trino/sql/planner/optimizations/TestLimitMaxWriterNodesCount.java b/core/trino-main/src/test/java/io/trino/sql/planner/optimizations/TestLimitMaxWriterNodesCount.java index b71090735b25..90f301f3e8c6 100644 --- a/core/trino-main/src/test/java/io/trino/sql/planner/optimizations/TestLimitMaxWriterNodesCount.java +++ b/core/trino-main/src/test/java/io/trino/sql/planner/optimizations/TestLimitMaxWriterNodesCount.java @@ -291,7 +291,6 @@ public void testPlanWhenRetryPolicyIsTask() node(TableWriterNode.class, project( exchange(LOCAL, - // partitionCount for writing stage is empty because it is FTE mode exchange(REMOTE, SCALED_WRITER_HASH_DISTRIBUTION, Optional.empty(), project( values("column_a", "column_b")))))))); diff --git a/testing/trino-faulttolerant-tests/src/test/java/io/trino/faulttolerant/hive/TestHiveFaultTolerantExecutionConnectorTest.java b/testing/trino-faulttolerant-tests/src/test/java/io/trino/faulttolerant/hive/TestHiveFaultTolerantExecutionConnectorTest.java index db9e24b570d5..a6e8b7b04486 100644 --- a/testing/trino-faulttolerant-tests/src/test/java/io/trino/faulttolerant/hive/TestHiveFaultTolerantExecutionConnectorTest.java +++ b/testing/trino-faulttolerant-tests/src/test/java/io/trino/faulttolerant/hive/TestHiveFaultTolerantExecutionConnectorTest.java @@ -23,7 +23,8 @@ import org.testng.annotations.Test; import static io.airlift.units.DataSize.Unit.GIGABYTE; -import static io.trino.SystemSessionProperties.FAULT_TOLERANT_EXECUTION_PARTITION_COUNT; +import static io.trino.SystemSessionProperties.FAULT_TOLERANT_EXECUTION_MAX_PARTITION_COUNT; +import static io.trino.SystemSessionProperties.FAULT_TOLERANT_EXECUTION_MIN_PARTITION_COUNT; import static io.trino.plugin.exchange.filesystem.containers.MinioStorage.getExchangeManagerProperties; import static io.trino.testing.FaultTolerantExecutionConnectorTestHelper.getExtraProperties; import static io.trino.testing.TestingNames.randomNameSuffix; @@ -91,7 +92,8 @@ public void testWritersAcrossMultipleWorkersWhenScaleWritersIsEnabled() public void testMaxOutputPartitionCountCheck() { Session session = Session.builder(getSession()) - .setSystemProperty(FAULT_TOLERANT_EXECUTION_PARTITION_COUNT, "51") + .setSystemProperty(FAULT_TOLERANT_EXECUTION_MAX_PARTITION_COUNT, "51") + .setSystemProperty(FAULT_TOLERANT_EXECUTION_MIN_PARTITION_COUNT, "51") .build(); assertQueryFails(session, "SELECT nationkey, count(*) FROM nation GROUP BY nationkey", "Max number of output partitions exceeded for exchange.*"); } diff --git a/testing/trino-testing/src/main/java/io/trino/testing/BaseFailureRecoveryTest.java b/testing/trino-testing/src/main/java/io/trino/testing/BaseFailureRecoveryTest.java index ad209f4d0152..24fcb8c46c7a 100644 --- a/testing/trino-testing/src/main/java/io/trino/testing/BaseFailureRecoveryTest.java +++ b/testing/trino-testing/src/main/java/io/trino/testing/BaseFailureRecoveryTest.java @@ -117,7 +117,7 @@ protected final QueryRunner createQueryRunner() .put("failure-injection.request-timeout", new Duration(REQUEST_TIMEOUT.toMillis() * 2, MILLISECONDS).toString()) // making http timeouts shorter so tests which simulate communication timeouts finish in reasonable amount of time .put("exchange.http-client.idle-timeout", REQUEST_TIMEOUT.toString()) - .put("fault-tolerant-execution-partition-count", "5") + .put("fault-tolerant-execution-max-partition-count", "5") // to trigger spilling .put("exchange.deduplication-buffer-size", "1kB") .put("fault-tolerant-execution-task-memory", "1GB") diff --git a/testing/trino-testing/src/main/java/io/trino/testing/FaultTolerantExecutionConnectorTestHelper.java b/testing/trino-testing/src/main/java/io/trino/testing/FaultTolerantExecutionConnectorTestHelper.java index e1ab7038b761..22997f30babd 100644 --- a/testing/trino-testing/src/main/java/io/trino/testing/FaultTolerantExecutionConnectorTestHelper.java +++ b/testing/trino-testing/src/main/java/io/trino/testing/FaultTolerantExecutionConnectorTestHelper.java @@ -27,7 +27,7 @@ public static Map getExtraProperties() .put("retry-policy", "TASK") .put("retry-initial-delay", "50ms") .put("retry-max-delay", "100ms") - .put("fault-tolerant-execution-partition-count", "5") + .put("fault-tolerant-execution-max-partition-count", "5") .put("fault-tolerant-execution-arbitrary-distribution-compute-task-target-size-min", "5MB") .put("fault-tolerant-execution-arbitrary-distribution-compute-task-target-size-max", "10MB") .put("fault-tolerant-execution-arbitrary-distribution-write-task-target-size-min", "10MB")