Skip to content

Commit

Permalink
Support arbitrary types in Iceberg materialized view definition
Browse files Browse the repository at this point in the history
A materialized view should be able to capture any query results and
these are not constrained to be limited to types directly supported by
Iceberg.

Coerce unsupported types to supported ones.

When reading, when the storage table has types different than MV output
types, coercions are already applied by the `StatementAnalyzer`.

The MV/storage schema mismatch is not supported in REFRESH MATERIALIZED
VIEW yet.
  • Loading branch information
findepi committed Mar 9, 2023
1 parent 36543e4 commit c5e623c
Show file tree
Hide file tree
Showing 3 changed files with 133 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,15 @@
import io.trino.spi.connector.ConnectorTableMetadata;
import io.trino.spi.connector.ConnectorViewDefinition;
import io.trino.spi.connector.SchemaTableName;
import io.trino.spi.type.ArrayType;
import io.trino.spi.type.CharType;
import io.trino.spi.type.MapType;
import io.trino.spi.type.RowType;
import io.trino.spi.type.TimeType;
import io.trino.spi.type.TimeWithTimeZoneType;
import io.trino.spi.type.TimestampType;
import io.trino.spi.type.TimestampWithTimeZoneType;
import io.trino.spi.type.Type;
import io.trino.spi.type.TypeManager;
import org.apache.iceberg.AppendFiles;
import org.apache.iceberg.PartitionSpec;
Expand Down Expand Up @@ -62,6 +71,12 @@
import static io.trino.plugin.iceberg.IcebergUtil.commit;
import static io.trino.plugin.iceberg.IcebergUtil.getIcebergTableProperties;
import static io.trino.spi.StandardErrorCode.TABLE_NOT_FOUND;
import static io.trino.spi.type.IntegerType.INTEGER;
import static io.trino.spi.type.SmallintType.SMALLINT;
import static io.trino.spi.type.TimeType.TIME_MICROS;
import static io.trino.spi.type.TimestampType.TIMESTAMP_MICROS;
import static io.trino.spi.type.TinyintType.TINYINT;
import static io.trino.spi.type.VarcharType.VARCHAR;
import static java.lang.String.format;
import static java.util.Objects.requireNonNull;
import static java.util.UUID.randomUUID;
Expand Down Expand Up @@ -203,7 +218,7 @@ protected SchemaTableName createMaterializedViewStorageTable(ConnectorSession se
String storageSchema = getStorageSchema(definition.getProperties()).orElse(viewName.getSchemaName());
SchemaTableName storageTable = new SchemaTableName(storageSchema, storageTableName);
List<ColumnMetadata> columns = definition.getColumns().stream()
.map(column -> new ColumnMetadata(column.getName(), typeManager.getType(column.getType())))
.map(column -> new ColumnMetadata(column.getName(), typeForMaterializedViewStorageTable(typeManager.getType(column.getType()))))
.collect(toImmutableList());

ConnectorTableMetadata tableMetadata = new ConnectorTableMetadata(storageTable, columns, storageTableProperties, Optional.empty());
Expand All @@ -214,6 +229,61 @@ protected SchemaTableName createMaterializedViewStorageTable(ConnectorSession se
return storageTable;
}

/**
* Substitutes type not supported by Iceberg with a type that is supported.
* Upon reading from a materialized view, the types will be coerced back to the original ones,
* stored in the materialized view definition.
*/
private Type typeForMaterializedViewStorageTable(Type type)
{
if (type == TINYINT || type == SMALLINT) {
return INTEGER;
}
if (type instanceof CharType) {
return VARCHAR;
}
if (type instanceof TimeType timeType) {
// Iceberg supports microsecond precision only
return timeType.getPrecision() <= 6
? TIME_MICROS
: VARCHAR;
}
if (type instanceof TimeWithTimeZoneType) {
return VARCHAR;
}
if (type instanceof TimestampType timestampType) {
// Iceberg supports microsecond precision only
return timestampType.getPrecision() <= 6
? TIMESTAMP_MICROS
: VARCHAR;
}
if (type instanceof TimestampWithTimeZoneType) {
// Iceberg does not store the time zone
// TODO allow temporal partitioning on these columns, or MV property to
// drop zone info and use timestamptz directly
return VARCHAR;
}
if (type instanceof ArrayType arrayType) {
return new ArrayType(typeForMaterializedViewStorageTable(arrayType.getElementType()));
}
if (type instanceof MapType mapType) {
return new MapType(
typeForMaterializedViewStorageTable(mapType.getKeyType()),
typeForMaterializedViewStorageTable(mapType.getValueType()),
typeManager.getTypeOperators());
}
if (type instanceof RowType rowType) {
return RowType.rowType(
rowType.getFields().stream()
.map(field -> new RowType.Field(field.getName(), typeForMaterializedViewStorageTable(field.getType())))
.toArray(RowType.Field[]::new));
}

// Pass through all the types not explicitly handled above. If a type is not accepted by the connector,
// creation of the storage table will fail anyway.
return type;
}

protected ConnectorMaterializedViewDefinition getMaterializedViewDefinition(
Table icebergTable,
Optional<String> owner,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -197,6 +197,13 @@ public void testMaterializedView()
.hasMessage("createMaterializedView is not supported for Iceberg JDBC catalogs");
}

@Override
public void testMaterializedViewAllTypes()
{
assertThatThrownBy(super::testMaterializedViewAllTypes)
.hasMessage("createMaterializedView is not supported for Iceberg JDBC catalogs");
}

@Override
public void testMaterializedViewBaseTableGone(boolean initialized)
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1172,6 +1172,61 @@ public void testMaterializedView()
assertUpdate("DROP SCHEMA " + otherSchema);
}

