Skip to content

Commit

Permalink
Move dynamic filter evaluation ahead of static filter
Browse files Browse the repository at this point in the history
Dynamic filter evaluation can turn itself off when it's selectivity is
low. So it is better to evaluate it before the static filter as only
selective dynamic filters will be retained after processing a few input pages
  • Loading branch information
raunaqmorarka committed Sep 9, 2024
1 parent c558172 commit d70159b
Show file tree
Hide file tree
Showing 3 changed files with 18 additions and 18 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -113,16 +113,16 @@ public WorkProcessor<Page> createWorkProcessor(
}

SelectedPositions activePositions = positionsRange(0, page.getPositionCount());
FilterEvaluator.SelectionResult staticFilterResult = new FilterEvaluator.SelectionResult(activePositions, 0);
if (filterEvaluator.isPresent()) {
staticFilterResult = filterEvaluator.get().evaluate(session, activePositions, page);
metrics.recordFilterTime(staticFilterResult.filterTimeNanos());
FilterEvaluator.SelectionResult dynamicFilterResult = new FilterEvaluator.SelectionResult(activePositions, 0);
if (dynamicFilterEvaluator.isPresent()) {
dynamicFilterResult = dynamicFilterEvaluator.get().evaluate(session, activePositions, page);
metrics.recordDynamicFilterMetrics(dynamicFilterResult.filterTimeNanos(), dynamicFilterResult.selectedPositions().size());
}

FilterEvaluator.SelectionResult result = staticFilterResult;
if (dynamicFilterEvaluator.isPresent()) {
result = dynamicFilterEvaluator.get().evaluate(session, staticFilterResult.selectedPositions(), page);
metrics.recordDynamicFilterMetrics(result.filterTimeNanos(), staticFilterResult.selectedPositions().size());
FilterEvaluator.SelectionResult result = dynamicFilterResult;
if (filterEvaluator.isPresent()) {
result = filterEvaluator.get().evaluate(session, dynamicFilterResult.selectedPositions(), page);
metrics.recordFilterTime(result.filterTimeNanos());
}
SelectedPositions selectedPositions = result.selectedPositions();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,25 +27,25 @@ public class PageProcessorMetrics
private static final String FILTER_TIME = "Filter CPU time";
private static final String PROJECTION_TIME = "Projection CPU time";
public static final String DYNAMIC_FILTER_TIME = "Dynamic Filter CPU time";
public static final String DYNAMIC_FILTER_INPUT_POSITIONS = "Dynamic Filter input positions";
public static final String DYNAMIC_FILTER_OUTPUT_POSITIONS = "Dynamic Filter output positions";

private long filterTimeNanos;
private boolean hasFilter;
private long projectionTimeNanos;
private boolean hasProjection;
private long dynamicFilterTimeNanos;
private long dynamicFilterInputPositions;
private long dynamicFilterOutputPositions;

public void recordFilterTime(long filterTimeNanos)
{
this.filterTimeNanos += filterTimeNanos;
hasFilter = true;
}

public void recordDynamicFilterMetrics(long filterTimeNanos, long inputPositions)
public void recordDynamicFilterMetrics(long filterTimeNanos, long outputPositions)
{
dynamicFilterTimeNanos += filterTimeNanos;
dynamicFilterInputPositions += inputPositions;
dynamicFilterOutputPositions += outputPositions;
}

public void recordProjectionTime(long projectionTimeNanos)
Expand All @@ -60,9 +60,9 @@ public Metrics getMetrics()
if (hasFilter) {
builder.put(FILTER_TIME, new DurationTiming(new Duration(filterTimeNanos, NANOSECONDS)));
}
if (dynamicFilterInputPositions > 0) {
if (dynamicFilterOutputPositions > 0) {
builder.put(DYNAMIC_FILTER_TIME, new DurationTiming(new Duration(dynamicFilterTimeNanos, NANOSECONDS)));
builder.put(DYNAMIC_FILTER_INPUT_POSITIONS, new LongCount(dynamicFilterInputPositions));
builder.put(DYNAMIC_FILTER_OUTPUT_POSITIONS, new LongCount(dynamicFilterOutputPositions));
}
if (hasProjection) {
builder.put(PROJECTION_TIME, new DurationTiming(new Duration(projectionTimeNanos, NANOSECONDS)));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@

import static io.trino.SystemSessionProperties.DYNAMIC_ROW_FILTERING_SELECTIVITY_THRESHOLD;
import static io.trino.SystemSessionProperties.ENABLE_DYNAMIC_ROW_FILTERING;
import static io.trino.operator.project.PageProcessorMetrics.DYNAMIC_FILTER_INPUT_POSITIONS;
import static io.trino.operator.project.PageProcessorMetrics.DYNAMIC_FILTER_OUTPUT_POSITIONS;
import static io.trino.operator.project.PageProcessorMetrics.DYNAMIC_FILTER_TIME;
import static io.trino.sql.DynamicFilters.extractDynamicFilters;
import static io.trino.testing.QueryAssertions.assertEqualsIgnoreOrder;
Expand Down Expand Up @@ -159,8 +159,8 @@ protected void assertRowFiltering(@Language("SQL") String sql, JoinDistributionT
.isLessThan(noRowFilteringProbeStats.getOutputPositions());

Map<String, Metric<?>> metrics = rowFilteringProbeStats.getMetrics().getMetrics();
long filterInputPositions = ((Count<?>) metrics.get(DYNAMIC_FILTER_INPUT_POSITIONS)).getTotal();
assertThat(rowFilteringProbeStats.getOutputPositions()).isLessThan(filterInputPositions);
long filterOutputPositions = ((Count<?>) metrics.get(DYNAMIC_FILTER_OUTPUT_POSITIONS)).getTotal();
assertThat(filterOutputPositions).isLessThan(rowFilteringProbeStats.getInputPositions());
assertThat(((DurationTiming) metrics.get(DYNAMIC_FILTER_TIME)).getDuration())
.isGreaterThan(Duration.ZERO);
}
Expand Down Expand Up @@ -188,7 +188,7 @@ protected void assertNoRowFiltering(@Language("SQL") String sql, JoinDistributio
.isEqualTo(rowFilteringProbeStats.getPhysicalInputPositions());

Map<String, Metric<?>> metrics = rowFilteringProbeStats.getMetrics().getMetrics();
long filterInputPositions = ((Count<?>) metrics.get(DYNAMIC_FILTER_INPUT_POSITIONS)).getTotal();
long filterInputPositions = ((Count<?>) metrics.get(DYNAMIC_FILTER_OUTPUT_POSITIONS)).getTotal();
assertThat(rowFilteringProbeStats.getOutputPositions()).isEqualTo(filterInputPositions);
assertThat(((DurationTiming) metrics.get(DYNAMIC_FILTER_TIME)).getDuration())
.isGreaterThan(Duration.ZERO);
Expand Down

0 comments on commit d70159b

Please sign in to comment.