Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support Iceberg materialized views with types not supported directly in Iceberg library #16050

Merged
merged 4 commits into from
Mar 9, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -689,19 +689,6 @@ protected Scope visitRefreshMaterializedView(RefreshMaterializedView refreshMate
.map(insertColumn -> tableMetadata.getColumn(insertColumn).getType())
.collect(toImmutableList());

List<Type> queryTypes = queryScope.getRelationType().getVisibleFields().stream()
findepi marked this conversation as resolved.
Show resolved Hide resolved
.map(Field::getType)
.collect(toImmutableList());

if (!typesMatchForInsert(tableTypes, queryTypes)) {
throw semanticException(
TYPE_MISMATCH,
refreshMaterializedView,
"Insert query has mismatched column types: Table: [%s], Query: [%s]",
Joiner.on(", ").join(tableTypes),
Joiner.on(", ").join(queryTypes));
}

Stream<Column> columns = Streams.zip(
insertColumns.stream(),
tableTypes.stream()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -279,6 +279,18 @@ public void markMaterializedViewIsFresh(SchemaTableName name)
freshMaterializedViews.add(name);
}

@Override
public boolean delegateMaterializedViewRefreshToConnector(ConnectorSession session, SchemaTableName viewName)
{
return false;
}

@Override
public ConnectorInsertTableHandle beginRefreshMaterializedView(ConnectorSession session, ConnectorTableHandle tableHandle, List<ConnectorTableHandle> sourceTableHandles, RetryMode retryMode)
{
return TestingHandle.INSTANCE;
}

@Override
public ConnectorOutputTableHandle beginCreateTable(ConnectorSession session, ConnectorTableMetadata tableMetadata, Optional<ConnectorTableLayout> layout, RetryMode retryMode)
{
Expand Down Expand Up @@ -378,6 +390,12 @@ public SchemaTableName getTableName()
{
return tableName;
}

@Override
public String toString()
{
return tableName.toString();
}
}

public static class TestingColumnHandle
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,20 +33,28 @@
import io.trino.spi.security.ViewExpression;
import io.trino.spi.transaction.IsolationLevel;
import io.trino.sql.planner.assertions.BasePlanTest;
import io.trino.sql.tree.GenericLiteral;
import io.trino.testing.LocalQueryRunner;
import io.trino.testing.TestingAccessControlManager;
import io.trino.testing.TestingMetadata;
import org.testng.annotations.Test;

import java.util.List;
import java.util.Map;
import java.util.Optional;

import static io.trino.spi.type.BigintType.BIGINT;
import static io.trino.spi.type.TinyintType.TINYINT;
import static io.trino.spi.type.VarcharType.VARCHAR;
import static io.trino.sql.planner.assertions.PlanMatchPattern.anyTree;
import static io.trino.sql.planner.assertions.PlanMatchPattern.exchange;
import static io.trino.sql.planner.assertions.PlanMatchPattern.expression;
import static io.trino.sql.planner.assertions.PlanMatchPattern.output;
import static io.trino.sql.planner.assertions.PlanMatchPattern.project;
import static io.trino.sql.planner.assertions.PlanMatchPattern.tableScan;
import static io.trino.sql.planner.assertions.PlanMatchPattern.tableWriter;
import static io.trino.sql.planner.assertions.PlanMatchPattern.values;
import static io.trino.sql.planner.plan.ExchangeNode.Scope.LOCAL;
import static io.trino.testing.TestingHandles.TEST_CATALOG_NAME;
import static io.trino.testing.TestingSession.testSessionBuilder;

Expand Down Expand Up @@ -164,6 +172,16 @@ protected LocalQueryRunner createLocalQueryRunner()
});
testingConnectorMetadata.markMaterializedViewIsFresh(materializedViewWithCasts.asSchemaTableName());

queryRunner.inTransaction(session -> {
metadata.createMaterializedView(
session,
new QualifiedObjectName(TEST_CATALOG_NAME, SCHEMA, "stale_materialized_view_with_casts"),
materializedViewDefinitionWithCasts,
false,
false);
return null;
});

