Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ConnectorExpression pushdown #7994

Merged
merged 2 commits into from
Mar 2, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 13 additions & 0 deletions core/trino-main/src/main/java/io/trino/FeaturesConfig.java
Original file line number Diff line number Diff line change
Expand Up @@ -130,6 +130,7 @@ public class FeaturesConfig
private boolean optimizeTopNRanking = true;
private boolean lateMaterializationEnabled;
private boolean skipRedundantSort = true;
private boolean complexExpressionPushdownEnabled = true;
private boolean predicatePushdownUseTableProperties = true;
private boolean ignoreDownstreamPreferences;
private boolean rewriteFilteringSemiJoinToInnerJoin = true;
Expand Down Expand Up @@ -948,6 +949,18 @@ public FeaturesConfig setSkipRedundantSort(boolean value)
return this;
}

public boolean isComplexExpressionPushdownEnabled()
{
return complexExpressionPushdownEnabled;
}

@Config("optimizer.complex-expression-pushdown.enabled")
public FeaturesConfig setComplexExpressionPushdownEnabled(boolean complexExpressionPushdownEnabled)
{
this.complexExpressionPushdownEnabled = complexExpressionPushdownEnabled;
return this;
}

public boolean isPredicatePushdownUseTableProperties()
{
return predicatePushdownUseTableProperties;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -122,6 +122,7 @@ public final class SystemSessionProperties
public static final String DEFAULT_FILTER_FACTOR_ENABLED = "default_filter_factor_enabled";
public static final String SKIP_REDUNDANT_SORT = "skip_redundant_sort";
public static final String ALLOW_PUSHDOWN_INTO_CONNECTORS = "allow_pushdown_into_connectors";
public static final String COMPLEX_EXPRESSION_PUSHDOWN = "complex_expression_pushdown";
public static final String PREDICATE_PUSHDOWN_USE_TABLE_PROPERTIES = "predicate_pushdown_use_table_properties";
public static final String LATE_MATERIALIZATION = "late_materialization";
public static final String ENABLE_DYNAMIC_FILTERING = "enable_dynamic_filtering";
Expand Down Expand Up @@ -551,6 +552,11 @@ public SystemSessionProperties(
// This is a diagnostic property
true,
true),
booleanProperty(
COMPLEX_EXPRESSION_PUSHDOWN,
"Allow complex expression pushdown into connectors",
featuresConfig.isComplexExpressionPushdownEnabled(),
true),
booleanProperty(
PREDICATE_PUSHDOWN_USE_TABLE_PROPERTIES,
"Use table properties in predicate pushdown",
Expand Down Expand Up @@ -1127,6 +1133,11 @@ public static boolean isAllowPushdownIntoConnectors(Session session)
return session.getSystemProperty(ALLOW_PUSHDOWN_INTO_CONNECTORS, Boolean.class);
}

public static boolean isComplexExpressionPushdown(Session session)
{
return session.getSystemProperty(COMPLEX_EXPRESSION_PUSHDOWN, Boolean.class);
}

public static boolean isPredicatePushdownUseTableProperties(Session session)
{
return session.getSystemProperty(PREDICATE_PUSHDOWN_USE_TABLE_PROPERTIES, Boolean.class);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1722,10 +1722,7 @@ public Optional<ConstraintApplicationResult<TableHandle>> applyFilter(Session se

ConnectorSession connectorSession = session.toConnectorSession(catalogName);
return metadata.applyFilter(connectorSession, table.getConnectorHandle(), constraint)
.map(result -> new ConstraintApplicationResult<>(
new TableHandle(catalogName, result.getHandle(), table.getTransaction()),
result.getRemainingFilter(),
result.isPrecalculateStatistics()));
.map(result -> result.transform(handle -> new TableHandle(catalogName, handle, table.getTransaction())));
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,86 +13,163 @@
*/
package io.trino.sql.planner;

import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import io.airlift.slice.Slice;
import io.airlift.slice.Slices;
import io.trino.Session;
import io.trino.metadata.LiteralFunction;
import io.trino.metadata.ResolvedFunction;
import io.trino.security.AllowAllAccessControl;
import io.trino.spi.expression.Call;
import io.trino.spi.expression.ConnectorExpression;
import io.trino.spi.expression.Constant;
import io.trino.spi.expression.FieldDereference;
import io.trino.spi.expression.FunctionName;
import io.trino.spi.expression.Variable;
import io.trino.spi.type.Decimals;
import io.trino.spi.type.RowType;
import io.trino.spi.type.Type;
import io.trino.spi.type.TypeSignature;
import io.trino.spi.type.VarcharType;
import io.trino.sql.PlannerContext;
import io.trino.sql.analyzer.TypeSignatureProvider;
import io.trino.sql.tree.AstVisitor;
import io.trino.sql.tree.BinaryLiteral;
import io.trino.sql.tree.BooleanLiteral;
import io.trino.sql.tree.CharLiteral;
import io.trino.sql.tree.DecimalLiteral;
import io.trino.sql.tree.DoubleLiteral;
import io.trino.sql.tree.Expression;
import io.trino.sql.tree.FunctionCall;
import io.trino.sql.tree.LikePredicate;
import io.trino.sql.tree.LongLiteral;
import io.trino.sql.tree.NodeRef;
import io.trino.sql.tree.NullLiteral;
import io.trino.sql.tree.QualifiedName;
import io.trino.sql.tree.StringLiteral;
import io.trino.sql.tree.SubscriptExpression;
import io.trino.sql.tree.SymbolReference;
import io.trino.type.JoniRegexp;
import io.trino.type.Re2JRegexp;

import java.util.List;
import java.util.Map;
import java.util.Optional;

import static com.google.common.base.Verify.verify;
import static com.google.common.collect.ImmutableList.toImmutableList;
import static io.airlift.slice.SliceUtf8.countCodePoints;
import static io.trino.SystemSessionProperties.isComplexExpressionPushdown;
import static io.trino.sql.planner.ExpressionInterpreter.evaluateConstantExpression;
import static io.trino.type.LikeFunctions.LIKE_PATTERN_FUNCTION_NAME;
import static java.util.Objects.requireNonNull;

public final class ConnectorExpressionTranslator
{
private ConnectorExpressionTranslator() {}

public static Expression translate(Session session, ConnectorExpression expression, Map<String, Symbol> variableMappings, LiteralEncoder literalEncoder)
public static Expression translate(Session session, ConnectorExpression expression, PlannerContext plannerContext, Map<String, Symbol> variableMappings, LiteralEncoder literalEncoder)
{
return new ConnectorToSqlExpressionTranslator(variableMappings, literalEncoder).translate(session, expression);
return new ConnectorToSqlExpressionTranslator(session, plannerContext, literalEncoder, variableMappings)
.translate(session, expression)
.orElseThrow(() -> new UnsupportedOperationException("Expression is not supported: " + expression.toString()));
}

public static Optional<ConnectorExpression> translate(Session session, Expression expression, TypeAnalyzer types, TypeProvider inputTypes)
public static Optional<ConnectorExpression> translate(Session session, Expression expression, TypeAnalyzer types, TypeProvider inputTypes, PlannerContext plannerContext)
{
return new SqlToConnectorExpressionTranslator(types.getTypes(session, inputTypes, expression))
return new SqlToConnectorExpressionTranslator(session, types.getTypes(session, inputTypes, expression), plannerContext)
.process(expression);
}

private static class ConnectorToSqlExpressionTranslator
{
private final Map<String, Symbol> variableMappings;
private final Session session;
private final PlannerContext plannerContext;
private final LiteralEncoder literalEncoder;
private final Map<String, Symbol> variableMappings;

public ConnectorToSqlExpressionTranslator(Map<String, Symbol> variableMappings, LiteralEncoder literalEncoder)
public ConnectorToSqlExpressionTranslator(Session session, PlannerContext plannerContext, LiteralEncoder literalEncoder, Map<String, Symbol> variableMappings)
{
this.variableMappings = requireNonNull(variableMappings, "variableMappings is null");
this.session = requireNonNull(session, "session is null");
this.plannerContext = requireNonNull(plannerContext, "plannerContext is null");
this.literalEncoder = requireNonNull(literalEncoder, "literalEncoder is null");
this.variableMappings = requireNonNull(variableMappings, "variableMappings is null");
}

public Expression translate(Session session, ConnectorExpression expression)
public Optional<Expression> translate(Session session, ConnectorExpression expression)
{
if (expression instanceof Variable) {
return variableMappings.get(((Variable) expression).getName()).toSymbolReference();
String name = ((Variable) expression).getName();
return Optional.of(variableMappings.get(name).toSymbolReference());
}

if (expression instanceof Constant) {
return literalEncoder.toExpression(session, ((Constant) expression).getValue(), expression.getType());
return Optional.of(literalEncoder.toExpression(session, ((Constant) expression).getValue(), expression.getType()));
}

if (expression instanceof FieldDereference) {
FieldDereference dereference = (FieldDereference) expression;
return new SubscriptExpression(translate(session, dereference.getTarget()), new LongLiteral(Long.toString(dereference.getField() + 1)));
return translate(session, dereference.getTarget())
.map(base -> new SubscriptExpression(base, new LongLiteral(Long.toString(dereference.getField() + 1))));
}

if (expression instanceof Call) {
return translateCall((Call) expression);
}

throw new UnsupportedOperationException("Expression type not supported: " + expression.getClass().getName());
return Optional.empty();
}

protected Optional<Expression> translateCall(Call call)
{
if (call.getFunctionName().getCatalogSchema().isPresent()) {
return Optional.empty();
}
QualifiedName name = QualifiedName.of(call.getFunctionName().getName());
List<TypeSignature> argumentTypes = call.getArguments().stream()
.map(argument -> argument.getType().getTypeSignature())
.collect(toImmutableList());
ResolvedFunction resolved = plannerContext.getMetadata().resolveFunction(session, name, TypeSignatureProvider.fromTypeSignatures(argumentTypes));

// TODO Support ESCAPE character
if (LIKE_PATTERN_FUNCTION_NAME.equals(resolved.getSignature().getName()) && call.getArguments().size() == 2) {
return translateLike(call.getArguments().get(0), call.getArguments().get(1));
findepi marked this conversation as resolved.
Show resolved Hide resolved
}

FunctionCallBuilder builder = FunctionCallBuilder.resolve(session, plannerContext.getMetadata())
.setName(name);
for (int i = 0; i < call.getArguments().size(); i++) {
Type type = resolved.getSignature().getArgumentTypes().get(i);
Expression expression = ConnectorExpressionTranslator.translate(session, call.getArguments().get(i), plannerContext, variableMappings, literalEncoder);
builder.addArgument(type, expression);
}
return Optional.of(builder.build());
}

protected Optional<Expression> translateLike(ConnectorExpression value, ConnectorExpression pattern)
{
Optional<Expression> translatedValue = translate(session, value);
Optional<Expression> translatedPattern = translate(session, pattern);
if (translatedValue.isPresent() && translatedPattern.isPresent()) {
return Optional.of(new LikePredicate(translatedValue.get(), translatedPattern.get(), Optional.empty()));
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@martint Shouldn't this use LikeFunctions#LIKE_PATTERN_FUNCTION_NAME, so that we don't have to re-run DesugarLike?

cc @wendigo

}
return Optional.empty();
}
}

static class SqlToConnectorExpressionTranslator
public static class SqlToConnectorExpressionTranslator
extends AstVisitor<Optional<ConnectorExpression>, Void>
{
private final Session session;
private final Map<NodeRef<Expression>, Type> types;
private final PlannerContext plannerContext;

public SqlToConnectorExpressionTranslator(Map<NodeRef<Expression>, Type> types)
public SqlToConnectorExpressionTranslator(Session session, Map<NodeRef<Expression>, Type> types, PlannerContext plannerContext)
{
this.session = requireNonNull(session, "session is null");
this.types = requireNonNull(types, "types is null");
this.plannerContext = requireNonNull(plannerContext, "plannerContext is null");
}

@Override
Expand Down Expand Up @@ -149,6 +226,61 @@ protected Optional<ConnectorExpression> visitNullLiteral(NullLiteral node, Void
return Optional.of(new Constant(null, typeOf(node)));
}

@Override
protected Optional<ConnectorExpression> visitFunctionCall(FunctionCall node, Void context)
{
if (!isComplexExpressionPushdown(session)) {
return Optional.empty();
}

if (node.getFilter().isPresent() || node.getOrderBy().isPresent() || node.getWindow().isPresent() || node.getNullTreatment().isPresent() || node.isDistinct()) {
return Optional.empty();
}

String functionName = ResolvedFunction.extractFunctionName(node.getName());

if (LiteralFunction.LITERAL_FUNCTION_NAME.equalsIgnoreCase(functionName)) {
Object value = evaluateConstant(node);
if (value instanceof JoniRegexp) {
martint marked this conversation as resolved.
Show resolved Hide resolved
Slice pattern = ((JoniRegexp) value).pattern();
return Optional.of(new Constant(pattern, VarcharType.createVarcharType(countCodePoints(pattern))));
}
if (value instanceof Re2JRegexp) {
Slice pattern = Slices.utf8Slice(((Re2JRegexp) value).pattern());
return Optional.of(new Constant(pattern, VarcharType.createVarcharType(countCodePoints(pattern))));
}
return Optional.of(new Constant(value, types.get(NodeRef.of(node))));
}

ImmutableList.Builder<ConnectorExpression> arguments = ImmutableList.builder();
for (Expression argumentExpression : node.getArguments()) {
Optional<ConnectorExpression> argument = process(argumentExpression);
if (argument.isEmpty()) {
return Optional.empty();
}
arguments.add(argument.get());
}

// Currently, plugin-provided and runtime-added functions doesn't have a catalog/schema qualifier.
// TODO Translate catalog/schema qualifier when available.
FunctionName name = new FunctionName(functionName);
return Optional.of(new Call(typeOf(node), name, arguments.build()));
}

@Override
protected Optional<ConnectorExpression> visitLikePredicate(LikePredicate node, Void context)
martint marked this conversation as resolved.
Show resolved Hide resolved
{
// TODO Support ESCAPE character
if (node.getEscape().isEmpty()) {
Optional<ConnectorExpression> value = process(node.getValue());
Optional<ConnectorExpression> pattern = process(node.getPattern());
if (value.isPresent() && pattern.isPresent()) {
return Optional.of(new Call(typeOf(node), new FunctionName(LIKE_PATTERN_FUNCTION_NAME), List.of(value.get(), pattern.get())));
}
}
return Optional.empty();
}

@Override
protected Optional<ConnectorExpression> visitSubscriptExpression(SubscriptExpression node, Void context)
{
Expand All @@ -174,5 +306,19 @@ private Type typeOf(Expression node)
{
return types.get(NodeRef.of(node));
}

private Object evaluateConstant(Expression node)
{
Type type = typeOf(node);
Object value = evaluateConstantExpression(
node,
type,
plannerContext,
session,
new AllowAllAccessControl(),
ImmutableMap.of());
verify(!(value instanceof Expression), "Expression %s did not evaluate to constant: %s", node, value);
return value;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
import io.trino.Session;
import io.trino.spi.expression.ConnectorExpression;
import io.trino.spi.type.Type;
import io.trino.sql.PlannerContext;
import io.trino.sql.tree.AstVisitor;
import io.trino.sql.tree.Expression;
import io.trino.sql.tree.LambdaExpression;
Expand All @@ -40,15 +41,16 @@ public static Map<NodeRef<Expression>, ConnectorExpression> extractPartialTransl
Expression inputExpression,
Session session,
TypeAnalyzer typeAnalyzer,
TypeProvider typeProvider)
TypeProvider typeProvider,
PlannerContext plannerContext)
{
requireNonNull(inputExpression, "inputExpression is null");
requireNonNull(session, "session is null");
requireNonNull(typeAnalyzer, "typeAnalyzer is null");
requireNonNull(typeProvider, "typeProvider is null");

Map<NodeRef<Expression>, ConnectorExpression> partialTranslations = new HashMap<>();
new Visitor(typeAnalyzer.getTypes(session, typeProvider, inputExpression), partialTranslations).process(inputExpression);
new Visitor(session, typeAnalyzer.getTypes(session, typeProvider, inputExpression), partialTranslations, plannerContext).process(inputExpression);
return ImmutableMap.copyOf(partialTranslations);
}

Expand All @@ -58,11 +60,11 @@ private static class Visitor
private final Map<NodeRef<Expression>, ConnectorExpression> translatedSubExpressions;
private final ConnectorExpressionTranslator.SqlToConnectorExpressionTranslator translator;

Visitor(Map<NodeRef<Expression>, Type> types, Map<NodeRef<Expression>, ConnectorExpression> translatedSubExpressions)
Visitor(Session session, Map<NodeRef<Expression>, Type> types, Map<NodeRef<Expression>, ConnectorExpression> translatedSubExpressions, PlannerContext plannerContext)
{
requireNonNull(types, "types is null");
this.translatedSubExpressions = requireNonNull(translatedSubExpressions, "translatedSubExpressions is null");
this.translator = new ConnectorExpressionTranslator.SqlToConnectorExpressionTranslator(types);
this.translator = new ConnectorExpressionTranslator.SqlToConnectorExpressionTranslator(session, types, plannerContext);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -179,7 +179,7 @@ public static Optional<PlanNode> pushAggregationIntoTableScan(
}

List<Expression> newProjections = result.getProjections().stream()
.map(expression -> ConnectorExpressionTranslator.translate(context.getSession(), expression, variableMappings, new LiteralEncoder(plannerContext)))
.map(expression -> ConnectorExpressionTranslator.translate(context.getSession(), expression, plannerContext, variableMappings, new LiteralEncoder(plannerContext)))
.collect(toImmutableList());

verify(aggregationOutputSymbols.size() == newProjections.size());
Expand Down
Loading