Skip to content

Commit

Permalink
Add support for stats on timestamp type in Delta Lake
Browse files Browse the repository at this point in the history
  • Loading branch information
ebyhr committed Jun 12, 2024
1 parent da8457c commit 24b56cc
Show file tree
Hide file tree
Showing 6 changed files with 149 additions and 7 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import io.trino.spi.type.Int128;
import io.trino.spi.type.MapType;
import io.trino.spi.type.RowType;
import io.trino.spi.type.TimestampType;
import io.trino.spi.type.TimestampWithTimeZoneType;
import io.trino.spi.type.Type;
import io.trino.spi.type.VarcharType;
Expand Down Expand Up @@ -64,15 +65,20 @@
import static io.trino.spi.type.IntegerType.INTEGER;
import static io.trino.spi.type.RealType.REAL;
import static io.trino.spi.type.SmallintType.SMALLINT;
import static io.trino.spi.type.TimestampType.TIMESTAMP_MICROS;
import static io.trino.spi.type.TimestampType.TIMESTAMP_MILLIS;
import static io.trino.spi.type.TimestampWithTimeZoneType.TIMESTAMP_TZ_MILLIS;
import static io.trino.spi.type.Timestamps.MICROSECONDS_PER_MILLISECOND;
import static io.trino.spi.type.Timestamps.MICROSECONDS_PER_SECOND;
import static io.trino.spi.type.Timestamps.NANOSECONDS_PER_MICROSECOND;
import static io.trino.spi.type.TinyintType.TINYINT;
import static io.trino.spi.type.TypeUtils.readNativeValue;
import static io.trino.spi.type.TypeUtils.writeNativeValue;
import static io.trino.spi.type.VarbinaryType.VARBINARY;
import static java.lang.Float.floatToRawIntBits;
import static java.lang.Float.intBitsToFloat;
import static java.lang.Math.floorDiv;
import static java.lang.Math.floorMod;
import static java.lang.Math.toIntExact;
import static java.nio.charset.StandardCharsets.UTF_8;
import static java.time.ZoneOffset.UTC;
Expand Down Expand Up @@ -141,6 +147,10 @@ public static Object jsonValueToTrinoValue(Type type, @Nullable Object jsonValue
if (type == TIMESTAMP_MILLIS) {
return Instant.parse((String) jsonValue).toEpochMilli() * MICROSECONDS_PER_MILLISECOND;
}
if (type == TIMESTAMP_MICROS) {
Instant instant = Instant.parse((String) jsonValue);
return (instant.getEpochSecond() * MICROSECONDS_PER_SECOND) + (instant.getNano() / NANOSECONDS_PER_MICROSECOND);
}
if (type instanceof RowType rowType) {
Map<?, ?> values = (Map<?, ?>) jsonValue;
List<Type> fieldTypes = rowType.getTypeParameters();
Expand Down Expand Up @@ -200,6 +210,13 @@ public static Object toJsonValue(Type type, @Nullable Object value)
if (type == DateType.DATE) {
return LocalDate.ofEpochDay((long) value).format(ISO_LOCAL_DATE);
}
if (type == TIMESTAMP_MICROS) {
long epochMicros = (long) value;
long epochSeconds = floorDiv(epochMicros, MICROSECONDS_PER_SECOND);
int nanoAdjustment = floorMod(epochMicros, MICROSECONDS_PER_SECOND) * NANOSECONDS_PER_MICROSECOND;
Instant instant = Instant.ofEpochSecond(epochSeconds, nanoAdjustment);
return ISO_INSTANT.format(ZonedDateTime.ofInstant(instant.truncatedTo(MILLIS), UTC));
}
if (type == TIMESTAMP_TZ_MILLIS) {
Instant ts = Instant.ofEpochMilli(unpackMillisUtc((long) value));
return ISO_INSTANT.format(ZonedDateTime.ofInstant(ts, UTC));
Expand Down Expand Up @@ -290,6 +307,22 @@ private static Optional<Object> getMin(Type type, Statistics<?> statistics)
return Optional.of(date.format(ISO_LOCAL_DATE));
}

if (type instanceof TimestampType) {
// Spark truncates the timestamp to milliseconds. The connector follows the same behavior.
if (statistics instanceof LongStatistics longStatistics) {
long epochMicros = longStatistics.genericGetMin();
long epochSeconds = floorDiv(epochMicros, MICROSECONDS_PER_SECOND);
int nanoAdjustment = floorMod(epochMicros, MICROSECONDS_PER_SECOND) * NANOSECONDS_PER_MICROSECOND;
Instant instant = Instant.ofEpochSecond(epochSeconds, nanoAdjustment);
return Optional.of(ISO_INSTANT.format(ZonedDateTime.ofInstant(instant, UTC).truncatedTo(MILLIS)));
}
if (statistics instanceof BinaryStatistics binaryStatistics) {
DecodedTimestamp decodedTimestamp = decodeInt96Timestamp(binaryStatistics.genericGetMin());
Instant instant = Instant.ofEpochSecond(decodedTimestamp.epochSeconds(), decodedTimestamp.nanosOfSecond());
return Optional.of(ISO_INSTANT.format(ZonedDateTime.ofInstant(instant, UTC).truncatedTo(MILLIS)));
}
}

if (type instanceof TimestampWithTimeZoneType) {
if (statistics instanceof LongStatistics) {
Instant ts = Instant.ofEpochMilli(((LongStatistics) statistics).genericGetMin());
Expand Down Expand Up @@ -367,6 +400,32 @@ private static Optional<Object> getMax(Type type, Statistics<?> statistics)
return Optional.of(date.format(ISO_LOCAL_DATE));
}

if (type instanceof TimestampType timestampType) {
// Spark truncates the timestamp to milliseconds. The connector follows the same behavior.
checkArgument(timestampType.getPrecision() >= TIMESTAMP_MILLIS.getPrecision(), "type precision must be at least milliseconds");

if (statistics instanceof LongStatistics longStatistics) {
long epochMicros = longStatistics.genericGetMax();
long epochSeconds = floorDiv(epochMicros, MICROSECONDS_PER_SECOND);
int nanoAdjustment = floorMod(epochMicros, MICROSECONDS_PER_SECOND) * NANOSECONDS_PER_MICROSECOND;
Instant instant = Instant.ofEpochSecond(epochSeconds, nanoAdjustment);
Instant truncatedToMillis = instant.truncatedTo(MILLIS);
if (truncatedToMillis.isBefore(instant)) {
truncatedToMillis = truncatedToMillis.plusMillis(1);
}
return Optional.of(ISO_INSTANT.format(truncatedToMillis));
}
if (statistics instanceof BinaryStatistics binaryStatistics) {
DecodedTimestamp decodedTimestamp = decodeInt96Timestamp(binaryStatistics.genericGetMax());
Instant instant = Instant.ofEpochSecond(decodedTimestamp.epochSeconds(), decodedTimestamp.nanosOfSecond());
Instant truncatedToMillis = instant.truncatedTo(MILLIS);
if (truncatedToMillis.isBefore(instant)) {
truncatedToMillis = truncatedToMillis.plusMillis(1);
}
return Optional.of(ISO_INSTANT.format(truncatedToMillis));
}
}

if (type instanceof TimestampWithTimeZoneType) {
if (statistics instanceof LongStatistics) {
Instant ts = Instant.ofEpochMilli(((LongStatistics) statistics).genericGetMax());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,6 @@
import io.trino.spi.type.MapType;
import io.trino.spi.type.RowType;
import io.trino.spi.type.RowType.Field;
import io.trino.spi.type.TimestampType;
import io.trino.spi.type.Type;
import io.trino.spi.type.TypeManager;
import jakarta.annotation.Nullable;
Expand All @@ -66,6 +65,8 @@
import static io.trino.plugin.deltalake.transactionlog.MetadataEntry.DELTA_CHECKPOINT_WRITE_STATS_AS_JSON_PROPERTY;
import static io.trino.plugin.deltalake.transactionlog.MetadataEntry.DELTA_CHECKPOINT_WRITE_STATS_AS_STRUCT_PROPERTY;
import static io.trino.plugin.deltalake.transactionlog.TransactionLogParser.deserializePartitionValue;
import static io.trino.spi.type.TimestampType.TIMESTAMP_MICROS;
import static io.trino.spi.type.TimestampType.TIMESTAMP_MILLIS;
import static io.trino.spi.type.Timestamps.MICROSECONDS_PER_MILLISECOND;
import static io.trino.spi.type.TypeUtils.writeNativeValue;
import static java.lang.Math.multiplyExact;
Expand Down Expand Up @@ -432,11 +433,15 @@ private Optional<Map<String, Object>> preprocessMinMaxValues(RowType valuesType,
if (isJson) {
return jsonValueToTrinoValue(type, value);
}
if (type instanceof TimestampType) {
if (type == TIMESTAMP_MILLIS) {
// We need to remap TIMESTAMP WITH TIME ZONE -> TIMESTAMP here because of
// inconsistency in what type is used for DL "timestamp" type in data processing and in min/max statistics map.
value = multiplyExact(DateTimeEncoding.unpackMillisUtc((long) value), MICROSECONDS_PER_MILLISECOND);
}
if (type == TIMESTAMP_MICROS) {
// This is TIMESTAMP_NTZ type in Delta Lake
return value;
}
return value;
}));
});
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -617,7 +617,7 @@ private void testDeltaTimestampNtz(ZoneId sessionZone)
"SHOW STATS FOR " + tableName,
"""
VALUES
('x', null, 1.0, 0.1111111111111111, null, null, null),
('x', null, 1.0, 0.1111111111111111, null, '2023-01-02 03:04:05.123000', '2023-01-02 03:04:05.124000'),
(null, null, null, null, 9.0, null, null)
""");

Expand Down Expand Up @@ -722,7 +722,7 @@ private void testTrinoCreateTableWithTimestampNtz(ZoneId sessionZone, Consumer<S
"SHOW STATS FOR " + tableName,
"""
VALUES
('x', null, 8.0, 0.1111111111111111, null, null, null),
('x', null, 8.0, 0.1111111111111111, null, '2023-01-02 03:04:05.123000', '+10000-01-01 00:00:00.000000'),
(null, null, null, null, 9.0, null, null)
""");

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -359,7 +359,6 @@ public void testPartitionsTableDifferentOrderFromDefinitionMultipleColumns()
@Test
public void testPartitionsTableColumnTypes()
{
// TODO: add support for TIMESTAMP column types https://github.com/trinodb/trino/issues/21878
testPartitionsTableColumnTypes("BOOLEAN", "VALUES (true, 'a'), (false, 'a'), (false, 'b'), (false, 'b')", 4, """
VALUES
ROW(ROW(CAST(NULL AS BOOLEAN), CAST(NULL AS BOOLEAN), BIGINT '0')),
Expand Down Expand Up @@ -412,8 +411,8 @@ public void testPartitionsTableColumnTypes()
""");
testPartitionsTableColumnTypes("TIMESTAMP(6)", "VALUES (TIMESTAMP '2001-05-06 12:34:56.123456', 'a'), (TIMESTAMP '2001-05-06 12:34:56.567890', 'a'), (TIMESTAMP '2001-05-06 12:34:56.123456', 'b'), (TIMESTAMP '2001-05-06 12:34:56.123457', 'b')", 4, """
VALUES
ROW(ROW(CAST(NULL AS TIMESTAMP(6)), CAST(NULL AS TIMESTAMP(6)), BIGINT '0')),
ROW(ROW(CAST(NULL AS TIMESTAMP(6)), CAST(NULL AS TIMESTAMP(6)), BIGINT '0'))
ROW(ROW(TIMESTAMP '2001-05-06 12:34:56.123000', TIMESTAMP '2001-05-06 12:34:56.568000', BIGINT '0')),
ROW(ROW(TIMESTAMP '2001-05-06 12:34:56.123000', TIMESTAMP '2001-05-06 12:34:56.124000', BIGINT '0'))
""");
testPartitionsTableColumnTypes("TIMESTAMP(3) WITH TIME ZONE", "VALUES (TIMESTAMP '2001-05-06 12:34:56.123 UTC', 'a'), (TIMESTAMP '2001-05-06 12:34:56.234 -08:30', 'a'), (TIMESTAMP '2001-05-06 12:34:56.567 GMT-08:30', 'b'), (TIMESTAMP '2001-05-06 12:34:56.789 America/New_York', 'b')", 4, """
VALUES
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,13 +24,19 @@
import org.junit.jupiter.api.Test;

import java.nio.ByteBuffer;
import java.time.Instant;
import java.time.LocalDate;
import java.time.LocalDateTime;
import java.util.Optional;

import static io.trino.plugin.deltalake.transactionlog.DeltaLakeParquetStatisticsUtils.jsonValueToTrinoValue;
import static io.trino.spi.type.DateType.DATE;
import static io.trino.spi.type.RealType.REAL;
import static io.trino.spi.type.TimestampType.TIMESTAMP_MICROS;
import static io.trino.spi.type.TimestampWithTimeZoneType.TIMESTAMP_TZ_MILLIS;
import static io.trino.spi.type.Timestamps.MICROSECONDS_PER_MILLISECOND;
import static io.trino.spi.type.Timestamps.MICROSECONDS_PER_SECOND;
import static io.trino.spi.type.Timestamps.NANOSECONDS_PER_MICROSECOND;
import static io.trino.spi.type.VarcharType.createUnboundedVarcharType;
import static java.lang.Math.toIntExact;
import static java.nio.ByteOrder.LITTLE_ENDIAN;
Expand Down Expand Up @@ -113,6 +119,61 @@ public void testDateStatistics()
assertThat(DeltaLakeParquetStatisticsUtils.jsonEncodeMax(ImmutableMap.of(columnName, Optional.of(stats)), ImmutableMap.of(columnName, DATE))).isEqualTo(ImmutableMap.of(columnName, "2020-09-17"));
}

@Test
public void testTimestampJsonValueToTrinoValue()
{
assertThat(jsonValueToTrinoValue(TIMESTAMP_MICROS, "2020-08-26T01:02:03.123Z"))
.isEqualTo(Instant.parse("2020-08-26T01:02:03.123Z").toEpochMilli() * MICROSECONDS_PER_MILLISECOND);
assertThat(jsonValueToTrinoValue(TIMESTAMP_MICROS, "2020-08-26T01:02:03.123111Z"))
.isEqualTo(Instant.parse("2020-08-26T01:02:03Z").getEpochSecond() * MICROSECONDS_PER_SECOND + 123111);
assertThat(jsonValueToTrinoValue(TIMESTAMP_MICROS, "2020-08-26T01:02:03.123999Z"))
.isEqualTo(Instant.parse("2020-08-26T01:02:03Z").getEpochSecond() * MICROSECONDS_PER_SECOND + 123999);
}

@Test
public void testTimestampStatisticsMillisPrecision()
{
String columnName = "t_timestamp";
PrimitiveType type = new PrimitiveType(Type.Repetition.REQUIRED, PrimitiveType.PrimitiveTypeName.INT64, columnName);
Statistics<?> stats = Statistics.getBuilderForReading(type)
.withMin(timestampToBytes(LocalDateTime.parse("2020-08-26T01:02:03.123456")))
.withMax(timestampToBytes(LocalDateTime.parse("2020-08-26T01:02:03.987654")))
.withNumNulls(2)
.build();

assertThat(DeltaLakeParquetStatisticsUtils.jsonEncodeMin(ImmutableMap.of(columnName, Optional.of(stats)), ImmutableMap.of(columnName, TIMESTAMP_MICROS)))
.isEqualTo(ImmutableMap.of(columnName, "2020-08-26T01:02:03.123Z"));
assertThat(DeltaLakeParquetStatisticsUtils.jsonEncodeMax(ImmutableMap.of(columnName, Optional.of(stats)), ImmutableMap.of(columnName, TIMESTAMP_MICROS)))
.isEqualTo(ImmutableMap.of(columnName, "2020-08-26T01:02:03.988Z"));
}

private static byte[] timestampToBytes(LocalDateTime localDateTime)
{
long epochMicros = localDateTime.toEpochSecond(UTC) * MICROSECONDS_PER_SECOND
+ localDateTime.getNano() / NANOSECONDS_PER_MICROSECOND;

Slice slice = Slices.allocate(8);
slice.setLong(0, epochMicros);
return slice.byteArray();
}

@Test
public void testTimestampStatisticsHighPrecision()
{
String columnName = "t_timestamp";
PrimitiveType type = new PrimitiveType(Type.Repetition.REQUIRED, PrimitiveType.PrimitiveTypeName.INT96, columnName);
Statistics<?> stats = Statistics.getBuilderForReading(type)
.withMin(toParquetEncoding(LocalDateTime.parse("2020-08-26T01:02:03.123456789")))
.withMax(toParquetEncoding(LocalDateTime.parse("2020-08-26T01:02:03.123987654")))
.withNumNulls(2)
.build();

assertThat(DeltaLakeParquetStatisticsUtils.jsonEncodeMin(ImmutableMap.of(columnName, Optional.of(stats)), ImmutableMap.of(columnName, TIMESTAMP_MICROS)))
.isEqualTo(ImmutableMap.of(columnName, "2020-08-26T01:02:03.123Z"));
assertThat(DeltaLakeParquetStatisticsUtils.jsonEncodeMax(ImmutableMap.of(columnName, Optional.of(stats)), ImmutableMap.of(columnName, TIMESTAMP_MICROS)))
.isEqualTo(ImmutableMap.of(columnName, "2020-08-26T01:02:03.124Z"));
}

@Test
public void testTimestampWithTimeZoneStatisticsHighPrecision()
{
Expand Down
Loading

0 comments on commit 24b56cc

Please sign in to comment.