return queryRunner;
}

Expand Down Expand Up @@ -201,6 +219,22 @@ public void testMaterializedViewWithCasts()
tableScan("storage_table_with_casts", ImmutableMap.of("A", "a", "B", "b")))));
}

@Test
public void testRefreshMaterializedViewWithCasts()
{
assertPlan("REFRESH MATERIALIZED VIEW stale_materialized_view_with_casts",
anyTree(
tableWriter(List.of("A_CAST", "B_CAST"), List.of("a", "b"),
exchange(LOCAL,
project(Map.of("A_CAST", expression("CAST(A AS tinyint)"), "B_CAST", expression("CAST(B AS varchar)")),
tableScan("test_table", Map.of("A", "a", "B", "b")))))));

// No-op REFRESH
assertPlan("REFRESH MATERIALIZED VIEW materialized_view_with_casts",
output(
values(List.of("rows"), List.of(List.of(new GenericLiteral("BIGINT", "0"))))));
}

private static class TestMaterializedViewConnector
implements Connector
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,15 @@
import io.trino.spi.connector.ConnectorTableMetadata;
import io.trino.spi.connector.ConnectorViewDefinition;
import io.trino.spi.connector.SchemaTableName;
import io.trino.spi.type.ArrayType;
import io.trino.spi.type.CharType;
import io.trino.spi.type.MapType;
import io.trino.spi.type.RowType;
import io.trino.spi.type.TimeType;
import io.trino.spi.type.TimeWithTimeZoneType;
import io.trino.spi.type.TimestampType;
import io.trino.spi.type.TimestampWithTimeZoneType;
import io.trino.spi.type.Type;
import io.trino.spi.type.TypeManager;
import org.apache.iceberg.AppendFiles;
import org.apache.iceberg.PartitionSpec;
Expand Down Expand Up @@ -62,6 +71,12 @@
import static io.trino.plugin.iceberg.IcebergUtil.commit;
import static io.trino.plugin.iceberg.IcebergUtil.getIcebergTableProperties;
import static io.trino.spi.StandardErrorCode.TABLE_NOT_FOUND;
import static io.trino.spi.type.IntegerType.INTEGER;
import static io.trino.spi.type.SmallintType.SMALLINT;
import static io.trino.spi.type.TimeType.TIME_MICROS;
import static io.trino.spi.type.TimestampType.TIMESTAMP_MICROS;
import static io.trino.spi.type.TinyintType.TINYINT;
import static io.trino.spi.type.VarcharType.VARCHAR;
import static java.lang.String.format;
import static java.util.Objects.requireNonNull;
import static java.util.UUID.randomUUID;
Expand Down Expand Up @@ -203,7 +218,7 @@ protected SchemaTableName createMaterializedViewStorageTable(ConnectorSession se
String storageSchema = getStorageSchema(definition.getProperties()).orElse(viewName.getSchemaName());
SchemaTableName storageTable = new SchemaTableName(storageSchema, storageTableName);
List<ColumnMetadata> columns = definition.getColumns().stream()
.map(column -> new ColumnMetadata(column.getName(), typeManager.getType(column.getType())))
.map(column -> new ColumnMetadata(column.getName(), typeForMaterializedViewStorageTable(typeManager.getType(column.getType()))))
.collect(toImmutableList());

ConnectorTableMetadata tableMetadata = new ConnectorTableMetadata(storageTable, columns, storageTableProperties, Optional.empty());
Expand All @@ -214,6 +229,61 @@ protected SchemaTableName createMaterializedViewStorageTable(ConnectorSession se
return storageTable;
}

/**
* Substitutes type not supported by Iceberg with a type that is supported.
* Upon reading from a materialized view, the types will be coerced back to the original ones,
* stored in the materialized view definition.
*/
private Type typeForMaterializedViewStorageTable(Type type)
{
if (type == TINYINT || type == SMALLINT) {
return INTEGER;
}
if (type instanceof CharType) {
return VARCHAR;
}
if (type instanceof TimeType timeType) {
// Iceberg supports microsecond precision only
return timeType.getPrecision() <= 6
? TIME_MICROS
: VARCHAR;
}
if (type instanceof TimeWithTimeZoneType) {
return VARCHAR;
}
if (type instanceof TimestampType timestampType) {
// Iceberg supports microsecond precision only
return timestampType.getPrecision() <= 6
? TIMESTAMP_MICROS
: VARCHAR;
}
if (type instanceof TimestampWithTimeZoneType) {
// Iceberg does not store the time zone
// TODO allow temporal partitioning on these columns, or MV property to
// drop zone info and use timestamptz directly
return VARCHAR;
}
if (type instanceof ArrayType arrayType) {
return new ArrayType(typeForMaterializedViewStorageTable(arrayType.getElementType()));
}
if (type instanceof MapType mapType) {
return new MapType(
typeForMaterializedViewStorageTable(mapType.getKeyType()),
typeForMaterializedViewStorageTable(mapType.getValueType()),
typeManager.getTypeOperators());
}
if (type instanceof RowType rowType) {
return RowType.rowType(
rowType.getFields().stream()
.map(field -> new RowType.Field(field.getName(), typeForMaterializedViewStorageTable(field.getType())))
.toArray(RowType.Field[]::new));
}

// Pass through all the types not explicitly handled above. If a type is not accepted by the connector,
// creation of the storage table will fail anyway.
return type;
}

protected ConnectorMaterializedViewDefinition getMaterializedViewDefinition(
Table icebergTable,
Optional<String> owner,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,7 @@ public void testMaterializedViewsMetadata()
{
String catalogName = getSession().getCatalog().orElseThrow();
String schemaName = getSession().getSchema().orElseThrow();
String materializedViewName = format("test_materialized_view_%s", randomNameSuffix());
String materializedViewName = "test_materialized_view_" + randomNameSuffix();

computeActual("CREATE TABLE small_region AS SELECT * FROM tpch.tiny.region LIMIT 1");
computeActual(format("CREATE MATERIALIZED VIEW %s AS SELECT * FROM small_region LIMIT 1", materializedViewName));
Expand Down Expand Up @@ -122,7 +122,7 @@ public void testMaterializedViewsMetadata()
"VALUES 'FRESH'");

assertUpdate("DROP TABLE small_region");
assertUpdate(format("DROP MATERIALIZED VIEW %s", materializedViewName));
assertUpdate("DROP MATERIALIZED VIEW " + materializedViewName);
}

@Test
Expand Down Expand Up @@ -154,17 +154,17 @@ public void testShowCreate()
{
String schema = getSession().getSchema().orElseThrow();

assertUpdate("CREATE MATERIALIZED VIEW materialized_view_with_property " +
assertUpdate("CREATE MATERIALIZED VIEW test_mv_show_create " +
"WITH (\n" +
" partitioning = ARRAY['_date'],\n" +
" orc_bloom_filter_columns = ARRAY['_date'],\n" +
" orc_bloom_filter_fpp = 0.1) AS " +
"SELECT _bigint, _date FROM base_table1");
assertQuery("SELECT COUNT(*) FROM materialized_view_with_property", "VALUES 6");
assertQuery("SELECT COUNT(*) FROM test_mv_show_create", "VALUES 6");

assertThat((String) computeScalar("SHOW CREATE MATERIALIZED VIEW materialized_view_with_property"))
assertThat((String) computeScalar("SHOW CREATE MATERIALIZED VIEW test_mv_show_create"))
.matches(
"\\QCREATE MATERIALIZED VIEW iceberg." + schema + ".materialized_view_with_property\n" +
"\\QCREATE MATERIALIZED VIEW iceberg." + schema + ".test_mv_show_create\n" +
"WITH (\n" +
" format = 'ORC',\n" +
" format_version = 2,\n" +
Expand All @@ -179,7 +179,7 @@ public void testShowCreate()
", _date\n" +
"FROM\n" +
" base_table1");
assertUpdate("DROP MATERIALIZED VIEW materialized_view_with_property");
assertUpdate("DROP MATERIALIZED VIEW test_mv_show_create");
}

@Test
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -197,6 +197,13 @@ public void testMaterializedView()
.hasMessage("createMaterializedView is not supported for Iceberg JDBC catalogs");
}

@Override
public void testMaterializedViewAllTypes()
{
assertThatThrownBy(super::testMaterializedViewAllTypes)
.hasMessage("createMaterializedView is not supported for Iceberg JDBC catalogs");
}

@Override
public void testMaterializedViewBaseTableGone(boolean initialized)
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1177,6 +1177,64 @@ public void testMaterializedView()
assertUpdate("DROP SCHEMA " + otherSchema);
}

