Skip to content

Commit

Permalink
Pushdown CAST in JOIN criteria for PostgreSql
Browse files Browse the repository at this point in the history
Co-authored-by: Semion Paramonow <[email protected]>
  • Loading branch information
ssheikin and SemionPar committed May 11, 2024
1 parent 173acd3 commit 2b19480
Show file tree
Hide file tree
Showing 3 changed files with 143 additions and 5 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,77 @@
/*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.trino.plugin.jdbc.expression;

import io.trino.matching.Capture;
import io.trino.matching.Captures;
import io.trino.matching.Pattern;
import io.trino.plugin.base.expression.ConnectorExpressionRule;
import io.trino.plugin.jdbc.WriteMapping;
import io.trino.spi.connector.ConnectorSession;
import io.trino.spi.expression.Call;
import io.trino.spi.expression.ConnectorExpression;
import io.trino.spi.type.Type;

import java.util.Optional;
import java.util.function.BiFunction;

import static io.trino.matching.Capture.newCapture;
import static io.trino.plugin.base.expression.ConnectorExpressionPatterns.argument;
import static io.trino.plugin.base.expression.ConnectorExpressionPatterns.argumentCount;
import static io.trino.plugin.base.expression.ConnectorExpressionPatterns.call;
import static io.trino.plugin.base.expression.ConnectorExpressionPatterns.expression;
import static io.trino.plugin.base.expression.ConnectorExpressionPatterns.functionName;
import static io.trino.spi.expression.StandardFunctions.CAST_FUNCTION_NAME;
import static java.lang.String.format;
import static java.util.Objects.requireNonNull;

public class RewriteCast
implements ConnectorExpressionRule<Call, ParameterizedExpression>
{
private static final Capture<ConnectorExpression> VALUE = newCapture();
private final BiFunction<ConnectorSession, Type, WriteMapping> toWriteMapping;

public RewriteCast(BiFunction<ConnectorSession, Type, WriteMapping> toWriteMapping)
{
this.toWriteMapping = requireNonNull(toWriteMapping, "toWriteMapping is null");
}

@Override
public Pattern<Call> getPattern()
{
return call()
.with(functionName().equalTo(CAST_FUNCTION_NAME))
.with(argumentCount().equalTo(1))
.with(argument(0).matching(expression().capturedAs(VALUE)));
}

@Override
public Optional<ParameterizedExpression> rewrite(Call call, Captures captures, RewriteContext<ParameterizedExpression> context)
{
Type trinoType = call.getType();
ConnectorExpression capturedValue = captures.get(VALUE);

Optional<ParameterizedExpression> value = context.defaultRewrite(capturedValue);
if (value.isEmpty()) {
// if argument is a call chain that can't be rewritten, then we can't push it down
return Optional.empty();
}

String targetType = toWriteMapping.apply(context.getSession(), trinoType).getDataType();

return Optional.of(new ParameterizedExpression(
format("CAST(%s AS %s)", value.get().expression(), targetType),
value.get().parameters()));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,7 @@
import io.trino.plugin.jdbc.aggregation.ImplementVarianceSamp;
import io.trino.plugin.jdbc.expression.JdbcConnectorExpressionRewriterBuilder;
import io.trino.plugin.jdbc.expression.ParameterizedExpression;
import io.trino.plugin.jdbc.expression.RewriteCast;
import io.trino.plugin.jdbc.expression.RewriteIn;
import io.trino.plugin.jdbc.logging.RemoteQueryModifier;
import io.trino.plugin.postgresql.PostgreSqlConfig.ArrayMapping;
Expand Down Expand Up @@ -307,6 +308,7 @@ public PostgreSqlClient(
this.connectorExpressionRewriter = JdbcConnectorExpressionRewriterBuilder.newBuilder()
.addStandardRules(this::quoted)
.add(new RewriteIn())
.add(new RewriteCast(this::toWriteMapping))
.withTypeClass("integer_type", ImmutableSet.of("tinyint", "smallint", "integer", "bigint"))
.withTypeClass("numeric_type", ImmutableSet.of("tinyint", "smallint", "integer", "bigint", "decimal", "real", "double"))
.withTypeClass("string_type", ImmutableSet.of("char", "varchar"))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,7 @@
import static io.trino.sql.planner.assertions.PlanMatchPattern.node;
import static io.trino.sql.planner.assertions.PlanMatchPattern.tableScan;
import static io.trino.testing.TestingConnectorBehavior.SUPPORTS_AGGREGATION_PUSHDOWN;
import static io.trino.testing.TestingConnectorBehavior.SUPPORTS_JOIN_PUSHDOWN;
import static io.trino.testing.TestingConnectorBehavior.SUPPORTS_JOIN_PUSHDOWN_WITH_FULL_JOIN;
import static io.trino.testing.TestingConnectorBehavior.SUPPORTS_LIMIT_PUSHDOWN;
import static io.trino.testing.TestingConnectorBehavior.SUPPORTS_PREDICATE_PUSHDOWN_WITH_VARCHAR_EQUALITY;
Expand Down Expand Up @@ -630,12 +631,10 @@ public void testStringPushdownWithCollate()
assertThat(query(joinPushdownEnabled, "SELECT c.name, n.name FROM customer c JOIN nation n ON c.custkey = n.nationkey WHERE address < 'TcGe5gaZNgVePxU5kRrvXBfkasDTea'"))
.isFullyPushedDown();

// join on varchar columns is not pushed down
// join on varchar columns is pushed down even when synthetic cast to align length is added
// address: varchar(40), name: varchar(25) => address = CAST(name AS varchar(40))
assertThat(query(joinPushdownEnabled, "SELECT c.name, n.name FROM customer c JOIN nation n ON c.address = n.name"))
.isNotFullyPushedDown(
node(JoinNode.class,
anyTree(node(TableScanNode.class)),
anyTree(node(TableScanNode.class))));
.isFullyPushedDown();
}

@Test
Expand Down Expand Up @@ -1013,6 +1012,66 @@ public void testInPredicatePushdown()
}
}

@Test
public void testJoinWithCastInCriteriaPushdown()
{
if (!hasBehavior(SUPPORTS_JOIN_PUSHDOWN)) {
return;
}

Session session = joinPushdownEnabled(getSession());

for (CastTestCase testCase : ImmutableList.of(
new CastTestCase("integer", "tinyint", "smallint"),
new CastTestCase("integer", "double", "double precision"),
new CastTestCase("integer", "varchar"),
new CastTestCase("integer", "varchar(10)"),
new CastTestCase("varchar", "tinyint", "smallint"),
new CastTestCase("varchar", "double", "double precision"),
new CastTestCase("varchar", "integer"),
new CastTestCase("varchar", "varchar(10)"))) {
try (TestTable tableA = new TestTable(onRemoteDatabase(), "test_join_cast_a_", "(a1 %s, a2 %s)".formatted(testCase.fromRemoteType(), testCase.fromRemoteType()));
TestTable tableB = new TestTable(onRemoteDatabase(), "test_join_cast_b_", "(b1 %s, b2 %s)".formatted(testCase.targetRemoteType(), testCase.fromRemoteType()))) {
assertThat(query(session, "SELECT a.a2, b.b2 FROM %s a JOIN %s b ON CAST(a.a1 AS %s) = b.b1".formatted(tableA.getName(), tableB.getName(), testCase.castType())))
.isFullyPushedDown();
}
}

CastTestCase testCase;

// cast with pushdownable complex expression
testCase = new CastTestCase("integer", "varchar");
try (TestTable tableA = new TestTable(onRemoteDatabase(), "test_join_cast_a_", "(a1 %s, a2 %s)".formatted(testCase.fromRemoteType(), testCase.fromRemoteType()));
TestTable tableB = new TestTable(onRemoteDatabase(), "test_join_cast_b_", "(b1 %s, b2 %s)".formatted(testCase.targetRemoteType(), testCase.fromRemoteType()))) {
assertThat(query(session, "SELECT a.a2, b.b2 FROM %s a JOIN %s b ON CAST(a.a1 + 123 AS %s) = b.b1".formatted(tableA.getName(), tableB.getName(), testCase.castType())))
.isFullyPushedDown();
}

// cast with pushdownable complex expression with function
testCase = new CastTestCase("varchar", "tinyint", "smallint");
try (TestTable tableA = new TestTable(onRemoteDatabase(), "test_join_cast_a_", "(a1 %s, a2 %s)".formatted(testCase.fromRemoteType(), testCase.fromRemoteType()));
TestTable tableB = new TestTable(onRemoteDatabase(), "test_join_cast_b_", "(b1 %s, b2 %s)".formatted(testCase.targetRemoteType(), testCase.fromRemoteType()))) {
assertThat(query(session, "SELECT a.a2, b.b2 FROM %s a JOIN %s b ON CAST(UPPER(a.a1) AS %s) = b.b1".formatted(tableA.getName(), tableB.getName(), testCase.castType())))
.isFullyPushedDown();
}

// cast with non-pushdownable complex expression
testCase = new CastTestCase("date", "varchar");
try (TestTable tableA = new TestTable(onRemoteDatabase(), "test_join_cast_a_", "(a1 %s, a2 %s)".formatted(testCase.fromRemoteType(), testCase.fromRemoteType()));
TestTable tableB = new TestTable(onRemoteDatabase(), "test_join_cast_b_", "(b1 %s, b2 %s)".formatted(testCase.targetRemoteType(), testCase.fromRemoteType()))) {
assertThat(query(session, "SELECT a.a2, b.b2 FROM %s a JOIN %s b ON CAST(year(a.a1) AS %s) = b.b1".formatted(tableA.getName(), tableB.getName(), testCase.castType())))
.joinIsNotFullyPushedDown();
}
}

private record CastTestCase(String fromRemoteType, String castType, String targetRemoteType)
{
private CastTestCase(String fromRemoteType, String castType)
{
this(fromRemoteType, castType, castType);
}
}

@Override
protected String errorMessageForInsertIntoNotNullColumn(String columnName)
{
Expand Down

0 comments on commit 2b19480

Please sign in to comment.