diff --git a/core/trino-main/src/main/java/io/trino/SystemSessionProperties.java b/core/trino-main/src/main/java/io/trino/SystemSessionProperties.java index ac882e4f99bb..a6418921f141 100644 --- a/core/trino-main/src/main/java/io/trino/SystemSessionProperties.java +++ b/core/trino-main/src/main/java/io/trino/SystemSessionProperties.java @@ -28,6 +28,7 @@ import io.trino.sql.planner.OptimizerConfig; import io.trino.sql.planner.OptimizerConfig.JoinDistributionType; import io.trino.sql.planner.OptimizerConfig.JoinReorderingStrategy; +import io.trino.sql.planner.OptimizerConfig.MarkDistinctStrategy; import javax.inject.Inject; @@ -115,6 +116,7 @@ public final class SystemSessionProperties public static final String USE_PARTIAL_DISTINCT_LIMIT = "use_partial_distinct_limit"; public static final String MAX_RECURSION_DEPTH = "max_recursion_depth"; public static final String USE_MARK_DISTINCT = "use_mark_distinct"; + public static final String MARK_DISTINCT_STRATEGY = "mark_distinct_strategy"; public static final String PREFER_PARTIAL_AGGREGATION = "prefer_partial_aggregation"; public static final String OPTIMIZE_TOP_N_RANKING = "optimize_top_n_ranking"; public static final String MAX_GROUPING_SETS = "max_grouping_sets"; @@ -540,6 +542,12 @@ public SystemSessionProperties( "Implement DISTINCT aggregations using MarkDistinct", optimizerConfig.isUseMarkDistinct(), false), + enumProperty( + MARK_DISTINCT_STRATEGY, + "", + MarkDistinctStrategy.class, + optimizerConfig.getMarkDistinctStrategy(), + false), booleanProperty( PREFER_PARTIAL_AGGREGATION, "Prefer splitting aggregations into partial and final stages", @@ -1193,9 +1201,21 @@ public static int getFilterAndProjectMinOutputPageRowCount(Session session) return session.getSystemProperty(FILTER_AND_PROJECT_MIN_OUTPUT_PAGE_ROW_COUNT, Integer.class); } - public static boolean useMarkDistinct(Session session) + public static MarkDistinctStrategy markDistinctStrategy(Session session) { - return session.getSystemProperty(USE_MARK_DISTINCT, Boolean.class); + MarkDistinctStrategy markDistinctStrategy = session.getSystemProperty(MARK_DISTINCT_STRATEGY, MarkDistinctStrategy.class); + if (markDistinctStrategy != null) { + // mark_distinct_strategy is set, so it takes precedence over use_mark_distinct + return markDistinctStrategy; + } + + Boolean useMarkDistinct = session.getSystemProperty(USE_MARK_DISTINCT, Boolean.class); + if (useMarkDistinct == null) { + // both mark_distinct_strategy and use_mark_distinct have default null values, use AUTOMATIC + return MarkDistinctStrategy.AUTOMATIC; + } + // use_mark_distinct is set but mark_distinct_strategy is not, map use_mark_distinct to mark_distinct_strategy + return useMarkDistinct ? MarkDistinctStrategy.AUTOMATIC : MarkDistinctStrategy.NONE; } public static boolean preferPartialAggregation(Session session) diff --git a/core/trino-main/src/main/java/io/trino/sql/planner/OptimizerConfig.java b/core/trino-main/src/main/java/io/trino/sql/planner/OptimizerConfig.java index 029c8dc008eb..4c1274cc684a 100644 --- a/core/trino-main/src/main/java/io/trino/sql/planner/OptimizerConfig.java +++ b/core/trino-main/src/main/java/io/trino/sql/planner/OptimizerConfig.java @@ -19,6 +19,7 @@ import io.airlift.units.DataSize; import io.airlift.units.Duration; +import javax.annotation.Nullable; import javax.validation.constraints.Max; import javax.validation.constraints.Min; import javax.validation.constraints.NotNull; @@ -62,7 +63,10 @@ public class OptimizerConfig private boolean optimizeHashGeneration = true; private boolean pushTableWriteThroughUnion = true; private boolean dictionaryAggregation; - private boolean useMarkDistinct = true; + @Nullable + private Boolean useMarkDistinct; + @Nullable + private MarkDistinctStrategy markDistinctStrategy; private boolean preferPartialAggregation = true; private boolean pushAggregationThroughOuterJoin = true; private boolean enableIntermediateAggregations; @@ -115,6 +119,13 @@ public boolean canReplicate() } } + public enum MarkDistinctStrategy + { + NONE, + ALWAYS, + AUTOMATIC, + } + public double getCpuCostWeight() { return cpuCostWeight; @@ -461,18 +472,35 @@ public OptimizerConfig setOptimizeMetadataQueries(boolean optimizeMetadataQuerie return this; } - public boolean isUseMarkDistinct() + @Deprecated + @Nullable + public Boolean isUseMarkDistinct() { return useMarkDistinct; } - @Config("optimizer.use-mark-distinct") - public OptimizerConfig setUseMarkDistinct(boolean value) + @Deprecated + @LegacyConfig(value = "optimizer.use-mark-distinct", replacedBy = "optimizer.mark-distinct-strategy") + public OptimizerConfig setUseMarkDistinct(Boolean value) { this.useMarkDistinct = value; return this; } + @Nullable + public MarkDistinctStrategy getMarkDistinctStrategy() + { + return markDistinctStrategy; + } + + @Config("optimizer.mark-distinct-strategy") + @ConfigDescription("Strategy to use for distinct aggregations") + public OptimizerConfig setMarkDistinctStrategy(MarkDistinctStrategy markDistinctStrategy) + { + this.markDistinctStrategy = markDistinctStrategy; + return this; + } + public boolean isPreferPartialAggregation() { return preferPartialAggregation; diff --git a/core/trino-main/src/main/java/io/trino/sql/planner/PlanOptimizers.java b/core/trino-main/src/main/java/io/trino/sql/planner/PlanOptimizers.java index ac7938cff741..641e676516d2 100644 --- a/core/trino-main/src/main/java/io/trino/sql/planner/PlanOptimizers.java +++ b/core/trino-main/src/main/java/io/trino/sql/planner/PlanOptimizers.java @@ -648,7 +648,7 @@ public PlanOptimizers( new RemoveRedundantIdentityProjections(), new PushAggregationThroughOuterJoin(), new ReplaceRedundantJoinWithSource(), // Run this after PredicatePushDown optimizer as it inlines filter constants - new MultipleDistinctAggregationToMarkDistinct())), // Run this after aggregation pushdown so that multiple distinct aggregations can be pushed into a connector + new MultipleDistinctAggregationToMarkDistinct(taskCountEstimator))), // Run this after aggregation pushdown so that multiple distinct aggregations can be pushed into a connector inlineProjections, simplifyOptimizer, // Re-run the SimplifyExpressions to simplify any recomposed expressions from other optimizations pushProjectionIntoTableScanOptimizer, diff --git a/core/trino-main/src/main/java/io/trino/sql/planner/iterative/rule/MultipleDistinctAggregationToMarkDistinct.java b/core/trino-main/src/main/java/io/trino/sql/planner/iterative/rule/MultipleDistinctAggregationToMarkDistinct.java index d132adf1741c..80c79d590d54 100644 --- a/core/trino-main/src/main/java/io/trino/sql/planner/iterative/rule/MultipleDistinctAggregationToMarkDistinct.java +++ b/core/trino-main/src/main/java/io/trino/sql/planner/iterative/rule/MultipleDistinctAggregationToMarkDistinct.java @@ -17,9 +17,10 @@ import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableSet; import com.google.common.collect.Iterables; -import io.trino.SystemSessionProperties; +import io.trino.cost.TaskCountEstimator; import io.trino.matching.Captures; import io.trino.matching.Pattern; +import io.trino.sql.planner.OptimizerConfig.MarkDistinctStrategy; import io.trino.sql.planner.Symbol; import io.trino.sql.planner.iterative.Rule; import io.trino.sql.planner.plan.AggregationNode; @@ -33,8 +34,14 @@ import java.util.Optional; import java.util.Set; +import static io.trino.SystemSessionProperties.getTaskConcurrency; +import static io.trino.SystemSessionProperties.isOptimizeDistinctAggregationEnabled; +import static io.trino.SystemSessionProperties.markDistinctStrategy; import static io.trino.spi.type.BooleanType.BOOLEAN; +import static io.trino.sql.planner.OptimizerConfig.MarkDistinctStrategy.AUTOMATIC; +import static io.trino.sql.planner.OptimizerConfig.MarkDistinctStrategy.NONE; import static io.trino.sql.planner.plan.Patterns.aggregation; +import static java.util.Objects.requireNonNull; import static java.util.stream.Collectors.toSet; /** @@ -62,6 +69,9 @@ public class MultipleDistinctAggregationToMarkDistinct implements Rule { + private static final int MARK_DISTINCT_MAX_OUTPUT_ROW_COUNT_MULTIPLIER = 8; + private static final int OPTIMIZED_DISTINCT_MAX_OUTPUT_ROW_COUNT_MULTIPLIER = MARK_DISTINCT_MAX_OUTPUT_ROW_COUNT_MULTIPLIER * 8; + private static final Pattern PATTERN = aggregation() .matching( Predicates.and( @@ -98,6 +108,13 @@ private static boolean hasMixedDistinctAndNonDistincts(AggregationNode aggregati return distincts > 0 && distincts < aggregationNode.getAggregations().size(); } + private final TaskCountEstimator taskCountEstimator; + + public MultipleDistinctAggregationToMarkDistinct(TaskCountEstimator taskCountEstimator) + { + this.taskCountEstimator = requireNonNull(taskCountEstimator, "taskCountEstimator is null"); + } + @Override public Pattern getPattern() { @@ -107,7 +124,12 @@ public Pattern getPattern() @Override public Result apply(AggregationNode parent, Captures captures, Context context) { - if (!SystemSessionProperties.useMarkDistinct(context.getSession())) { + MarkDistinctStrategy markDistinctStrategy = markDistinctStrategy(context.getSession()); + if (markDistinctStrategy.equals(NONE)) { + return Result.empty(); + } + + if (markDistinctStrategy.equals(AUTOMATIC) && !shouldAddMarkDistinct(parent, context)) { return Result.empty(); } @@ -165,4 +187,51 @@ public Result apply(AggregationNode parent, Captures captures, Context context) .setPreGroupedSymbols(ImmutableList.of()) .build()); } + + private boolean shouldAddMarkDistinct(AggregationNode aggregationNode, Context context) + { + if (aggregationNode.getGroupingKeys().isEmpty()) { + // global distinct aggregation is computed using a single thread. MarkDistinct will help parallelize the execution. + return true; + } + if (aggregationNode.getGroupingKeys().size() > 1) { + // NDV stats for multiple grouping keys are unreliable, let's keep MarkDistinct for this case to avoid significant slowdown or OOM/too big hash table issues in case of + // overestimation of very small NDV with big number of distinct values inside the groups. + return true; + } + double numberOfDistinctValues = context.getStatsProvider().getStats(aggregationNode).getOutputRowCount(); + if (Double.isNaN(numberOfDistinctValues)) { + // if the estimate is unknown, use MarkDistinct to avoid query failure + return true; + } + int maxNumberOfConcurrentThreadsForAggregation = taskCountEstimator.estimateHashedTaskCount(context.getSession()) * getTaskConcurrency(context.getSession()); + + if (numberOfDistinctValues <= MARK_DISTINCT_MAX_OUTPUT_ROW_COUNT_MULTIPLIER * maxNumberOfConcurrentThreadsForAggregation) { + // small numberOfDistinctValues reduces the distinct aggregation parallelism, also because the partitioning may be skewed. + // This makes query to underutilize the cluster CPU but also to possibly concentrate memory on few nodes. + // MarkDistinct should increase the parallelism at a cost of CPU. + return true; + } + + if (isOptimizeDistinctAggregationEnabled(context.getSession()) && + numberOfDistinctValues <= OPTIMIZED_DISTINCT_MAX_OUTPUT_ROW_COUNT_MULTIPLIER * maxNumberOfConcurrentThreadsForAggregation && + hasSingleDistinctAndNonDistincts(aggregationNode)) { + // with medium number of numberOfDistinctValues, OptimizeMixedDistinctAggregations + // will be beneficial for query latency (duration) over distinct aggregation at a cost of increased CPU, + // but it relies on existence of MarkDistinct nodes. + return true; + } + + return false; + } + + private static boolean hasSingleDistinctAndNonDistincts(AggregationNode aggregationNode) + { + long distincts = aggregationNode.getAggregations() + .values().stream() + .filter(Aggregation::isDistinct) + .count(); + + return distincts == 1 && distincts < aggregationNode.getAggregations().size(); + } } diff --git a/core/trino-main/src/test/java/io/trino/cost/TestOptimizerConfig.java b/core/trino-main/src/test/java/io/trino/cost/TestOptimizerConfig.java index e1b17bf866b8..4576695ba9e5 100644 --- a/core/trino-main/src/test/java/io/trino/cost/TestOptimizerConfig.java +++ b/core/trino-main/src/test/java/io/trino/cost/TestOptimizerConfig.java @@ -30,6 +30,7 @@ import static io.airlift.units.DataSize.Unit.MEGABYTE; import static io.trino.sql.planner.OptimizerConfig.JoinDistributionType.BROADCAST; import static io.trino.sql.planner.OptimizerConfig.JoinReorderingStrategy.NONE; +import static io.trino.sql.planner.OptimizerConfig.MarkDistinctStrategy.ALWAYS; import static java.util.concurrent.TimeUnit.MINUTES; import static java.util.concurrent.TimeUnit.SECONDS; @@ -69,7 +70,7 @@ public void testDefaults() .setPushAggregationThroughOuterJoin(true) .setPushPartialAggregationThoughJoin(false) .setPreAggregateCaseAggregationsEnabled(true) - .setUseMarkDistinct(true) + .setMarkDistinctStrategy(null) .setPreferPartialAggregation(true) .setOptimizeTopNRanking(true) .setDistributedSortEnabled(true) @@ -129,7 +130,7 @@ public void testExplicitPropertyMappings() .put("optimizer.pre-aggregate-case-aggregations.enabled", "false") .put("optimizer.enable-intermediate-aggregations", "true") .put("optimizer.force-single-node-output", "true") - .put("optimizer.use-mark-distinct", "false") + .put("optimizer.mark-distinct-strategy", "always") .put("optimizer.prefer-partial-aggregation", "false") .put("optimizer.optimize-top-n-ranking", "false") .put("optimizer.skip-redundant-sort", "false") @@ -182,7 +183,7 @@ public void testExplicitPropertyMappings() .setPushPartialAggregationThoughJoin(true) .setPreAggregateCaseAggregationsEnabled(false) .setEnableIntermediateAggregations(true) - .setUseMarkDistinct(false) + .setMarkDistinctStrategy(ALWAYS) .setPreferPartialAggregation(false) .setOptimizeTopNRanking(false) .setDistributedSortEnabled(false) diff --git a/core/trino-main/src/test/java/io/trino/sql/planner/iterative/rule/TestMultipleDistinctAggregationToMarkDistinct.java b/core/trino-main/src/test/java/io/trino/sql/planner/iterative/rule/TestMultipleDistinctAggregationToMarkDistinct.java index fff5f12371c1..767f831cfb50 100644 --- a/core/trino-main/src/test/java/io/trino/sql/planner/iterative/rule/TestMultipleDistinctAggregationToMarkDistinct.java +++ b/core/trino-main/src/test/java/io/trino/sql/planner/iterative/rule/TestMultipleDistinctAggregationToMarkDistinct.java @@ -14,16 +14,40 @@ package io.trino.sql.planner.iterative.rule; import com.google.common.collect.ImmutableList; +import com.google.common.collect.ImmutableMap; +import io.trino.cost.PlanNodeStatsEstimate; +import io.trino.cost.TaskCountEstimator; +import io.trino.sql.planner.assertions.PlanMatchPattern; import io.trino.sql.planner.iterative.rule.test.BaseRuleTest; +import io.trino.sql.planner.iterative.rule.test.PlanBuilder; import io.trino.sql.planner.plan.Assignments; +import io.trino.sql.planner.plan.PlanNode; +import io.trino.sql.planner.plan.PlanNodeId; import org.testng.annotations.Test; +import java.util.Optional; +import java.util.function.Function; + +import static io.trino.SystemSessionProperties.MARK_DISTINCT_STRATEGY; +import static io.trino.SystemSessionProperties.OPTIMIZE_DISTINCT_AGGREGATIONS; +import static io.trino.SystemSessionProperties.TASK_CONCURRENCY; +import static io.trino.SystemSessionProperties.USE_MARK_DISTINCT; import static io.trino.spi.type.BigintType.BIGINT; +import static io.trino.sql.planner.assertions.PlanMatchPattern.aggregation; +import static io.trino.sql.planner.assertions.PlanMatchPattern.functionCall; +import static io.trino.sql.planner.assertions.PlanMatchPattern.globalAggregation; +import static io.trino.sql.planner.assertions.PlanMatchPattern.markDistinct; +import static io.trino.sql.planner.assertions.PlanMatchPattern.singleGroupingSet; +import static io.trino.sql.planner.assertions.PlanMatchPattern.values; import static io.trino.sql.planner.iterative.rule.test.PlanBuilder.expression; +import static io.trino.sql.planner.plan.AggregationNode.Step.SINGLE; public class TestMultipleDistinctAggregationToMarkDistinct extends BaseRuleTest { + private static final int NODES_COUNT = 4; + private static final TaskCountEstimator TASK_COUNT_ESTIMATOR = new TaskCountEstimator(() -> NODES_COUNT); + @Test public void testNoDistinct() { @@ -42,7 +66,7 @@ public void testNoDistinct() @Test public void testSingleDistinct() { - tester().assertThat(new MultipleDistinctAggregationToMarkDistinct()) + tester().assertThat(new MultipleDistinctAggregationToMarkDistinct(TASK_COUNT_ESTIMATOR)) .on(p -> p.aggregation(builder -> builder .globalGrouping() .addAggregation(p.symbol("output1"), expression("count(DISTINCT input1)"), ImmutableList.of(BIGINT)) @@ -56,7 +80,7 @@ public void testSingleDistinct() @Test public void testMultipleAggregations() { - tester().assertThat(new MultipleDistinctAggregationToMarkDistinct()) + tester().assertThat(new MultipleDistinctAggregationToMarkDistinct(TASK_COUNT_ESTIMATOR)) .on(p -> p.aggregation(builder -> builder .globalGrouping() .addAggregation(p.symbol("output1"), expression("count(DISTINCT input)"), ImmutableList.of(BIGINT)) @@ -69,7 +93,7 @@ public void testMultipleAggregations() @Test public void testDistinctWithFilter() { - tester().assertThat(new MultipleDistinctAggregationToMarkDistinct()) + tester().assertThat(new MultipleDistinctAggregationToMarkDistinct(TASK_COUNT_ESTIMATOR)) .on(p -> p.aggregation(builder -> builder .globalGrouping() .addAggregation(p.symbol("output1"), expression("count(DISTINCT input1) filter (where filter1)"), ImmutableList.of(BIGINT)) @@ -87,7 +111,7 @@ public void testDistinctWithFilter() p.symbol("input2")))))) .doesNotFire(); - tester().assertThat(new MultipleDistinctAggregationToMarkDistinct()) + tester().assertThat(new MultipleDistinctAggregationToMarkDistinct(TASK_COUNT_ESTIMATOR)) .on(p -> p.aggregation(builder -> builder .globalGrouping() .addAggregation(p.symbol("output1"), expression("count(DISTINCT input1) filter (where filter1)"), ImmutableList.of(BIGINT)) @@ -105,4 +129,154 @@ public void testDistinctWithFilter() p.symbol("input2")))))) .doesNotFire(); } + + @Test + public void testGlobalAggregation() + { + tester().assertThat(new MultipleDistinctAggregationToMarkDistinct(TASK_COUNT_ESTIMATOR)) + .on(p -> p.aggregation(builder -> builder + .globalGrouping() + .addAggregation(p.symbol("output1"), expression("count(DISTINCT input1)"), ImmutableList.of(BIGINT)) + .addAggregation(p.symbol("output2"), expression("count(DISTINCT input2)"), ImmutableList.of(BIGINT)) + .source( + p.values(p.symbol("input1"), p.symbol("input2"))))) + .matches(aggregation( + globalAggregation(), + ImmutableMap.of( + Optional.of("output1"), functionCall("count", ImmutableList.of("input1")), + Optional.of("output2"), functionCall("count", ImmutableList.of("input2"))), + ImmutableList.of(), + ImmutableList.of("mark_input1", "mark_input2"), + Optional.empty(), + SINGLE, + markDistinct( + "mark_input2", + ImmutableList.of("input2"), + markDistinct( + "mark_input1", + ImmutableList.of("input1"), + values(ImmutableMap.of("input1", 0, "input2", 1)))))); + } + + @Test + public void testAggregationNDV() + { + PlanNodeId aggregationNodeId = new PlanNodeId("aggregationNodeId"); + Function plan = p -> p.aggregation(builder -> builder + .nodeId(aggregationNodeId) + .singleGroupingSet(p.symbol("key")) + .addAggregation(p.symbol("output1"), expression("count(DISTINCT input)"), ImmutableList.of(BIGINT)) + .addAggregation(p.symbol("output2"), expression("sum(input)"), ImmutableList.of(BIGINT)) + .source( + p.values(p.symbol("input"), p.symbol("key")))); + PlanMatchPattern expectedMarkDistinct = aggregation( + singleGroupingSet("key"), + ImmutableMap.of( + Optional.of("output1"), functionCall("count", ImmutableList.of("input")), + Optional.of("output2"), functionCall("sum", ImmutableList.of("input"))), + ImmutableList.of(), + ImmutableList.of("mark_input"), + Optional.empty(), + SINGLE, + markDistinct( + "mark_input", + ImmutableList.of("input", "key"), + values(ImmutableMap.of("input", 0, "key", 1)))); + + int clusterThreadCount = NODES_COUNT * tester().getSession().getSystemProperty(TASK_CONCURRENCY, Integer.class); + + // small NDV + tester().assertThat(new MultipleDistinctAggregationToMarkDistinct(TASK_COUNT_ESTIMATOR)) + .on(plan) + .overrideStats(aggregationNodeId.toString(), PlanNodeStatsEstimate.builder().setOutputRowCount(2 * clusterThreadCount).build()) + .matches(expectedMarkDistinct); + + // unknown estimate + tester().assertThat(new MultipleDistinctAggregationToMarkDistinct(TASK_COUNT_ESTIMATOR)) + .on(plan) + .overrideStats(aggregationNodeId.toString(), PlanNodeStatsEstimate.builder().setOutputRowCount(Double.NaN).build()) + .matches(expectedMarkDistinct); + + // medium NDV, optimize_mixed_distinct_aggregations enabled + tester().assertThat(new MultipleDistinctAggregationToMarkDistinct(TASK_COUNT_ESTIMATOR)) + .on(plan) + .setSystemProperty(OPTIMIZE_DISTINCT_AGGREGATIONS, "true") + .overrideStats(aggregationNodeId.toString(), PlanNodeStatsEstimate.builder().setOutputRowCount(50 * clusterThreadCount).build()) + .matches(expectedMarkDistinct); + + // medium NDV, optimize_mixed_distinct_aggregations disabled + tester().assertThat(new MultipleDistinctAggregationToMarkDistinct(TASK_COUNT_ESTIMATOR)) + .on(plan) + .setSystemProperty(OPTIMIZE_DISTINCT_AGGREGATIONS, "false") + .overrideStats(aggregationNodeId.toString(), PlanNodeStatsEstimate.builder().setOutputRowCount(50 * clusterThreadCount).build()) + .doesNotFire(); + + // medium NDV, optimize_mixed_distinct_aggregations enabled but plan has multiple distinct aggregations + tester().assertThat(new MultipleDistinctAggregationToMarkDistinct(TASK_COUNT_ESTIMATOR)) + .on(p -> p.aggregation(builder -> builder + .nodeId(aggregationNodeId) + .singleGroupingSet(p.symbol("key")) + .addAggregation(p.symbol("output1"), expression("count(DISTINCT input1)"), ImmutableList.of(BIGINT)) + .addAggregation(p.symbol("output2"), expression("count(DISTINCT input2)"), ImmutableList.of(BIGINT)) + .source( + p.values(p.symbol("input1"), p.symbol("input2"), p.symbol("key"))))) + .setSystemProperty(OPTIMIZE_DISTINCT_AGGREGATIONS, "true") + .overrideStats(aggregationNodeId.toString(), PlanNodeStatsEstimate.builder().setOutputRowCount(50 * clusterThreadCount).build()) + .doesNotFire(); + + // big NDV + tester().assertThat(new MultipleDistinctAggregationToMarkDistinct(TASK_COUNT_ESTIMATOR)) + .on(plan) + .overrideStats(aggregationNodeId.toString(), PlanNodeStatsEstimate.builder().setOutputRowCount(1000 * clusterThreadCount).build()) + .doesNotFire(); + + // big NDV, mark_distinct_strategy = always, use_mark_distinct = null + tester().assertThat(new MultipleDistinctAggregationToMarkDistinct(TASK_COUNT_ESTIMATOR)) + .on(plan) + .setSystemProperty(MARK_DISTINCT_STRATEGY, "always") + .overrideStats(aggregationNodeId.toString(), PlanNodeStatsEstimate.builder().setOutputRowCount(1000 * clusterThreadCount).build()) + .matches(expectedMarkDistinct); + // big NDV, mark_distinct_strategy = null, use_mark_distinct = true + tester().assertThat(new MultipleDistinctAggregationToMarkDistinct(TASK_COUNT_ESTIMATOR)) + .on(plan) + .setSystemProperty(USE_MARK_DISTINCT, "true") + .overrideStats(aggregationNodeId.toString(), PlanNodeStatsEstimate.builder().setOutputRowCount(1000 * clusterThreadCount).build()) + .doesNotFire(); + // small NDV, mark_distinct_strategy = none, use_mark_distinct = null + tester().assertThat(new MultipleDistinctAggregationToMarkDistinct(TASK_COUNT_ESTIMATOR)) + .on(plan) + .setSystemProperty(MARK_DISTINCT_STRATEGY, "none") + .overrideStats(aggregationNodeId.toString(), PlanNodeStatsEstimate.builder().setOutputRowCount(2 * clusterThreadCount).build()) + .doesNotFire(); + // small NDV, mark_distinct_strategy = null, use_mark_distinct = false + tester().assertThat(new MultipleDistinctAggregationToMarkDistinct(TASK_COUNT_ESTIMATOR)) + .on(plan) + .setSystemProperty(USE_MARK_DISTINCT, "false") + .overrideStats(aggregationNodeId.toString(), PlanNodeStatsEstimate.builder().setOutputRowCount(2 * clusterThreadCount).build()) + .doesNotFire(); + + // big NDV but on multiple grouping keys + tester().assertThat(new MultipleDistinctAggregationToMarkDistinct(TASK_COUNT_ESTIMATOR)) + .on(p -> p.aggregation(builder -> builder + .nodeId(aggregationNodeId) + .singleGroupingSet(p.symbol("key1"), p.symbol("key2")) + .addAggregation(p.symbol("output1"), expression("count(DISTINCT input)"), ImmutableList.of(BIGINT)) + .addAggregation(p.symbol("output2"), expression("sum(input)"), ImmutableList.of(BIGINT)) + .source( + p.values(p.symbol("input"), p.symbol("key1"), p.symbol("key2"))))) + .overrideStats(aggregationNodeId.toString(), PlanNodeStatsEstimate.builder().setOutputRowCount(1000 * clusterThreadCount).build()) + .matches(aggregation( + singleGroupingSet("key1", "key2"), + ImmutableMap.of( + Optional.of("output1"), functionCall("count", ImmutableList.of("input")), + Optional.of("output2"), functionCall("sum", ImmutableList.of("input"))), + ImmutableList.of(), + ImmutableList.of("mark_input"), + Optional.empty(), + SINGLE, + markDistinct( + "mark_input", + ImmutableList.of("input", "key1", "key2"), + values(ImmutableMap.of("input", 0, "key1", 1, "key2", 2))))); + } } diff --git a/core/trino-main/src/test/java/io/trino/sql/planner/optimizations/TestAddExchangesPlans.java b/core/trino-main/src/test/java/io/trino/sql/planner/optimizations/TestAddExchangesPlans.java index 15915e786898..f18fd5f4156b 100644 --- a/core/trino-main/src/test/java/io/trino/sql/planner/optimizations/TestAddExchangesPlans.java +++ b/core/trino-main/src/test/java/io/trino/sql/planner/optimizations/TestAddExchangesPlans.java @@ -42,6 +42,7 @@ import static io.trino.SystemSessionProperties.JOIN_DISTRIBUTION_TYPE; import static io.trino.SystemSessionProperties.JOIN_PARTITIONED_BUILD_MIN_ROW_COUNT; import static io.trino.SystemSessionProperties.JOIN_REORDERING_STRATEGY; +import static io.trino.SystemSessionProperties.MARK_DISTINCT_STRATEGY; import static io.trino.SystemSessionProperties.SPILL_ENABLED; import static io.trino.SystemSessionProperties.TASK_CONCURRENCY; import static io.trino.SystemSessionProperties.USE_EXACT_PARTITIONING; @@ -224,6 +225,7 @@ public void testForcePartitioningMarkDistinctInput() query, Session.builder(getQueryRunner().getDefaultSession()) .setSystemProperty(IGNORE_DOWNSTREAM_PREFERENCES, "true") + .setSystemProperty(MARK_DISTINCT_STRATEGY, "always") .build(), anyTree( node(MarkDistinctNode.class, @@ -243,6 +245,7 @@ public void testForcePartitioningMarkDistinctInput() query, Session.builder(getQueryRunner().getDefaultSession()) .setSystemProperty(IGNORE_DOWNSTREAM_PREFERENCES, "false") + .setSystemProperty(MARK_DISTINCT_STRATEGY, "always") .build(), anyTree( node(MarkDistinctNode.class, diff --git a/core/trino-main/src/test/java/io/trino/sql/planner/optimizations/TestOptimizeMixedDistinctAggregations.java b/core/trino-main/src/test/java/io/trino/sql/planner/optimizations/TestOptimizeMixedDistinctAggregations.java index 1aea7d48fcb5..35a29c72cc88 100644 --- a/core/trino-main/src/test/java/io/trino/sql/planner/optimizations/TestOptimizeMixedDistinctAggregations.java +++ b/core/trino-main/src/test/java/io/trino/sql/planner/optimizations/TestOptimizeMixedDistinctAggregations.java @@ -17,6 +17,7 @@ import com.google.common.collect.ImmutableMap; import com.google.common.collect.ImmutableSet; import io.trino.SystemSessionProperties; +import io.trino.cost.TaskCountEstimator; import io.trino.sql.planner.RuleStatsRecorder; import io.trino.sql.planner.assertions.BasePlanTest; import io.trino.sql.planner.assertions.ExpectedValueProvider; @@ -51,7 +52,9 @@ public class TestOptimizeMixedDistinctAggregations { public TestOptimizeMixedDistinctAggregations() { - super(ImmutableMap.of(SystemSessionProperties.OPTIMIZE_DISTINCT_AGGREGATIONS, "true")); + super(ImmutableMap.of( + SystemSessionProperties.OPTIMIZE_DISTINCT_AGGREGATIONS, "true", + SystemSessionProperties.MARK_DISTINCT_STRATEGY, "always")); } @Test @@ -125,7 +128,7 @@ private void assertUnitPlan(String sql, PlanMatchPattern pattern) ImmutableSet.of( new RemoveRedundantIdentityProjections(), new SingleDistinctAggregationToGroupBy(), - new MultipleDistinctAggregationToMarkDistinct())), + new MultipleDistinctAggregationToMarkDistinct(new TaskCountEstimator(() -> 4)))), new OptimizeMixedDistinctAggregations(getQueryRunner().getMetadata()), new IterativeOptimizer( getQueryRunner().getPlannerContext(), diff --git a/docs/src/main/sphinx/admin/properties-optimizer.rst b/docs/src/main/sphinx/admin/properties-optimizer.rst index 153cfa77a8ea..4ba875985f01 100644 --- a/docs/src/main/sphinx/admin/properties-optimizer.rst +++ b/docs/src/main/sphinx/admin/properties-optimizer.rst @@ -44,6 +44,26 @@ partition keys for partitions that have no rows. In particular, the Hive connect can return empty partitions, if they were created by other systems. Trino cannot create them. +``optimizer.mark-distinct-strategy`` +^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ + +* **Type:** :ref:`prop-type-string` +* **Allowed values:** ``AUTOMATIC``, ``ALWAYS``, ``NONE`` +* **Default value:** ``AUTOMATIC`` + +The mark distinct strategy to use for distinct aggregations. ``NONE`` does not use +``MarkDistinct`` operator. ``ALWAYS`` uses ``MarkDistinct`` for multiple distinct +aggregations or for mix of distinct and non-distinct aggregations. +``AUTOMATIC`` limits the use of ``MarkDistinct`` only for cases with limited +concurrency (global or small cardinality aggregations), where direct distinct +aggregation implementation cannot utilize CPU efficiently. +``optimizer.mark-distinct-strategy`` overrides, if set, the deprecated +``optimizer.use-mark-distinct``. If ``optimizer.mark-distinct-strategy`` is not +set, but ``optimizer.use-mark-distinct`` is then ``optimizer.use-mark-distinct`` +is mapped to ``optimizer.mark-distinct-strategy`` with value ``true`` mapped to +``AUTOMATIC`` and value ``false`` mapped to ``NONE``.The strategy can be specified +on a per-query basis using the ``mark_distinct_strategy`` session property. + ``optimizer.push-aggregation-through-outer-join`` ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^