From 0acd50314ed1cd5ca9197c5dd6cf9232a5e5658a Mon Sep 17 00:00:00 2001 From: Yuya Ebihara Date: Wed, 29 May 2024 12:20:38 +0900 Subject: [PATCH 1/3] Skip getting stats on varbinary type in Delta --- .../transactionlog/DeltaLakeParquetStatisticsUtils.java | 9 +++++---- .../plugin/deltalake/TestDeltaLakeSystemTables.java | 2 +- 2 files changed, 6 insertions(+), 5 deletions(-) diff --git a/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/transactionlog/DeltaLakeParquetStatisticsUtils.java b/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/transactionlog/DeltaLakeParquetStatisticsUtils.java index 76c105436cfb..ba91eb1d5364 100644 --- a/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/transactionlog/DeltaLakeParquetStatisticsUtils.java +++ b/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/transactionlog/DeltaLakeParquetStatisticsUtils.java @@ -70,6 +70,7 @@ import static io.trino.spi.type.TinyintType.TINYINT; import static io.trino.spi.type.TypeUtils.readNativeValue; import static io.trino.spi.type.TypeUtils.writeNativeValue; +import static io.trino.spi.type.VarbinaryType.VARBINARY; import static java.lang.Float.floatToRawIntBits; import static java.lang.Float.intBitsToFloat; import static java.lang.Math.toIntExact; @@ -344,8 +345,8 @@ private static Optional getMin(Type type, Statistics statistics) return Optional.of(new String(((BinaryStatistics) statistics).genericGetMin().getBytes(), UTF_8)); } - if (type.equals(BOOLEAN)) { - // Boolean columns do not collect min/max stats + if (type.equals(BOOLEAN) || type.equals(VARBINARY)) { + // Boolean and varbinary columns do not collect min/max stats return Optional.empty(); } @@ -424,8 +425,8 @@ private static Optional getMax(Type type, Statistics statistics) return Optional.of(new String(((BinaryStatistics) statistics).genericGetMax().getBytes(), UTF_8)); } - if (type.equals(BOOLEAN)) { - // Boolean columns do not collect min/max stats + if (type.equals(BOOLEAN) || type.equals(VARBINARY)) { + // Boolean and varbinary columns do not collect min/max stats return Optional.empty(); } diff --git a/plugin/trino-delta-lake/src/test/java/io/trino/plugin/deltalake/TestDeltaLakeSystemTables.java b/plugin/trino-delta-lake/src/test/java/io/trino/plugin/deltalake/TestDeltaLakeSystemTables.java index 9975bafb0ea4..f690f3498ad0 100644 --- a/plugin/trino-delta-lake/src/test/java/io/trino/plugin/deltalake/TestDeltaLakeSystemTables.java +++ b/plugin/trino-delta-lake/src/test/java/io/trino/plugin/deltalake/TestDeltaLakeSystemTables.java @@ -359,7 +359,7 @@ public void testPartitionsTableDifferentOrderFromDefinitionMultipleColumns() @Test public void testPartitionsTableColumnTypes() { - // TODO: add support for BOOLEAN, TIMESTAMP, and VARBINARY column types https://github.com/trinodb/trino/issues/21878 + // TODO: add support for TIMESTAMP column types https://github.com/trinodb/trino/issues/21878 testPartitionsTableColumnTypes("BOOLEAN", "VALUES (true, 'a'), (false, 'a'), (false, 'b'), (false, 'b')", 4, """ VALUES ROW(ROW(CAST(NULL AS BOOLEAN), CAST(NULL AS BOOLEAN), BIGINT '0')), From da8457c386bbf6cfc761ae8c1362ada9dee37a8b Mon Sep 17 00:00:00 2001 From: Yuya Ebihara Date: Wed, 29 May 2024 13:42:55 +0900 Subject: [PATCH 2/3] Rename tests in TestDeltaLakeParquetStatisticsUtils --- .../transactionlog/TestDeltaLakeParquetStatisticsUtils.java | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/plugin/trino-delta-lake/src/test/java/io/trino/plugin/deltalake/transactionlog/TestDeltaLakeParquetStatisticsUtils.java b/plugin/trino-delta-lake/src/test/java/io/trino/plugin/deltalake/transactionlog/TestDeltaLakeParquetStatisticsUtils.java index b88d427a1e82..8f59d179ea9f 100644 --- a/plugin/trino-delta-lake/src/test/java/io/trino/plugin/deltalake/transactionlog/TestDeltaLakeParquetStatisticsUtils.java +++ b/plugin/trino-delta-lake/src/test/java/io/trino/plugin/deltalake/transactionlog/TestDeltaLakeParquetStatisticsUtils.java @@ -114,7 +114,7 @@ public void testDateStatistics() } @Test - public void testTimestampStatisticsHighPrecision() + public void testTimestampWithTimeZoneStatisticsHighPrecision() { String columnName = "t_timestamp"; PrimitiveType type = new PrimitiveType(Type.Repetition.REQUIRED, PrimitiveType.PrimitiveTypeName.INT96, columnName); @@ -129,7 +129,7 @@ public void testTimestampStatisticsHighPrecision() } @Test - public void testTimestampStatisticsMillisPrecision() + public void testTimestampWithTimeZoneStatisticsMillisPrecision() { String columnName = "t_timestamp"; PrimitiveType type = new PrimitiveType(Type.Repetition.REQUIRED, PrimitiveType.PrimitiveTypeName.INT96, columnName); From 24b56ccb37d9156b94f3599a8ac9427c4cc170b5 Mon Sep 17 00:00:00 2001 From: Yuya Ebihara Date: Mon, 27 May 2024 14:54:47 +0900 Subject: [PATCH 3/3] Add support for stats on timestamp type in Delta Lake --- .../DeltaLakeParquetStatisticsUtils.java | 59 ++++++++++++++++++ .../checkpoint/CheckpointWriter.java | 9 ++- .../plugin/deltalake/TestDeltaLakeBasic.java | 4 +- .../deltalake/TestDeltaLakeSystemTables.java | 5 +- .../TestDeltaLakeParquetStatisticsUtils.java | 61 +++++++++++++++++++ .../checkpoint/TestCheckpointWriter.java | 18 ++++++ 6 files changed, 149 insertions(+), 7 deletions(-) diff --git a/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/transactionlog/DeltaLakeParquetStatisticsUtils.java b/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/transactionlog/DeltaLakeParquetStatisticsUtils.java index ba91eb1d5364..2ece368760ab 100644 --- a/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/transactionlog/DeltaLakeParquetStatisticsUtils.java +++ b/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/transactionlog/DeltaLakeParquetStatisticsUtils.java @@ -27,6 +27,7 @@ import io.trino.spi.type.Int128; import io.trino.spi.type.MapType; import io.trino.spi.type.RowType; +import io.trino.spi.type.TimestampType; import io.trino.spi.type.TimestampWithTimeZoneType; import io.trino.spi.type.Type; import io.trino.spi.type.VarcharType; @@ -64,15 +65,20 @@ import static io.trino.spi.type.IntegerType.INTEGER; import static io.trino.spi.type.RealType.REAL; import static io.trino.spi.type.SmallintType.SMALLINT; +import static io.trino.spi.type.TimestampType.TIMESTAMP_MICROS; import static io.trino.spi.type.TimestampType.TIMESTAMP_MILLIS; import static io.trino.spi.type.TimestampWithTimeZoneType.TIMESTAMP_TZ_MILLIS; import static io.trino.spi.type.Timestamps.MICROSECONDS_PER_MILLISECOND; +import static io.trino.spi.type.Timestamps.MICROSECONDS_PER_SECOND; +import static io.trino.spi.type.Timestamps.NANOSECONDS_PER_MICROSECOND; import static io.trino.spi.type.TinyintType.TINYINT; import static io.trino.spi.type.TypeUtils.readNativeValue; import static io.trino.spi.type.TypeUtils.writeNativeValue; import static io.trino.spi.type.VarbinaryType.VARBINARY; import static java.lang.Float.floatToRawIntBits; import static java.lang.Float.intBitsToFloat; +import static java.lang.Math.floorDiv; +import static java.lang.Math.floorMod; import static java.lang.Math.toIntExact; import static java.nio.charset.StandardCharsets.UTF_8; import static java.time.ZoneOffset.UTC; @@ -141,6 +147,10 @@ public static Object jsonValueToTrinoValue(Type type, @Nullable Object jsonValue if (type == TIMESTAMP_MILLIS) { return Instant.parse((String) jsonValue).toEpochMilli() * MICROSECONDS_PER_MILLISECOND; } + if (type == TIMESTAMP_MICROS) { + Instant instant = Instant.parse((String) jsonValue); + return (instant.getEpochSecond() * MICROSECONDS_PER_SECOND) + (instant.getNano() / NANOSECONDS_PER_MICROSECOND); + } if (type instanceof RowType rowType) { Map values = (Map) jsonValue; List fieldTypes = rowType.getTypeParameters(); @@ -200,6 +210,13 @@ public static Object toJsonValue(Type type, @Nullable Object value) if (type == DateType.DATE) { return LocalDate.ofEpochDay((long) value).format(ISO_LOCAL_DATE); } + if (type == TIMESTAMP_MICROS) { + long epochMicros = (long) value; + long epochSeconds = floorDiv(epochMicros, MICROSECONDS_PER_SECOND); + int nanoAdjustment = floorMod(epochMicros, MICROSECONDS_PER_SECOND) * NANOSECONDS_PER_MICROSECOND; + Instant instant = Instant.ofEpochSecond(epochSeconds, nanoAdjustment); + return ISO_INSTANT.format(ZonedDateTime.ofInstant(instant.truncatedTo(MILLIS), UTC)); + } if (type == TIMESTAMP_TZ_MILLIS) { Instant ts = Instant.ofEpochMilli(unpackMillisUtc((long) value)); return ISO_INSTANT.format(ZonedDateTime.ofInstant(ts, UTC)); @@ -290,6 +307,22 @@ private static Optional getMin(Type type, Statistics statistics) return Optional.of(date.format(ISO_LOCAL_DATE)); } + if (type instanceof TimestampType) { + // Spark truncates the timestamp to milliseconds. The connector follows the same behavior. + if (statistics instanceof LongStatistics longStatistics) { + long epochMicros = longStatistics.genericGetMin(); + long epochSeconds = floorDiv(epochMicros, MICROSECONDS_PER_SECOND); + int nanoAdjustment = floorMod(epochMicros, MICROSECONDS_PER_SECOND) * NANOSECONDS_PER_MICROSECOND; + Instant instant = Instant.ofEpochSecond(epochSeconds, nanoAdjustment); + return Optional.of(ISO_INSTANT.format(ZonedDateTime.ofInstant(instant, UTC).truncatedTo(MILLIS))); + } + if (statistics instanceof BinaryStatistics binaryStatistics) { + DecodedTimestamp decodedTimestamp = decodeInt96Timestamp(binaryStatistics.genericGetMin()); + Instant instant = Instant.ofEpochSecond(decodedTimestamp.epochSeconds(), decodedTimestamp.nanosOfSecond()); + return Optional.of(ISO_INSTANT.format(ZonedDateTime.ofInstant(instant, UTC).truncatedTo(MILLIS))); + } + } + if (type instanceof TimestampWithTimeZoneType) { if (statistics instanceof LongStatistics) { Instant ts = Instant.ofEpochMilli(((LongStatistics) statistics).genericGetMin()); @@ -367,6 +400,32 @@ private static Optional getMax(Type type, Statistics statistics) return Optional.of(date.format(ISO_LOCAL_DATE)); } + if (type instanceof TimestampType timestampType) { + // Spark truncates the timestamp to milliseconds. The connector follows the same behavior. + checkArgument(timestampType.getPrecision() >= TIMESTAMP_MILLIS.getPrecision(), "type precision must be at least milliseconds"); + + if (statistics instanceof LongStatistics longStatistics) { + long epochMicros = longStatistics.genericGetMax(); + long epochSeconds = floorDiv(epochMicros, MICROSECONDS_PER_SECOND); + int nanoAdjustment = floorMod(epochMicros, MICROSECONDS_PER_SECOND) * NANOSECONDS_PER_MICROSECOND; + Instant instant = Instant.ofEpochSecond(epochSeconds, nanoAdjustment); + Instant truncatedToMillis = instant.truncatedTo(MILLIS); + if (truncatedToMillis.isBefore(instant)) { + truncatedToMillis = truncatedToMillis.plusMillis(1); + } + return Optional.of(ISO_INSTANT.format(truncatedToMillis)); + } + if (statistics instanceof BinaryStatistics binaryStatistics) { + DecodedTimestamp decodedTimestamp = decodeInt96Timestamp(binaryStatistics.genericGetMax()); + Instant instant = Instant.ofEpochSecond(decodedTimestamp.epochSeconds(), decodedTimestamp.nanosOfSecond()); + Instant truncatedToMillis = instant.truncatedTo(MILLIS); + if (truncatedToMillis.isBefore(instant)) { + truncatedToMillis = truncatedToMillis.plusMillis(1); + } + return Optional.of(ISO_INSTANT.format(truncatedToMillis)); + } + } + if (type instanceof TimestampWithTimeZoneType) { if (statistics instanceof LongStatistics) { Instant ts = Instant.ofEpochMilli(((LongStatistics) statistics).genericGetMax()); diff --git a/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/transactionlog/checkpoint/CheckpointWriter.java b/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/transactionlog/checkpoint/CheckpointWriter.java index 09afa0202afc..b908520dbd1b 100644 --- a/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/transactionlog/checkpoint/CheckpointWriter.java +++ b/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/transactionlog/checkpoint/CheckpointWriter.java @@ -40,7 +40,6 @@ import io.trino.spi.type.MapType; import io.trino.spi.type.RowType; import io.trino.spi.type.RowType.Field; -import io.trino.spi.type.TimestampType; import io.trino.spi.type.Type; import io.trino.spi.type.TypeManager; import jakarta.annotation.Nullable; @@ -66,6 +65,8 @@ import static io.trino.plugin.deltalake.transactionlog.MetadataEntry.DELTA_CHECKPOINT_WRITE_STATS_AS_JSON_PROPERTY; import static io.trino.plugin.deltalake.transactionlog.MetadataEntry.DELTA_CHECKPOINT_WRITE_STATS_AS_STRUCT_PROPERTY; import static io.trino.plugin.deltalake.transactionlog.TransactionLogParser.deserializePartitionValue; +import static io.trino.spi.type.TimestampType.TIMESTAMP_MICROS; +import static io.trino.spi.type.TimestampType.TIMESTAMP_MILLIS; import static io.trino.spi.type.Timestamps.MICROSECONDS_PER_MILLISECOND; import static io.trino.spi.type.TypeUtils.writeNativeValue; import static java.lang.Math.multiplyExact; @@ -432,11 +433,15 @@ private Optional> preprocessMinMaxValues(RowType valuesType, if (isJson) { return jsonValueToTrinoValue(type, value); } - if (type instanceof TimestampType) { + if (type == TIMESTAMP_MILLIS) { // We need to remap TIMESTAMP WITH TIME ZONE -> TIMESTAMP here because of // inconsistency in what type is used for DL "timestamp" type in data processing and in min/max statistics map. value = multiplyExact(DateTimeEncoding.unpackMillisUtc((long) value), MICROSECONDS_PER_MILLISECOND); } + if (type == TIMESTAMP_MICROS) { + // This is TIMESTAMP_NTZ type in Delta Lake + return value; + } return value; })); }); diff --git a/plugin/trino-delta-lake/src/test/java/io/trino/plugin/deltalake/TestDeltaLakeBasic.java b/plugin/trino-delta-lake/src/test/java/io/trino/plugin/deltalake/TestDeltaLakeBasic.java index 55a612fd737b..6998ee2ff64c 100644 --- a/plugin/trino-delta-lake/src/test/java/io/trino/plugin/deltalake/TestDeltaLakeBasic.java +++ b/plugin/trino-delta-lake/src/test/java/io/trino/plugin/deltalake/TestDeltaLakeBasic.java @@ -617,7 +617,7 @@ private void testDeltaTimestampNtz(ZoneId sessionZone) "SHOW STATS FOR " + tableName, """ VALUES - ('x', null, 1.0, 0.1111111111111111, null, null, null), + ('x', null, 1.0, 0.1111111111111111, null, '2023-01-02 03:04:05.123000', '2023-01-02 03:04:05.124000'), (null, null, null, null, 9.0, null, null) """); @@ -722,7 +722,7 @@ private void testTrinoCreateTableWithTimestampNtz(ZoneId sessionZone, Consumer stats = Statistics.getBuilderForReading(type) + .withMin(timestampToBytes(LocalDateTime.parse("2020-08-26T01:02:03.123456"))) + .withMax(timestampToBytes(LocalDateTime.parse("2020-08-26T01:02:03.987654"))) + .withNumNulls(2) + .build(); + + assertThat(DeltaLakeParquetStatisticsUtils.jsonEncodeMin(ImmutableMap.of(columnName, Optional.of(stats)), ImmutableMap.of(columnName, TIMESTAMP_MICROS))) + .isEqualTo(ImmutableMap.of(columnName, "2020-08-26T01:02:03.123Z")); + assertThat(DeltaLakeParquetStatisticsUtils.jsonEncodeMax(ImmutableMap.of(columnName, Optional.of(stats)), ImmutableMap.of(columnName, TIMESTAMP_MICROS))) + .isEqualTo(ImmutableMap.of(columnName, "2020-08-26T01:02:03.988Z")); + } + + private static byte[] timestampToBytes(LocalDateTime localDateTime) + { + long epochMicros = localDateTime.toEpochSecond(UTC) * MICROSECONDS_PER_SECOND + + localDateTime.getNano() / NANOSECONDS_PER_MICROSECOND; + + Slice slice = Slices.allocate(8); + slice.setLong(0, epochMicros); + return slice.byteArray(); + } + + @Test + public void testTimestampStatisticsHighPrecision() + { + String columnName = "t_timestamp"; + PrimitiveType type = new PrimitiveType(Type.Repetition.REQUIRED, PrimitiveType.PrimitiveTypeName.INT96, columnName); + Statistics stats = Statistics.getBuilderForReading(type) + .withMin(toParquetEncoding(LocalDateTime.parse("2020-08-26T01:02:03.123456789"))) + .withMax(toParquetEncoding(LocalDateTime.parse("2020-08-26T01:02:03.123987654"))) + .withNumNulls(2) + .build(); + + assertThat(DeltaLakeParquetStatisticsUtils.jsonEncodeMin(ImmutableMap.of(columnName, Optional.of(stats)), ImmutableMap.of(columnName, TIMESTAMP_MICROS))) + .isEqualTo(ImmutableMap.of(columnName, "2020-08-26T01:02:03.123Z")); + assertThat(DeltaLakeParquetStatisticsUtils.jsonEncodeMax(ImmutableMap.of(columnName, Optional.of(stats)), ImmutableMap.of(columnName, TIMESTAMP_MICROS))) + .isEqualTo(ImmutableMap.of(columnName, "2020-08-26T01:02:03.124Z")); + } + @Test public void testTimestampWithTimeZoneStatisticsHighPrecision() { diff --git a/plugin/trino-delta-lake/src/test/java/io/trino/plugin/deltalake/transactionlog/checkpoint/TestCheckpointWriter.java b/plugin/trino-delta-lake/src/test/java/io/trino/plugin/deltalake/transactionlog/checkpoint/TestCheckpointWriter.java index ef7333cf7132..410e5686cad1 100644 --- a/plugin/trino-delta-lake/src/test/java/io/trino/plugin/deltalake/transactionlog/checkpoint/TestCheckpointWriter.java +++ b/plugin/trino-delta-lake/src/test/java/io/trino/plugin/deltalake/transactionlog/checkpoint/TestCheckpointWriter.java @@ -46,6 +46,7 @@ import java.io.File; import java.io.IOException; +import java.time.LocalDateTime; import java.util.Iterator; import java.util.Map; import java.util.Optional; @@ -65,9 +66,12 @@ import static io.trino.plugin.hive.HiveTestUtils.HDFS_FILE_SYSTEM_STATS; import static io.trino.spi.predicate.Utils.nativeValueToBlock; import static io.trino.spi.type.TimeZoneKey.UTC_KEY; +import static io.trino.spi.type.Timestamps.MICROSECONDS_PER_SECOND; +import static io.trino.spi.type.Timestamps.NANOSECONDS_PER_MICROSECOND; import static io.trino.spi.type.VarcharType.createUnboundedVarcharType; import static io.trino.type.InternalTypeManager.TESTING_TYPE_MANAGER; import static io.trino.util.DateTimeUtils.parseDate; +import static java.time.ZoneOffset.UTC; import static org.assertj.core.api.Assertions.assertThat; public class TestCheckpointWriter @@ -91,6 +95,7 @@ public void testCheckpointWriteReadJsonRoundtrip() "{\"type\":\"struct\",\"fields\":" + "[{\"name\":\"part_key\",\"type\":\"double\",\"nullable\":true,\"metadata\":{}}," + "{\"name\":\"ts\",\"type\":\"timestamp\",\"nullable\":true,\"metadata\":{}}," + + "{\"name\":\"ts_ntz\",\"type\":\"timestamp_ntz\",\"nullable\":true,\"metadata\":{}}," + "{\"name\":\"str\",\"type\":\"string\",\"nullable\":true,\"metadata\":{}}," + "{\"name\":\"dec_short\",\"type\":\"decimal(5,1)\",\"nullable\":true,\"metadata\":{}}," + "{\"name\":\"dec_long\",\"type\":\"decimal(25,3)\",\"nullable\":true,\"metadata\":{}}," + @@ -125,6 +130,7 @@ public void testCheckpointWriteReadJsonRoundtrip() "\"numRecords\":20," + "\"minValues\":{" + "\"ts\":\"2960-10-31T01:00:00.000Z\"," + + "\"ts_ntz\":\"2020-01-01T01:02:03.123\"," + "\"str\":\"a\"," + "\"dec_short\":10.1," + "\"dec_long\":111111111111.123," + @@ -139,6 +145,7 @@ public void testCheckpointWriteReadJsonRoundtrip() "}," + "\"maxValues\":{" + "\"ts\":\"2960-10-31T02:00:00.000Z\"," + + "\"ts_ntz\":\"3000-01-01T01:02:03.123\"," + "\"str\":\"z\"," + "\"dec_short\":20.1," + "\"dec_long\":222222222222.123," + @@ -219,6 +226,7 @@ public void testCheckpointWriteReadParquetStatisticsRoundtrip() "{\"type\":\"struct\",\"fields\":" + "[{\"name\":\"part_key\",\"type\":\"double\",\"nullable\":true,\"metadata\":{}}," + "{\"name\":\"ts\",\"type\":\"timestamp\",\"nullable\":true,\"metadata\":{}}," + + "{\"name\":\"ts_ntz\",\"type\":\"timestamp_ntz\",\"nullable\":true,\"metadata\":{}}," + "{\"name\":\"str\",\"type\":\"string\",\"nullable\":true,\"metadata\":{}}," + "{\"name\":\"dec_short\",\"type\":\"decimal(5,1)\",\"nullable\":true,\"metadata\":{}}," + "{\"name\":\"dec_long\",\"type\":\"decimal(25,3)\",\"nullable\":true,\"metadata\":{}}," + @@ -262,6 +270,7 @@ public void testCheckpointWriteReadParquetStatisticsRoundtrip() Optional.of(5L), Optional.of(ImmutableMap.builder() .put("ts", DateTimeUtils.convertToTimestampWithTimeZone(UTC_KEY, "2060-10-31 01:00:00")) + .put("ts_ntz", convertToTimestamp("2060-10-31T01:00:00.123")) .put("str", utf8Slice("a")) .put("dec_short", 101L) .put("dec_long", Int128.valueOf(111111111111123L)) @@ -276,6 +285,7 @@ public void testCheckpointWriteReadParquetStatisticsRoundtrip() .buildOrThrow()), Optional.of(ImmutableMap.builder() .put("ts", DateTimeUtils.convertToTimestampWithTimeZone(UTC_KEY, "2060-10-31 02:00:00")) + .put("ts_ntz", convertToTimestamp("2060-10-31T02:00:00.123")) .put("str", utf8Slice("a")) .put("dec_short", 201L) .put("dec_long", Int128.valueOf(222222222222123L)) @@ -290,6 +300,7 @@ public void testCheckpointWriteReadParquetStatisticsRoundtrip() .buildOrThrow()), Optional.of(ImmutableMap.builder() .put("ts", 1L) + .put("ts_ntz", 16L) .put("str", 2L) .put("dec_short", 3L) .put("dec_long", 4L) @@ -340,6 +351,13 @@ public void testCheckpointWriteReadParquetStatisticsRoundtrip() assertThat(readEntries.addFileEntries().stream().map(this::makeComparable).collect(toImmutableSet())).isEqualTo(entries.addFileEntries().stream().map(this::makeComparable).collect(toImmutableSet())); } + private static long convertToTimestamp(String value) + { + LocalDateTime localDateTime = LocalDateTime.parse(value); + return localDateTime.toEpochSecond(UTC) * MICROSECONDS_PER_SECOND + + localDateTime.getNano() / NANOSECONDS_PER_MICROSECOND; + } + @Test public void testDisablingRowStatistics() throws IOException