From c5e623c88d5aca142c3a4cfe1ff962b98faf43d2 Mon Sep 17 00:00:00 2001 From: Piotr Findeisen Date: Thu, 9 Feb 2023 20:50:26 +0100 Subject: [PATCH] Support arbitrary types in Iceberg materialized view definition A materialized view should be able to capture any query results and these are not constrained to be limited to types directly supported by Iceberg. Coerce unsupported types to supported ones. When reading, when the storage table has types different than MV output types, coercions are already applied by the `StatementAnalyzer`. The MV/storage schema mismatch is not supported in REFRESH MATERIALIZED VIEW yet. --- .../iceberg/catalog/AbstractTrinoCatalog.java | 72 ++++++++++++++++++- .../jdbc/TestIcebergJdbcConnectorTest.java | 7 ++ .../io/trino/testing/BaseConnectorTest.java | 55 ++++++++++++++ 3 files changed, 133 insertions(+), 1 deletion(-) diff --git a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/catalog/AbstractTrinoCatalog.java b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/catalog/AbstractTrinoCatalog.java index 88ec1508be7d..002fb37b515c 100644 --- a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/catalog/AbstractTrinoCatalog.java +++ b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/catalog/AbstractTrinoCatalog.java @@ -30,6 +30,15 @@ import io.trino.spi.connector.ConnectorTableMetadata; import io.trino.spi.connector.ConnectorViewDefinition; import io.trino.spi.connector.SchemaTableName; +import io.trino.spi.type.ArrayType; +import io.trino.spi.type.CharType; +import io.trino.spi.type.MapType; +import io.trino.spi.type.RowType; +import io.trino.spi.type.TimeType; +import io.trino.spi.type.TimeWithTimeZoneType; +import io.trino.spi.type.TimestampType; +import io.trino.spi.type.TimestampWithTimeZoneType; +import io.trino.spi.type.Type; import io.trino.spi.type.TypeManager; import org.apache.iceberg.AppendFiles; import org.apache.iceberg.PartitionSpec; @@ -62,6 +71,12 @@ import static io.trino.plugin.iceberg.IcebergUtil.commit; import static io.trino.plugin.iceberg.IcebergUtil.getIcebergTableProperties; import static io.trino.spi.StandardErrorCode.TABLE_NOT_FOUND; +import static io.trino.spi.type.IntegerType.INTEGER; +import static io.trino.spi.type.SmallintType.SMALLINT; +import static io.trino.spi.type.TimeType.TIME_MICROS; +import static io.trino.spi.type.TimestampType.TIMESTAMP_MICROS; +import static io.trino.spi.type.TinyintType.TINYINT; +import static io.trino.spi.type.VarcharType.VARCHAR; import static java.lang.String.format; import static java.util.Objects.requireNonNull; import static java.util.UUID.randomUUID; @@ -203,7 +218,7 @@ protected SchemaTableName createMaterializedViewStorageTable(ConnectorSession se String storageSchema = getStorageSchema(definition.getProperties()).orElse(viewName.getSchemaName()); SchemaTableName storageTable = new SchemaTableName(storageSchema, storageTableName); List columns = definition.getColumns().stream() - .map(column -> new ColumnMetadata(column.getName(), typeManager.getType(column.getType()))) + .map(column -> new ColumnMetadata(column.getName(), typeForMaterializedViewStorageTable(typeManager.getType(column.getType())))) .collect(toImmutableList()); ConnectorTableMetadata tableMetadata = new ConnectorTableMetadata(storageTable, columns, storageTableProperties, Optional.empty()); @@ -214,6 +229,61 @@ protected SchemaTableName createMaterializedViewStorageTable(ConnectorSession se return storageTable; } + /** + * Substitutes type not supported by Iceberg with a type that is supported. + * Upon reading from a materialized view, the types will be coerced back to the original ones, + * stored in the materialized view definition. + */ + private Type typeForMaterializedViewStorageTable(Type type) + { + if (type == TINYINT || type == SMALLINT) { + return INTEGER; + } + if (type instanceof CharType) { + return VARCHAR; + } + if (type instanceof TimeType timeType) { + // Iceberg supports microsecond precision only + return timeType.getPrecision() <= 6 + ? TIME_MICROS + : VARCHAR; + } + if (type instanceof TimeWithTimeZoneType) { + return VARCHAR; + } + if (type instanceof TimestampType timestampType) { + // Iceberg supports microsecond precision only + return timestampType.getPrecision() <= 6 + ? TIMESTAMP_MICROS + : VARCHAR; + } + if (type instanceof TimestampWithTimeZoneType) { + // Iceberg does not store the time zone + // TODO allow temporal partitioning on these columns, or MV property to + // drop zone info and use timestamptz directly + return VARCHAR; + } + if (type instanceof ArrayType arrayType) { + return new ArrayType(typeForMaterializedViewStorageTable(arrayType.getElementType())); + } + if (type instanceof MapType mapType) { + return new MapType( + typeForMaterializedViewStorageTable(mapType.getKeyType()), + typeForMaterializedViewStorageTable(mapType.getValueType()), + typeManager.getTypeOperators()); + } + if (type instanceof RowType rowType) { + return RowType.rowType( + rowType.getFields().stream() + .map(field -> new RowType.Field(field.getName(), typeForMaterializedViewStorageTable(field.getType()))) + .toArray(RowType.Field[]::new)); + } + + // Pass through all the types not explicitly handled above. If a type is not accepted by the connector, + // creation of the storage table will fail anyway. + return type; + } + protected ConnectorMaterializedViewDefinition getMaterializedViewDefinition( Table icebergTable, Optional owner, diff --git a/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/catalog/jdbc/TestIcebergJdbcConnectorTest.java b/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/catalog/jdbc/TestIcebergJdbcConnectorTest.java index a976c3db0ddd..9b70b1284471 100644 --- a/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/catalog/jdbc/TestIcebergJdbcConnectorTest.java +++ b/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/catalog/jdbc/TestIcebergJdbcConnectorTest.java @@ -197,6 +197,13 @@ public void testMaterializedView() .hasMessage("createMaterializedView is not supported for Iceberg JDBC catalogs"); } + @Override + public void testMaterializedViewAllTypes() + { + assertThatThrownBy(super::testMaterializedViewAllTypes) + .hasMessage("createMaterializedView is not supported for Iceberg JDBC catalogs"); + } + @Override public void testMaterializedViewBaseTableGone(boolean initialized) { diff --git a/testing/trino-testing/src/main/java/io/trino/testing/BaseConnectorTest.java b/testing/trino-testing/src/main/java/io/trino/testing/BaseConnectorTest.java index 49528ff44bbc..46921e7b361d 100644 --- a/testing/trino-testing/src/main/java/io/trino/testing/BaseConnectorTest.java +++ b/testing/trino-testing/src/main/java/io/trino/testing/BaseConnectorTest.java @@ -1172,6 +1172,61 @@ public void testMaterializedView() assertUpdate("DROP SCHEMA " + otherSchema); } + @Test + public void testMaterializedViewAllTypes() + { + skipTestUnless(hasBehavior(SUPPORTS_CREATE_MATERIALIZED_VIEW)); + + String viewName = "test_mv_all_types_" + randomNameSuffix(); + + String values = """ + SELECT + true a_boolean, + TINYINT '67' a_tinyint, + SMALLINT '35' a_smallint, + INTEGER '-1546831166' an_integer, + 1544323431676534245 a_bigint, + REAL '12345.67' a_real, + DOUBLE '12345.678901234' a_double, + CAST('1234567.8901' AS decimal(11, 4)) a_short_decimal, + CAST('1234567890123456789.0123456' AS decimal(26, 7)) a_long_decimal, + CHAR 'few chars ' a_char, + CAST('some string' AS varchar(33)) a_bounded_varchar, + CAST('some longer string' AS varchar) an_unbounded_varchar, + X'65683F' a_varbinary, + DATE '2005-09-10' a_date, + TIME '13:00:00' a_time_seconds, + TIME '13:00:00.123' a_time_millis, + TIME '13:00:00.123456' a_time_micros, + TIME '13:00:00.123456789' a_time_nanos, + TIME '13:00:00 +02:00' a_time_tz__seconds, + TIME '13:00:00.123 +02:00' a_time_tz__millis, + TIME '13:00:00.123456 +02:00' a_time_tz__micros, + TIME '13:00:00.123456789 +02:00' a_time_tz__nanos, + TIMESTAMP '2005-09-10 13:00:00' a_timestamp_seconds, + TIMESTAMP '2005-09-10 13:00:00.123' a_timestamp_millis, + TIMESTAMP '2005-09-10 13:00:00.123456' a_timestamp_micros, + TIMESTAMP '2005-09-10 13:00:00.123456789' a_timestamp_nanos, + TIMESTAMP '2005-09-10 13:00:00 Europe/Warsaw' a_timestamp_tz_seconds, + TIMESTAMP '2005-09-10 13:00:00.123 Europe/Warsaw' a_timestamp_tz_millis, + TIMESTAMP '2005-09-10 13:00:00.123456 Europe/Warsaw' a_timestamp_tz_micros, + TIMESTAMP '2005-09-10 13:00:00.123456789 Europe/Warsaw' a_timestamp_tz_nanos, + UUID '12151fd2-7586-11e9-8f9e-2a86e4085a59' a_uuid, + ARRAY[TIMESTAMP '2005-09-10 13:00:00.123456789'] an_array_of_timestamp_nanos, + map(ARRAY['key'], ARRAY[TIMESTAMP '2005-09-10 13:00:00.123456789']) a_map_with_timestamp_nanos, + CAST(ROW(TIMESTAMP '2005-09-10 13:00:00.123456789') AS ROW(key timestamp(9))) a_row_with_timestamp_nanos, + """ + + // TODO JSON (requires json_format & json_parse instead of CASTs for the conversion) + // TODO interval, IPAddress, Geo types? + " 'a dummy' a_dummy"; + + assertUpdate("CREATE MATERIALIZED VIEW %s AS %s".formatted(viewName, values)); + assertThat(query("TABLE " + viewName)) + .matches(values); + + assertUpdate("DROP MATERIALIZED VIEW " + viewName); + } + @Test public void testFederatedMaterializedView() {