Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Use MarkDistinct only in limited cases #15927

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import io.trino.sql.planner.OptimizerConfig;
import io.trino.sql.planner.OptimizerConfig.JoinDistributionType;
import io.trino.sql.planner.OptimizerConfig.JoinReorderingStrategy;
import io.trino.sql.planner.OptimizerConfig.MarkDistinctStrategy;

import javax.inject.Inject;

Expand Down Expand Up @@ -115,6 +116,7 @@ public final class SystemSessionProperties
public static final String USE_PARTIAL_DISTINCT_LIMIT = "use_partial_distinct_limit";
public static final String MAX_RECURSION_DEPTH = "max_recursion_depth";
public static final String USE_MARK_DISTINCT = "use_mark_distinct";
public static final String MARK_DISTINCT_STRATEGY = "mark_distinct_strategy";
public static final String PREFER_PARTIAL_AGGREGATION = "prefer_partial_aggregation";
public static final String OPTIMIZE_TOP_N_RANKING = "optimize_top_n_ranking";
public static final String MAX_GROUPING_SETS = "max_grouping_sets";
Expand Down Expand Up @@ -540,6 +542,12 @@ public SystemSessionProperties(
"Implement DISTINCT aggregations using MarkDistinct",
optimizerConfig.isUseMarkDistinct(),
false),
enumProperty(
MARK_DISTINCT_STRATEGY,
"",
MarkDistinctStrategy.class,
optimizerConfig.getMarkDistinctStrategy(),
false),
booleanProperty(
PREFER_PARTIAL_AGGREGATION,
"Prefer splitting aggregations into partial and final stages",
Expand Down Expand Up @@ -1193,9 +1201,21 @@ public static int getFilterAndProjectMinOutputPageRowCount(Session session)
return session.getSystemProperty(FILTER_AND_PROJECT_MIN_OUTPUT_PAGE_ROW_COUNT, Integer.class);
}

public static boolean useMarkDistinct(Session session)
public static MarkDistinctStrategy markDistinctStrategy(Session session)
{
return session.getSystemProperty(USE_MARK_DISTINCT, Boolean.class);
MarkDistinctStrategy markDistinctStrategy = session.getSystemProperty(MARK_DISTINCT_STRATEGY, MarkDistinctStrategy.class);
if (markDistinctStrategy != null) {
// mark_distinct_strategy is set, so it takes precedence over use_mark_distinct
return markDistinctStrategy;
}

Boolean useMarkDistinct = session.getSystemProperty(USE_MARK_DISTINCT, Boolean.class);
if (useMarkDistinct == null) {
// both mark_distinct_strategy and use_mark_distinct have default null values, use AUTOMATIC
return MarkDistinctStrategy.AUTOMATIC;
}
// use_mark_distinct is set but mark_distinct_strategy is not, map use_mark_distinct to mark_distinct_strategy
return useMarkDistinct ? MarkDistinctStrategy.AUTOMATIC : MarkDistinctStrategy.NONE;
}

public static boolean preferPartialAggregation(Session session)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
import io.airlift.units.DataSize;
import io.airlift.units.Duration;

import javax.annotation.Nullable;
import javax.validation.constraints.Max;
import javax.validation.constraints.Min;
import javax.validation.constraints.NotNull;
Expand Down Expand Up @@ -62,7 +63,10 @@ public class OptimizerConfig
private boolean optimizeHashGeneration = true;
private boolean pushTableWriteThroughUnion = true;
private boolean dictionaryAggregation;
private boolean useMarkDistinct = true;
@Nullable
private Boolean useMarkDistinct;
@Nullable
private MarkDistinctStrategy markDistinctStrategy;
private boolean preferPartialAggregation = true;
private boolean pushAggregationThroughOuterJoin = true;
private boolean enableIntermediateAggregations;
Expand Down Expand Up @@ -115,6 +119,13 @@ public boolean canReplicate()
}
}

public enum MarkDistinctStrategy
{
NONE,
ALWAYS,
AUTOMATIC,
}

