Skip to content

Commit

Permalink
Implement collection of min/max values in DynamicFilterSourceOperator
Browse files Browse the repository at this point in the history
Added a flag dynamic-filtering-range-row-limit-per-driver to allow collection of
min and max values for dynamic filtering upto a given per driver row count
when build side exceeds dynamic-filtering-max-per-driver-row-count or
dynamic-filtering-max-per-driver-size thresholds
  • Loading branch information
raunaqmorarka authored and sopel39 committed Sep 17, 2020
1 parent e0d1846 commit f548a1e
Show file tree
Hide file tree
Showing 11 changed files with 478 additions and 72 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
import static io.airlift.testing.Assertions.assertGreaterThan;
import static io.airlift.testing.Assertions.assertGreaterThanOrEqual;
import static io.airlift.units.Duration.nanosSince;
import static io.prestosql.SystemSessionProperties.DYNAMIC_FILTERING_RANGE_ROW_LIMIT_PER_DRIVER;
import static io.prestosql.SystemSessionProperties.JOIN_DISTRIBUTION_TYPE;
import static io.prestosql.SystemSessionProperties.JOIN_REORDERING_STRATEGY;
import static io.prestosql.server.DynamicFilterService.DynamicFilterDomainStats;
Expand Down Expand Up @@ -73,7 +74,7 @@ public void init()
super.init();
// setup partitioned fact table for dynamic partition pruning
@Language("SQL") String sql = format("CREATE TABLE %s WITH (format = 'TEXTFILE', partitioned_by=array['suppkey']) AS " +
"SELECT orderkey, partkey, linenumber, suppkey FROM %s", PARTITIONED_LINEITEM, "tpch.tiny.lineitem");
"SELECT orderkey, partkey, suppkey FROM %s", PARTITIONED_LINEITEM, "tpch.tiny.lineitem");
long start = System.nanoTime();
long rows = (Long) getQueryRunner().execute(sql).getMaterializedRows().get(0).getField(0);
log.info("Imported %s rows for %s in %s", rows, PARTITIONED_LINEITEM, nanosSince(start).convertToMostSuccinctTimeUnit());
Expand All @@ -85,6 +86,7 @@ protected Session getSession()
return Session.builder(super.getSession())
.setSystemProperty(JOIN_REORDERING_STRATEGY, NONE.name())
.setSystemProperty(JOIN_DISTRIBUTION_TYPE, PARTITIONED.name()) // Avoid node local DF
.setSystemProperty(DYNAMIC_FILTERING_RANGE_ROW_LIMIT_PER_DRIVER, "100000")
.build();
}

Expand Down Expand Up @@ -182,7 +184,11 @@ public void testJoinLargeBuildSideNoDynamicFiltering()
assertEquals(dynamicFiltersStats.getDynamicFiltersCompleted(), 1L);

DynamicFilterDomainStats domainStats = getOnlyElement(dynamicFiltersStats.getDynamicFilterDomainStats());
assertEquals(domainStats.getSimplifiedDomain(), Domain.all(INTEGER).toString(getSession().toConnectorSession()));
assertEquals(
domainStats.getSimplifiedDomain(),
Domain.create(
ValueSet.ofRanges(range(INTEGER, 1L, true, 60000L, true)), false)
.toString(getSession().toConnectorSession()));
assertEquals(domainStats.getDiscreteValuesCount(), 0);
assertEquals(domainStats.getRangeCount(), 1);
}
Expand Down Expand Up @@ -310,7 +316,11 @@ public void testSemiJoinLargeBuildSideNoDynamicFiltering()
assertEquals(dynamicFiltersStats.getDynamicFiltersCompleted(), 1L);

