Skip to content

Commit

Permalink
Use MarkDistinct only in limited cases
Browse files Browse the repository at this point in the history
MarkDistinct is beneficial only when there is
a limited parallelism in query (e.g. global distinct
aggregation is handled by a single thread).
For cases where there is enough parallelism, MarkDistinct
hurts performance due to excessive data shuffle
and introducing multiple stages in case of
multiple distinct aggregations.
  • Loading branch information
lukasz-stec authored and raunaqmorarka committed Mar 2, 2023
1 parent 20a2a44 commit 0372246
Show file tree
Hide file tree
Showing 9 changed files with 336 additions and 18 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import io.trino.sql.planner.OptimizerConfig;
import io.trino.sql.planner.OptimizerConfig.JoinDistributionType;
import io.trino.sql.planner.OptimizerConfig.JoinReorderingStrategy;
import io.trino.sql.planner.OptimizerConfig.MarkDistinctStrategy;

import javax.inject.Inject;

Expand Down Expand Up @@ -115,6 +116,7 @@ public final class SystemSessionProperties
public static final String USE_PARTIAL_DISTINCT_LIMIT = "use_partial_distinct_limit";
public static final String MAX_RECURSION_DEPTH = "max_recursion_depth";
public static final String USE_MARK_DISTINCT = "use_mark_distinct";
public static final String MARK_DISTINCT_STRATEGY = "mark_distinct_strategy";
public static final String PREFER_PARTIAL_AGGREGATION = "prefer_partial_aggregation";
public static final String OPTIMIZE_TOP_N_RANKING = "optimize_top_n_ranking";
public static final String MAX_GROUPING_SETS = "max_grouping_sets";
Expand Down Expand Up @@ -540,6 +542,12 @@ public SystemSessionProperties(
"Implement DISTINCT aggregations using MarkDistinct",
optimizerConfig.isUseMarkDistinct(),
false),
enumProperty(
MARK_DISTINCT_STRATEGY,
"",
MarkDistinctStrategy.class,
optimizerConfig.getMarkDistinctStrategy(),
false),
booleanProperty(
PREFER_PARTIAL_AGGREGATION,
"Prefer splitting aggregations into partial and final stages",
Expand Down Expand Up @@ -1193,9 +1201,21 @@ public static int getFilterAndProjectMinOutputPageRowCount(Session session)
return session.getSystemProperty(FILTER_AND_PROJECT_MIN_OUTPUT_PAGE_ROW_COUNT, Integer.class);
}

public static boolean useMarkDistinct(Session session)
public static MarkDistinctStrategy markDistinctStrategy(Session session)
{
return session.getSystemProperty(USE_MARK_DISTINCT, Boolean.class);
MarkDistinctStrategy markDistinctStrategy = session.getSystemProperty(MARK_DISTINCT_STRATEGY, MarkDistinctStrategy.class);
if (markDistinctStrategy != null) {
// mark_distinct_strategy is set, so it takes precedence over use_mark_distinct
return markDistinctStrategy;
}

Boolean useMarkDistinct = session.getSystemProperty(USE_MARK_DISTINCT, Boolean.class);
if (useMarkDistinct == null) {
// both mark_distinct_strategy and use_mark_distinct have default null values, use AUTOMATIC
return MarkDistinctStrategy.AUTOMATIC;
}
// use_mark_distinct is set but mark_distinct_strategy is not, map use_mark_distinct to mark_distinct_strategy
return useMarkDistinct ? MarkDistinctStrategy.AUTOMATIC : MarkDistinctStrategy.NONE;
}

public static boolean preferPartialAggregation(Session session)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
import io.airlift.units.DataSize;
import io.airlift.units.Duration;

import javax.annotation.Nullable;
import javax.validation.constraints.Max;
import javax.validation.constraints.Min;
import javax.validation.constraints.NotNull;
Expand Down Expand Up @@ -62,7 +63,10 @@ public class OptimizerConfig
private boolean optimizeHashGeneration = true;
private boolean pushTableWriteThroughUnion = true;
private boolean dictionaryAggregation;
private boolean useMarkDistinct = true;
@Nullable
private Boolean useMarkDistinct;
@Nullable
private MarkDistinctStrategy markDistinctStrategy;
private boolean preferPartialAggregation = true;
private boolean pushAggregationThroughOuterJoin = true;
private boolean enableIntermediateAggregations;
Expand Down Expand Up @@ -115,6 +119,13 @@ public boolean canReplicate()
}
}

public enum MarkDistinctStrategy
{
NONE,
ALWAYS,
AUTOMATIC,
}

