Skip to content

Commit

Permalink
Make DynamicPageFilter accept DynamicFilter per split
Browse files Browse the repository at this point in the history
Sub-query cache may provide different instance of DynamicFilter per-split
  • Loading branch information
sopel39 committed Jul 29, 2024
1 parent a183680 commit e7aa589
Show file tree
Hide file tree
Showing 13 changed files with 116 additions and 37 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@
import java.io.UncheckedIOException;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.function.Function;
import java.util.function.Supplier;

import static com.google.common.base.Preconditions.checkState;
Expand Down Expand Up @@ -411,7 +412,7 @@ public static class ScanFilterAndProjectOperatorFactory
private final int operatorId;
private final PlanNodeId planNodeId;
private final Supplier<CursorProcessor> cursorProcessor;
private final Supplier<PageProcessor> pageProcessor;
private final Function<DynamicFilter, PageProcessor> pageProcessor;
private final PlanNodeId sourceId;
private final PageSourceProvider pageSourceProvider;
private final TableHandle table;
Expand All @@ -428,7 +429,7 @@ public ScanFilterAndProjectOperatorFactory(
PlanNodeId sourceId,
PageSourceProviderFactory pageSourceProvider,
Supplier<CursorProcessor> cursorProcessor,
Supplier<PageProcessor> pageProcessor,
Function<DynamicFilter, PageProcessor> pageProcessor,
TableHandle table,
Iterable<ColumnHandle> columns,
DynamicFilter dynamicFilter,
Expand Down Expand Up @@ -496,7 +497,7 @@ public WorkProcessorSourceOperator create(
split,
pageSourceProvider,
cursorProcessor.get(),
pageProcessor.get(),
pageProcessor.apply(dynamicFilter),
table,
columns,
dynamicFilter,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import io.trino.operator.project.PageProcessor;
import io.trino.operator.project.PageProjection;
import io.trino.spi.TrinoException;
import io.trino.spi.connector.DynamicFilter;
import io.trino.sql.gen.columnar.ColumnarFilterCompiler;
import io.trino.sql.gen.columnar.DynamicPageFilter;
import io.trino.sql.gen.columnar.FilterEvaluator;
Expand All @@ -40,6 +41,7 @@
import java.util.Objects;
import java.util.Optional;
import java.util.OptionalInt;
import java.util.function.Function;
import java.util.function.Supplier;

import static com.google.common.base.MoreObjects.toStringHelper;
Expand Down Expand Up @@ -106,7 +108,7 @@ public Supplier<CursorProcessor> compileCursorProcessor(Optional<RowExpression>
};
}

public Supplier<PageProcessor> compilePageProcessor(
public Function<DynamicFilter, PageProcessor> compilePageProcessor(
boolean columnarFilterEvaluationEnabled,
Optional<RowExpression> filter,
Optional<DynamicPageFilter> dynamicPageFilter,
Expand All @@ -125,7 +127,7 @@ public Supplier<PageProcessor> compilePageProcessor(
.collect(toImmutableList());

Optional<Supplier<PageFilter>> finalFilterFunctionSupplier = filterFunctionSupplier;
return () -> {
return (dynamicFilter) -> {
Optional<FilterEvaluator> filterEvaluator = columnarFilterEvaluatorSupplier.map(Supplier::get);
if (filterEvaluator.isEmpty()) {
filterEvaluator = finalFilterFunctionSupplier
Expand All @@ -136,7 +138,7 @@ public Supplier<PageProcessor> compilePageProcessor(
.map(Supplier::get)
.collect(toImmutableList());
Optional<FilterEvaluator> dynamicFilterEvaluator = dynamicPageFilter
.map(dynamicFilter -> dynamicFilter.createDynamicPageFilterEvaluator(columnarFilterCompiler))
.map(pageFilter -> pageFilter.createDynamicPageFilterEvaluator(columnarFilterCompiler, dynamicFilter))
.map(Supplier::get);
return new PageProcessor(filterEvaluator, dynamicFilterEvaluator, pageProjections, initialBatchSize);
};
Expand All @@ -145,13 +147,15 @@ public Supplier<PageProcessor> compilePageProcessor(
@VisibleForTesting
public Supplier<PageProcessor> compilePageProcessor(Optional<RowExpression> filter, List<? extends RowExpression> projections)
{
return compilePageProcessor(true, filter, Optional.empty(), projections, Optional.empty(), OptionalInt.empty());
return () -> compilePageProcessor(true, filter, Optional.empty(), projections, Optional.empty(), OptionalInt.empty())
.apply(DynamicFilter.EMPTY);
}

@VisibleForTesting
public Supplier<PageProcessor> compilePageProcessor(Optional<RowExpression> filter, List<? extends RowExpression> projections, int initialBatchSize)
{
return compilePageProcessor(true, filter, Optional.empty(), projections, Optional.empty(), OptionalInt.of(initialBatchSize));
return () -> compilePageProcessor(true, filter, Optional.empty(), projections, Optional.empty(), OptionalInt.of(initialBatchSize))
.apply(DynamicFilter.EMPTY);
}

private <T> Class<? extends T> compile(Optional<RowExpression> filter, List<RowExpression> projections, BodyCompiler bodyCompiler, Class<? extends T> superType)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,6 @@ public final class DynamicPageFilter
private final Session session;
private final IrExpressionOptimizer irExpressionOptimizer;
private final DomainTranslator domainTranslator;
private final DynamicFilter dynamicFilter;
private final Map<ColumnHandle, Symbol> columnHandles;
private final Map<Symbol, Integer> sourceLayout;
private final double selectivityThreshold;
Expand All @@ -64,11 +63,13 @@ public final class DynamicPageFilter
@Nullable
@GuardedBy("this")
private CompletableFuture<?> isBlocked;
@Nullable
@GuardedBy("this")
private DynamicFilter currentDynamicFilter;

public DynamicPageFilter(
PlannerContext plannerContext,
Session session,
DynamicFilter dynamicFilter,
Map<Symbol, ColumnHandle> columnHandles,
Map<Symbol, Integer> sourceLayout,
double selectivityThreshold)
Expand All @@ -78,20 +79,25 @@ public DynamicPageFilter(
this.session = requireNonNull(session, "session is null");
this.irExpressionOptimizer = newOptimizer(plannerContext);
this.domainTranslator = new DomainTranslator(plannerContext.getMetadata());
this.dynamicFilter = requireNonNull(dynamicFilter, "dynamicFilter is null");
this.columnHandles = columnHandles.entrySet()
.stream()
.collect(toImmutableMap(Map.Entry::getValue, Map.Entry::getKey));
this.sourceLayout = ImmutableMap.copyOf(sourceLayout);
this.selectivityThreshold = selectivityThreshold;
this.isBlocked = dynamicFilter.isBlocked();
}

// Compiled dynamic filter is fixed per-split and generated duration page source creation.
// Page source implementations may subsequently implement blocking on completion of dynamic filters, but since
// that occurs after page source creation, we cannot be guaranteed a completed dynamic filter here for initial splits
public synchronized Supplier<FilterEvaluator> createDynamicPageFilterEvaluator(ColumnarFilterCompiler compiler)
public synchronized Supplier<FilterEvaluator> createDynamicPageFilterEvaluator(ColumnarFilterCompiler compiler, DynamicFilter dynamicFilter)
{
requireNonNull(dynamicFilter, "dynamicFilter is null");
// Sub-query cache may provide different instance of DynamicFilter per-split.
if (!dynamicFilter.equals(currentDynamicFilter)) {
compiledDynamicFilter = null;
currentDynamicFilter = dynamicFilter;
isBlocked = dynamicFilter.isBlocked();
}
if (isBlocked == null) {
return compiledDynamicFilter;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1992,12 +1992,11 @@ else if (sourceNode instanceof SampleNode sampleNode) {
dynamicPageFilterFactory = Optional.of(new DynamicPageFilter(
plannerContext,
session,
dynamicFilter,
((TableScanNode) sourceNode).getAssignments(),
sourceLayout,
getDynamicRowFilterSelectivityThreshold(session)));
}
Supplier<PageProcessor> pageProcessor = expressionCompiler.compilePageProcessor(
Function<DynamicFilter, PageProcessor> pageProcessor = expressionCompiler.compilePageProcessor(
columnarFilterEvaluationEnabled,
translatedFilter,
dynamicPageFilterFactory,
Expand Down Expand Up @@ -2028,7 +2027,7 @@ else if (sourceNode instanceof SampleNode sampleNode) {
OperatorFactory operatorFactory = FilterAndProjectOperator.createOperatorFactory(
context.getNextOperatorId(),
planNodeId,
pageProcessor,
() -> pageProcessor.apply(dynamicFilter),
getTypes(projections),
getFilterAndProjectMinOutputPageSize(session),
getFilterAndProjectMinOutputPageRowCount(session));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -184,7 +184,7 @@ private void createScanFilterAndProjectOperatorFactories(List<Page> inputPages,
new PlanNodeId("test_source"),
(catalog) -> (session, split, table, columns, dynamicFilter) -> new FixedPageSource(inputPages),
() -> cursorProcessor,
() -> pageProcessor,
(_) -> pageProcessor,
TEST_TABLE_HANDLE,
columnHandles,
DynamicFilter.EMPTY,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -133,7 +133,7 @@ public void testPageSource()
new PlanNodeId("0"),
(catalog) -> (session, split, table, columns, dynamicFilter) -> new FixedPageSource(ImmutableList.of(input)),
cursorProcessor,
pageProcessor,
(_) -> pageProcessor.get(),
TEST_TABLE_HANDLE,
ImmutableList.of(),
DynamicFilter.EMPTY,
Expand Down Expand Up @@ -175,7 +175,7 @@ public void testPageSourceMergeOutput()
new PlanNodeId("0"),
(catalog) -> (session, split, table, columns, dynamicFilter) -> new FixedPageSource(input),
cursorProcessor,
pageProcessor,
(_) -> pageProcessor.get(),
TEST_TABLE_HANDLE,
ImmutableList.of(),
DynamicFilter.EMPTY,
Expand Down Expand Up @@ -220,7 +220,7 @@ public void testPageSourceLazyLoad()
new PlanNodeId("0"),
(catalog) -> (session, split, table, columns, dynamicFilter) -> new SinglePagePageSource(input),
cursorProcessor,
() -> pageProcessor,
(_) -> pageProcessor,
TEST_TABLE_HANDLE,
ImmutableList.of(),
DynamicFilter.EMPTY,
Expand Down Expand Up @@ -254,7 +254,7 @@ public void testRecordCursorSource()
new PlanNodeId("0"),
(catalog) -> (session, split, table, columns, dynamicFilter) -> new RecordPageSource(new PageRecordSet(ImmutableList.of(VARCHAR), input)),
cursorProcessor,
pageProcessor,
(_) -> pageProcessor.get(),
TEST_TABLE_HANDLE,
ImmutableList.of(),
DynamicFilter.EMPTY,
Expand Down Expand Up @@ -309,7 +309,7 @@ public void testPageYield()
new PlanNodeId("0"),
(catalog) -> (session, split, table, columns, dynamicFilter) -> new FixedPageSource(ImmutableList.of(input)),
cursorProcessor,
pageProcessor,
(_) -> pageProcessor.get(),
TEST_TABLE_HANDLE,
ImmutableList.of(),
DynamicFilter.EMPTY,
Expand Down Expand Up @@ -377,7 +377,7 @@ public void testRecordCursorYield()
new PlanNodeId("0"),
(catalog) -> (session, split, table, columns, dynamicFilter) -> new RecordPageSource(new PageRecordSet(ImmutableList.of(BIGINT), input)),
cursorProcessor,
pageProcessor,
(_) -> pageProcessor.get(),
TEST_TABLE_HANDLE,
ImmutableList.of(),
DynamicFilter.EMPTY,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import io.trino.operator.project.PageProcessor;
import io.trino.spi.Page;
import io.trino.spi.PageBuilder;
import io.trino.spi.connector.DynamicFilter;
import io.trino.sql.relational.CallExpression;
import io.trino.sql.relational.InputReferenceExpression;
import io.trino.sql.relational.RowExpression;
Expand Down Expand Up @@ -96,7 +97,7 @@ public void setup()
projections,
Optional.empty(),
OptionalInt.empty())
.get();
.apply(DynamicFilter.EMPTY);
}

@Benchmark
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import io.trino.spi.block.IntArrayBlock;
import io.trino.spi.block.LongArrayBlock;
import io.trino.spi.block.ShortArrayBlock;
import io.trino.spi.connector.DynamicFilter;
import io.trino.spi.function.OperatorType;
import io.trino.spi.type.StandardTypes;
import io.trino.spi.type.Type;
Expand Down Expand Up @@ -165,7 +166,7 @@ public void setup()
ImmutableList.of(field(0, type)),
Optional.empty(),
OptionalInt.empty())
.get();
.apply(DynamicFilter.EMPTY);
}

@Benchmark
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import io.trino.operator.project.PageProcessor;
import io.trino.spi.Page;
import io.trino.spi.PageBuilder;
import io.trino.spi.connector.DynamicFilter;
import io.trino.spi.function.OperatorType;
import io.trino.spi.type.StandardTypes;
import io.trino.spi.type.Type;
Expand Down Expand Up @@ -196,7 +197,7 @@ public void setup()
ImmutableList.of(project),
Optional.empty(),
OptionalInt.empty())
.get();
.apply(DynamicFilter.EMPTY);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@
import io.trino.spi.block.LongArrayBlock;
import io.trino.spi.block.VariableWidthBlockBuilder;
import io.trino.spi.connector.ConnectorSession;
import io.trino.spi.connector.DynamicFilter;
import io.trino.spi.function.LiteralParameters;
import io.trino.spi.function.ScalarFunction;
import io.trino.spi.function.SqlNullable;
Expand Down Expand Up @@ -630,7 +631,7 @@ private static List<Page> processFilter(List<Page> inputPages, boolean columnarE
ImmutableList.of(field(ROW_NUM_CHANNEL, BIGINT)),
Optional.empty(),
OptionalInt.empty())
.get();
.apply(DynamicFilter.EMPTY);
LocalMemoryContext context = newSimpleAggregatedMemoryContext().newLocalMemoryContext(PageProcessor.class.getSimpleName());
ImmutableList.Builder<Page> outputPagesBuilder = ImmutableList.builder();
for (Page inputPage : inputPages) {
Expand Down
Loading

0 comments on commit e7aa589

Please sign in to comment.