Skip to content

Commit

Permalink
Disable predicate pushdown on ARRAY(CHAR) type in Phoenix
Browse files Browse the repository at this point in the history
Fix query failure for predicate pushdown on ARRAY(CHAR) type.
  • Loading branch information
ebyhr committed Feb 22, 2022
1 parent a44af8b commit c199fac
Show file tree
Hide file tree
Showing 4 changed files with 38 additions and 90 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import io.trino.plugin.jdbc.LongWriteFunction;
import io.trino.plugin.jdbc.ObjectReadFunction;
import io.trino.plugin.jdbc.ObjectWriteFunction;
import io.trino.plugin.jdbc.PredicatePushdownController;
import io.trino.plugin.jdbc.PreparedQuery;
import io.trino.plugin.jdbc.QueryBuilder;
import io.trino.plugin.jdbc.WriteFunction;
Expand Down Expand Up @@ -115,6 +116,8 @@
import static io.trino.plugin.jdbc.DecimalSessionSessionProperties.getDecimalDefaultScale;
import static io.trino.plugin.jdbc.DecimalSessionSessionProperties.getDecimalRounding;
import static io.trino.plugin.jdbc.DecimalSessionSessionProperties.getDecimalRoundingMode;
import static io.trino.plugin.jdbc.PredicatePushdownController.DISABLE_PUSHDOWN;
import static io.trino.plugin.jdbc.PredicatePushdownController.FULL_PUSHDOWN;
import static io.trino.plugin.jdbc.StandardColumnMappings.bigintColumnMapping;
import static io.trino.plugin.jdbc.StandardColumnMappings.bigintWriteFunction;
import static io.trino.plugin.jdbc.StandardColumnMappings.booleanColumnMapping;
Expand Down Expand Up @@ -460,7 +463,9 @@ public Optional<ColumnMapping> toColumnMapping(ConnectorSession session, Connect
.orElseThrow(() -> new TrinoException(
PHOENIX_METADATA_ERROR,
"Type name is missing for jdbc type: " + JDBCType.valueOf(elementTypeHandle.getJdbcType())));
return arrayColumnMapping(session, trinoArrayType, jdbcTypeName);
// TODO (https://github.com/trinodb/trino/issues/11132) Enable predicate pushdown on ARRAY(CHAR) type in Phoenix
PredicatePushdownController pushdown = elementTypeHandle.getJdbcType() == Types.CHAR ? DISABLE_PUSHDOWN : FULL_PUSHDOWN;
return arrayColumnMapping(session, trinoArrayType, jdbcTypeName, pushdown);
});
}
if (getUnsupportedTypeHandling(session) == CONVERT_TO_VARCHAR) {
Expand Down Expand Up @@ -732,12 +737,13 @@ public void set(PreparedStatement statement, int index, long value)
};
}

private static ColumnMapping arrayColumnMapping(ConnectorSession session, ArrayType arrayType, String elementJdbcTypeName)
private static ColumnMapping arrayColumnMapping(ConnectorSession session, ArrayType arrayType, String elementJdbcTypeName, PredicatePushdownController pushdownController)
{
return ColumnMapping.objectMapping(
arrayType,
arrayReadFunction(session, arrayType.getElementType()),
arrayWriteFunction(session, arrayType.getElementType(), elementJdbcTypeName));
arrayWriteFunction(session, arrayType.getElementType(), elementJdbcTypeName),
pushdownController);
}

private static ObjectReadFunction arrayReadFunction(ConnectorSession session, Type elementType)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,6 @@
import io.trino.testing.datatype.CreateAndInsertDataSetup;
import io.trino.testing.datatype.CreateAsSelectDataSetup;
import io.trino.testing.datatype.DataSetup;
import io.trino.testing.datatype.DataType;
import io.trino.testing.datatype.DataTypeTest;
import io.trino.testing.datatype.SqlDataTypeTest;
import io.trino.testing.sql.TestTable;
import io.trino.testing.sql.TrinoSqlExecutor;
Expand All @@ -39,8 +37,6 @@
import java.time.LocalDate;
import java.time.LocalDateTime;
import java.time.ZoneId;
import java.util.List;
import java.util.function.Function;

