Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix null handling in Pinot predicate pushdown #16979

Merged
merged 2 commits into from
May 15, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@
import io.trino.spi.expression.Variable;
import io.trino.spi.predicate.Domain;
import io.trino.spi.predicate.TupleDomain;
import io.trino.spi.predicate.ValueSet;
import io.trino.spi.type.ArrayType;
import io.trino.spi.type.Type;
import org.apache.pinot.spi.data.Schema;
Expand All @@ -63,6 +64,7 @@
import java.util.Map;
import java.util.Optional;
import java.util.OptionalLong;
import java.util.Set;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.TimeUnit;
Expand All @@ -78,6 +80,10 @@
import static io.trino.plugin.pinot.PinotSessionProperties.isAggregationPushdownEnabled;
import static io.trino.plugin.pinot.query.AggregateExpression.replaceIdentifier;
import static io.trino.plugin.pinot.query.DynamicTablePqlExtractor.quoteIdentifier;
import static io.trino.spi.type.BigintType.BIGINT;
import static io.trino.spi.type.DoubleType.DOUBLE;
import static io.trino.spi.type.IntegerType.INTEGER;
import static io.trino.spi.type.RealType.REAL;
import static java.util.Locale.ENGLISH;
import static java.util.Objects.requireNonNull;
import static java.util.function.UnaryOperator.identity;
Expand All @@ -87,6 +93,10 @@ public class PinotMetadata
{
public static final String SCHEMA_NAME = "default";

// Pinot does not yet have full support for predicates that are always TRUE/FALSE
// See https://github.com/apache/incubator-pinot/issues/10601
private static final Set<Type> SUPPORTS_ALWAYS_FALSE = Set.of(BIGINT, INTEGER, REAL, DOUBLE);

private final NonEvictableLoadingCache<String, List<PinotColumnHandle>> pinotTableColumnCache;
private final int maxRowsPerBrokerQuery;
private final AggregateFunctionRewriter<AggregateExpression, Void> aggregateFunctionRewriter;
Expand Down Expand Up @@ -292,6 +302,9 @@ else if (typeConverter.isJsonType(columnType)) {
// Pinot does not support filtering on json values
unsupported.put(entry.getKey(), entry.getValue());
}
else if (isFilterPushdownUnsupported(entry.getValue())) {
unsupported.put(entry.getKey(), entry.getValue());
}
else {
supported.put(entry.getKey(), entry.getValue());
}
Expand All @@ -313,6 +326,20 @@ else if (typeConverter.isJsonType(columnType)) {
return Optional.of(new ConstraintApplicationResult<>(handle, remainingFilter, false));
}

// IS NULL and IS NOT NULL are handled differently in Pinot, pushing down would lead to inconsistent results.
// See https://docs.pinot.apache.org/developers/advanced/null-value-support for more info.
private boolean isFilterPushdownUnsupported(Domain domain)
{
ValueSet valueSet = domain.getValues();
boolean isNotNull = valueSet.isAll() && !domain.isNullAllowed();
boolean isUnsupportedAlwaysFalse = domain.isNone() && !SUPPORTS_ALWAYS_FALSE.contains(domain.getType());
boolean isInOrNull = !valueSet.getRanges().getOrderedRanges().isEmpty() && domain.isNullAllowed();
return isNotNull ||
domain.isOnlyNull() ||
isUnsupportedAlwaysFalse ||
isInOrNull;
}

@Override
public Optional<AggregationApplicationResult<ConnectorTableHandle>> applyAggregation(
ConnectorSession session,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import io.trino.spi.predicate.Domain;
import io.trino.spi.predicate.Range;
import io.trino.spi.predicate.TupleDomain;
import io.trino.spi.predicate.ValueSet;
import io.trino.spi.type.RealType;
import io.trino.spi.type.TimestampType;
import io.trino.spi.type.Timestamps;
Expand All @@ -37,6 +38,7 @@
import java.util.OptionalLong;

import static com.google.common.base.Preconditions.checkState;
import static com.google.common.base.Verify.verify;
import static com.google.common.collect.ImmutableList.toImmutableList;
import static com.google.common.collect.Iterables.getOnlyElement;
import static java.lang.Float.intBitsToFloat;
Expand Down Expand Up @@ -103,19 +105,18 @@ private static void generateFilterPql(StringBuilder pqlBuilder, PinotTableHandle

public static Optional<String> getFilterClause(TupleDomain<ColumnHandle> tupleDomain, Optional<String> timePredicate, boolean forHavingClause)
{
checkState(!tupleDomain.isNone(), "Pinot does not support 1 = 0 syntax, as a workaround use <column> != <column>");
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you explain the relationship between TupleDomain being None and Pinot's workaround.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure: one of the sides of a filter must contain an expression with a column in it, constants on both sides are not supported (throws a "BAD REQUEST" error), so instead of 1 = 0 you can do <column> != <column>. lmk if you need any more info.

ImmutableList.Builder<String> conjunctsBuilder = ImmutableList.builder();
checkState((forHavingClause && timePredicate.isEmpty()) || !forHavingClause, "Unexpected time predicate with having clause");
timePredicate.ifPresent(conjunctsBuilder::add);
if (!tupleDomain.equals(TupleDomain.all())) {
Map<ColumnHandle, Domain> domains = tupleDomain.getDomains().orElseThrow();
for (Map.Entry<ColumnHandle, Domain> entry : domains.entrySet()) {
PinotColumnHandle pinotColumnHandle = (PinotColumnHandle) entry.getKey();
// If this is for a having clause, only include aggregate columns.
// If this is for a where clause, only include non-aggregate columns.
// i.e. (forHavingClause && isAggregate) || (!forHavingClause && !isAggregate)
if (forHavingClause == pinotColumnHandle.isAggregate()) {
conjunctsBuilder.add(toPredicate(pinotColumnHandle, entry.getValue()));
}
Map<ColumnHandle, Domain> domains = tupleDomain.getDomains().orElseThrow();
elonazoulay marked this conversation as resolved.
Show resolved Hide resolved
for (Map.Entry<ColumnHandle, Domain> entry : domains.entrySet()) {
PinotColumnHandle pinotColumnHandle = (PinotColumnHandle) entry.getKey();
// If this is for a having clause, only include aggregate columns.
// If this is for a where clause, only include non-aggregate columns.
// i.e. (forHavingClause && isAggregate) || (!forHavingClause && !isAggregate)
if (forHavingClause == pinotColumnHandle.isAggregate()) {
toPredicate(pinotColumnHandle, entry.getValue()).ifPresent(conjunctsBuilder::add);
}
}
List<String> conjuncts = conjunctsBuilder.build();
Expand All @@ -125,12 +126,32 @@ public static Optional<String> getFilterClause(TupleDomain<ColumnHandle> tupleDo
return Optional.empty();
}

private static String toPredicate(PinotColumnHandle pinotColumnHandle, Domain domain)
private static Optional<String> toPredicate(PinotColumnHandle pinotColumnHandle, Domain domain)
{
String predicateArgument = pinotColumnHandle.isAggregate() ? pinotColumnHandle.getExpression() : quoteIdentifier(pinotColumnHandle.getColumnName());
ValueSet valueSet = domain.getValues();
if (valueSet.isNone()) {
verify(!domain.isNullAllowed(), "IS NULL is not supported due to different null handling semantics. See https://docs.pinot.apache.org/developers/advanced/null-value-support");
return Optional.of(format("(%s != %s)", predicateArgument, predicateArgument));
}
if (valueSet.isAll()) {
verify(domain.isNullAllowed(), "IS NOT NULL is not supported due to different null handling semantics. See https://docs.pinot.apache.org/developers/advanced/null-value-support");
// Pinot does not support "1 = 1" syntax: see https://github.com/apache/pinot/issues/10600
// As a workaround, skip adding always true to conjuncts
return Optional.empty();
}
verify(!domain.getValues().getRanges().getOrderedRanges().isEmpty() && !domain.isNullAllowed(), "IS NULL is not supported due to different null handling semantics. See https://docs.pinot.apache.org/developers/advanced/null-value-support");
List<String> disjuncts = new ArrayList<>();
List<Object> singleValues = new ArrayList<>();
for (Range range : domain.getValues().getRanges().getOrderedRanges()) {
boolean invertPredicate = false;
if (!valueSet.isDiscreteSet()) {
ValueSet complement = domain.getValues().complement();
if (complement.isDiscreteSet()) {
invertPredicate = complement.isDiscreteSet();
valueSet = complement;
}
}
Comment on lines +147 to +153
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit : Maybe it could be implemented a different PR. This logic allows us to check if its for NOT IN types of filter condition ? Can we expose them as an API so that connectors could identify them like DefaultQueryBuilder and SqlServerClient use similar logic.

cc: @findepi Is it possible to expose them as an API or some Util method ?

cc: @findepi Can we extract them to a Util method ?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

lmk - I can extract it, or I can wait for the pr (or even help with that) and then change this to use the utility method or api.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I added #17189 for the api method and found (and replaced) 2 use cases outside of this pr.

for (Range range : valueSet.getRanges().getOrderedRanges()) {
Comment on lines +146 to +154
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For my understanding, these changes are for null handling or for additional predicate types ?

Copy link
Member Author

@elonazoulay elonazoulay Apr 14, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is for != and NOT IN - I noticed the issue when adding back the test. lmk if I should do anything - also added tests for NOT IN/IN with IS NULL, etc.

checkState(!range.isAll()); // Already checked
if (range.isSingleValue()) {
singleValues.add(convertValue(range.getType(), range.getSingleValue()));
Expand All @@ -150,12 +171,14 @@ private static String toPredicate(PinotColumnHandle pinotColumnHandle, Domain do
}
// Add back all of the possible single values either as an equality or an IN predicate
if (singleValues.size() == 1) {
disjuncts.add(toConjunct(predicateArgument, "=", getOnlyElement(singleValues)));
String operator = invertPredicate ? "!=" : "=";
disjuncts.add(toConjunct(predicateArgument, operator, getOnlyElement(singleValues)));
}
else if (singleValues.size() > 1) {
disjuncts.add(inClauseValues(predicateArgument, singleValues));
String operator = invertPredicate ? "NOT IN" : "IN";
disjuncts.add(inClauseValues(predicateArgument, operator, singleValues));
}
return "(" + Joiner.on(" OR ").join(disjuncts) + ")";
return Optional.of("(" + Joiner.on(" OR ").join(disjuncts) + ")");
}

private static Object convertValue(Type type, Object value)
Expand Down Expand Up @@ -191,9 +214,9 @@ private static String toConjunct(String columnName, String operator, Object valu
return format("%s %s %s", columnName, operator, singleQuote(value));
}

private static String inClauseValues(String columnName, List<Object> singleValues)
private static String inClauseValues(String columnName, String operator, List<Object> singleValues)
{
return format("%s IN (%s)", columnName, singleValues.stream()
return format("%s %s (%s)", columnName, operator, singleValues.stream()
.map(PinotQueryBuilder::singleQuote)
.collect(joining(", ")));
}
Expand Down
Loading