public double getCpuCostWeight()
{
return cpuCostWeight;
Expand Down Expand Up @@ -461,18 +472,35 @@ public OptimizerConfig setOptimizeMetadataQueries(boolean optimizeMetadataQuerie
return this;
}

public boolean isUseMarkDistinct()
@Deprecated
@Nullable
public Boolean isUseMarkDistinct()
{
return useMarkDistinct;
}

@Config("optimizer.use-mark-distinct")
public OptimizerConfig setUseMarkDistinct(boolean value)
@Deprecated
@LegacyConfig(value = "optimizer.use-mark-distinct", replacedBy = "optimizer.mark-distinct-strategy")
raunaqmorarka marked this conversation as resolved.
Show resolved Hide resolved
public OptimizerConfig setUseMarkDistinct(Boolean value)
{
this.useMarkDistinct = value;
return this;
}

@Nullable
public MarkDistinctStrategy getMarkDistinctStrategy()
{
return markDistinctStrategy;
}

@Config("optimizer.mark-distinct-strategy")
@ConfigDescription("Strategy to use for distinct aggregations")
public OptimizerConfig setMarkDistinctStrategy(MarkDistinctStrategy markDistinctStrategy)
{
this.markDistinctStrategy = markDistinctStrategy;
return this;
}

public boolean isPreferPartialAggregation()
{
return preferPartialAggregation;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -648,7 +648,7 @@ public PlanOptimizers(
new RemoveRedundantIdentityProjections(),
new PushAggregationThroughOuterJoin(),
new ReplaceRedundantJoinWithSource(), // Run this after PredicatePushDown optimizer as it inlines filter constants
new MultipleDistinctAggregationToMarkDistinct())), // Run this after aggregation pushdown so that multiple distinct aggregations can be pushed into a connector
new MultipleDistinctAggregationToMarkDistinct(taskCountEstimator))), // Run this after aggregation pushdown so that multiple distinct aggregations can be pushed into a connector
inlineProjections,
simplifyOptimizer, // Re-run the SimplifyExpressions to simplify any recomposed expressions from other optimizations
pushProjectionIntoTableScanOptimizer,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,10 @@
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Iterables;
import io.trino.SystemSessionProperties;
import io.trino.cost.TaskCountEstimator;
import io.trino.matching.Captures;
import io.trino.matching.Pattern;
import io.trino.sql.planner.OptimizerConfig.MarkDistinctStrategy;
import io.trino.sql.planner.Symbol;
import io.trino.sql.planner.iterative.Rule;
import io.trino.sql.planner.plan.AggregationNode;
Expand All @@ -33,8 +34,14 @@
import java.util.Optional;
import java.util.Set;

import static io.trino.SystemSessionProperties.getTaskConcurrency;
import static io.trino.SystemSessionProperties.isOptimizeDistinctAggregationEnabled;
import static io.trino.SystemSessionProperties.markDistinctStrategy;
import static io.trino.spi.type.BooleanType.BOOLEAN;
import static io.trino.sql.planner.OptimizerConfig.MarkDistinctStrategy.AUTOMATIC;
import static io.trino.sql.planner.OptimizerConfig.MarkDistinctStrategy.NONE;
import static io.trino.sql.planner.plan.Patterns.aggregation;
import static java.util.Objects.requireNonNull;
import static java.util.stream.Collectors.toSet;