@Test
public void testMaterializedViewAllTypes()
{
skipTestUnless(hasBehavior(SUPPORTS_CREATE_MATERIALIZED_VIEW));

String viewName = "test_mv_all_types_" + randomNameSuffix();

String values = """
SELECT
true a_boolean,
TINYINT '67' a_tinyint,
SMALLINT '35' a_smallint,
INTEGER '-1546831166' an_integer,
1544323431676534245 a_bigint,
REAL '12345.67' a_real,
DOUBLE '12345.678901234' a_double,
CAST('1234567.8901' AS decimal(11, 4)) a_short_decimal,
CAST('1234567890123456789.0123456' AS decimal(26, 7)) a_long_decimal,
CHAR 'few chars ' a_char,
CAST('some string' AS varchar(33)) a_bounded_varchar,
CAST('some longer string' AS varchar) an_unbounded_varchar,
X'65683F' a_varbinary,
DATE '2005-09-10' a_date,
TIME '13:00:00' a_time_seconds,
TIME '13:00:00.123' a_time_millis,
TIME '13:00:00.123456' a_time_micros,
TIME '13:00:00.123456789' a_time_nanos,
TIME '13:00:00 +02:00' a_time_tz__seconds,
TIME '13:00:00.123 +02:00' a_time_tz__millis,
TIME '13:00:00.123456 +02:00' a_time_tz__micros,
TIME '13:00:00.123456789 +02:00' a_time_tz__nanos,
TIMESTAMP '2005-09-10 13:00:00' a_timestamp_seconds,
TIMESTAMP '2005-09-10 13:00:00.123' a_timestamp_millis,
TIMESTAMP '2005-09-10 13:00:00.123456' a_timestamp_micros,
TIMESTAMP '2005-09-10 13:00:00.123456789' a_timestamp_nanos,
TIMESTAMP '2005-09-10 13:00:00 Europe/Warsaw' a_timestamp_tz_seconds,
TIMESTAMP '2005-09-10 13:00:00.123 Europe/Warsaw' a_timestamp_tz_millis,
TIMESTAMP '2005-09-10 13:00:00.123456 Europe/Warsaw' a_timestamp_tz_micros,
TIMESTAMP '2005-09-10 13:00:00.123456789 Europe/Warsaw' a_timestamp_tz_nanos,
UUID '12151fd2-7586-11e9-8f9e-2a86e4085a59' a_uuid,
ARRAY[TIMESTAMP '2005-09-10 13:00:00.123456789'] an_array_of_timestamp_nanos,
map(ARRAY['key'], ARRAY[TIMESTAMP '2005-09-10 13:00:00.123456789']) a_map_with_timestamp_nanos,
CAST(ROW(TIMESTAMP '2005-09-10 13:00:00.123456789') AS ROW(key timestamp(9))) a_row_with_timestamp_nanos,
""" +
// TODO JSON (requires json_format & json_parse instead of CASTs for the conversion)
// TODO interval, IPAddress, Geo types?
" 'a dummy' a_dummy";

assertUpdate("CREATE MATERIALIZED VIEW %s AS %s".formatted(viewName, values));
assertThat(query("TABLE " + viewName))
.matches(values);
assertUpdate("REFRESH MATERIALIZED VIEW " + viewName, 1);
assertThat(query("TABLE " + viewName))
.matches(values);

assertUpdate("DROP MATERIALIZED VIEW " + viewName);
}

@Test
public void testFederatedMaterializedView()
{
Expand Down