public double getCpuCostWeight()
{
return cpuCostWeight;
Expand Down Expand Up @@ -461,18 +472,35 @@ public OptimizerConfig setOptimizeMetadataQueries(boolean optimizeMetadataQuerie
return this;
}

public boolean isUseMarkDistinct()
@Deprecated
@Nullable
public Boolean isUseMarkDistinct()
{
return useMarkDistinct;
}

@Config("optimizer.use-mark-distinct")
public OptimizerConfig setUseMarkDistinct(boolean value)
@Deprecated
@LegacyConfig(value = "optimizer.use-mark-distinct", replacedBy = "optimizer.mark-distinct-strategy")
public OptimizerConfig setUseMarkDistinct(Boolean value)
{
this.useMarkDistinct = value;
return this;
}

@Nullable
public MarkDistinctStrategy getMarkDistinctStrategy()
{
return markDistinctStrategy;
}

@Config("optimizer.mark-distinct-strategy")
@ConfigDescription("Strategy to use for distinct aggregations")
public OptimizerConfig setMarkDistinctStrategy(MarkDistinctStrategy markDistinctStrategy)
{
this.markDistinctStrategy = markDistinctStrategy;
return this;
}

public boolean isPreferPartialAggregation()
{
return preferPartialAggregation;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -648,7 +648,7 @@ public PlanOptimizers(
new RemoveRedundantIdentityProjections(),
new PushAggregationThroughOuterJoin(),
new ReplaceRedundantJoinWithSource(), // Run this after PredicatePushDown optimizer as it inlines filter constants
new MultipleDistinctAggregationToMarkDistinct())), // Run this after aggregation pushdown so that multiple distinct aggregations can be pushed into a connector
new MultipleDistinctAggregationToMarkDistinct(taskCountEstimator))), // Run this after aggregation pushdown so that multiple distinct aggregations can be pushed into a connector
inlineProjections,
simplifyOptimizer, // Re-run the SimplifyExpressions to simplify any recomposed expressions from other optimizations
pushProjectionIntoTableScanOptimizer,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,10 @@
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Iterables;
import io.trino.SystemSessionProperties;
import io.trino.cost.TaskCountEstimator;
import io.trino.matching.Captures;
import io.trino.matching.Pattern;
import io.trino.sql.planner.OptimizerConfig.MarkDistinctStrategy;
import io.trino.sql.planner.Symbol;
import io.trino.sql.planner.iterative.Rule;
import io.trino.sql.planner.plan.AggregationNode;
Expand All @@ -33,8 +34,14 @@
import java.util.Optional;
import java.util.Set;

import static io.trino.SystemSessionProperties.getTaskConcurrency;
import static io.trino.SystemSessionProperties.isOptimizeDistinctAggregationEnabled;
import static io.trino.SystemSessionProperties.markDistinctStrategy;
import static io.trino.spi.type.BooleanType.BOOLEAN;
import static io.trino.sql.planner.OptimizerConfig.MarkDistinctStrategy.AUTOMATIC;
import static io.trino.sql.planner.OptimizerConfig.MarkDistinctStrategy.NONE;
import static io.trino.sql.planner.plan.Patterns.aggregation;
import static java.util.Objects.requireNonNull;
import static java.util.stream.Collectors.toSet;