/**
Expand Down Expand Up @@ -62,6 +69,9 @@
public class MultipleDistinctAggregationToMarkDistinct
implements Rule<AggregationNode>
{
private static final int MARK_DISTINCT_MAX_OUTPUT_ROW_COUNT_MULTIPLIER = 8;
private static final int OPTIMIZED_DISTINCT_MAX_OUTPUT_ROW_COUNT_MULTIPLIER = MARK_DISTINCT_MAX_OUTPUT_ROW_COUNT_MULTIPLIER * 8;

private static final Pattern<AggregationNode> PATTERN = aggregation()
.matching(
Predicates.and(
Expand Down Expand Up @@ -98,6 +108,13 @@ private static boolean hasMixedDistinctAndNonDistincts(AggregationNode aggregati
return distincts > 0 && distincts < aggregationNode.getAggregations().size();
}

private final TaskCountEstimator taskCountEstimator;

public MultipleDistinctAggregationToMarkDistinct(TaskCountEstimator taskCountEstimator)
{
this.taskCountEstimator = requireNonNull(taskCountEstimator, "taskCountEstimator is null");
}

@Override
public Pattern<AggregationNode> getPattern()
{
Expand All @@ -107,7 +124,12 @@ public Pattern<AggregationNode> getPattern()
@Override
public Result apply(AggregationNode parent, Captures captures, Context context)
{
if (!SystemSessionProperties.useMarkDistinct(context.getSession())) {
MarkDistinctStrategy markDistinctStrategy = markDistinctStrategy(context.getSession());
if (markDistinctStrategy.equals(NONE)) {
return Result.empty();
}

if (markDistinctStrategy.equals(AUTOMATIC) && !shouldAddMarkDistinct(parent, context)) {
return Result.empty();
}

Expand Down Expand Up @@ -165,4 +187,51 @@ public Result apply(AggregationNode parent, Captures captures, Context context)
.setPreGroupedSymbols(ImmutableList.of())
.build());
}

private boolean shouldAddMarkDistinct(AggregationNode aggregationNode, Context context)
{
if (aggregationNode.getGroupingKeys().isEmpty()) {
// global distinct aggregation is computed using a single thread. MarkDistinct will help parallelize the execution.
return true;
}
if (aggregationNode.getGroupingKeys().size() > 1) {
// NDV stats for multiple grouping keys are unreliable, let's keep MarkDistinct for this case to avoid significant slowdown or OOM/too big hash table issues in case of
// overestimation of very small NDV with big number of distinct values inside the groups.
return true;
}
double numberOfDistinctValues = context.getStatsProvider().getStats(aggregationNode).getOutputRowCount();
lukasz-stec marked this conversation as resolved.
Show resolved Hide resolved
if (Double.isNaN(numberOfDistinctValues)) {
// if the estimate is unknown, use MarkDistinct to avoid query failure
return true;
}
int maxNumberOfConcurrentThreadsForAggregation = taskCountEstimator.estimateHashedTaskCount(context.getSession()) * getTaskConcurrency(context.getSession());

if (numberOfDistinctValues <= MARK_DISTINCT_MAX_OUTPUT_ROW_COUNT_MULTIPLIER * maxNumberOfConcurrentThreadsForAggregation) {
// small numberOfDistinctValues reduces the distinct aggregation parallelism, also because the partitioning may be skewed.
// This makes query to underutilize the cluster CPU but also to possibly concentrate memory on few nodes.
// MarkDistinct should increase the parallelism at a cost of CPU.
lukasz-stec marked this conversation as resolved.
Show resolved Hide resolved
return true;
}

if (isOptimizeDistinctAggregationEnabled(context.getSession()) &&
numberOfDistinctValues <= OPTIMIZED_DISTINCT_MAX_OUTPUT_ROW_COUNT_MULTIPLIER * maxNumberOfConcurrentThreadsForAggregation &&
hasSingleDistinctAndNonDistincts(aggregationNode)) {
// with medium number of numberOfDistinctValues, OptimizeMixedDistinctAggregations
// will be beneficial for query latency (duration) over distinct aggregation at a cost of increased CPU,
// but it relies on existence of MarkDistinct nodes.
return true;
}

return false;
}

private static boolean hasSingleDistinctAndNonDistincts(AggregationNode aggregationNode)
{
long distincts = aggregationNode.getAggregations()
.values().stream()
.filter(Aggregation::isDistinct)
.count();

return distincts == 1 && distincts < aggregationNode.getAggregations().size();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
import static io.airlift.units.DataSize.Unit.MEGABYTE;
import static io.trino.sql.planner.OptimizerConfig.JoinDistributionType.BROADCAST;
import static io.trino.sql.planner.OptimizerConfig.JoinReorderingStrategy.NONE;
import static io.trino.sql.planner.OptimizerConfig.MarkDistinctStrategy.ALWAYS;
import static java.util.concurrent.TimeUnit.MINUTES;
import static java.util.concurrent.TimeUnit.SECONDS;

Expand Down Expand Up @@ -69,7 +70,7 @@ public void testDefaults()
.setPushAggregationThroughOuterJoin(true)
.setPushPartialAggregationThoughJoin(false)
.setPreAggregateCaseAggregationsEnabled(true)
.setUseMarkDistinct(true)
.setMarkDistinctStrategy(null)
.setPreferPartialAggregation(true)
.setOptimizeTopNRanking(true)
.setDistributedSortEnabled(true)
Expand Down Expand Up @@ -129,7 +130,7 @@ public void testExplicitPropertyMappings()
.put("optimizer.pre-aggregate-case-aggregations.enabled", "false")
.put("optimizer.enable-intermediate-aggregations", "true")
.put("optimizer.force-single-node-output", "true")
.put("optimizer.use-mark-distinct", "false")
.put("optimizer.mark-distinct-strategy", "always")
.put("optimizer.prefer-partial-aggregation", "false")
.put("optimizer.optimize-top-n-ranking", "false")
.put("optimizer.skip-redundant-sort", "false")
Expand Down Expand Up @@ -182,7 +183,7 @@ public void testExplicitPropertyMappings()
.setPushPartialAggregationThoughJoin(true)
.setPreAggregateCaseAggregationsEnabled(false)
.setEnableIntermediateAggregations(true)
.setUseMarkDistinct(false)
.setMarkDistinctStrategy(ALWAYS)
.setPreferPartialAggregation(false)
.setOptimizeTopNRanking(false)
.setDistributedSortEnabled(false)
Expand Down
Loading