import static com.google.common.base.Preconditions.checkState;
import static com.google.common.base.Verify.verify;
Expand All @@ -66,13 +62,11 @@
import static io.trino.spi.type.VarbinaryType.VARBINARY;
import static io.trino.spi.type.VarcharType.VARCHAR;
import static io.trino.spi.type.VarcharType.createVarcharType;
import static io.trino.testing.datatype.DataType.dataType;
import static java.lang.String.format;
import static java.math.RoundingMode.HALF_UP;
import static java.math.RoundingMode.UNNECESSARY;
import static java.time.ZoneOffset.UTC;
import static java.util.Arrays.asList;
import static java.util.stream.Collectors.toList;
import static org.assertj.core.api.Assertions.assertThat;

/**
Expand Down Expand Up @@ -617,11 +611,17 @@ public void testArray()
.addRoundTrip("integer primary key", "1", INTEGER, "1")
.execute(getQueryRunner(), phoenixCreateAndInsert("tpch.test_array_decimal"));

// TODO (https://github.com/trinodb/trino/issues/10451) Migrate to SqlDataTypeTest after fixing predicate pushdown failure on array(char) type
arrayStringDataTypeTest(TestPhoenixTypeMapping::arrayDataType, DataType::charDataType)
SqlDataTypeTest.create()
.addRoundTrip("ARRAY(char(10))", "ARRAY['text_a']", new ArrayType(createCharType(10)), "ARRAY[CAST('text_a' AS char(10))]")
.addRoundTrip("ARRAY(char(255))", "ARRAY['text_b']", new ArrayType(createCharType(255)), "ARRAY[CAST('text_b' AS char(255))]")
.addRoundTrip("ARRAY(char(65535))", "ARRAY['text_d']", new ArrayType(createCharType(65535)), "ARRAY[CAST('text_d' AS char(65535))]")
.execute(getQueryRunner(), trinoCreateAsSelect("test_array_char"));
arrayStringDataTypeTest(TestPhoenixTypeMapping::phoenixArrayDataType, DataType::charDataType)
.addRoundTrip(primaryKey(), 1)

SqlDataTypeTest.create()
.addRoundTrip("char(10) ARRAY", "ARRAY['text_a']", new ArrayType(createCharType(10)), "ARRAY[CAST('text_a' AS char(10))]")
.addRoundTrip("char(255) ARRAY", "ARRAY['text_b']", new ArrayType(createCharType(255)), "ARRAY[CAST('text_b' AS char(255))]")
.addRoundTrip("char(65535) ARRAY", "ARRAY['text_d']", new ArrayType(createCharType(65535)), "ARRAY[CAST('text_d' AS char(65535))]")
.addRoundTrip("integer primary key", "1", INTEGER, "1")
.execute(getQueryRunner(), phoenixCreateAndInsert("tpch.test_array_char"));

SqlDataTypeTest.create()
Expand Down Expand Up @@ -653,33 +653,6 @@ public void testArrayNulls()
}
}

private DataTypeTest arrayStringDataTypeTest(Function<DataType<String>, DataType<List<String>>> arrayTypeFactory, Function<Integer, DataType<String>> dataTypeFactory)
{
return DataTypeTest.create()
.addRoundTrip(arrayTypeFactory.apply(dataTypeFactory.apply(10)), asList("text_a"))
.addRoundTrip(arrayTypeFactory.apply(dataTypeFactory.apply(255)), asList("text_b"))
.addRoundTrip(arrayTypeFactory.apply(dataTypeFactory.apply(65535)), asList("text_d"));
}

private static <E> DataType<List<E>> arrayDataType(DataType<E> elementType)
{
return arrayDataType(elementType, format("ARRAY(%s)", elementType.getInsertType()));
}

private static <E> DataType<List<E>> phoenixArrayDataType(DataType<E> elementType)
{
return arrayDataType(elementType, elementType.getInsertType() + " ARRAY");
}

private static <E> DataType<List<E>> arrayDataType(DataType<E> elementType, String insertType)
{
return dataType(
insertType,
new ArrayType(elementType.getTrinoResultType()),
valuesList -> "ARRAY" + valuesList.stream().map(elementType::toLiteral).collect(toList()),
valuesList -> valuesList == null ? null : valuesList.stream().map(elementType::toTrinoQueryResult).collect(toList()));
}

@DataProvider
public Object[][] sessionZonesDataProvider()
{
Expand Down Expand Up @@ -708,11 +681,6 @@ private static void checkIsDoubled(ZoneId zone, LocalDateTime dateTime)
verify(zone.getRules().getValidOffsets(dateTime).size() == 2, "Expected %s to be doubled in %s", dateTime, zone);
}

private DataType<Integer> primaryKey()
{
return dataType("integer primary key", INTEGER, Object::toString);
}

private DataSetup trinoCreateAsSelect(String tableNamePrefix)
{
return trinoCreateAsSelect(getSession(), tableNamePrefix);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import io.trino.plugin.jdbc.LongWriteFunction;
import io.trino.plugin.jdbc.ObjectReadFunction;
import io.trino.plugin.jdbc.ObjectWriteFunction;
import io.trino.plugin.jdbc.PredicatePushdownController;
import io.trino.plugin.jdbc.PreparedQuery;
import io.trino.plugin.jdbc.QueryBuilder;
import io.trino.plugin.jdbc.WriteFunction;
Expand Down Expand Up @@ -116,6 +117,8 @@
import static io.trino.plugin.jdbc.DecimalSessionSessionProperties.getDecimalDefaultScale;
import static io.trino.plugin.jdbc.DecimalSessionSessionProperties.getDecimalRounding;
import static io.trino.plugin.jdbc.DecimalSessionSessionProperties.getDecimalRoundingMode;
import static io.trino.plugin.jdbc.PredicatePushdownController.DISABLE_PUSHDOWN;
import static io.trino.plugin.jdbc.PredicatePushdownController.FULL_PUSHDOWN;
import static io.trino.plugin.jdbc.StandardColumnMappings.bigintColumnMapping;
import static io.trino.plugin.jdbc.StandardColumnMappings.bigintWriteFunction;
import static io.trino.plugin.jdbc.StandardColumnMappings.booleanColumnMapping;
Expand Down Expand Up @@ -452,7 +455,9 @@ public Optional<ColumnMapping> toColumnMapping(ConnectorSession session, Connect
.orElseThrow(() -> new TrinoException(
PHOENIX_METADATA_ERROR,
"Type name is missing for jdbc type: " + JDBCType.valueOf(elementTypeHandle.getJdbcType())));
return arrayColumnMapping(session, trinoArrayType, jdbcTypeName);
// TODO (https://github.com/trinodb/trino/issues/11132) Enable predicate pushdown on ARRAY(CHAR) type in Phoenix
PredicatePushdownController pushdownController = elementTypeHandle.getJdbcType() == Types.CHAR ? DISABLE_PUSHDOWN : FULL_PUSHDOWN;
return arrayColumnMapping(session, trinoArrayType, jdbcTypeName, pushdownController);
});
}
if (getUnsupportedTypeHandling(session) == CONVERT_TO_VARCHAR) {
Expand Down Expand Up @@ -724,12 +729,13 @@ public void set(PreparedStatement statement, int index, long value)
};
}

private static ColumnMapping arrayColumnMapping(ConnectorSession session, ArrayType arrayType, String elementJdbcTypeName)
private static ColumnMapping arrayColumnMapping(ConnectorSession session, ArrayType arrayType, String elementJdbcTypeName, PredicatePushdownController pushdownController)
{
return ColumnMapping.objectMapping(
arrayType,
arrayReadFunction(session, arrayType.getElementType()),
arrayWriteFunction(session, arrayType.getElementType(), elementJdbcTypeName));
arrayWriteFunction(session, arrayType.getElementType(), elementJdbcTypeName),
pushdownController);
}

private static ObjectReadFunction arrayReadFunction(ConnectorSession session, Type elementType)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,6 @@
import io.trino.testing.datatype.CreateAndInsertDataSetup;
import io.trino.testing.datatype.CreateAsSelectDataSetup;
import io.trino.testing.datatype.DataSetup;
import io.trino.testing.datatype.DataType;
import io.trino.testing.datatype.DataTypeTest;
import io.trino.testing.datatype.SqlDataTypeTest;
import io.trino.testing.sql.TestTable;
import io.trino.testing.sql.TrinoSqlExecutor;
Expand All @@ -39,8 +37,6 @@
import java.time.LocalDate;
import java.time.LocalDateTime;
import java.time.ZoneId;
import java.util.List;
import java.util.function.Function;

import static com.google.common.base.Preconditions.checkState;
import static com.google.common.base.Verify.verify;
Expand All @@ -66,13 +62,11 @@
import static io.trino.spi.type.VarbinaryType.VARBINARY;
import static io.trino.spi.type.VarcharType.VARCHAR;
import static io.trino.spi.type.VarcharType.createVarcharType;
import static io.trino.testing.datatype.DataType.dataType;
import static java.lang.String.format;
import static java.math.RoundingMode.HALF_UP;
import static java.math.RoundingMode.UNNECESSARY;
import static java.time.ZoneOffset.UTC;
import static java.util.Arrays.asList;
import static java.util.stream.Collectors.toList;
import static org.assertj.core.api.Assertions.assertThat;

/**
Expand Down Expand Up @@ -617,11 +611,17 @@ public void testArray()
.addRoundTrip("integer primary key", "1", INTEGER, "1")
.execute(getQueryRunner(), phoenixCreateAndInsert("tpch.test_array_decimal"));

// TODO (https://github.com/trinodb/trino/issues/10451) Migrate to SqlDataTypeTest after fixing predicate pushdown failure on array(char) type
arrayStringDataTypeTest(TestPhoenixTypeMapping::arrayDataType, DataType::charDataType)
SqlDataTypeTest.create()
.addRoundTrip("ARRAY(char(10))", "ARRAY['text_a']", new ArrayType(createCharType(10)), "ARRAY[CAST('text_a' AS char(10))]")
.addRoundTrip("ARRAY(char(255))", "ARRAY['text_b']", new ArrayType(createCharType(255)), "ARRAY[CAST('text_b' AS char(255))]")
.addRoundTrip("ARRAY(char(65535))", "ARRAY['text_d']", new ArrayType(createCharType(65535)), "ARRAY[CAST('text_d' AS char(65535))]")
.execute(getQueryRunner(), trinoCreateAsSelect("test_array_char"));
arrayStringDataTypeTest(TestPhoenixTypeMapping::phoenixArrayDataType, DataType::charDataType)
.addRoundTrip(primaryKey(), 1)

SqlDataTypeTest.create()
.addRoundTrip("char(10) ARRAY", "ARRAY['text_a']", new ArrayType(createCharType(10)), "ARRAY[CAST('text_a' AS char(10))]")
.addRoundTrip("char(255) ARRAY", "ARRAY['text_b']", new ArrayType(createCharType(255)), "ARRAY[CAST('text_b' AS char(255))]")
.addRoundTrip("char(65535) ARRAY", "ARRAY['text_d']", new ArrayType(createCharType(65535)), "ARRAY[CAST('text_d' AS char(65535))]")
.addRoundTrip("integer primary key", "1", INTEGER, "1")
.execute(getQueryRunner(), phoenixCreateAndInsert("tpch.test_array_char"));

SqlDataTypeTest.create()
Expand Down Expand Up @@ -653,33 +653,6 @@ public void testArrayNulls()
}
}

private DataTypeTest arrayStringDataTypeTest(Function<DataType<String>, DataType<List<String>>> arrayTypeFactory, Function<Integer, DataType<String>> dataTypeFactory)
{
return DataTypeTest.create()
.addRoundTrip(arrayTypeFactory.apply(dataTypeFactory.apply(10)), asList("text_a"))
.addRoundTrip(arrayTypeFactory.apply(dataTypeFactory.apply(255)), asList("text_b"))
.addRoundTrip(arrayTypeFactory.apply(dataTypeFactory.apply(65535)), asList("text_d"));
}

private static <E> DataType<List<E>> arrayDataType(DataType<E> elementType)
{
return arrayDataType(elementType, format("ARRAY(%s)", elementType.getInsertType()));
}

private static <E> DataType<List<E>> phoenixArrayDataType(DataType<E> elementType)
{
return arrayDataType(elementType, elementType.getInsertType() + " ARRAY");
}

private static <E> DataType<List<E>> arrayDataType(DataType<E> elementType, String insertType)
{
return dataType(
insertType,
new ArrayType(elementType.getTrinoResultType()),
valuesList -> "ARRAY" + valuesList.stream().map(elementType::toLiteral).collect(toList()),
valuesList -> valuesList == null ? null : valuesList.stream().map(elementType::toTrinoQueryResult).collect(toList()));
}

@DataProvider
public Object[][] sessionZonesDataProvider()
{
Expand Down Expand Up @@ -708,11 +681,6 @@ private static void checkIsDoubled(ZoneId zone, LocalDateTime dateTime)
verify(zone.getRules().getValidOffsets(dateTime).size() == 2, "Expected %s to be doubled in %s", dateTime, zone);
}

private DataType<Integer> primaryKey()
{
return dataType("integer primary key", INTEGER, Object::toString);
}

private DataSetup trinoCreateAsSelect(String tableNamePrefix)
{
return trinoCreateAsSelect(getSession(), tableNamePrefix);
Expand Down

0 comments on commit c199fac

Please sign in to comment.