Skip to content

Commit

Permalink
Support ConnectorExpression pushdown and introduce functions and LIKE…
Browse files Browse the repository at this point in the history
… pushdown
  • Loading branch information
assaf2 committed Mar 1, 2022
1 parent 33277e7 commit 8d6f162
Show file tree
Hide file tree
Showing 18 changed files with 655 additions and 55 deletions.
13 changes: 13 additions & 0 deletions core/trino-main/src/main/java/io/trino/FeaturesConfig.java
Original file line number Diff line number Diff line change
Expand Up @@ -130,6 +130,7 @@ public class FeaturesConfig
private boolean optimizeTopNRanking = true;
private boolean lateMaterializationEnabled;
private boolean skipRedundantSort = true;
private boolean complexExpressionPushdownEnabled = true;
private boolean predicatePushdownUseTableProperties = true;
private boolean ignoreDownstreamPreferences;
private boolean rewriteFilteringSemiJoinToInnerJoin = true;
Expand Down Expand Up @@ -948,6 +949,18 @@ public FeaturesConfig setSkipRedundantSort(boolean value)
return this;
}

public boolean isComplexExpressionPushdownEnabled()
{
return complexExpressionPushdownEnabled;
}

@Config("optimizer.complex-expression-pushdown.enabled")
public FeaturesConfig setComplexExpressionPushdownEnabled(boolean complexExpressionPushdownEnabled)
{
this.complexExpressionPushdownEnabled = complexExpressionPushdownEnabled;
return this;
}

