Skip to content

Commit

Permalink
Support reading timestamp_ntz type in Delta Lake
Browse files Browse the repository at this point in the history
  • Loading branch information
ebyhr committed Jul 21, 2023
1 parent ada049b commit 68ad854
Show file tree
Hide file tree
Showing 21 changed files with 186 additions and 9 deletions.
2 changes: 2 additions & 0 deletions docs/src/main/sphinx/connector/delta-lake.rst
Original file line number Diff line number Diff line change
Expand Up @@ -265,6 +265,8 @@ this table:
- ``VARBINARY``
* - ``DATE``
- ``DATE``
* - ``TIMESTAMPNTZ`` (``TIMESTAMP_NTZ``)
- ``TIMESTAMP(6)``
* - ``TIMESTAMP``
- ``TIMESTAMP(3) WITH TIME ZONE``
* - ``ARRAY``
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ public static HiveType toHiveType(Type type)
return HiveType.toHiveType(translate(type));
}

// Copy from HiveTypeTranslator with a custom mapping for TimestampWithTimeZone
// Copy from HiveTypeTranslator with custom mappings for TimestampType and TimestampWithTimeZone
public static TypeInfo translate(Type type)
{
requireNonNull(type, "type is null");
Expand Down Expand Up @@ -122,8 +122,8 @@ public static TypeInfo translate(Type type)
verify(((TimestampWithTimeZoneType) type).getPrecision() == 3, "Unsupported type: %s", type);
return HIVE_TIMESTAMP.getTypeInfo();
}
if (type instanceof TimestampType) {
verify(((TimestampType) type).getPrecision() == 3, "Unsupported type: %s", type);
if (type instanceof TimestampType timestampType) {
verify(timestampType.getPrecision() == 3 || timestampType.getPrecision() == 6, "Unsupported type: %s", type);
return HIVE_TIMESTAMP.getTypeInfo();
}
if (type instanceof DecimalType decimalType) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,7 @@
import static io.trino.spi.type.IntegerType.INTEGER;
import static io.trino.spi.type.RealType.REAL;
import static io.trino.spi.type.SmallintType.SMALLINT;
import static io.trino.spi.type.TimestampType.TIMESTAMP_MICROS;
import static io.trino.spi.type.TimestampWithTimeZoneType.TIMESTAMP_TZ_MILLIS;
import static io.trino.spi.type.TinyintType.TINYINT;
import static io.trino.spi.type.VarbinaryType.VARBINARY;
Expand All @@ -87,9 +88,10 @@ private DeltaLakeSchemaSupport() {}
public static final String MAX_COLUMN_ID_CONFIGURATION_KEY = "delta.columnMapping.maxColumnId";

// https://github.com/delta-io/delta/blob/master/PROTOCOL.md#valid-feature-names-in-table-features
// TODO: Add support for 'deletionVectors' and 'timestampNTZ' reader features
// TODO: Add support for 'deletionVectors' reader features
private static final Set<String> SUPPORTED_READER_FEATURES = ImmutableSet.<String>builder()
.add("columnMapping")
.add("timestampNtz")
.build();

public enum ColumnMappingMode
Expand Down Expand Up @@ -576,6 +578,9 @@ private static Type buildType(TypeManager typeManager, JsonNode typeNode, boolea
return VARBINARY;
case "date":
return DATE;
case "timestamp_ntz":
// https://github.com/delta-io/delta/blob/master/PROTOCOL.md#timestamp-without-timezone-timestampntz
return TIMESTAMP_MICROS;
case "timestamp":
// Spark/DeltaLake stores timestamps in UTC, but renders them in session time zone.
// For more info, see https://delta-users.slack.com/archives/GKTUWT03T/p1585760533005400
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@
import java.time.Duration;
import java.time.LocalDate;
import java.time.LocalDateTime;
import java.time.LocalTime;
import java.time.ZonedDateTime;
import java.time.chrono.IsoChronology;
import java.time.format.DateTimeFormatter;
Expand All @@ -51,6 +52,7 @@
import java.util.function.Function;

import static com.google.common.base.Verify.verify;
import static com.google.common.math.LongMath.divide;
import static io.airlift.slice.Slices.utf8Slice;
import static io.trino.plugin.deltalake.transactionlog.TransactionLogUtil.getTransactionLogDir;
import static io.trino.plugin.deltalake.transactionlog.TransactionLogUtil.getTransactionLogJsonEntryPath;
Expand All @@ -64,7 +66,10 @@
import static io.trino.spi.type.RealType.REAL;
import static io.trino.spi.type.SmallintType.SMALLINT;
import static io.trino.spi.type.TimeZoneKey.UTC_KEY;
import static io.trino.spi.type.TimestampType.TIMESTAMP_MICROS;
import static io.trino.spi.type.TimestampWithTimeZoneType.TIMESTAMP_TZ_MILLIS;
import static io.trino.spi.type.Timestamps.MICROSECONDS_PER_SECOND;
import static io.trino.spi.type.Timestamps.NANOSECONDS_PER_MICROSECOND;
import static io.trino.spi.type.TinyintType.TINYINT;
import static io.trino.spi.type.VarcharType.VARCHAR;
import static java.lang.Double.parseDouble;
Expand All @@ -73,6 +78,7 @@
import static java.lang.Integer.parseInt;
import static java.lang.Long.parseLong;
import static java.lang.String.format;
import static java.math.RoundingMode.UNNECESSARY;
import static java.time.ZoneOffset.UTC;
import static java.time.format.DateTimeFormatter.ISO_LOCAL_TIME;
import static java.time.temporal.ChronoField.DAY_OF_MONTH;
Expand All @@ -85,7 +91,9 @@ public final class TransactionLogParser

// Before 1900, Java Time and Joda Time are not consistent with java.sql.Date and java.util.Calendar
// Since January 1, 1900 UTC is still December 31, 1899 in other zones, we are adding a 1 day margin.
public static final long START_OF_MODERN_ERA_EPOCH_DAY = LocalDate.of(1900, 1, 2).toEpochDay();
private static final LocalDate START_OF_MODERN_ERA_DATE = LocalDate.of(1900, 1, 2);
public static final long START_OF_MODERN_ERA_EPOCH_DAY = START_OF_MODERN_ERA_DATE.toEpochDay();
public static final long START_OF_MODERN_ERA_EPOCH_MICROS = LocalDateTime.of(START_OF_MODERN_ERA_DATE, LocalTime.MIN).toEpochSecond(UTC) * MICROSECONDS_PER_SECOND;

public static final String LAST_CHECKPOINT_FILENAME = "_last_checkpoint";

Expand Down Expand Up @@ -149,7 +157,13 @@ private static Object parseDecimal(DecimalType type, String valueString)
@Nullable
public static Object deserializePartitionValue(DeltaLakeColumnHandle column, Optional<String> valueString)
{
return valueString.map(value -> deserializeColumnValue(column, value, TransactionLogParser::readPartitionTimestampWithZone)).orElse(null);
return valueString.map(value -> deserializeColumnValue(column, value, TransactionLogParser::readPartitionTimestamp, TransactionLogParser::readPartitionTimestampWithZone)).orElse(null);
}

private static Long readPartitionTimestamp(String timestamp)
{
LocalDateTime localDateTime = LocalDateTime.parse(timestamp, PARTITION_TIMESTAMP_FORMATTER);
return localDateTime.toEpochSecond(UTC) * MICROSECONDS_PER_SECOND + divide(localDateTime.getNano(), NANOSECONDS_PER_MICROSECOND, UNNECESSARY);
}

private static Long readPartitionTimestampWithZone(String timestamp)
Expand All @@ -158,7 +172,7 @@ private static Long readPartitionTimestampWithZone(String timestamp)
return packDateTimeWithZone(zonedDateTime.toInstant().toEpochMilli(), UTC_KEY);
}

public static Object deserializeColumnValue(DeltaLakeColumnHandle column, String valueString, Function<String, Long> timestampWithZoneReader)
public static Object deserializeColumnValue(DeltaLakeColumnHandle column, String valueString, Function<String, Long> timestampReader, Function<String, Long> timestampWithZoneReader)
{
verify(column.isBaseColumn(), "Unexpected dereference: %s", column);
Type type = column.getBaseType();
Expand Down Expand Up @@ -196,6 +210,9 @@ public static Object deserializeColumnValue(DeltaLakeColumnHandle column, String
// date values are represented as yyyy-MM-dd
return LocalDate.parse(valueString).toEpochDay();
}
if (type.equals(TIMESTAMP_MICROS)) {
return timestampReader.apply(valueString);
}
if (type.equals(TIMESTAMP_TZ_MILLIS)) {
return timestampWithZoneReader.apply(valueString);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,29 +23,37 @@
import io.trino.plugin.deltalake.DeltaLakeColumnHandle;
import io.trino.plugin.deltalake.transactionlog.CanonicalColumnName;
import io.trino.plugin.deltalake.transactionlog.TransactionLogAccess;
import io.trino.spi.type.TimestampType;
import io.trino.spi.type.TimestampWithTimeZoneType;
import io.trino.spi.type.Type;

import java.time.LocalDateTime;
import java.time.ZonedDateTime;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;

import static com.google.common.base.MoreObjects.toStringHelper;
import static com.google.common.math.LongMath.divide;
import static io.airlift.slice.SizeOf.estimatedSizeOf;
import static io.airlift.slice.SizeOf.instanceSize;
import static io.trino.plugin.base.util.JsonUtils.parseJson;
import static io.trino.plugin.deltalake.transactionlog.TransactionLogAccess.toCanonicalNameKeyedMap;
import static io.trino.plugin.deltalake.transactionlog.TransactionLogParser.JSON_STATISTICS_TIMESTAMP_FORMATTER;
import static io.trino.plugin.deltalake.transactionlog.TransactionLogParser.START_OF_MODERN_ERA_EPOCH_DAY;
import static io.trino.plugin.deltalake.transactionlog.TransactionLogParser.START_OF_MODERN_ERA_EPOCH_MICROS;
import static io.trino.plugin.deltalake.transactionlog.TransactionLogParser.deserializeColumnValue;
import static io.trino.spi.type.DateTimeEncoding.packDateTimeWithZone;
import static io.trino.spi.type.DateTimeEncoding.unpackMillisUtc;
import static io.trino.spi.type.DateType.DATE;
import static io.trino.spi.type.TimeZoneKey.UTC_KEY;
import static io.trino.spi.type.Timestamps.MICROSECONDS_PER_SECOND;
import static io.trino.spi.type.Timestamps.MILLISECONDS_PER_DAY;
import static io.trino.spi.type.Timestamps.NANOSECONDS_PER_MICROSECOND;
import static java.lang.Math.floorDiv;
import static java.math.RoundingMode.UNNECESSARY;
import static java.time.ZoneOffset.UTC;

public class DeltaLakeJsonFileStatistics
implements DeltaLakeFileStatistics
Expand Down Expand Up @@ -133,7 +141,7 @@ private Optional<Object> deserializeStatisticsValue(DeltaLakeColumnHandle column
if (!columnHandle.isBaseColumn()) {
return Optional.empty();
}
Object columnValue = deserializeColumnValue(columnHandle, statValue, DeltaLakeJsonFileStatistics::readStatisticsTimestampWithZone);
Object columnValue = deserializeColumnValue(columnHandle, statValue, DeltaLakeJsonFileStatistics::readStatisticsTimestamp, DeltaLakeJsonFileStatistics::readStatisticsTimestampWithZone);

Type columnType = columnHandle.getBaseType();
if (columnType.equals(DATE)) {
Expand All @@ -142,6 +150,12 @@ private Optional<Object> deserializeStatisticsValue(DeltaLakeColumnHandle column
return Optional.empty();
}
}
if (columnType instanceof TimestampType) {
long epochMicros = (long) columnValue;
if (epochMicros < START_OF_MODERN_ERA_EPOCH_MICROS) {
return Optional.empty();
}
}
if (columnType instanceof TimestampWithTimeZoneType) {
long packedTimestamp = (long) columnValue;
long epochMillis = unpackMillisUtc(packedTimestamp);
Expand All @@ -153,6 +167,12 @@ private Optional<Object> deserializeStatisticsValue(DeltaLakeColumnHandle column
return Optional.of(columnValue);
}

private static Long readStatisticsTimestamp(String timestamp)
{
LocalDateTime localDateTime = LocalDateTime.parse(timestamp, JSON_STATISTICS_TIMESTAMP_FORMATTER);
return localDateTime.toEpochSecond(UTC) * MICROSECONDS_PER_SECOND + divide(localDateTime.getNano(), NANOSECONDS_PER_MICROSECOND, UNNECESSARY);
}

private static Long readStatisticsTimestampWithZone(String timestamp)
{
ZonedDateTime zonedDateTime = ZonedDateTime.parse(timestamp, JSON_STATISTICS_TIMESTAMP_FORMATTER);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@ public class TestDeltaLakeBasic

private static final List<String> PERSON_TABLES = ImmutableList.of(
"person", "person_without_last_checkpoint", "person_without_old_jsons", "person_without_checkpoints");
private static final List<String> OTHER_TABLES = ImmutableList.of("no_column_stats");
private static final List<String> OTHER_TABLES = ImmutableList.of("no_column_stats", "timestamp_ntz", "timestamp_ntz_partition");

// The col-{uuid} pattern for delta.columnMapping.physicalName
private static final Pattern PHYSICAL_COLUMN_NAME_PATTERN = Pattern.compile("^col-[0-9a-fA-F]{8}-[0-9a-fA-F]{4}-[0-9a-fA-F]{4}-[0-9a-fA-F]{4}-[0-9a-fA-F]{12}$");
Expand Down Expand Up @@ -424,6 +424,83 @@ public Object[][] columnMappingModeDataProvider()
};
}

/**
* @see databricks.timestamp_ntz
*/
@Test
public void testTimestampNtz()
{
// TODO Move this test to product test once new Databricks LTS or OSS Delta Lake supports timestamp_ntz type
assertQuery(
"DESCRIBE timestamp_ntz",
"VALUES ('x', 'timestamp(6)', '', '')");

assertThat(query("SELECT * FROM timestamp_ntz"))
.matches("""
VALUES
NULL,
TIMESTAMP '-9999-12-31 23:59:59.999999',
TIMESTAMP '-0001-01-01 00:00:00',
TIMESTAMP '0000-01-01 00:00:00',
TIMESTAMP '1582-10-05 00:00:00',
TIMESTAMP '1582-10-14 23:59:59.999999',
TIMESTAMP '2020-12-31 01:02:03.123456',
TIMESTAMP '9999-12-31 23:59:59.999999'
""");
assertQuery(
"SHOW STATS FOR timestamp_ntz",
"""
VALUES
('x', null, null, 0.125, null, null, null),
(null, null, null, null, 8.0, null, null)
""");

// TODO https://github.com/trinodb/trino/issues/15873 Support writing timestamp_ntz type when upgrading the max writer version to 7
assertQueryFails("INSERT INTO timestamp_ntz VALUES NULL", "Table .* requires Delta Lake writer version 7 which is not supported");
}

/**
* @see databricks.timestamp_ntz_partition
*/
@Test
public void testTimestampNtzPartitioned()
{
// TODO Move this test to product test once new Databricks LTS or OSS Delta Lake supports timestamp_ntz type
assertQuery(
"DESCRIBE timestamp_ntz_partition",
"VALUES ('id', 'integer', '', ''), ('part', 'timestamp(6)', '', '')");
assertThat((String) computeScalar("SHOW CREATE TABLE timestamp_ntz_partition"))
.contains("partitioned_by = ARRAY['part']");

assertThat(query("SELECT * FROM timestamp_ntz_partition"))
.matches("""
VALUES
(1, NULL),
(2, TIMESTAMP '-9999-12-31 23:59:59.999999'),
(3, TIMESTAMP '-0001-01-01 00:00:00'),
(4, TIMESTAMP '0000-01-01 00:00:00'),
(5, TIMESTAMP '1582-10-05 00:00:00'),
(6, TIMESTAMP '1582-10-14 23:59:59.999999'),
(7, TIMESTAMP '2020-12-31 01:02:03.123456'),
(8, TIMESTAMP '9999-12-31 23:59:59.999999')
""");
assertQuery("SELECT id FROM timestamp_ntz_partition WHERE part = TIMESTAMP '2020-12-31 01:02:03.123456'", "VALUES 7");

assertQuery(
"SHOW STATS FOR timestamp_ntz_partition",
"""
VALUES
('id', null, null, 0.0, null, 1, 8),
('part', null, 7.0, 0.125, null, null, null),
(null, null, null, null, 8.0, null, null)
""");

// TODO https://github.com/trinodb/trino/issues/15873 Support writing timestamp_ntz type when upgrading the max writer version to 7
assertQueryFails(
"INSERT INTO timestamp_ntz_partition VALUES (NULL, NULL)",
"Table .* requires Delta Lake writer version 7 which is not supported");
}

@Test
public void testCorruptedManagedTableLocation()
throws Exception
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
Data generated using Databricks 13.1:

```sql
CREATE TABLE default.test_timestamp_ntz
(x timestamp_ntz)
USING delta
LOCATION 's3://bucket/table'
TBLPROPERTIES ('delta.feature.timestampNtz' = 'supported');

INSERT INTO default.test_timestamp_ntz VALUES
(NULL),
(TIMESTAMP_NTZ '-9999-12-31T23:59:59.999999'),
(TIMESTAMP_NTZ '-0001-01-01T00:00:00.000000'),
(TIMESTAMP_NTZ '0000-01-01T00:00:00.000000'),
(TIMESTAMP_NTZ '1582-10-05T00:00:00.000000'),
(TIMESTAMP_NTZ '1582-10-14T23:59:59.999999'),
(TIMESTAMP_NTZ '2020-12-31T01:02:03.123456'),
(TIMESTAMP_NTZ '9999-12-31T23:59:59.999999');
```
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
{"commitInfo":{"timestamp":1684140545733,"userId":"7853186923043731","userName":"[email protected]","operation":"CREATE TABLE","operationParameters":{"isManaged":"false","description":null,"partitionBy":"[]","properties":"{\"delta.feature.timestampntz\":\"supported\"}"},"notebook":{"notebookId":"824234330454407"},"clusterId":"0515-061003-bamkihe4","isolationLevel":"WriteSerializable","isBlindAppend":true,"operationMetrics":{},"engineInfo":"Databricks-Runtime/13.1.x-scala2.12","txnId":"74bed3b7-6080-4ca8-9ea7-536505dc6e24"}}
{"protocol":{"minReaderVersion":3,"minWriterVersion":7,"readerFeatures":["timestampNtz"],"writerFeatures":["timestampNtz"]}}
{"metaData":{"id":"54a9d74c-2777-4eed-85aa-8a95732f5a74","format":{"provider":"parquet","options":{}},"schemaString":"{\"type\":\"struct\",\"fields\":[{\"name\":\"x\",\"type\":\"timestamp_ntz\",\"nullable\":true,\"metadata\":{}}]}","partitionColumns":[],"configuration":{},"createdTime":1684140544719}}
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
{"commitInfo":{"timestamp":1684140582215,"userId":"7853186923043731","userName":"[email protected]","operation":"WRITE","operationParameters":{"mode":"Append","partitionBy":"[]"},"notebook":{"notebookId":"824234330454407"},"clusterId":"0515-061003-bamkihe4","readVersion":0,"isolationLevel":"WriteSerializable","isBlindAppend":true,"operationMetrics":{"numFiles":"1","numOutputRows":"8","numOutputBytes":"673"},"tags":{"restoresDeletedRows":"false"},"engineInfo":"Databricks-Runtime/13.1.x-scala2.12","txnId":"38fb0b0b-17c0-4263-bc78-93b74a459cce"}}
{"add":{"path":"part-00000-0ca99241-8aa7-4f33-981b-e6fd611fe062-c000.snappy.parquet","partitionValues":{},"size":673,"modificationTime":1684140581000,"dataChange":true,"stats":"{\"numRecords\":8,\"nullCount\":{\"x\":1}}","tags":{"INSERTION_TIME":"1684140581000000","MIN_INSERTION_TIME":"1684140581000000","MAX_INSERTION_TIME":"1684140581000000","OPTIMIZE_TARGET_SIZE":"268435456"}}}
Binary file not shown.
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
Data generated using Databricks 13.1:

```sql
CREATE TABLE default.test_timestamp_ntz_partition
(id int, part timestamp_ntz)
USING delta
PARTITIONED BY (part)
LOCATION 's3://bucket/table'
TBLPROPERTIES ('delta.feature.timestampNtz' = 'supported');

INSERT INTO default.test_timestamp_ntz_partition VALUES
(1, NULL),
(2, TIMESTAMP_NTZ '-9999-12-31T23:59:59.999999'),
(3, TIMESTAMP_NTZ '-0001-01-01T00:00:00.000000'),
(4, TIMESTAMP_NTZ '0000-01-01T00:00:00.000000'),
(5, TIMESTAMP_NTZ '1582-10-05T00:00:00.000000'),
(6, TIMESTAMP_NTZ '1582-10-14T23:59:59.999999'),
(7, TIMESTAMP_NTZ '2020-12-31T01:02:03.123456'),
(8, TIMESTAMP_NTZ '9999-12-31T23:59:59.999999');
```
Loading

0 comments on commit 68ad854

Please sign in to comment.