@Test
public void testMaterializedViewAllTypes()
{
skipTestUnless(hasBehavior(SUPPORTS_CREATE_MATERIALIZED_VIEW));

String viewName = "test_mv_all_types_" + randomNameSuffix();

String values = """
SELECT
true a_boolean,
TINYINT '67' a_tinyint,
SMALLINT '35' a_smallint,
INTEGER '-1546831166' an_integer,
1544323431676534245 a_bigint,
REAL '12345.67' a_real,
DOUBLE '12345.678901234' a_double,
CAST('1234567.8901' AS decimal(11, 4)) a_short_decimal,
CAST('1234567890123456789.0123456' AS decimal(26, 7)) a_long_decimal,
CHAR 'few chars ' a_char,
CAST('some string' AS varchar(33)) a_bounded_varchar,
CAST('some longer string' AS varchar) an_unbounded_varchar,
X'65683F' a_varbinary,
DATE '2005-09-10' a_date,
TIME '13:00:00' a_time_seconds,
TIME '13:00:00.123' a_time_millis,
TIME '13:00:00.123456' a_time_micros,
TIME '13:00:00.123456789' a_time_nanos,
TIME '13:00:00 +02:00' a_time_tz__seconds,
TIME '13:00:00.123 +02:00' a_time_tz__millis,
TIME '13:00:00.123456 +02:00' a_time_tz__micros,
TIME '13:00:00.123456789 +02:00' a_time_tz__nanos,
TIMESTAMP '2005-09-10 13:00:00' a_timestamp_seconds,
TIMESTAMP '2005-09-10 13:00:00.123' a_timestamp_millis,
TIMESTAMP '2005-09-10 13:00:00.123456' a_timestamp_micros,
TIMESTAMP '2005-09-10 13:00:00.123456789' a_timestamp_nanos,
TIMESTAMP '2005-09-10 13:00:00 Europe/Warsaw' a_timestamp_tz_seconds,
TIMESTAMP '2005-09-10 13:00:00.123 Europe/Warsaw' a_timestamp_tz_millis,
TIMESTAMP '2005-09-10 13:00:00.123456 Europe/Warsaw' a_timestamp_tz_micros,
TIMESTAMP '2005-09-10 13:00:00.123456789 Europe/Warsaw' a_timestamp_tz_nanos,
UUID '12151fd2-7586-11e9-8f9e-2a86e4085a59' a_uuid,
ARRAY[TIMESTAMP '2005-09-10 13:00:00.123456789'] an_array_of_timestamp_nanos,
map(ARRAY['key'], ARRAY[TIMESTAMP '2005-09-10 13:00:00.123456789']) a_map_with_timestamp_nanos,
CAST(ROW(TIMESTAMP '2005-09-10 13:00:00.123456789') AS ROW(key timestamp(9))) a_row_with_timestamp_nanos,
""" +
// TODO JSON (requires json_format & json_parse instead of CASTs for the conversion)
// TODO interval, IPAddress, Geo types?
" 'a dummy' a_dummy";

assertUpdate("CREATE MATERIALIZED VIEW %s AS %s".formatted(viewName, values));
assertThat(query("TABLE " + viewName))
.matches(values);

assertUpdate("DROP MATERIALIZED VIEW " + viewName);
}

@Test
public void testFederatedMaterializedView()
{
Expand Down

0 comments on commit c5e623c

Please sign in to comment.