public boolean isPredicatePushdownUseTableProperties()
{
return predicatePushdownUseTableProperties;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -122,6 +122,7 @@ public final class SystemSessionProperties
public static final String DEFAULT_FILTER_FACTOR_ENABLED = "default_filter_factor_enabled";
public static final String SKIP_REDUNDANT_SORT = "skip_redundant_sort";
public static final String ALLOW_PUSHDOWN_INTO_CONNECTORS = "allow_pushdown_into_connectors";
public static final String COMPLEX_EXPRESSION_PUSHDOWN = "complex_expression_pushdown";
public static final String PREDICATE_PUSHDOWN_USE_TABLE_PROPERTIES = "predicate_pushdown_use_table_properties";
public static final String LATE_MATERIALIZATION = "late_materialization";
public static final String ENABLE_DYNAMIC_FILTERING = "enable_dynamic_filtering";
Expand Down Expand Up @@ -551,6 +552,11 @@ public SystemSessionProperties(
// This is a diagnostic property
true,
true),
booleanProperty(
COMPLEX_EXPRESSION_PUSHDOWN,
"Allow complex expression pushdown into connectors",
featuresConfig.isComplexExpressionPushdownEnabled(),
true),
booleanProperty(
PREDICATE_PUSHDOWN_USE_TABLE_PROPERTIES,
"Use table properties in predicate pushdown",
Expand Down Expand Up @@ -1127,6 +1133,11 @@ public static boolean isAllowPushdownIntoConnectors(Session session)
return session.getSystemProperty(ALLOW_PUSHDOWN_INTO_CONNECTORS, Boolean.class);
}

public static boolean isComplexExpressionPushdown(Session session)
{
return session.getSystemProperty(COMPLEX_EXPRESSION_PUSHDOWN, Boolean.class);
}

public static boolean isPredicatePushdownUseTableProperties(Session session)
{
return session.getSystemProperty(PREDICATE_PUSHDOWN_USE_TABLE_PROPERTIES, Boolean.class);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1722,10 +1722,7 @@ public Optional<ConstraintApplicationResult<TableHandle>> applyFilter(Session se

ConnectorSession connectorSession = session.toConnectorSession(catalogName);
return metadata.applyFilter(connectorSession, table.getConnectorHandle(), constraint)
.map(result -> new ConstraintApplicationResult<>(
new TableHandle(catalogName, result.getHandle(), table.getTransaction()),
result.getRemainingFilter(),
result.isPrecalculateStatistics()));
.map(result -> result.transform(handle -> new TableHandle(catalogName, handle, table.getTransaction())));
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,86 +13,163 @@
*/
package io.trino.sql.planner;

import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import io.airlift.slice.Slice;
import io.airlift.slice.Slices;
import io.trino.Session;
import io.trino.metadata.LiteralFunction;
import io.trino.metadata.ResolvedFunction;
import io.trino.security.AllowAllAccessControl;
import io.trino.spi.expression.Call;
import io.trino.spi.expression.ConnectorExpression;
import io.trino.spi.expression.Constant;
import io.trino.spi.expression.FieldDereference;
import io.trino.spi.expression.FunctionName;
import io.trino.spi.expression.Variable;
import io.trino.spi.type.Decimals;
import io.trino.spi.type.RowType;
import io.trino.spi.type.Type;
import io.trino.spi.type.TypeSignature;
import io.trino.spi.type.VarcharType;
import io.trino.sql.PlannerContext;
import io.trino.sql.analyzer.TypeSignatureProvider;
import io.trino.sql.tree.AstVisitor;
import io.trino.sql.tree.BinaryLiteral;
import io.trino.sql.tree.BooleanLiteral;
import io.trino.sql.tree.CharLiteral;
import io.trino.sql.tree.DecimalLiteral;
import io.trino.sql.tree.DoubleLiteral;
import io.trino.sql.tree.Expression;
import io.trino.sql.tree.FunctionCall;
import io.trino.sql.tree.LikePredicate;
import io.trino.sql.tree.LongLiteral;
import io.trino.sql.tree.NodeRef;
import io.trino.sql.tree.NullLiteral;
import io.trino.sql.tree.QualifiedName;
import io.trino.sql.tree.StringLiteral;
import io.trino.sql.tree.SubscriptExpression;
import io.trino.sql.tree.SymbolReference;
import io.trino.type.JoniRegexp;
import io.trino.type.Re2JRegexp;

import java.util.List;
import java.util.Map;
import java.util.Optional;

import static com.google.common.base.Verify.verify;
import static com.google.common.collect.ImmutableList.toImmutableList;
import static io.airlift.slice.SliceUtf8.countCodePoints;
import static io.trino.SystemSessionProperties.isComplexExpressionPushdown;
import static io.trino.sql.planner.ExpressionInterpreter.evaluateConstantExpression;
import static io.trino.type.LikeFunctions.LIKE_PATTERN_FUNCTION_NAME;
import static java.util.Objects.requireNonNull;

public final class ConnectorExpressionTranslator
{
private ConnectorExpressionTranslator() {}

public static Expression translate(Session session, ConnectorExpression expression, Map<String, Symbol> variableMappings, LiteralEncoder literalEncoder)
public static Expression translate(Session session, ConnectorExpression expression, PlannerContext plannerContext, Map<String, Symbol> variableMappings, LiteralEncoder literalEncoder)
{
return new ConnectorToSqlExpressionTranslator(variableMappings, literalEncoder).translate(session, expression);
return new ConnectorToSqlExpressionTranslator(session, plannerContext, literalEncoder, variableMappings)
.translate(session, expression)
.orElseThrow(() -> new UnsupportedOperationException("Expression is not supported: " + expression.toString()));
}

public static Optional<ConnectorExpression> translate(Session session, Expression expression, TypeAnalyzer types, TypeProvider inputTypes)
public static Optional<ConnectorExpression> translate(Session session, Expression expression, TypeAnalyzer types, TypeProvider inputTypes, PlannerContext plannerContext)
{
return new SqlToConnectorExpressionTranslator(types.getTypes(session, inputTypes, expression))
return new SqlToConnectorExpressionTranslator(session, types.getTypes(session, inputTypes, expression), plannerContext)
.process(expression);
}

private static class ConnectorToSqlExpressionTranslator
{
private final Map<String, Symbol> variableMappings;
private final Session session;
private final PlannerContext plannerContext;
private final LiteralEncoder literalEncoder;
private final Map<String, Symbol> variableMappings;

public ConnectorToSqlExpressionTranslator(Map<String, Symbol> variableMappings, LiteralEncoder literalEncoder)
public ConnectorToSqlExpressionTranslator(Session session, PlannerContext plannerContext, LiteralEncoder literalEncoder, Map<String, Symbol> variableMappings)
{
this.variableMappings = requireNonNull(variableMappings, "variableMappings is null");
this.session = requireNonNull(session, "session is null");
this.plannerContext = requireNonNull(plannerContext, "plannerContext is null");
this.literalEncoder = requireNonNull(literalEncoder, "literalEncoder is null");
this.variableMappings = requireNonNull(variableMappings, "variableMappings is null");
}

public Expression translate(Session session, ConnectorExpression expression)
public Optional<Expression> translate(Session session, ConnectorExpression expression)
{
if (expression instanceof Variable) {
return variableMappings.get(((Variable) expression).getName()).toSymbolReference();
String name = ((Variable) expression).getName();
return Optional.of(variableMappings.get(name).toSymbolReference());
}

if (expression instanceof Constant) {
return literalEncoder.toExpression(session, ((Constant) expression).getValue(), expression.getType());
return Optional.of(literalEncoder.toExpression(session, ((Constant) expression).getValue(), expression.getType()));
}

if (expression instanceof FieldDereference) {
FieldDereference dereference = (FieldDereference) expression;
return new SubscriptExpression(translate(session, dereference.getTarget()), new LongLiteral(Long.toString(dereference.getField() + 1)));
return translate(session, dereference.getTarget())
.map(base -> new SubscriptExpression(base, new LongLiteral(Long.toString(dereference.getField() + 1))));
}

if (expression instanceof Call) {
return translateCall((Call) expression);
}

throw new UnsupportedOperationException("Expression type not supported: " + expression.getClass().getName());
return Optional.empty();
}

protected Optional<Expression> translateCall(Call call)
{
if (call.getFunctionName().getCatalogSchema().isPresent()) {
return Optional.empty();
}
QualifiedName name = QualifiedName.of(call.getFunctionName().getName());
List<TypeSignature> argumentTypes = call.getArguments().stream()
.map(argument -> argument.getType().getTypeSignature())
.collect(toImmutableList());
ResolvedFunction resolved = plannerContext.getMetadata().resolveFunction(session, name, TypeSignatureProvider.fromTypeSignatures(argumentTypes));

// TODO Support ESCAPE character
if (LIKE_PATTERN_FUNCTION_NAME.equals(resolved.getSignature().getName()) && call.getArguments().size() == 2) {
return translateLike(call.getArguments().get(0), call.getArguments().get(1));
}

FunctionCallBuilder builder = FunctionCallBuilder.resolve(session, plannerContext.getMetadata())
.setName(name);
for (int i = 0; i < call.getArguments().size(); i++) {
Type type = resolved.getSignature().getArgumentTypes().get(i);
Expression expression = ConnectorExpressionTranslator.translate(session, call.getArguments().get(i), plannerContext, variableMappings, literalEncoder);
builder.addArgument(type, expression);
}
return Optional.of(builder.build());
}

protected Optional<Expression> translateLike(ConnectorExpression value, ConnectorExpression pattern)
{
Optional<Expression> translatedValue = translate(session, value);
Optional<Expression> translatedPattern = translate(session, pattern);
if (translatedValue.isPresent() && translatedPattern.isPresent()) {
return Optional.of(new LikePredicate(translatedValue.get(), translatedPattern.get(), Optional.empty()));
}
return Optional.empty();
}
}

static class SqlToConnectorExpressionTranslator
public static class SqlToConnectorExpressionTranslator
extends AstVisitor<Optional<ConnectorExpression>, Void>
{
private final Session session;
private final Map<NodeRef<Expression>, Type> types;
private final PlannerContext plannerContext;

public SqlToConnectorExpressionTranslator(Map<NodeRef<Expression>, Type> types)
public SqlToConnectorExpressionTranslator(Session session, Map<NodeRef<Expression>, Type> types, PlannerContext plannerContext)
{
this.session = requireNonNull(session, "session is null");
this.types = requireNonNull(types, "types is null");
this.plannerContext = requireNonNull(plannerContext, "plannerContext is null");
}

@Override
Expand Down Expand Up @@ -149,6 +226,61 @@ protected Optional<ConnectorExpression> visitNullLiteral(NullLiteral node, Void
return Optional.of(new Constant(null, typeOf(node)));
}

@Override
protected Optional<ConnectorExpression> visitFunctionCall(FunctionCall node, Void context)
{
if (!isComplexExpressionPushdown(session)) {
return Optional.empty();
}

if (node.getFilter().isPresent() || node.getOrderBy().isPresent() || node.getWindow().isPresent() || node.getNullTreatment().isPresent() || node.isDistinct()) {
return Optional.empty();
}

String functionName = ResolvedFunction.extractFunctionName(node.getName());

if (LiteralFunction.LITERAL_FUNCTION_NAME.equalsIgnoreCase(functionName)) {
Object value = evaluateConstant(node);
if (value instanceof JoniRegexp) {
Slice pattern = ((JoniRegexp) value).pattern();
return Optional.of(new Constant(pattern, VarcharType.createVarcharType(countCodePoints(pattern))));
}
if (value instanceof Re2JRegexp) {
Slice pattern = Slices.utf8Slice(((Re2JRegexp) value).pattern());
return Optional.of(new Constant(pattern, VarcharType.createVarcharType(countCodePoints(pattern))));
}
return Optional.of(new Constant(value, types.get(NodeRef.of(node))));
}

ImmutableList.Builder<ConnectorExpression> arguments = ImmutableList.builder();
for (Expression argumentExpression : node.getArguments()) {
Optional<ConnectorExpression> argument = process(argumentExpression);
if (argument.isEmpty()) {
return Optional.empty();
}
arguments.add(argument.get());
}

// Currently, plugin-provided and runtime-added functions doesn't have a catalog/schema qualifier.
// TODO Translate catalog/schema qualifier when available.
FunctionName name = new FunctionName(functionName);
return Optional.of(new Call(typeOf(node), name, arguments.build()));
}

@Override
protected Optional<ConnectorExpression> visitLikePredicate(LikePredicate node, Void context)
{
// TODO Support ESCAPE character
if (node.getEscape().isEmpty()) {
Optional<ConnectorExpression> value = process(node.getValue());
Optional<ConnectorExpression> pattern = process(node.getPattern());
if (value.isPresent() && pattern.isPresent()) {
return Optional.of(new Call(typeOf(node), new FunctionName(LIKE_PATTERN_FUNCTION_NAME), List.of(value.get(), pattern.get())));
}
}
return Optional.empty();
}

@Override
protected Optional<ConnectorExpression> visitSubscriptExpression(SubscriptExpression node, Void context)
{
Expand All @@ -174,5 +306,19 @@ private Type typeOf(Expression node)
{
return types.get(NodeRef.of(node));
}

private Object evaluateConstant(Expression node)
{
Type type = typeOf(node);
Object value = evaluateConstantExpression(
node,
type,
plannerContext,
session,
new AllowAllAccessControl(),
ImmutableMap.of());
verify(!(value instanceof Expression), "Expression %s did not evaluate to constant: %s", node, value);
return value;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
import io.trino.Session;
import io.trino.spi.expression.ConnectorExpression;
import io.trino.spi.type.Type;
import io.trino.sql.PlannerContext;
import io.trino.sql.tree.AstVisitor;
import io.trino.sql.tree.Expression;
import io.trino.sql.tree.LambdaExpression;
Expand All @@ -40,15 +41,16 @@ public static Map<NodeRef<Expression>, ConnectorExpression> extractPartialTransl
Expression inputExpression,
Session session,
TypeAnalyzer typeAnalyzer,
TypeProvider typeProvider)
TypeProvider typeProvider,
PlannerContext plannerContext)
{
requireNonNull(inputExpression, "inputExpression is null");
requireNonNull(session, "session is null");
requireNonNull(typeAnalyzer, "typeAnalyzer is null");
requireNonNull(typeProvider, "typeProvider is null");

Map<NodeRef<Expression>, ConnectorExpression> partialTranslations = new HashMap<>();
new Visitor(typeAnalyzer.getTypes(session, typeProvider, inputExpression), partialTranslations).process(inputExpression);
new Visitor(session, typeAnalyzer.getTypes(session, typeProvider, inputExpression), partialTranslations, plannerContext).process(inputExpression);
return ImmutableMap.copyOf(partialTranslations);
}

Expand All @@ -58,11 +60,11 @@ private static class Visitor
private final Map<NodeRef<Expression>, ConnectorExpression> translatedSubExpressions;
private final ConnectorExpressionTranslator.SqlToConnectorExpressionTranslator translator;

Visitor(Map<NodeRef<Expression>, Type> types, Map<NodeRef<Expression>, ConnectorExpression> translatedSubExpressions)
Visitor(Session session, Map<NodeRef<Expression>, Type> types, Map<NodeRef<Expression>, ConnectorExpression> translatedSubExpressions, PlannerContext plannerContext)
{
requireNonNull(types, "types is null");
this.translatedSubExpressions = requireNonNull(translatedSubExpressions, "translatedSubExpressions is null");
this.translator = new ConnectorExpressionTranslator.SqlToConnectorExpressionTranslator(types);
this.translator = new ConnectorExpressionTranslator.SqlToConnectorExpressionTranslator(session, types, plannerContext);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -179,7 +179,7 @@ public static Optional<PlanNode> pushAggregationIntoTableScan(
}

List<Expression> newProjections = result.getProjections().stream()
.map(expression -> ConnectorExpressionTranslator.translate(context.getSession(), expression, variableMappings, new LiteralEncoder(plannerContext)))
.map(expression -> ConnectorExpressionTranslator.translate(context.getSession(), expression, plannerContext, variableMappings, new LiteralEncoder(plannerContext)))
.collect(toImmutableList());

verify(aggregationOutputSymbols.size() == newProjections.size());
Expand Down
Loading

0 comments on commit 8d6f162

Please sign in to comment.