DynamicFilterDomainStats domainStats = getOnlyElement(dynamicFiltersStats.getDynamicFilterDomainStats());
assertEquals(domainStats.getSimplifiedDomain(), Domain.all(INTEGER).toString(getSession().toConnectorSession()));
assertEquals(
domainStats.getSimplifiedDomain(),
Domain.create(
ValueSet.ofRanges(range(INTEGER, 1L, true, 60000L, true)), false)
.toString(getSession().toConnectorSession()));
assertEquals(domainStats.getDiscreteValuesCount(), 0);
assertEquals(domainStats.getRangeCount(), 1);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -121,6 +121,7 @@ public final class SystemSessionProperties
public static final String QUERY_MAX_TOTAL_MEMORY_PER_NODE = "query_max_total_memory_per_node";
public static final String DYNAMIC_FILTERING_MAX_PER_DRIVER_ROW_COUNT = "dynamic_filtering_max_per_driver_row_count";
public static final String DYNAMIC_FILTERING_MAX_PER_DRIVER_SIZE = "dynamic_filtering_max_per_driver_size";
public static final String DYNAMIC_FILTERING_RANGE_ROW_LIMIT_PER_DRIVER = "dynamic_filtering_range_row_limit_per_driver";
public static final String IGNORE_DOWNSTREAM_PREFERENCES = "ignore_downstream_preferences";
public static final String ITERATIVE_COLUMN_PRUNING = "iterative_rule_based_column_pruning";
public static final String REQUIRED_WORKERS_COUNT = "required_workers_count";
Expand Down Expand Up @@ -548,6 +549,11 @@ public SystemSessionProperties(
"Experimental: maximum number of bytes to be collected for dynamic filtering per-driver",
featuresConfig.getDynamicFilteringMaxPerDriverSize(),
false),
integerProperty(
DYNAMIC_FILTERING_RANGE_ROW_LIMIT_PER_DRIVER,
"Maximum number of build-side rows per driver up to which min and max values will be collected for dynamic filtering",
featuresConfig.getDynamicFilteringRangeRowLimitPerDriver(),
false),
booleanProperty(
IGNORE_DOWNSTREAM_PREFERENCES,
"Ignore Parent's PreferredProperties in AddExchange optimizer",
Expand Down Expand Up @@ -1006,6 +1012,11 @@ public static DataSize getDynamicFilteringMaxPerDriverSize(Session session)
return session.getSystemProperty(DYNAMIC_FILTERING_MAX_PER_DRIVER_SIZE, DataSize.class);
}

public static int getDynamicFilteringRangeRowLimitPerDriver(Session session)
{
return session.getSystemProperty(DYNAMIC_FILTERING_RANGE_ROW_LIMIT_PER_DRIVER, Integer.class);
}

public static boolean ignoreDownStreamPreferences(Session session)
{
return session.getSystemProperty(IGNORE_DOWNSTREAM_PREFERENCES, Boolean.class);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@
import io.prestosql.spi.predicate.TupleDomain;
import io.prestosql.spi.predicate.ValueSet;
import io.prestosql.spi.type.Type;
import io.prestosql.spi.type.TypeUtils;
import io.prestosql.sql.planner.plan.DynamicFilterId;
import io.prestosql.sql.planner.plan.PlanNodeId;

Expand All @@ -36,14 +35,19 @@

import static com.google.common.base.Preconditions.checkState;
import static com.google.common.base.Verify.verify;
import static io.prestosql.spi.predicate.Range.range;
import static io.prestosql.spi.type.DoubleType.DOUBLE;
import static io.prestosql.spi.type.RealType.REAL;
import static io.prestosql.spi.type.TypeUtils.isFloatingPointNaN;
import static io.prestosql.spi.type.TypeUtils.readNativeValue;
import static java.util.Objects.requireNonNull;
import static java.util.stream.Collectors.toSet;

/**
* This operator acts as a simple "pass-through" pipe, while saving its input pages.
* The collected pages' value are used for creating a run-time filtering constraint (for probe-side table scan in an inner join).
* We support only small build-side pages (which should be the case when using "broadcast" join).
* We record all values for the run-time filter only for small build-side pages (which should be the case when using "broadcast" join).
* For large inputs on build side, we can optionally record the min and max values per channel for orderable types (except Double and Real).
*/
public class DynamicFilterSourceOperator
implements Operator
Expand Down Expand Up @@ -73,6 +77,7 @@ public static class DynamicFilterSourceOperatorFactory
private final List<Channel> channels;
private final int maxFilterPositionsCount;
private final DataSize maxFilterSize;
private final int minMaxCollectionLimit;

private boolean closed;

Expand All @@ -82,7 +87,8 @@ public DynamicFilterSourceOperatorFactory(
Consumer<TupleDomain<DynamicFilterId>> dynamicPredicateConsumer,
List<Channel> channels,
int maxFilterPositionsCount,
DataSize maxFilterSize)
DataSize maxFilterSize,
int minMaxCollectionLimit)
{
this.operatorId = operatorId;
this.planNodeId = requireNonNull(planNodeId, "planNodeId is null");
Expand All @@ -94,6 +100,7 @@ public DynamicFilterSourceOperatorFactory(
"duplicate channel indices are not allowed");
this.maxFilterPositionsCount = maxFilterPositionsCount;
this.maxFilterSize = maxFilterSize;
this.minMaxCollectionLimit = minMaxCollectionLimit;
}

@Override
Expand All @@ -106,7 +113,8 @@ public Operator createOperator(DriverContext driverContext)
channels,
planNodeId,
maxFilterPositionsCount,
maxFilterSize);
maxFilterSize,
minMaxCollectionLimit);
}

