From 005e84bf5833bc54f20289b3afa52c4d9f6750d1 Mon Sep 17 00:00:00 2001 From: Yuya Ebihara Date: Wed, 6 Sep 2023 17:11:49 +0900 Subject: [PATCH] Add support for writing timestamp_ntz type in Delta --- docs/src/main/sphinx/connector/delta-lake.md | 2 + .../plugin/deltalake/DeltaLakeMetadata.java | 45 +++- .../deltalake/DeltaLakeParquetSchemas.java | 5 + .../DeltaLakeSchemaSupport.java | 6 + .../deltalake/util/DeltaLakeWriteUtils.java | 3 +- .../plugin/deltalake/TestDeltaLakeBasic.java | 255 ++++++++++++++++-- .../deltalake/TestDeltaLakeConnectorTest.java | 41 ++- ...eltaLakeDatabricksInsertCompatibility.java | 37 +++ 8 files changed, 367 insertions(+), 27 deletions(-) diff --git a/docs/src/main/sphinx/connector/delta-lake.md b/docs/src/main/sphinx/connector/delta-lake.md index e89d8fb7b75e..64209fe8e555 100644 --- a/docs/src/main/sphinx/connector/delta-lake.md +++ b/docs/src/main/sphinx/connector/delta-lake.md @@ -302,6 +302,8 @@ this table: - ``BINARY`` * - ``DATE`` - ``DATE`` + * - ``TIMESTAMP`` + - ``TIMESTAMPNTZ`` (``TIMESTAMP_NTZ``) * - ``TIMESTAMP(3) WITH TIME ZONE`` - ``TIMESTAMP`` * - ``ARRAY`` diff --git a/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/DeltaLakeMetadata.java b/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/DeltaLakeMetadata.java index 2d998ddbc35b..f62a067687af 100644 --- a/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/DeltaLakeMetadata.java +++ b/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/DeltaLakeMetadata.java @@ -131,6 +131,7 @@ import io.trino.spi.type.HyperLogLogType; import io.trino.spi.type.MapType; import io.trino.spi.type.RowType; +import io.trino.spi.type.TimestampType; import io.trino.spi.type.TimestampWithTimeZoneType; import io.trino.spi.type.Type; import io.trino.spi.type.TypeManager; @@ -146,6 +147,7 @@ import java.util.Collections; import java.util.Deque; import java.util.HashMap; +import java.util.HashSet; import java.util.Iterator; import java.util.List; import java.util.Map; @@ -336,6 +338,8 @@ public class DeltaLakeMetadata private static final int CDF_SUPPORTED_WRITER_VERSION = 4; private static final int COLUMN_MAPPING_MODE_SUPPORTED_READER_VERSION = 2; private static final int COLUMN_MAPPING_MODE_SUPPORTED_WRITER_VERSION = 5; + private static final int TIMESTAMP_NTZ_SUPPORTED_READER_VERSION = 3; + private static final int TIMESTAMP_NTZ_SUPPORTED_WRITER_VERSION = 7; // Matches the dummy column Databricks stores in the metastore private static final List DUMMY_DATA_COLUMNS = ImmutableList.of( @@ -879,10 +883,14 @@ public void createTable(ConnectorSession session, ConnectorTableMetadata tableMe ImmutableList.Builder columnNames = ImmutableList.builderWithExpectedSize(tableMetadata.getColumns().size()); ImmutableMap.Builder columnTypes = ImmutableMap.builderWithExpectedSize(tableMetadata.getColumns().size()); ImmutableMap.Builder> columnsMetadata = ImmutableMap.builderWithExpectedSize(tableMetadata.getColumns().size()); + boolean containsTimestampType = false; for (ColumnMetadata column : tableMetadata.getColumns()) { columnNames.add(column.getName()); columnTypes.put(column.getName(), serializeColumnType(columnMappingMode, fieldId, column.getType())); columnsMetadata.put(column.getName(), generateColumnMetadata(columnMappingMode, fieldId)); + if (!containsTimestampType) { + containsTimestampType = containsTimestampType(column.getType()); + } } Map columnComments = tableMetadata.getColumns().stream() .filter(column -> column.getComment() != null) @@ -909,7 +917,7 @@ public void createTable(ConnectorSession session, ConnectorTableMetadata tableMe CREATE_TABLE_OPERATION, session, tableMetadata.getComment(), - protocolEntryForNewTable(tableMetadata.getProperties())); + protocolEntryForNewTable(containsTimestampType, tableMetadata.getProperties())); setRollback(() -> deleteRecursivelyIfExists(fileSystem, deltaLogDirectory)); transactionLogWriter.flush(); @@ -1009,6 +1017,7 @@ public DeltaLakeOutputTableHandle beginCreateTable(ConnectorSession session, Con setRollback(() -> deleteRecursivelyIfExists(fileSystemFactory.create(session), finalLocation)); boolean usePhysicalName = columnMappingMode == ID || columnMappingMode == NAME; + boolean containsTimestampType = false; int columnSize = tableMetadata.getColumns().size(); ImmutableList.Builder columnNames = ImmutableList.builderWithExpectedSize(columnSize); ImmutableMap.Builder columnTypes = ImmutableMap.builderWithExpectedSize(columnSize); @@ -1018,6 +1027,7 @@ public DeltaLakeOutputTableHandle beginCreateTable(ConnectorSession session, Con for (ColumnMetadata column : tableMetadata.getColumns()) { columnNames.add(column.getName()); columnNullabilities.put(column.getName(), column.isNullable()); + containsTimestampType |= containsTimestampType(column.getType()); Object serializedType = serializeColumnType(columnMappingMode, fieldId, column.getType()); Type physicalType = deserializeType(typeManager, serializedType, usePhysicalName); @@ -1067,7 +1077,7 @@ public DeltaLakeOutputTableHandle beginCreateTable(ConnectorSession session, Con schemaString, columnMappingMode, maxFieldId, - protocolEntryForNewTable(tableMetadata.getProperties())); + protocolEntryForNewTable(containsTimestampType, tableMetadata.getProperties())); } private Optional getSchemaLocation(Database database) @@ -1156,6 +1166,21 @@ private static void deleteRecursivelyIfExists(TrinoFileSystem fileSystem, Locati } } + private static boolean containsTimestampType(Type type) + { + if (type instanceof ArrayType arrayType) { + return containsTimestampType(arrayType.getElementType()); + } + if (type instanceof MapType mapType) { + return containsTimestampType(mapType.getKeyType()) || containsTimestampType(mapType.getValueType()); + } + if (type instanceof RowType rowType) { + return rowType.getFields().stream().anyMatch(field -> containsTimestampType(field.getType())); + } + checkArgument(type.getTypeParameters().isEmpty(), "Unexpected type parameters for type %s", type); + return type instanceof TimestampType; + } + @Override public Optional finishCreateTable( ConnectorSession session, @@ -2243,10 +2268,12 @@ private TableSnapshot getSnapshot(SchemaTableName schemaTableName, String tableL } } - private ProtocolEntry protocolEntryForNewTable(Map properties) + private ProtocolEntry protocolEntryForNewTable(boolean containsTimestampType, Map properties) { int readerVersion = DEFAULT_READER_VERSION; int writerVersion = DEFAULT_WRITER_VERSION; + Set readerFeatures = new HashSet<>(); + Set writerFeatures = new HashSet<>(); Optional changeDataFeedEnabled = getChangeDataFeedEnabled(properties); if (changeDataFeedEnabled.isPresent() && changeDataFeedEnabled.get()) { // Enabling cdf (change data feed) requires setting the writer version to 4 @@ -2258,7 +2285,17 @@ private ProtocolEntry protocolEntryForNewTable(Map properties) readerVersion = max(readerVersion, COLUMN_MAPPING_MODE_SUPPORTED_READER_VERSION); writerVersion = max(writerVersion, COLUMN_MAPPING_MODE_SUPPORTED_WRITER_VERSION); } - return new ProtocolEntry(readerVersion, writerVersion, Optional.empty(), Optional.empty()); + if (containsTimestampType) { + readerVersion = max(readerVersion, TIMESTAMP_NTZ_SUPPORTED_READER_VERSION); + writerVersion = max(writerVersion, TIMESTAMP_NTZ_SUPPORTED_WRITER_VERSION); + readerFeatures.add("timestampNtz"); + writerFeatures.add("timestampNtz"); + } + return new ProtocolEntry( + readerVersion, + writerVersion, + readerFeatures.isEmpty() ? Optional.empty() : Optional.of(readerFeatures), + writerFeatures.isEmpty() ? Optional.empty() : Optional.of(writerFeatures)); } private void writeCheckpointIfNeeded(ConnectorSession session, SchemaTableName table, String tableLocation, Optional checkpointInterval, long newVersion) diff --git a/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/DeltaLakeParquetSchemas.java b/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/DeltaLakeParquetSchemas.java index b1e290ac6086..9aa38e67ba91 100644 --- a/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/DeltaLakeParquetSchemas.java +++ b/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/DeltaLakeParquetSchemas.java @@ -53,6 +53,7 @@ import static io.trino.spi.type.IntegerType.INTEGER; import static io.trino.spi.type.RealType.REAL; import static io.trino.spi.type.SmallintType.SMALLINT; +import static io.trino.spi.type.TimestampType.TIMESTAMP_MICROS; import static io.trino.spi.type.TimestampType.TIMESTAMP_MILLIS; import static io.trino.spi.type.TinyintType.TINYINT; import static io.trino.spi.type.VarbinaryType.VARBINARY; @@ -266,6 +267,10 @@ else if (trinoDecimalType.isShort()) { typeBuilder = Types.primitive(PrimitiveType.PrimitiveTypeName.INT64, repetition).as(LogicalTypeAnnotation.timestampType(false, LogicalTypeAnnotation.TimeUnit.MILLIS)); trinoType = TIMESTAMP_MILLIS; } + case "timestamp_ntz" -> { + typeBuilder = Types.primitive(PrimitiveType.PrimitiveTypeName.INT64, repetition).as(LogicalTypeAnnotation.timestampType(false, LogicalTypeAnnotation.TimeUnit.MICROS)); + trinoType = TIMESTAMP_MICROS; + } default -> throw new TrinoException(NOT_SUPPORTED, format("Unsupported primitive type: %s", primitiveType)); } diff --git a/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/transactionlog/DeltaLakeSchemaSupport.java b/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/transactionlog/DeltaLakeSchemaSupport.java index 195fb3f57af4..529a2d84e6d5 100644 --- a/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/transactionlog/DeltaLakeSchemaSupport.java +++ b/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/transactionlog/DeltaLakeSchemaSupport.java @@ -35,6 +35,7 @@ import io.trino.spi.type.DecimalType; import io.trino.spi.type.MapType; import io.trino.spi.type.RowType; +import io.trino.spi.type.TimestampType; import io.trino.spi.type.TimestampWithTimeZoneType; import io.trino.spi.type.Type; import io.trino.spi.type.TypeManager; @@ -104,6 +105,7 @@ private DeltaLakeSchemaSupport() {} .add("checkConstraints") .add("changeDataFeed") .add("columnMapping") + .add("timestampNtz") .build(); public enum ColumnMappingMode @@ -328,6 +330,9 @@ private static String serializePrimitiveType(Type type) private static Optional serializeSupportedPrimitiveType(Type type) { + if (type instanceof TimestampType) { + return Optional.of("timestamp_ntz"); + } if (type instanceof TimestampWithTimeZoneType) { return Optional.of("timestamp"); } @@ -374,6 +379,7 @@ private static void validateStructuralType(Optional rootType, Type type) private static void validatePrimitiveType(Type type) { if (serializeSupportedPrimitiveType(type).isEmpty() || + (type instanceof TimestampType && ((TimestampType) type).getPrecision() != 6) || (type instanceof TimestampWithTimeZoneType && ((TimestampWithTimeZoneType) type).getPrecision() != 3)) { throw new TrinoException(DELTA_LAKE_INVALID_SCHEMA, "Unsupported type: " + type); } diff --git a/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/util/DeltaLakeWriteUtils.java b/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/util/DeltaLakeWriteUtils.java index 98e31fc3d61c..a30846588cf2 100644 --- a/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/util/DeltaLakeWriteUtils.java +++ b/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/util/DeltaLakeWriteUtils.java @@ -44,6 +44,7 @@ import static io.trino.spi.type.IntegerType.INTEGER; import static io.trino.spi.type.RealType.REAL; import static io.trino.spi.type.SmallintType.SMALLINT; +import static io.trino.spi.type.TimestampType.TIMESTAMP_MICROS; import static io.trino.spi.type.TimestampType.TIMESTAMP_MILLIS; import static io.trino.spi.type.TimestampWithTimeZoneType.TIMESTAMP_TZ_MILLIS; import static io.trino.spi.type.Timestamps.MICROSECONDS_PER_SECOND; @@ -112,7 +113,7 @@ private static String toPartitionValue(Type type, Block block, int position) if (DATE.equals(type)) { return LocalDate.ofEpochDay(DATE.getInt(block, position)).format(DELTA_DATE_FORMATTER); } - if (TIMESTAMP_MILLIS.equals(type)) { + if (TIMESTAMP_MILLIS.equals(type) || TIMESTAMP_MICROS.equals(type)) { long epochMicros = type.getLong(block, position); long epochSeconds = floorDiv(epochMicros, MICROSECONDS_PER_SECOND); int nanosOfSecond = floorMod(epochMicros, MICROSECONDS_PER_SECOND) * NANOSECONDS_PER_MICROSECOND; diff --git a/plugin/trino-delta-lake/src/test/java/io/trino/plugin/deltalake/TestDeltaLakeBasic.java b/plugin/trino-delta-lake/src/test/java/io/trino/plugin/deltalake/TestDeltaLakeBasic.java index 9dac1a64e2c0..8e6db87edc1a 100644 --- a/plugin/trino-delta-lake/src/test/java/io/trino/plugin/deltalake/TestDeltaLakeBasic.java +++ b/plugin/trino-delta-lake/src/test/java/io/trino/plugin/deltalake/TestDeltaLakeBasic.java @@ -18,6 +18,7 @@ import com.fasterxml.jackson.databind.node.ObjectNode; import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableMap; +import com.google.common.collect.ImmutableSet; import com.google.common.collect.Iterables; import com.google.common.io.Resources; import io.airlift.json.ObjectMapperProvider; @@ -31,11 +32,14 @@ import io.trino.plugin.deltalake.transactionlog.AddFileEntry; import io.trino.plugin.deltalake.transactionlog.DeltaLakeTransactionLogEntry; import io.trino.plugin.deltalake.transactionlog.MetadataEntry; +import io.trino.plugin.deltalake.transactionlog.ProtocolEntry; import io.trino.plugin.deltalake.transactionlog.statistics.DeltaLakeFileStatistics; import io.trino.plugin.hive.FileFormatDataSourceStats; import io.trino.plugin.hive.parquet.TrinoParquetDataSource; +import io.trino.spi.type.TimeZoneKey; import io.trino.testing.AbstractTestQueryFramework; import io.trino.testing.QueryRunner; +import io.trino.testing.TestingSession; import org.apache.parquet.hadoop.metadata.FileMetaData; import org.apache.parquet.hadoop.metadata.ParquetMetadata; import org.apache.parquet.schema.PrimitiveType; @@ -49,10 +53,15 @@ import java.net.URL; import java.nio.file.Files; import java.nio.file.Path; +import java.time.ZoneId; import java.util.List; +import java.util.Map; import java.util.Optional; +import java.util.function.Consumer; +import java.util.regex.Matcher; import java.util.regex.Pattern; +import static com.google.common.base.Verify.verify; import static com.google.common.collect.Iterators.getOnlyElement; import static com.google.common.collect.MoreCollectors.onlyElement; import static com.google.common.io.MoreFiles.deleteRecursively; @@ -67,6 +76,7 @@ import static io.trino.plugin.hive.HiveTestUtils.HDFS_FILE_SYSTEM_STATS; import static io.trino.testing.TestingNames.randomNameSuffix; import static java.lang.String.format; +import static java.time.ZoneOffset.UTC; import static org.assertj.core.api.Assertions.assertThat; import static org.assertj.core.api.Assertions.entry; import static org.testng.Assert.assertFalse; @@ -93,6 +103,10 @@ public class TestDeltaLakeBasic private static final TrinoFileSystem FILE_SYSTEM = new HdfsFileSystemFactory(HDFS_ENVIRONMENT, HDFS_FILE_SYSTEM_STATS).create(SESSION); + private final ZoneId jvmZone = ZoneId.systemDefault(); + private final ZoneId vilnius = ZoneId.of("Europe/Vilnius"); + private final ZoneId kathmandu = ZoneId.of("Asia/Kathmandu"); + @Override protected QueryRunner createQueryRunner() throws Exception @@ -519,15 +533,25 @@ public Object[][] columnMappingModeDataProvider() /** * @see databricks131.timestamp_ntz */ - @Test - public void testTimestampNtz() + @Test(dataProvider = "sessionZonesDataProvider") + public void testDeltaTimestampNtz(ZoneId sessionZone) + throws Exception { - // TODO Move this test to product test once new Databricks LTS or OSS Delta Lake supports timestamp_ntz type + String tableName = "timestamp_ntz" + randomNameSuffix(); + Path tableLocation = Files.createTempFile(tableName, null); + copyDirectoryContents(new File(Resources.getResource("databricks131/timestamp_ntz").toURI()).toPath(), tableLocation); + + assertUpdate("CALL system.register_table('%s', '%s', '%s')".formatted(getSession().getSchema().orElseThrow(), tableName, tableLocation.toUri())); + + Session session = Session.builder(getSession()) + .setTimeZoneKey(TimeZoneKey.getTimeZoneKey(sessionZone.getId())) + .build(); + assertQuery( - "DESCRIBE timestamp_ntz", + "DESCRIBE " + tableName, "VALUES ('x', 'timestamp(6)', '', '')"); - assertThat(query("SELECT * FROM timestamp_ntz")) + assertThat(query(session, "SELECT * FROM " + tableName)) .matches(""" VALUES NULL, @@ -540,31 +564,179 @@ public void testTimestampNtz() TIMESTAMP '9999-12-31 23:59:59.999999' """); assertQuery( - "SHOW STATS FOR timestamp_ntz", + "SHOW STATS FOR " + tableName, """ VALUES ('x', null, null, 0.125, null, null, null), (null, null, null, null, 8.0, null, null) """); - // TODO https://github.com/trinodb/trino/issues/15873 Support writing timestamp_ntz type when upgrading the max writer version to 7 - assertQueryFails("INSERT INTO timestamp_ntz VALUES NULL", "\\QUnsupported writer features: [timestampNtz]"); + // Verify the connector can insert into tables created by Databricks + assertUpdate(session, "INSERT INTO " + tableName + " VALUES TIMESTAMP '2023-01-02 03:04:05.123456'", 1); + assertQuery(session, "SELECT true FROM " + tableName + " WHERE x = TIMESTAMP '2023-01-02 03:04:05.123456'", "VALUES true"); + assertQuery( + "SHOW STATS FOR " + tableName, + """ + VALUES + ('x', null, 1.0, 0.1111111111111111, null, null, null), + (null, null, null, null, 9.0, null, null) + """); + + assertUpdate("DROP TABLE " + tableName); + } + + @Test(dataProvider = "sessionZonesDataProvider") + public void testTrinoCreateTableWithTimestampNtz(ZoneId sessionZone) + throws Exception + { + testTrinoCreateTableWithTimestampNtz( + sessionZone, + tableName -> { + assertUpdate("CREATE TABLE " + tableName + "(x timestamp(6))"); + assertUpdate("INSERT INTO " + tableName + " VALUES timestamp '2023-01-02 03:04:05.123456'", 1); + }); + } + + @Test(dataProvider = "sessionZonesDataProvider") + public void testTrinoCreateTableAsSelectWithTimestampNtz(ZoneId sessionZone) + throws Exception + { + testTrinoCreateTableWithTimestampNtz( + sessionZone, + tableName -> assertUpdate("CREATE TABLE " + tableName + " AS SELECT timestamp '2023-01-02 03:04:05.123456' AS x", 1)); + } + + private void testTrinoCreateTableWithTimestampNtz(ZoneId sessionZone, Consumer createTable) + throws IOException + { + String tableName = "test_create_table_timestamp_ntz" + randomNameSuffix(); + + Session session = Session.builder(getSession()) + .setTimeZoneKey(TimeZoneKey.getTimeZoneKey(sessionZone.getId())) + .build(); + + createTable.accept(tableName); + + assertQuery(session, "SELECT * FROM " + tableName, "VALUES TIMESTAMP '2023-01-02 03:04:05.123456'"); + + // Verify reader/writer version and features in ProtocolEntry + String tableLocation = getTableLocation(tableName); + List transactionLogs = getEntriesFromJson(0, tableLocation + "/_delta_log", FILE_SYSTEM).orElseThrow(); + ProtocolEntry protocolEntry = transactionLogs.get(1).getProtocol(); + assertThat(protocolEntry).isNotNull(); + assertThat(protocolEntry.getMinReaderVersion()).isEqualTo(3); + assertThat(protocolEntry.getMinWriterVersion()).isEqualTo(7); + assertThat(protocolEntry.getReaderFeatures()).isEqualTo(Optional.of(ImmutableSet.of("timestampNtz"))); + assertThat(protocolEntry.getWriterFeatures()).isEqualTo(Optional.of(ImmutableSet.of("timestampNtz"))); + + // Insert rows and verify results + assertUpdate(session, + "INSERT INTO " + tableName + " " + """ + VALUES + NULL, + TIMESTAMP '-9999-12-31 23:59:59.999999', + TIMESTAMP '-0001-01-01 00:00:00', + TIMESTAMP '0000-01-01 00:00:00', + TIMESTAMP '1582-10-05 00:00:00', + TIMESTAMP '1582-10-14 23:59:59.999999', + TIMESTAMP '2020-12-31 01:02:03.123456', + TIMESTAMP '9999-12-31 23:59:59.999999' + """, + 8); + + assertThat(query(session, "SELECT * FROM " + tableName)) + .matches(""" + VALUES + NULL, + TIMESTAMP '-9999-12-31 23:59:59.999999', + TIMESTAMP '-0001-01-01 00:00:00', + TIMESTAMP '0000-01-01 00:00:00', + TIMESTAMP '1582-10-05 00:00:00', + TIMESTAMP '1582-10-14 23:59:59.999999', + TIMESTAMP '2020-12-31 01:02:03.123456', + TIMESTAMP '2023-01-02 03:04:05.123456', + TIMESTAMP '9999-12-31 23:59:59.999999' + """); + assertQuery( + "SHOW STATS FOR " + tableName, + """ + VALUES + ('x', null, 8.0, 0.1111111111111111, null, null, null), + (null, null, null, null, 9.0, null, null) + """); + + assertUpdate("DROP TABLE " + tableName); + } + + @Test(dataProvider = "sessionZonesDataProvider") + public void testTrinoTimestampNtzComplexType(ZoneId sessionZone) + { + String tableName = "test_timestamp_ntz_complex_type" + randomNameSuffix(); + + assertUpdate("CREATE TABLE " + tableName + "(id int, array_col array(timestamp(6)), map_col map(timestamp(6), timestamp(6)), row_col row(child timestamp(6)))"); + + Session session = Session.builder(getSession()) + .setTimeZoneKey(TimeZoneKey.getTimeZoneKey(sessionZone.getId())) + .build(); + + assertUpdate( + session, + "INSERT INTO " + tableName + " " + """ + VALUES ( + 1, + ARRAY[TIMESTAMP '2020-12-31 01:02:03.123456'], + MAP(ARRAY[TIMESTAMP '2021-12-31 01:02:03.123456'], ARRAY[TIMESTAMP '2022-12-31 01:02:03.123456']), + ROW(TIMESTAMP '2023-12-31 01:02:03.123456') + ) + """, + 1); + + assertThat(query(session, "SELECT * FROM " + tableName)) + .matches(""" + VALUES ( + 1, + ARRAY[TIMESTAMP '2020-12-31 01:02:03.123456'], + MAP(ARRAY[TIMESTAMP '2021-12-31 01:02:03.123456'], ARRAY[TIMESTAMP '2022-12-31 01:02:03.123456']), + CAST(ROW(TIMESTAMP '2023-12-31 01:02:03.123456') AS ROW(child timestamp(6))) + ) + """); + assertQuery( + "SHOW STATS FOR " + tableName, + """ + VALUES + ('id', null, 1.0, 0.0, null, 1, 1), + ('array_col', null, null, null, null, null, null), + ('map_col', null, null, null, null, null, null), + ('row_col', null, null, null, null, null, null), + (null, null, null, null, 1.0, null, null) + """); + + assertUpdate("DROP TABLE " + tableName); } /** * @see databricks131.timestamp_ntz_partition */ - @Test - public void testTimestampNtzPartitioned() + @Test(dataProvider = "sessionZonesDataProvider") + public void testTimestampNtzPartitioned(ZoneId sessionZone) + throws Exception { - // TODO Move this test to product test once new Databricks LTS or OSS Delta Lake supports timestamp_ntz type + String tableName = "timestamp_ntz_partition" + randomNameSuffix(); + Path tableLocation = Files.createTempFile(tableName, null); + copyDirectoryContents(new File(Resources.getResource("databricks131/timestamp_ntz_partition").toURI()).toPath(), tableLocation); + assertUpdate("CALL system.register_table('%s', '%s', '%s')".formatted(getSession().getSchema().orElseThrow(), tableName, tableLocation.toUri())); + + Session session = Session.builder(getSession()) + .setTimeZoneKey(TimeZoneKey.getTimeZoneKey(sessionZone.getId())) + .build(); + assertQuery( - "DESCRIBE timestamp_ntz_partition", + "DESCRIBE " + tableName, "VALUES ('id', 'integer', '', ''), ('part', 'timestamp(6)', '', '')"); - assertThat((String) computeScalar("SHOW CREATE TABLE timestamp_ntz_partition")) + assertThat((String) computeScalar("SHOW CREATE TABLE " + tableName)) .contains("partitioned_by = ARRAY['part']"); - assertThat(query("SELECT * FROM timestamp_ntz_partition")) + assertThat(query(session, "SELECT * FROM " + tableName)) .matches(""" VALUES (1, NULL), @@ -575,11 +747,11 @@ public void testTimestampNtzPartitioned() (6, TIMESTAMP '1582-10-14 23:59:59.999999'), (7, TIMESTAMP '2020-12-31 01:02:03.123456'), (8, TIMESTAMP '9999-12-31 23:59:59.999999') - """); - assertQuery("SELECT id FROM timestamp_ntz_partition WHERE part = TIMESTAMP '2020-12-31 01:02:03.123456'", "VALUES 7"); + """); + assertQuery(session, "SELECT id FROM " + tableName + " WHERE part = TIMESTAMP '2020-12-31 01:02:03.123456'", "VALUES 7"); assertQuery( - "SHOW STATS FOR timestamp_ntz_partition", + "SHOW STATS FOR " + tableName, """ VALUES ('id', null, null, 0.0, null, 1, 8), @@ -587,10 +759,26 @@ public void testTimestampNtzPartitioned() (null, null, null, null, 8.0, null, null) """); - // TODO https://github.com/trinodb/trino/issues/15873 Support writing timestamp_ntz type when upgrading the max writer version to 7 - assertQueryFails( - "INSERT INTO timestamp_ntz_partition VALUES (NULL, NULL)", - "\\QUnsupported writer features: [timestampNtz]"); + // Verify the connector can insert into tables created by Databricks + assertUpdate(session, "INSERT INTO " + tableName + " VALUES (9, TIMESTAMP '2023-01-02 03:04:05.123456')", 1); + assertQuery(session, "SELECT part FROM " + tableName + " WHERE id = 9", "VALUES TIMESTAMP '2023-01-02 03:04:05.123456'"); + assertQuery(session, "SELECT id FROM " + tableName + " WHERE part = TIMESTAMP '2023-01-02 03:04:05.123456'", "VALUES 9"); + assertQuery( + "SHOW STATS FOR " + tableName, + """ + VALUES + ('id', null, 1.0, 0.0, null, 1, 9), + ('part', null, 8.0, 0.1111111111111111, null, null, null), + (null, null, null, null, 9.0, null, null) + """); + List transactionLogs = getEntriesFromJson(2, tableLocation.resolve("_delta_log").toString(), FILE_SYSTEM).orElseThrow(); + assertThat(transactionLogs).hasSize(2); + AddFileEntry addFileEntry = transactionLogs.get(1).getAdd(); + assertThat(addFileEntry).isNotNull(); + assertThat(addFileEntry.getPath()).startsWith("part=2023-01-02%2003%253A04%253A05.123456/"); + assertThat(addFileEntry.getPartitionValues()).containsExactly(Map.entry("part", "2023-01-02 03:04:05.123456")); + + assertUpdate("DROP TABLE " + tableName); } /** @@ -756,4 +944,29 @@ private static MetadataEntry loadMetadataEntry(long entryNumber, Path tableLocat .collect(onlyElement()); return transactionLog.getMetaData(); } + + private String getTableLocation(String tableName) + { + Pattern locationPattern = Pattern.compile(".*location = '(.*?)'.*", Pattern.DOTALL); + Matcher m = locationPattern.matcher((String) computeActual("SHOW CREATE TABLE " + tableName).getOnlyValue()); + if (m.find()) { + String location = m.group(1); + verify(!m.find(), "Unexpected second match"); + return location; + } + throw new IllegalStateException("Location not found in SHOW CREATE TABLE result"); + } + + @DataProvider + public Object[][] sessionZonesDataProvider() + { + return new Object[][] { + {UTC}, + {jvmZone}, + // using two non-JVM zones so that we don't need to worry what Postgres system zone is + {vilnius}, + {kathmandu}, + {TestingSession.DEFAULT_TIME_ZONE_KEY.getZoneId()}, + }; + } } diff --git a/plugin/trino-delta-lake/src/test/java/io/trino/plugin/deltalake/TestDeltaLakeConnectorTest.java b/plugin/trino-delta-lake/src/test/java/io/trino/plugin/deltalake/TestDeltaLakeConnectorTest.java index 765c75f8f14a..251326f8d4e6 100644 --- a/plugin/trino-delta-lake/src/test/java/io/trino/plugin/deltalake/TestDeltaLakeConnectorTest.java +++ b/plugin/trino-delta-lake/src/test/java/io/trino/plugin/deltalake/TestDeltaLakeConnectorTest.java @@ -223,7 +223,6 @@ protected Optional filterDataMappingSmokeTestData(DataMapp if (typeName.equals("time") || typeName.equals("time(6)") || typeName.equals("timestamp") || - typeName.equals("timestamp(6)") || typeName.equals("timestamp(6) with time zone") || typeName.equals("char(3)")) { return Optional.of(dataMappingTestSetup.asUnsupported()); @@ -532,6 +531,46 @@ public void testTimestampPredicatePushdown(String value) results -> {}); } + @Test + public void testTimestampPartition() + { + String tableName = "test_timestamp_ntz_partition_" + randomNameSuffix(); + + assertUpdate("DROP TABLE IF EXISTS " + tableName); + assertUpdate("CREATE TABLE " + tableName + "(id INT, part TIMESTAMP(6)) WITH (partitioned_by = ARRAY['part'])"); + assertUpdate( + "INSERT INTO " + tableName + " VALUES " + + "(1, NULL)," + + "(2, TIMESTAMP '0001-01-01 00:00:00.000')," + + "(3, TIMESTAMP '2023-07-20 01:02:03.9999999')," + + "(4, TIMESTAMP '9999-12-31 23:59:59.999999')", + 4); + + assertThat(query("SELECT * FROM " + tableName)) + .matches("VALUES " + + "(1, NULL)," + + "(2, TIMESTAMP '0001-01-01 00:00:00.000000')," + + "(3, TIMESTAMP '2023-07-20 01:02:04.000000')," + + "(4, TIMESTAMP '9999-12-31 23:59:59.999999')"); + assertQuery( + "SHOW STATS FOR " + tableName, + "VALUES " + + "('id', null, 4.0, 0.0, null, 1, 4)," + + "('part', null, 3.0, 0.25, null, null, null)," + + "(null, null, null, null, 4.0, null, null)"); + + assertThat((String) computeScalar("SELECT \"$path\" FROM " + tableName + " WHERE id = 1")) + .contains("/part=__HIVE_DEFAULT_PARTITION__/"); + assertThat((String) computeScalar("SELECT \"$path\" FROM " + tableName + " WHERE id = 2")) + .contains("/part=0001-01-01 00%3A00%3A00/"); + assertThat((String) computeScalar("SELECT \"$path\" FROM " + tableName + " WHERE id = 3")) + .contains("/part=2023-07-20 01%3A02%3A04/"); + assertThat((String) computeScalar("SELECT \"$path\" FROM " + tableName + " WHERE id = 4")) + .contains("/part=9999-12-31 23%3A59%3A59.999999/"); + + assertUpdate("DROP TABLE " + tableName); + } + @Test public void testTimestampWithTimeZonePartition() { diff --git a/testing/trino-product-tests/src/main/java/io/trino/tests/product/deltalake/TestDeltaLakeDatabricksInsertCompatibility.java b/testing/trino-product-tests/src/main/java/io/trino/tests/product/deltalake/TestDeltaLakeDatabricksInsertCompatibility.java index f40cff0440cf..d5a59fc2cdfa 100644 --- a/testing/trino-product-tests/src/main/java/io/trino/tests/product/deltalake/TestDeltaLakeDatabricksInsertCompatibility.java +++ b/testing/trino-product-tests/src/main/java/io/trino/tests/product/deltalake/TestDeltaLakeDatabricksInsertCompatibility.java @@ -34,6 +34,7 @@ import static io.trino.tests.product.TestGroups.DELTA_LAKE_DATABRICKS_104; import static io.trino.tests.product.TestGroups.DELTA_LAKE_DATABRICKS_113; import static io.trino.tests.product.TestGroups.DELTA_LAKE_DATABRICKS_122; +import static io.trino.tests.product.TestGroups.DELTA_LAKE_EXCLUDE_91; import static io.trino.tests.product.TestGroups.DELTA_LAKE_OSS; import static io.trino.tests.product.TestGroups.PROFILE_SPECIFIC_TESTS; import static io.trino.tests.product.deltalake.util.DatabricksVersion.DATABRICKS_104_RUNTIME_VERSION; @@ -140,6 +141,42 @@ public void testPartitionedInsertCompatibility() } } + @Test(groups = {DELTA_LAKE_DATABRICKS, DELTA_LAKE_EXCLUDE_91, DELTA_LAKE_OSS, PROFILE_SPECIFIC_TESTS}) + @Flaky(issue = DATABRICKS_COMMUNICATION_FAILURE_ISSUE, match = DATABRICKS_COMMUNICATION_FAILURE_MATCH) + public void testTimestampInsertCompatibility() + { + String tableName = "test_dl_timestamp_ntz_insert_" + randomNameSuffix(); + + onTrino().executeQuery("" + + "CREATE TABLE delta.default." + tableName + + "(id INT, ts TIMESTAMP(6))" + + "WITH (location = 's3://" + bucketName + "/databricks-compatibility-test-" + tableName + "')"); + try { + onDelta().executeQuery("INSERT INTO default." + tableName + " VALUES" + + "(1, TIMESTAMP '0001-01-01 00:00:00.000')," + + "(2, TIMESTAMP '2023-01-02 01:02:03.999')"); + onTrino().executeQuery("INSERT INTO delta.default." + tableName + " VALUES" + + "(3, TIMESTAMP '2023-03-04 01:02:03.999')," + + "(4, TIMESTAMP '9999-12-31 23:59:59.999')"); + + assertThat(onDelta().executeQuery("SELECT id, date_format(ts, \"yyyy-MM-dd HH:mm:ss.SSS\") FROM default." + tableName)) + .containsOnly( + row(1, databricksRuntimeVersion.isPresent() ? "0001-01-03 00:00:00.000" : "0001-01-01 00:00:00.000"), // Databricks returns incorrect results + row(2, "2023-01-02 01:02:03.999"), + row(3, "2023-03-04 01:02:03.999"), + row(4, "9999-12-31 23:59:59.999")); + assertThat(onTrino().executeQuery("SELECT id, format_datetime(ts, 'yyyy-MM-dd HH:mm:ss.SSS') FROM delta.default." + tableName)) + .containsOnly( + row(1, "0001-01-01 00:00:00.000"), + row(2, "2023-01-02 01:02:03.999"), + row(3, "2023-03-04 01:02:03.999"), + row(4, "9999-12-31 23:59:59.999")); + } + finally { + onTrino().executeQuery("DROP TABLE delta.default." + tableName); + } + } + @Test(groups = {DELTA_LAKE_DATABRICKS, DELTA_LAKE_OSS, PROFILE_SPECIFIC_TESTS}) @Flaky(issue = DATABRICKS_COMMUNICATION_FAILURE_ISSUE, match = DATABRICKS_COMMUNICATION_FAILURE_MATCH) public void testTimestampWithTimeZonePartitionedInsertCompatibility()