/**
Expand Down Expand Up @@ -62,6 +69,9 @@
public class MultipleDistinctAggregationToMarkDistinct
implements Rule<AggregationNode>
{
private static final int MARK_DISTINCT_MAX_OUTPUT_ROW_COUNT_MULTIPLIER = 8;
private static final int OPTIMIZED_DISTINCT_MAX_OUTPUT_ROW_COUNT_MULTIPLIER = MARK_DISTINCT_MAX_OUTPUT_ROW_COUNT_MULTIPLIER * 8;

private static final Pattern<AggregationNode> PATTERN = aggregation()
.matching(
Predicates.and(
Expand Down Expand Up @@ -98,6 +108,13 @@ private static boolean hasMixedDistinctAndNonDistincts(AggregationNode aggregati
return distincts > 0 && distincts < aggregationNode.getAggregations().size();
}

private final TaskCountEstimator taskCountEstimator;

public MultipleDistinctAggregationToMarkDistinct(TaskCountEstimator taskCountEstimator)
{
this.taskCountEstimator = requireNonNull(taskCountEstimator, "taskCountEstimator is null");
}

@Override
public Pattern<AggregationNode> getPattern()
{
Expand All @@ -107,7 +124,12 @@ public Pattern<AggregationNode> getPattern()
@Override
public Result apply(AggregationNode parent, Captures captures, Context context)
{
if (!SystemSessionProperties.useMarkDistinct(context.getSession())) {
MarkDistinctStrategy markDistinctStrategy = markDistinctStrategy(context.getSession());
if (markDistinctStrategy.equals(NONE)) {
return Result.empty();
}

if (markDistinctStrategy.equals(AUTOMATIC) && !shouldAddMarkDistinct(parent, context)) {
return Result.empty();
}

Expand Down Expand Up @@ -165,4 +187,51 @@ public Result apply(AggregationNode parent, Captures captures, Context context)
.setPreGroupedSymbols(ImmutableList.of())
.build());
}

private boolean shouldAddMarkDistinct(AggregationNode aggregationNode, Context context)
{
if (aggregationNode.getGroupingKeys().isEmpty()) {
// global distinct aggregation is computed using a single thread. MarkDistinct will help parallelize the execution.
return true;
}
if (aggregationNode.getGroupingKeys().size() > 1) {
// NDV stats for multiple grouping keys are unreliable, let's keep MarkDistinct for this case to avoid significant slowdown or OOM/too big hash table issues in case of
// overestimation of very small NDV with big number of distinct values inside the groups.
return true;
}
double numberOfDistinctValues = context.getStatsProvider().getStats(aggregationNode).getOutputRowCount();
if (Double.isNaN(numberOfDistinctValues)) {
// if the estimate is unknown, use MarkDistinct to avoid query failure
return true;
}
int maxNumberOfConcurrentThreadsForAggregation = taskCountEstimator.estimateHashedTaskCount(context.getSession()) * getTaskConcurrency(context.getSession());

if (numberOfDistinctValues <= MARK_DISTINCT_MAX_OUTPUT_ROW_COUNT_MULTIPLIER * maxNumberOfConcurrentThreadsForAggregation) {
// small numberOfDistinctValues reduces the distinct aggregation parallelism, also because the partitioning may be skewed.
// This makes query to underutilize the cluster CPU but also to possibly concentrate memory on few nodes.
// MarkDistinct should increase the parallelism at a cost of CPU.
return true;
}

if (isOptimizeDistinctAggregationEnabled(context.getSession()) &&
numberOfDistinctValues <= OPTIMIZED_DISTINCT_MAX_OUTPUT_ROW_COUNT_MULTIPLIER * maxNumberOfConcurrentThreadsForAggregation &&
hasSingleDistinctAndNonDistincts(aggregationNode)) {
// with medium number of numberOfDistinctValues, OptimizeMixedDistinctAggregations
// will be beneficial for query latency (duration) over distinct aggregation at a cost of increased CPU,
// but it relies on existence of MarkDistinct nodes.
return true;
}

return false;
}

private static boolean hasSingleDistinctAndNonDistincts(AggregationNode aggregationNode)
{
long distincts = aggregationNode.getAggregations()
.values().stream()
.filter(Aggregation::isDistinct)
.count();

return distincts == 1 && distincts < aggregationNode.getAggregations().size();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
import static io.airlift.units.DataSize.Unit.MEGABYTE;
import static io.trino.sql.planner.OptimizerConfig.JoinDistributionType.BROADCAST;
import static io.trino.sql.planner.OptimizerConfig.JoinReorderingStrategy.NONE;
import static io.trino.sql.planner.OptimizerConfig.MarkDistinctStrategy.ALWAYS;
import static java.util.concurrent.TimeUnit.MINUTES;
import static java.util.concurrent.TimeUnit.SECONDS;

Expand Down Expand Up @@ -69,7 +70,7 @@ public void testDefaults()
.setPushAggregationThroughOuterJoin(true)
.setPushPartialAggregationThoughJoin(false)
.setPreAggregateCaseAggregationsEnabled(true)
.setUseMarkDistinct(true)
.setMarkDistinctStrategy(null)
.setPreferPartialAggregation(true)
.setOptimizeTopNRanking(true)
.setDistributedSortEnabled(true)
Expand Down Expand Up @@ -129,7 +130,7 @@ public void testExplicitPropertyMappings()
.put("optimizer.pre-aggregate-case-aggregations.enabled", "false")
.put("optimizer.enable-intermediate-aggregations", "true")
.put("optimizer.force-single-node-output", "true")
.put("optimizer.use-mark-distinct", "false")
.put("optimizer.mark-distinct-strategy", "always")
.put("optimizer.prefer-partial-aggregation", "false")
.put("optimizer.optimize-top-n-ranking", "false")
.put("optimizer.skip-redundant-sort", "false")
Expand Down Expand Up @@ -182,7 +183,7 @@ public void testExplicitPropertyMappings()
.setPushPartialAggregationThoughJoin(true)
.setPreAggregateCaseAggregationsEnabled(false)
.setEnableIntermediateAggregations(true)
.setUseMarkDistinct(false)
.setMarkDistinctStrategy(ALWAYS)
.setPreferPartialAggregation(false)
.setOptimizeTopNRanking(false)
.setDistributedSortEnabled(false)
Expand Down
Loading

0 comments on commit 0372246

Please sign in to comment.