@Override
Expand All @@ -131,20 +139,28 @@ public OperatorFactory duplicate()
private final long maxFilterSizeInBytes;

private final List<Channel> channels;
private final List<Integer> minMaxChannels;

// May be dropped if the predicate becomes too large.
@Nullable
private BlockBuilder[] blockBuilders;
@Nullable
private TypedSet[] valueSets;

private int minMaxCollectionLimit;
@Nullable
private Block[] minValues;
@Nullable
private Block[] maxValues;

private DynamicFilterSourceOperator(
OperatorContext context,
Consumer<TupleDomain<DynamicFilterId>> dynamicPredicateConsumer,
List<Channel> channels,
PlanNodeId planNodeId,
int maxFilterPositionsCount,
DataSize maxFilterSize)
DataSize maxFilterSize,
int minMaxCollectionLimit)
{
this.context = requireNonNull(context, "context is null");
this.maxFilterPositionsCount = maxFilterPositionsCount;
Expand All @@ -155,8 +171,13 @@ private DynamicFilterSourceOperator(

this.blockBuilders = new BlockBuilder[channels.size()];
this.valueSets = new TypedSet[channels.size()];
ImmutableList.Builder<Integer> minMaxChannelsBuilder = ImmutableList.builder();
for (int channelIndex = 0; channelIndex < channels.size(); ++channelIndex) {
Type type = channels.get(channelIndex).type;
// Skipping DOUBLE and REAL in collectMinMaxValues to avoid dealing with NaN values
if (minMaxCollectionLimit > 0 && type.isOrderable() && type != DOUBLE && type != REAL) {
minMaxChannelsBuilder.add(channelIndex);
}
this.blockBuilders[channelIndex] = type.createBlockBuilder(null, EXPECTED_BLOCK_BUILDER_SIZE);
this.valueSets[channelIndex] = new TypedSet(
type,
Expand All @@ -166,6 +187,13 @@ private DynamicFilterSourceOperator(
String.format("DynamicFilterSourceOperator_%s_%d", planNodeId, channelIndex),
Optional.empty() /* maxBlockMemory */);
}

this.minMaxCollectionLimit = minMaxCollectionLimit;
this.minMaxChannels = minMaxChannelsBuilder.build();
if (!minMaxChannels.isEmpty()) {
this.minValues = new Block[channels.size()];
this.maxValues = new Block[channels.size()];
}
}

@Override
Expand All @@ -186,9 +214,23 @@ public void addInput(Page page)
verify(!finished, "DynamicFilterSourceOperator: addInput() may not be called after finish()");
current = page;
if (valueSets == null) {
return; // the predicate became too large.
if (minValues == null) {
// there are too many rows to collect min/max range
return;
}
minMaxCollectionLimit -= page.getPositionCount();
if (minMaxCollectionLimit < 0) {
handleMinMaxCollectionLimitExceeded();
return;
}
// the predicate became too large, record only min and max values for each orderable channel
for (Integer channelIndex : minMaxChannels) {
Block block = page.getBlock(channels.get(channelIndex).index);
updateMinMaxValues(block, channelIndex);
}
return;
}

minMaxCollectionLimit -= page.getPositionCount();
// TODO: we should account for the memory used for collecting build-side values using MemoryContext
long filterSizeInBytes = 0;
int filterPositionsCount = 0;
Expand All @@ -210,13 +252,84 @@ public void addInput(Page page)

private void handleTooLargePredicate()
{
// The resulting predicate is too large, allow all probe-side values to be read.
dynamicPredicateConsumer.accept(TupleDomain.all());
// The resulting predicate is too large
if (minMaxChannels.isEmpty()) {
// allow all probe-side values to be read.
dynamicPredicateConsumer.accept(TupleDomain.all());
}
else {
if (minMaxCollectionLimit < 0) {
handleMinMaxCollectionLimitExceeded();
}
else {
// convert to min/max per column for orderable types
for (Integer channelIndex : minMaxChannels) {
Block block = blockBuilders[channelIndex].build();
updateMinMaxValues(block, channelIndex);
}
}
}
// Drop references to collected values.
valueSets = null;
blockBuilders = null;
}

private void handleMinMaxCollectionLimitExceeded()
{
// allow all probe-side values to be read.
dynamicPredicateConsumer.accept(TupleDomain.all());
// Drop references to collected values.
minValues = null;
maxValues = null;
}

private void updateMinMaxValues(Block block, int channelIndex)
{
checkState(minValues != null && maxValues != null);
Type type = channels.get(channelIndex).type;
int minValuePosition = -1;
int maxValuePosition = -1;

for (int position = 0; position < block.getPositionCount(); ++position) {
if (block.isNull(position)) {
continue;
}
if (minValuePosition == -1) {
// First non-null value
minValuePosition = position;
maxValuePosition = position;
continue;
}
if (type.compareTo(block, position, block, minValuePosition) < 0) {
minValuePosition = position;
}
else if (type.compareTo(block, position, block, maxValuePosition) > 0) {
maxValuePosition = position;
}
}

if (minValuePosition == -1) {
// all block values are nulls
return;
}
if (minValues[channelIndex] == null) {
// First Page with non-null value for this block
minValues[channelIndex] = block.getSingleValueBlock(minValuePosition);
maxValues[channelIndex] = block.getSingleValueBlock(maxValuePosition);
return;
}
// Compare with min/max values from previous Pages
Block currentMin = minValues[channelIndex];
Block currentMax = maxValues[channelIndex];

if (type.compareTo(block, minValuePosition, currentMin, 0) < 0) {
minValues[channelIndex] = block.getSingleValueBlock(minValuePosition);
}
if (type.compareTo(block, maxValuePosition, currentMax, 0) > 0) {
maxValues[channelIndex] = block.getSingleValueBlock(maxValuePosition);
}
}

@Override
public Page getOutput()
{
Expand All @@ -233,11 +346,34 @@ public void finish()
return;
}
finished = true;
ImmutableMap.Builder<DynamicFilterId, Domain> domainsBuilder = new ImmutableMap.Builder<>();
if (valueSets == null) {
return; // the predicate became too large.
if (minValues == null) {
// there were too many rows to collect collect min/max range
// dynamicPredicateConsumer was notified with 'all' in handleTooLargePredicate if there are no orderable types,
// else it was notified with 'all' in handleMinMaxCollectionLimitExceeded
return;
}
// valueSets became too large, create TupleDomain from min/max values
for (Integer channelIndex : minMaxChannels) {
Type type = channels.get(channelIndex).type;
if (minValues[channelIndex] == null) {
// all values were null
domainsBuilder.put(channels.get(channelIndex).filterId, Domain.none(type));
continue;
}
Object min = readNativeValue(type, minValues[channelIndex], 0);
Object max = readNativeValue(type, maxValues[channelIndex], 0);
Domain domain = Domain.create(
ValueSet.ofRanges(range(type, min, true, max, true)),
false);
domainsBuilder.put(channels.get(channelIndex).filterId, domain);
}
minValues = null;
maxValues = null;
dynamicPredicateConsumer.accept(TupleDomain.withColumnDomains(domainsBuilder.build()));
return;
}

ImmutableMap.Builder<DynamicFilterId, Domain> domainsBuilder = new ImmutableMap.Builder<>();
for (int channelIndex = 0; channelIndex < channels.size(); ++channelIndex) {
Block block = blockBuilders[channelIndex].build();
Type type = channels.get(channelIndex).type;
Expand All @@ -252,7 +388,7 @@ private Domain convertToDomain(Type type, Block block)
{
ImmutableList.Builder<Object> values = ImmutableList.builder();
for (int position = 0; position < block.getPositionCount(); ++position) {
Object value = TypeUtils.readNativeValue(type, block, position);
Object value = readNativeValue(type, block, position);
if (value != null) {
// join doesn't match rows with NaN values.
if (!isFloatingPointNaN(type, value)) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -138,6 +138,7 @@ public class FeaturesConfig
private int dynamicFilteringMaxPerDriverRowCount = 100;
private DataSize dynamicFilteringMaxPerDriverSize = DataSize.of(10, KILOBYTE);
private Duration dynamicFilteringRefreshInterval = new Duration(200, MILLISECONDS);
private int dynamicFilteringRangeRowLimitPerDriver;

private DataSize filterAndProjectMinOutputPageSize = DataSize.of(500, KILOBYTE);
private int filterAndProjectMinOutputPageRowCount = 256;
Expand Down Expand Up @@ -796,6 +797,19 @@ public FeaturesConfig setDynamicFilteringRefreshInterval(Duration dynamicFilteri
return this;
}

public int getDynamicFilteringRangeRowLimitPerDriver()
{
return dynamicFilteringRangeRowLimitPerDriver;
}

@Config("dynamic-filtering-range-row-limit-per-driver")
@ConfigDescription("Maximum number of build-side rows per driver up to which min and max values will be collected for dynamic filtering")
public FeaturesConfig setDynamicFilteringRangeRowLimitPerDriver(int dynamicFilteringRangeRowLimitPerDriver)
{
this.dynamicFilteringRangeRowLimitPerDriver = dynamicFilteringRangeRowLimitPerDriver;
return this;
}

public boolean isOptimizeMixedDistinctAggregations()
{
return optimizeMixedDistinctAggregations;
Expand Down
Loading

0 comments on commit f548a1e

Please sign in to comment.