Skip to content

Commit

Permalink
Enable varchar equality predicate pushdown for ClickHouse-connector
Browse files Browse the repository at this point in the history
  • Loading branch information
sylph-eu authored and ebyhr committed May 21, 2024
1 parent 06bab6c commit d035094
Show file tree
Hide file tree
Showing 3 changed files with 75 additions and 5 deletions.
2 changes: 1 addition & 1 deletion docs/src/main/sphinx/connector/clickhouse.md
Original file line number Diff line number Diff line change
Expand Up @@ -360,5 +360,5 @@ The connector supports pushdown for a number of operations:
```{include} pushdown-correctness-behavior.fragment
```

```{include} no-pushdown-text-type.fragment
```{include} no-inequality-pushdown-text-type.fragment
```
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@
import io.trino.plugin.jdbc.LongReadFunction;
import io.trino.plugin.jdbc.LongWriteFunction;
import io.trino.plugin.jdbc.ObjectWriteFunction;
import io.trino.plugin.jdbc.PredicatePushdownController;
import io.trino.plugin.jdbc.QueryBuilder;
import io.trino.plugin.jdbc.RemoteTableName;
import io.trino.plugin.jdbc.SliceWriteFunction;
Expand All @@ -58,6 +59,7 @@
import io.trino.spi.connector.ColumnMetadata;
import io.trino.spi.connector.ConnectorSession;
import io.trino.spi.connector.ConnectorTableMetadata;
import io.trino.spi.predicate.Domain;
import io.trino.spi.type.CharType;
import io.trino.spi.type.DecimalType;
import io.trino.spi.type.Decimals;
Expand Down Expand Up @@ -121,7 +123,9 @@
import static io.trino.plugin.jdbc.DecimalSessionSessionProperties.getDecimalRounding;
import static io.trino.plugin.jdbc.DecimalSessionSessionProperties.getDecimalRoundingMode;
import static io.trino.plugin.jdbc.JdbcErrorCode.JDBC_ERROR;
import static io.trino.plugin.jdbc.JdbcMetadataSessionProperties.getDomainCompactionThreshold;
import static io.trino.plugin.jdbc.PredicatePushdownController.DISABLE_PUSHDOWN;
import static io.trino.plugin.jdbc.PredicatePushdownController.FULL_PUSHDOWN;
import static io.trino.plugin.jdbc.StandardColumnMappings.bigintColumnMapping;
import static io.trino.plugin.jdbc.StandardColumnMappings.bigintWriteFunction;
import static io.trino.plugin.jdbc.StandardColumnMappings.booleanWriteFunction;
Expand Down Expand Up @@ -197,6 +201,20 @@ public class ClickHouseClient

public static final int DEFAULT_DOMAIN_COMPACTION_THRESHOLD = 1_000;

private static final PredicatePushdownController CLICKHOUSE_PUSHDOWN_CONTROLLER = (session, domain) -> {
if (domain.isOnlyNull()) {
return FULL_PUSHDOWN.apply(session, domain);
}

Domain simplifiedDomain = domain.simplify(getDomainCompactionThreshold(session));
if (!simplifiedDomain.getValues().isDiscreteSet()) {
// Domain#simplify can turn a discrete set into a range predicate
return DISABLE_PUSHDOWN.apply(session, domain);
}

return FULL_PUSHDOWN.apply(session, simplifiedDomain);
};

private final ConnectorExpressionRewriter<ParameterizedExpression> connectorExpressionRewriter;
private final AggregateFunctionRewriter<JdbcExpression, ?> aggregateFunctionRewriter;
private final Type uuidType;
Expand Down Expand Up @@ -612,9 +630,8 @@ public Optional<ColumnMapping> toColumnMapping(ConnectorSession session, Connect
createUnboundedVarcharType(),
varcharReadFunction(createUnboundedVarcharType()),
varcharWriteFunction(),
DISABLE_PUSHDOWN));
CLICKHOUSE_PUSHDOWN_CONTROLLER));
}
// TODO (https://github.com/trinodb/trino/issues/7100) test & enable predicate pushdown
return Optional.of(varbinaryColumnMapping());
case UUID:
return Optional.of(uuidColumnMapping());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,12 @@

import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import io.trino.Session;
import io.trino.plugin.jdbc.BaseJdbcConnectorTest;
import io.trino.plugin.jdbc.JdbcTableHandle;
import io.trino.spi.predicate.TupleDomain;
import io.trino.sql.planner.plan.AggregationNode;
import io.trino.sql.planner.plan.FilterNode;
import io.trino.testing.MaterializedResult;
import io.trino.testing.QueryRunner;
import io.trino.testing.TestingConnectorBehavior;
Expand Down Expand Up @@ -44,6 +48,8 @@
import static io.trino.plugin.clickhouse.TestingClickHouseServer.CLICKHOUSE_LATEST_IMAGE;
import static io.trino.plugin.jdbc.JdbcMetadataSessionProperties.DOMAIN_COMPACTION_THRESHOLD;
import static io.trino.spi.type.VarcharType.VARCHAR;
import static io.trino.sql.planner.assertions.PlanMatchPattern.node;
import static io.trino.sql.planner.assertions.PlanMatchPattern.tableScan;
import static io.trino.testing.MaterializedResult.resultBuilder;
import static io.trino.testing.TestingNames.randomNameSuffix;
import static java.lang.String.format;
Expand All @@ -60,7 +66,8 @@ public class TestClickHouseConnectorTest
protected boolean hasBehavior(TestingConnectorBehavior connectorBehavior)
{
return switch (connectorBehavior) {
case SUPPORTS_TRUNCATE -> true;
case SUPPORTS_PREDICATE_PUSHDOWN_WITH_VARCHAR_EQUALITY,
SUPPORTS_TRUNCATE -> true;
case SUPPORTS_AGGREGATION_PUSHDOWN_CORRELATION,
SUPPORTS_AGGREGATION_PUSHDOWN_COUNT_DISTINCT,
SUPPORTS_AGGREGATION_PUSHDOWN_COVARIANCE,
Expand All @@ -72,7 +79,6 @@ protected boolean hasBehavior(TestingConnectorBehavior connectorBehavior)
SUPPORTS_DROP_NOT_NULL_CONSTRAINT,
SUPPORTS_NATIVE_QUERY,
SUPPORTS_NEGATIVE_DATE,
SUPPORTS_PREDICATE_PUSHDOWN_WITH_VARCHAR_EQUALITY,
SUPPORTS_PREDICATE_PUSHDOWN_WITH_VARCHAR_INEQUALITY,
SUPPORTS_ROW_TYPE,
SUPPORTS_SET_COLUMN_TYPE,
Expand Down Expand Up @@ -793,6 +799,53 @@ public void testLargeDefaultDomainCompactionThreshold()
"VALUES('" + propertyName + "','1000', '1000', 'integer', 'Maximum ranges to allow in a tuple domain without simplifying it')");
}

@Test
public void testTextualPredicatePushdown()
{
// varchar equality
assertThat(query("SELECT regionkey, nationkey, name FROM nation WHERE name = 'ROMANIA'"))
.matches("VALUES (BIGINT '3', BIGINT '19', CAST('ROMANIA' AS varchar))")
.isFullyPushedDown();

// varchar range
assertThat(query("SELECT regionkey, nationkey, name FROM nation WHERE name BETWEEN 'POLAND' AND 'RPA'"))
.matches("VALUES (BIGINT '3', BIGINT '19', CAST('ROMANIA' AS varchar))")
.isNotFullyPushedDown(FilterNode.class);

// varchar IN without domain compaction
assertThat(query("SELECT regionkey, nationkey, name FROM nation WHERE name IN ('POLAND', 'ROMANIA', 'VIETNAM')"))
.matches("VALUES " +
"(BIGINT '3', BIGINT '19', CAST('ROMANIA' AS varchar)), " +
"(BIGINT '2', BIGINT '21', CAST('VIETNAM' AS varchar))")
.isFullyPushedDown();

// varchar IN with small compaction threshold
assertThat(query(
Session.builder(getSession())
.setCatalogSessionProperty("clickhouse", "domain_compaction_threshold", "1")
.build(),
"SELECT regionkey, nationkey, name FROM nation WHERE name IN ('POLAND', 'ROMANIA', 'VIETNAM')"))
.matches("VALUES " +
"(BIGINT '3', BIGINT '19', CAST('ROMANIA' AS varchar)), " +
"(BIGINT '2', BIGINT '21', CAST('VIETNAM' AS varchar))")
// Filter node is retained as no constraint is pushed into connector.
// The compacted domain is a range predicate which can give wrong results
// if pushed down as ClickHouse has different sort ordering for letters from Trino
.isNotFullyPushedDown(
node(
FilterNode.class,
// verify that no constraint is applied by the connector
tableScan(
tableHandle -> ((JdbcTableHandle) tableHandle).getConstraint().isAll(),
TupleDomain.all(),
ImmutableMap.of())));

// varchar different case
assertThat(query("SELECT regionkey, nationkey, name FROM nation WHERE name = 'romania'"))
.returnsEmptyResult()
.isFullyPushedDown();
}

@Override
protected OptionalInt maxTableNameLength()
{
Expand Down

0 comments on commit d035094

Please sign in to comment.