From cf825a6e492b6c7c7397e0879cb6934b8440f9d7 Mon Sep 17 00:00:00 2001 From: Piotr Findeisen Date: Thu, 9 Feb 2023 13:44:21 +0100 Subject: [PATCH 1/4] Remove redundant format call Per code style. --- .../trino/plugin/iceberg/BaseIcebergMaterializedViewTest.java | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/BaseIcebergMaterializedViewTest.java b/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/BaseIcebergMaterializedViewTest.java index 2e7a69b0c922..16fba9639d63 100644 --- a/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/BaseIcebergMaterializedViewTest.java +++ b/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/BaseIcebergMaterializedViewTest.java @@ -89,7 +89,7 @@ public void testMaterializedViewsMetadata() { String catalogName = getSession().getCatalog().orElseThrow(); String schemaName = getSession().getSchema().orElseThrow(); - String materializedViewName = format("test_materialized_view_%s", randomNameSuffix()); + String materializedViewName = "test_materialized_view_" + randomNameSuffix(); computeActual("CREATE TABLE small_region AS SELECT * FROM tpch.tiny.region LIMIT 1"); computeActual(format("CREATE MATERIALIZED VIEW %s AS SELECT * FROM small_region LIMIT 1", materializedViewName)); @@ -122,7 +122,7 @@ public void testMaterializedViewsMetadata() "VALUES 'FRESH'"); assertUpdate("DROP TABLE small_region"); - assertUpdate(format("DROP MATERIALIZED VIEW %s", materializedViewName)); + assertUpdate("DROP MATERIALIZED VIEW " + materializedViewName); } @Test From ae210540aac4723bf818becfe2e454c901abf9ec Mon Sep 17 00:00:00 2001 From: Piotr Findeisen Date: Thu, 9 Feb 2023 13:49:44 +0100 Subject: [PATCH 2/4] Sync test relation name with test name --- .../iceberg/BaseIcebergMaterializedViewTest.java | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/BaseIcebergMaterializedViewTest.java b/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/BaseIcebergMaterializedViewTest.java index 16fba9639d63..4d16b7a09048 100644 --- a/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/BaseIcebergMaterializedViewTest.java +++ b/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/BaseIcebergMaterializedViewTest.java @@ -154,17 +154,17 @@ public void testShowCreate() { String schema = getSession().getSchema().orElseThrow(); - assertUpdate("CREATE MATERIALIZED VIEW materialized_view_with_property " + + assertUpdate("CREATE MATERIALIZED VIEW test_mv_show_create " + "WITH (\n" + " partitioning = ARRAY['_date'],\n" + " orc_bloom_filter_columns = ARRAY['_date'],\n" + " orc_bloom_filter_fpp = 0.1) AS " + "SELECT _bigint, _date FROM base_table1"); - assertQuery("SELECT COUNT(*) FROM materialized_view_with_property", "VALUES 6"); + assertQuery("SELECT COUNT(*) FROM test_mv_show_create", "VALUES 6"); - assertThat((String) computeScalar("SHOW CREATE MATERIALIZED VIEW materialized_view_with_property")) + assertThat((String) computeScalar("SHOW CREATE MATERIALIZED VIEW test_mv_show_create")) .matches( - "\\QCREATE MATERIALIZED VIEW iceberg." + schema + ".materialized_view_with_property\n" + + "\\QCREATE MATERIALIZED VIEW iceberg." + schema + ".test_mv_show_create\n" + "WITH (\n" + " format = 'ORC',\n" + " format_version = 2,\n" + @@ -179,7 +179,7 @@ public void testShowCreate() ", _date\n" + "FROM\n" + " base_table1"); - assertUpdate("DROP MATERIALIZED VIEW materialized_view_with_property"); + assertUpdate("DROP MATERIALIZED VIEW test_mv_show_create"); } @Test From ac24bbf38e7edc115a4324d93428fc9506dd0ba8 Mon Sep 17 00:00:00 2001 From: Piotr Findeisen Date: Thu, 9 Feb 2023 20:50:26 +0100 Subject: [PATCH 3/4] Support arbitrary types in Iceberg materialized view definition A materialized view should be able to capture any query results and these are not constrained to be limited to types directly supported by Iceberg. Coerce unsupported types to supported ones. When reading, when the storage table has types different than MV output types, coercions are already applied by the `StatementAnalyzer`. The MV/storage schema mismatch is not supported in REFRESH MATERIALIZED VIEW yet. --- .../iceberg/catalog/AbstractTrinoCatalog.java | 72 ++++++++++++++++++- .../jdbc/TestIcebergJdbcConnectorTest.java | 7 ++ .../io/trino/testing/BaseConnectorTest.java | 55 ++++++++++++++ 3 files changed, 133 insertions(+), 1 deletion(-) diff --git a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/catalog/AbstractTrinoCatalog.java b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/catalog/AbstractTrinoCatalog.java index 88ec1508be7d..002fb37b515c 100644 --- a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/catalog/AbstractTrinoCatalog.java +++ b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/catalog/AbstractTrinoCatalog.java @@ -30,6 +30,15 @@ import io.trino.spi.connector.ConnectorTableMetadata; import io.trino.spi.connector.ConnectorViewDefinition; import io.trino.spi.connector.SchemaTableName; +import io.trino.spi.type.ArrayType; +import io.trino.spi.type.CharType; +import io.trino.spi.type.MapType; +import io.trino.spi.type.RowType; +import io.trino.spi.type.TimeType; +import io.trino.spi.type.TimeWithTimeZoneType; +import io.trino.spi.type.TimestampType; +import io.trino.spi.type.TimestampWithTimeZoneType; +import io.trino.spi.type.Type; import io.trino.spi.type.TypeManager; import org.apache.iceberg.AppendFiles; import org.apache.iceberg.PartitionSpec; @@ -62,6 +71,12 @@ import static io.trino.plugin.iceberg.IcebergUtil.commit; import static io.trino.plugin.iceberg.IcebergUtil.getIcebergTableProperties; import static io.trino.spi.StandardErrorCode.TABLE_NOT_FOUND; +import static io.trino.spi.type.IntegerType.INTEGER; +import static io.trino.spi.type.SmallintType.SMALLINT; +import static io.trino.spi.type.TimeType.TIME_MICROS; +import static io.trino.spi.type.TimestampType.TIMESTAMP_MICROS; +import static io.trino.spi.type.TinyintType.TINYINT; +import static io.trino.spi.type.VarcharType.VARCHAR; import static java.lang.String.format; import static java.util.Objects.requireNonNull; import static java.util.UUID.randomUUID; @@ -203,7 +218,7 @@ protected SchemaTableName createMaterializedViewStorageTable(ConnectorSession se String storageSchema = getStorageSchema(definition.getProperties()).orElse(viewName.getSchemaName()); SchemaTableName storageTable = new SchemaTableName(storageSchema, storageTableName); List columns = definition.getColumns().stream() - .map(column -> new ColumnMetadata(column.getName(), typeManager.getType(column.getType()))) + .map(column -> new ColumnMetadata(column.getName(), typeForMaterializedViewStorageTable(typeManager.getType(column.getType())))) .collect(toImmutableList()); ConnectorTableMetadata tableMetadata = new ConnectorTableMetadata(storageTable, columns, storageTableProperties, Optional.empty()); @@ -214,6 +229,61 @@ protected SchemaTableName createMaterializedViewStorageTable(ConnectorSession se return storageTable; } + /** + * Substitutes type not supported by Iceberg with a type that is supported. + * Upon reading from a materialized view, the types will be coerced back to the original ones, + * stored in the materialized view definition. + */ + private Type typeForMaterializedViewStorageTable(Type type) + { + if (type == TINYINT || type == SMALLINT) { + return INTEGER; + } + if (type instanceof CharType) { + return VARCHAR; + } + if (type instanceof TimeType timeType) { + // Iceberg supports microsecond precision only + return timeType.getPrecision() <= 6 + ? TIME_MICROS + : VARCHAR; + } + if (type instanceof TimeWithTimeZoneType) { + return VARCHAR; + } + if (type instanceof TimestampType timestampType) { + // Iceberg supports microsecond precision only + return timestampType.getPrecision() <= 6 + ? TIMESTAMP_MICROS + : VARCHAR; + } + if (type instanceof TimestampWithTimeZoneType) { + // Iceberg does not store the time zone + // TODO allow temporal partitioning on these columns, or MV property to + // drop zone info and use timestamptz directly + return VARCHAR; + } + if (type instanceof ArrayType arrayType) { + return new ArrayType(typeForMaterializedViewStorageTable(arrayType.getElementType())); + } + if (type instanceof MapType mapType) { + return new MapType( + typeForMaterializedViewStorageTable(mapType.getKeyType()), + typeForMaterializedViewStorageTable(mapType.getValueType()), + typeManager.getTypeOperators()); + } + if (type instanceof RowType rowType) { + return RowType.rowType( + rowType.getFields().stream() + .map(field -> new RowType.Field(field.getName(), typeForMaterializedViewStorageTable(field.getType()))) + .toArray(RowType.Field[]::new)); + } + + // Pass through all the types not explicitly handled above. If a type is not accepted by the connector, + // creation of the storage table will fail anyway. + return type; + } + protected ConnectorMaterializedViewDefinition getMaterializedViewDefinition( Table icebergTable, Optional owner, diff --git a/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/catalog/jdbc/TestIcebergJdbcConnectorTest.java b/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/catalog/jdbc/TestIcebergJdbcConnectorTest.java index a976c3db0ddd..9b70b1284471 100644 --- a/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/catalog/jdbc/TestIcebergJdbcConnectorTest.java +++ b/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/catalog/jdbc/TestIcebergJdbcConnectorTest.java @@ -197,6 +197,13 @@ public void testMaterializedView() .hasMessage("createMaterializedView is not supported for Iceberg JDBC catalogs"); } + @Override + public void testMaterializedViewAllTypes() + { + assertThatThrownBy(super::testMaterializedViewAllTypes) + .hasMessage("createMaterializedView is not supported for Iceberg JDBC catalogs"); + } + @Override public void testMaterializedViewBaseTableGone(boolean initialized) { diff --git a/testing/trino-testing/src/main/java/io/trino/testing/BaseConnectorTest.java b/testing/trino-testing/src/main/java/io/trino/testing/BaseConnectorTest.java index e76ca3763a5a..609bf34c621b 100644 --- a/testing/trino-testing/src/main/java/io/trino/testing/BaseConnectorTest.java +++ b/testing/trino-testing/src/main/java/io/trino/testing/BaseConnectorTest.java @@ -1177,6 +1177,61 @@ public void testMaterializedView() assertUpdate("DROP SCHEMA " + otherSchema); } + @Test + public void testMaterializedViewAllTypes() + { + skipTestUnless(hasBehavior(SUPPORTS_CREATE_MATERIALIZED_VIEW)); + + String viewName = "test_mv_all_types_" + randomNameSuffix(); + + String values = """ + SELECT + true a_boolean, + TINYINT '67' a_tinyint, + SMALLINT '35' a_smallint, + INTEGER '-1546831166' an_integer, + 1544323431676534245 a_bigint, + REAL '12345.67' a_real, + DOUBLE '12345.678901234' a_double, + CAST('1234567.8901' AS decimal(11, 4)) a_short_decimal, + CAST('1234567890123456789.0123456' AS decimal(26, 7)) a_long_decimal, + CHAR 'few chars ' a_char, + CAST('some string' AS varchar(33)) a_bounded_varchar, + CAST('some longer string' AS varchar) an_unbounded_varchar, + X'65683F' a_varbinary, + DATE '2005-09-10' a_date, + TIME '13:00:00' a_time_seconds, + TIME '13:00:00.123' a_time_millis, + TIME '13:00:00.123456' a_time_micros, + TIME '13:00:00.123456789' a_time_nanos, + TIME '13:00:00 +02:00' a_time_tz__seconds, + TIME '13:00:00.123 +02:00' a_time_tz__millis, + TIME '13:00:00.123456 +02:00' a_time_tz__micros, + TIME '13:00:00.123456789 +02:00' a_time_tz__nanos, + TIMESTAMP '2005-09-10 13:00:00' a_timestamp_seconds, + TIMESTAMP '2005-09-10 13:00:00.123' a_timestamp_millis, + TIMESTAMP '2005-09-10 13:00:00.123456' a_timestamp_micros, + TIMESTAMP '2005-09-10 13:00:00.123456789' a_timestamp_nanos, + TIMESTAMP '2005-09-10 13:00:00 Europe/Warsaw' a_timestamp_tz_seconds, + TIMESTAMP '2005-09-10 13:00:00.123 Europe/Warsaw' a_timestamp_tz_millis, + TIMESTAMP '2005-09-10 13:00:00.123456 Europe/Warsaw' a_timestamp_tz_micros, + TIMESTAMP '2005-09-10 13:00:00.123456789 Europe/Warsaw' a_timestamp_tz_nanos, + UUID '12151fd2-7586-11e9-8f9e-2a86e4085a59' a_uuid, + ARRAY[TIMESTAMP '2005-09-10 13:00:00.123456789'] an_array_of_timestamp_nanos, + map(ARRAY['key'], ARRAY[TIMESTAMP '2005-09-10 13:00:00.123456789']) a_map_with_timestamp_nanos, + CAST(ROW(TIMESTAMP '2005-09-10 13:00:00.123456789') AS ROW(key timestamp(9))) a_row_with_timestamp_nanos, + """ + + // TODO JSON (requires json_format & json_parse instead of CASTs for the conversion) + // TODO interval, IPAddress, Geo types? + " 'a dummy' a_dummy"; + + assertUpdate("CREATE MATERIALIZED VIEW %s AS %s".formatted(viewName, values)); + assertThat(query("TABLE " + viewName)) + .matches(values); + + assertUpdate("DROP MATERIALIZED VIEW " + viewName); + } + @Test public void testFederatedMaterializedView() { From b5e0c23979da4cce0b0940aa03f498c287745c9e Mon Sep 17 00:00:00 2001 From: Piotr Findeisen Date: Thu, 9 Feb 2023 13:37:40 +0100 Subject: [PATCH 4/4] Support REFRESH MATERIALIZED VIEW when storage differs in schema The materialized view's storage table may have a schema different than the materialized view itself. This is already supported when reading (the coercions are getting applied as necessary), but was not supported for REFRESH. In the REFRESH normal INSERT constraints where getting applied, thus not allowing the materialized view to, for example, store temporal values in a varchar column. The change removes the constraint, and therefore a storage table may use any type that is coercible to from the view definition type. Of course, the implementations should only use types where coercion to the storage and backwards does round trip. --- .../trino/sql/analyzer/StatementAnalyzer.java | 13 ------- .../io/trino/testing/TestingMetadata.java | 18 ++++++++++ .../sql/planner/TestMaterializedViews.java | 34 +++++++++++++++++++ .../io/trino/testing/BaseConnectorTest.java | 3 ++ 4 files changed, 55 insertions(+), 13 deletions(-) diff --git a/core/trino-main/src/main/java/io/trino/sql/analyzer/StatementAnalyzer.java b/core/trino-main/src/main/java/io/trino/sql/analyzer/StatementAnalyzer.java index 48309d0f0d4d..8c5d7152eb69 100644 --- a/core/trino-main/src/main/java/io/trino/sql/analyzer/StatementAnalyzer.java +++ b/core/trino-main/src/main/java/io/trino/sql/analyzer/StatementAnalyzer.java @@ -689,19 +689,6 @@ protected Scope visitRefreshMaterializedView(RefreshMaterializedView refreshMate .map(insertColumn -> tableMetadata.getColumn(insertColumn).getType()) .collect(toImmutableList()); - List queryTypes = queryScope.getRelationType().getVisibleFields().stream() - .map(Field::getType) - .collect(toImmutableList()); - - if (!typesMatchForInsert(tableTypes, queryTypes)) { - throw semanticException( - TYPE_MISMATCH, - refreshMaterializedView, - "Insert query has mismatched column types: Table: [%s], Query: [%s]", - Joiner.on(", ").join(tableTypes), - Joiner.on(", ").join(queryTypes)); - } - Stream columns = Streams.zip( insertColumns.stream(), tableTypes.stream() diff --git a/core/trino-main/src/main/java/io/trino/testing/TestingMetadata.java b/core/trino-main/src/main/java/io/trino/testing/TestingMetadata.java index 46320df3c19d..23bdbf44c6cf 100644 --- a/core/trino-main/src/main/java/io/trino/testing/TestingMetadata.java +++ b/core/trino-main/src/main/java/io/trino/testing/TestingMetadata.java @@ -279,6 +279,18 @@ public void markMaterializedViewIsFresh(SchemaTableName name) freshMaterializedViews.add(name); } + @Override + public boolean delegateMaterializedViewRefreshToConnector(ConnectorSession session, SchemaTableName viewName) + { + return false; + } + + @Override + public ConnectorInsertTableHandle beginRefreshMaterializedView(ConnectorSession session, ConnectorTableHandle tableHandle, List sourceTableHandles, RetryMode retryMode) + { + return TestingHandle.INSTANCE; + } + @Override public ConnectorOutputTableHandle beginCreateTable(ConnectorSession session, ConnectorTableMetadata tableMetadata, Optional layout, RetryMode retryMode) { @@ -378,6 +390,12 @@ public SchemaTableName getTableName() { return tableName; } + + @Override + public String toString() + { + return tableName.toString(); + } } public static class TestingColumnHandle diff --git a/core/trino-main/src/test/java/io/trino/sql/planner/TestMaterializedViews.java b/core/trino-main/src/test/java/io/trino/sql/planner/TestMaterializedViews.java index 93b1ca435574..c988982ec623 100644 --- a/core/trino-main/src/test/java/io/trino/sql/planner/TestMaterializedViews.java +++ b/core/trino-main/src/test/java/io/trino/sql/planner/TestMaterializedViews.java @@ -33,20 +33,28 @@ import io.trino.spi.security.ViewExpression; import io.trino.spi.transaction.IsolationLevel; import io.trino.sql.planner.assertions.BasePlanTest; +import io.trino.sql.tree.GenericLiteral; import io.trino.testing.LocalQueryRunner; import io.trino.testing.TestingAccessControlManager; import io.trino.testing.TestingMetadata; import org.testng.annotations.Test; +import java.util.List; +import java.util.Map; import java.util.Optional; import static io.trino.spi.type.BigintType.BIGINT; import static io.trino.spi.type.TinyintType.TINYINT; import static io.trino.spi.type.VarcharType.VARCHAR; import static io.trino.sql.planner.assertions.PlanMatchPattern.anyTree; +import static io.trino.sql.planner.assertions.PlanMatchPattern.exchange; import static io.trino.sql.planner.assertions.PlanMatchPattern.expression; +import static io.trino.sql.planner.assertions.PlanMatchPattern.output; import static io.trino.sql.planner.assertions.PlanMatchPattern.project; import static io.trino.sql.planner.assertions.PlanMatchPattern.tableScan; +import static io.trino.sql.planner.assertions.PlanMatchPattern.tableWriter; +import static io.trino.sql.planner.assertions.PlanMatchPattern.values; +import static io.trino.sql.planner.plan.ExchangeNode.Scope.LOCAL; import static io.trino.testing.TestingHandles.TEST_CATALOG_NAME; import static io.trino.testing.TestingSession.testSessionBuilder; @@ -164,6 +172,16 @@ protected LocalQueryRunner createLocalQueryRunner() }); testingConnectorMetadata.markMaterializedViewIsFresh(materializedViewWithCasts.asSchemaTableName()); + queryRunner.inTransaction(session -> { + metadata.createMaterializedView( + session, + new QualifiedObjectName(TEST_CATALOG_NAME, SCHEMA, "stale_materialized_view_with_casts"), + materializedViewDefinitionWithCasts, + false, + false); + return null; + }); + return queryRunner; } @@ -201,6 +219,22 @@ public void testMaterializedViewWithCasts() tableScan("storage_table_with_casts", ImmutableMap.of("A", "a", "B", "b"))))); } + @Test + public void testRefreshMaterializedViewWithCasts() + { + assertPlan("REFRESH MATERIALIZED VIEW stale_materialized_view_with_casts", + anyTree( + tableWriter(List.of("A_CAST", "B_CAST"), List.of("a", "b"), + exchange(LOCAL, + project(Map.of("A_CAST", expression("CAST(A AS tinyint)"), "B_CAST", expression("CAST(B AS varchar)")), + tableScan("test_table", Map.of("A", "a", "B", "b"))))))); + + // No-op REFRESH + assertPlan("REFRESH MATERIALIZED VIEW materialized_view_with_casts", + output( + values(List.of("rows"), List.of(List.of(new GenericLiteral("BIGINT", "0")))))); + } + private static class TestMaterializedViewConnector implements Connector { diff --git a/testing/trino-testing/src/main/java/io/trino/testing/BaseConnectorTest.java b/testing/trino-testing/src/main/java/io/trino/testing/BaseConnectorTest.java index 609bf34c621b..692d32fdcbbf 100644 --- a/testing/trino-testing/src/main/java/io/trino/testing/BaseConnectorTest.java +++ b/testing/trino-testing/src/main/java/io/trino/testing/BaseConnectorTest.java @@ -1226,6 +1226,9 @@ public void testMaterializedViewAllTypes() " 'a dummy' a_dummy"; assertUpdate("CREATE MATERIALIZED VIEW %s AS %s".formatted(viewName, values)); + assertThat(query("TABLE " + viewName)) + .matches(values); + assertUpdate("REFRESH MATERIALIZED VIEW " + viewName, 1); assertThat(query("TABLE " + viewName)) .matches(values);