Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add support for stats on timestamp type in Delta Lake #22159

Merged
merged 3 commits into from
Jun 12, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import io.trino.spi.type.Int128;
import io.trino.spi.type.MapType;
import io.trino.spi.type.RowType;
import io.trino.spi.type.TimestampType;
import io.trino.spi.type.TimestampWithTimeZoneType;
import io.trino.spi.type.Type;
import io.trino.spi.type.VarcharType;
Expand Down Expand Up @@ -64,14 +65,20 @@
import static io.trino.spi.type.IntegerType.INTEGER;
import static io.trino.spi.type.RealType.REAL;
import static io.trino.spi.type.SmallintType.SMALLINT;
import static io.trino.spi.type.TimestampType.TIMESTAMP_MICROS;
import static io.trino.spi.type.TimestampType.TIMESTAMP_MILLIS;
import static io.trino.spi.type.TimestampWithTimeZoneType.TIMESTAMP_TZ_MILLIS;
import static io.trino.spi.type.Timestamps.MICROSECONDS_PER_MILLISECOND;
import static io.trino.spi.type.Timestamps.MICROSECONDS_PER_SECOND;
import static io.trino.spi.type.Timestamps.NANOSECONDS_PER_MICROSECOND;
import static io.trino.spi.type.TinyintType.TINYINT;
import static io.trino.spi.type.TypeUtils.readNativeValue;
import static io.trino.spi.type.TypeUtils.writeNativeValue;
import static io.trino.spi.type.VarbinaryType.VARBINARY;
import static java.lang.Float.floatToRawIntBits;
import static java.lang.Float.intBitsToFloat;
import static java.lang.Math.floorDiv;
import static java.lang.Math.floorMod;
import static java.lang.Math.toIntExact;
import static java.nio.charset.StandardCharsets.UTF_8;
import static java.time.ZoneOffset.UTC;
Expand Down Expand Up @@ -140,6 +147,10 @@ public static Object jsonValueToTrinoValue(Type type, @Nullable Object jsonValue
if (type == TIMESTAMP_MILLIS) {
return Instant.parse((String) jsonValue).toEpochMilli() * MICROSECONDS_PER_MILLISECOND;
ebyhr marked this conversation as resolved.
Show resolved Hide resolved
}
if (type == TIMESTAMP_MICROS) {
Instant instant = Instant.parse((String) jsonValue);
return (instant.getEpochSecond() * MICROSECONDS_PER_SECOND) + (instant.getNano() / NANOSECONDS_PER_MICROSECOND);
}
if (type instanceof RowType rowType) {
Map<?, ?> values = (Map<?, ?>) jsonValue;
List<Type> fieldTypes = rowType.getTypeParameters();
Expand Down Expand Up @@ -199,6 +210,13 @@ public static Object toJsonValue(Type type, @Nullable Object value)
if (type == DateType.DATE) {
return LocalDate.ofEpochDay((long) value).format(ISO_LOCAL_DATE);
}
if (type == TIMESTAMP_MICROS) {
long epochMicros = (long) value;
long epochSeconds = floorDiv(epochMicros, MICROSECONDS_PER_SECOND);
int nanoAdjustment = floorMod(epochMicros, MICROSECONDS_PER_SECOND) * NANOSECONDS_PER_MICROSECOND;
Instant instant = Instant.ofEpochSecond(epochSeconds, nanoAdjustment);
return ISO_INSTANT.format(ZonedDateTime.ofInstant(instant.truncatedTo(MILLIS), UTC));
}
if (type == TIMESTAMP_TZ_MILLIS) {
Instant ts = Instant.ofEpochMilli(unpackMillisUtc((long) value));
return ISO_INSTANT.format(ZonedDateTime.ofInstant(ts, UTC));
Expand Down Expand Up @@ -289,6 +307,22 @@ private static Optional<Object> getMin(Type type, Statistics<?> statistics)
return Optional.of(date.format(ISO_LOCAL_DATE));
}

if (type instanceof TimestampType) {
// Spark truncates the timestamp to milliseconds. The connector follows the same behavior.
if (statistics instanceof LongStatistics longStatistics) {
long epochMicros = longStatistics.genericGetMin();
long epochSeconds = floorDiv(epochMicros, MICROSECONDS_PER_SECOND);
int nanoAdjustment = floorMod(epochMicros, MICROSECONDS_PER_SECOND) * NANOSECONDS_PER_MICROSECOND;
Instant instant = Instant.ofEpochSecond(epochSeconds, nanoAdjustment);
return Optional.of(ISO_INSTANT.format(ZonedDateTime.ofInstant(instant, UTC).truncatedTo(MILLIS)));
}
if (statistics instanceof BinaryStatistics binaryStatistics) {
DecodedTimestamp decodedTimestamp = decodeInt96Timestamp(binaryStatistics.genericGetMin());
Instant instant = Instant.ofEpochSecond(decodedTimestamp.epochSeconds(), decodedTimestamp.nanosOfSecond());
return Optional.of(ISO_INSTANT.format(ZonedDateTime.ofInstant(instant, UTC).truncatedTo(MILLIS)));
}
}

if (type instanceof TimestampWithTimeZoneType) {
if (statistics instanceof LongStatistics) {
Instant ts = Instant.ofEpochMilli(((LongStatistics) statistics).genericGetMin());
Expand Down Expand Up @@ -344,8 +378,8 @@ private static Optional<Object> getMin(Type type, Statistics<?> statistics)
return Optional.of(new String(((BinaryStatistics) statistics).genericGetMin().getBytes(), UTF_8));
}

if (type.equals(BOOLEAN)) {
// Boolean columns do not collect min/max stats
if (type.equals(BOOLEAN) || type.equals(VARBINARY)) {
// Boolean and varbinary columns do not collect min/max stats
ebyhr marked this conversation as resolved.
Show resolved Hide resolved
return Optional.empty();
}

Expand All @@ -366,6 +400,32 @@ private static Optional<Object> getMax(Type type, Statistics<?> statistics)
return Optional.of(date.format(ISO_LOCAL_DATE));
}

if (type instanceof TimestampType timestampType) {
// Spark truncates the timestamp to milliseconds. The connector follows the same behavior.
checkArgument(timestampType.getPrecision() >= TIMESTAMP_MILLIS.getPrecision(), "type precision must be at least milliseconds");

if (statistics instanceof LongStatistics longStatistics) {
long epochMicros = longStatistics.genericGetMax();
long epochSeconds = floorDiv(epochMicros, MICROSECONDS_PER_SECOND);
int nanoAdjustment = floorMod(epochMicros, MICROSECONDS_PER_SECOND) * NANOSECONDS_PER_MICROSECOND;
Instant instant = Instant.ofEpochSecond(epochSeconds, nanoAdjustment);
Instant truncatedToMillis = instant.truncatedTo(MILLIS);
if (truncatedToMillis.isBefore(instant)) {
truncatedToMillis = truncatedToMillis.plusMillis(1);
}
return Optional.of(ISO_INSTANT.format(truncatedToMillis));
}
if (statistics instanceof BinaryStatistics binaryStatistics) {
DecodedTimestamp decodedTimestamp = decodeInt96Timestamp(binaryStatistics.genericGetMax());
Instant instant = Instant.ofEpochSecond(decodedTimestamp.epochSeconds(), decodedTimestamp.nanosOfSecond());
Instant truncatedToMillis = instant.truncatedTo(MILLIS);
if (truncatedToMillis.isBefore(instant)) {
truncatedToMillis = truncatedToMillis.plusMillis(1);
}
return Optional.of(ISO_INSTANT.format(truncatedToMillis));
}
}

if (type instanceof TimestampWithTimeZoneType) {
if (statistics instanceof LongStatistics) {
Instant ts = Instant.ofEpochMilli(((LongStatistics) statistics).genericGetMax());
Expand Down Expand Up @@ -424,8 +484,8 @@ private static Optional<Object> getMax(Type type, Statistics<?> statistics)
return Optional.of(new String(((BinaryStatistics) statistics).genericGetMax().getBytes(), UTF_8));
}

if (type.equals(BOOLEAN)) {
// Boolean columns do not collect min/max stats
if (type.equals(BOOLEAN) || type.equals(VARBINARY)) {
// Boolean and varbinary columns do not collect min/max stats
return Optional.empty();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,6 @@
import io.trino.spi.type.MapType;
import io.trino.spi.type.RowType;
import io.trino.spi.type.RowType.Field;
import io.trino.spi.type.TimestampType;
import io.trino.spi.type.Type;
import io.trino.spi.type.TypeManager;
import jakarta.annotation.Nullable;
Expand All @@ -66,6 +65,8 @@
import static io.trino.plugin.deltalake.transactionlog.MetadataEntry.DELTA_CHECKPOINT_WRITE_STATS_AS_JSON_PROPERTY;
import static io.trino.plugin.deltalake.transactionlog.MetadataEntry.DELTA_CHECKPOINT_WRITE_STATS_AS_STRUCT_PROPERTY;
import static io.trino.plugin.deltalake.transactionlog.TransactionLogParser.deserializePartitionValue;
import static io.trino.spi.type.TimestampType.TIMESTAMP_MICROS;
import static io.trino.spi.type.TimestampType.TIMESTAMP_MILLIS;
import static io.trino.spi.type.Timestamps.MICROSECONDS_PER_MILLISECOND;
import static io.trino.spi.type.TypeUtils.writeNativeValue;
import static java.lang.Math.multiplyExact;
Expand Down Expand Up @@ -432,11 +433,15 @@ private Optional<Map<String, Object>> preprocessMinMaxValues(RowType valuesType,
if (isJson) {
return jsonValueToTrinoValue(type, value);
}
if (type instanceof TimestampType) {
if (type == TIMESTAMP_MILLIS) {
// We need to remap TIMESTAMP WITH TIME ZONE -> TIMESTAMP here because of
// inconsistency in what type is used for DL "timestamp" type in data processing and in min/max statistics map.
value = multiplyExact(DateTimeEncoding.unpackMillisUtc((long) value), MICROSECONDS_PER_MILLISECOND);
}
if (type == TIMESTAMP_MICROS) {
// This is TIMESTAMP_NTZ type in Delta Lake
return value;
}
return value;
}));
});
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -617,7 +617,7 @@ private void testDeltaTimestampNtz(ZoneId sessionZone)
"SHOW STATS FOR " + tableName,
"""
VALUES
('x', null, 1.0, 0.1111111111111111, null, null, null),
('x', null, 1.0, 0.1111111111111111, null, '2023-01-02 03:04:05.123000', '2023-01-02 03:04:05.124000'),
(null, null, null, null, 9.0, null, null)
""");

Expand Down Expand Up @@ -722,7 +722,7 @@ private void testTrinoCreateTableWithTimestampNtz(ZoneId sessionZone, Consumer<S
"SHOW STATS FOR " + tableName,
"""
VALUES
('x', null, 8.0, 0.1111111111111111, null, null, null),
('x', null, 8.0, 0.1111111111111111, null, '2023-01-02 03:04:05.123000', '+10000-01-01 00:00:00.000000'),
(null, null, null, null, 9.0, null, null)
""");

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -359,7 +359,6 @@ public void testPartitionsTableDifferentOrderFromDefinitionMultipleColumns()
@Test
public void testPartitionsTableColumnTypes()
{
// TODO: add support for BOOLEAN, TIMESTAMP, and VARBINARY column types https://github.com/trinodb/trino/issues/21878
testPartitionsTableColumnTypes("BOOLEAN", "VALUES (true, 'a'), (false, 'a'), (false, 'b'), (false, 'b')", 4, """
VALUES
ROW(ROW(CAST(NULL AS BOOLEAN), CAST(NULL AS BOOLEAN), BIGINT '0')),
Expand Down Expand Up @@ -412,8 +411,8 @@ public void testPartitionsTableColumnTypes()
""");
testPartitionsTableColumnTypes("TIMESTAMP(6)", "VALUES (TIMESTAMP '2001-05-06 12:34:56.123456', 'a'), (TIMESTAMP '2001-05-06 12:34:56.567890', 'a'), (TIMESTAMP '2001-05-06 12:34:56.123456', 'b'), (TIMESTAMP '2001-05-06 12:34:56.123457', 'b')", 4, """
VALUES
ROW(ROW(CAST(NULL AS TIMESTAMP(6)), CAST(NULL AS TIMESTAMP(6)), BIGINT '0')),
ROW(ROW(CAST(NULL AS TIMESTAMP(6)), CAST(NULL AS TIMESTAMP(6)), BIGINT '0'))
ROW(ROW(TIMESTAMP '2001-05-06 12:34:56.123000', TIMESTAMP '2001-05-06 12:34:56.568000', BIGINT '0')),
ROW(ROW(TIMESTAMP '2001-05-06 12:34:56.123000', TIMESTAMP '2001-05-06 12:34:56.124000', BIGINT '0'))
""");
testPartitionsTableColumnTypes("TIMESTAMP(3) WITH TIME ZONE", "VALUES (TIMESTAMP '2001-05-06 12:34:56.123 UTC', 'a'), (TIMESTAMP '2001-05-06 12:34:56.234 -08:30', 'a'), (TIMESTAMP '2001-05-06 12:34:56.567 GMT-08:30', 'b'), (TIMESTAMP '2001-05-06 12:34:56.789 America/New_York', 'b')", 4, """
VALUES
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,13 +24,19 @@
import org.junit.jupiter.api.Test;

import java.nio.ByteBuffer;
import java.time.Instant;
import java.time.LocalDate;
import java.time.LocalDateTime;
import java.util.Optional;

import static io.trino.plugin.deltalake.transactionlog.DeltaLakeParquetStatisticsUtils.jsonValueToTrinoValue;
import static io.trino.spi.type.DateType.DATE;
import static io.trino.spi.type.RealType.REAL;
import static io.trino.spi.type.TimestampType.TIMESTAMP_MICROS;
import static io.trino.spi.type.TimestampWithTimeZoneType.TIMESTAMP_TZ_MILLIS;
import static io.trino.spi.type.Timestamps.MICROSECONDS_PER_MILLISECOND;
import static io.trino.spi.type.Timestamps.MICROSECONDS_PER_SECOND;
import static io.trino.spi.type.Timestamps.NANOSECONDS_PER_MICROSECOND;
import static io.trino.spi.type.VarcharType.createUnboundedVarcharType;
import static java.lang.Math.toIntExact;
import static java.nio.ByteOrder.LITTLE_ENDIAN;
Expand Down Expand Up @@ -113,8 +119,63 @@ public void testDateStatistics()
assertThat(DeltaLakeParquetStatisticsUtils.jsonEncodeMax(ImmutableMap.of(columnName, Optional.of(stats)), ImmutableMap.of(columnName, DATE))).isEqualTo(ImmutableMap.of(columnName, "2020-09-17"));
}

@Test
public void testTimestampJsonValueToTrinoValue()
{
assertThat(jsonValueToTrinoValue(TIMESTAMP_MICROS, "2020-08-26T01:02:03.123Z"))
.isEqualTo(Instant.parse("2020-08-26T01:02:03.123Z").toEpochMilli() * MICROSECONDS_PER_MILLISECOND);
assertThat(jsonValueToTrinoValue(TIMESTAMP_MICROS, "2020-08-26T01:02:03.123111Z"))
.isEqualTo(Instant.parse("2020-08-26T01:02:03Z").getEpochSecond() * MICROSECONDS_PER_SECOND + 123111);
assertThat(jsonValueToTrinoValue(TIMESTAMP_MICROS, "2020-08-26T01:02:03.123999Z"))
.isEqualTo(Instant.parse("2020-08-26T01:02:03Z").getEpochSecond() * MICROSECONDS_PER_SECOND + 123999);
}

@Test
public void testTimestampStatisticsMillisPrecision()
{
String columnName = "t_timestamp";
PrimitiveType type = new PrimitiveType(Type.Repetition.REQUIRED, PrimitiveType.PrimitiveTypeName.INT64, columnName);
Statistics<?> stats = Statistics.getBuilderForReading(type)
.withMin(timestampToBytes(LocalDateTime.parse("2020-08-26T01:02:03.123456")))
.withMax(timestampToBytes(LocalDateTime.parse("2020-08-26T01:02:03.987654")))
.withNumNulls(2)
.build();

assertThat(DeltaLakeParquetStatisticsUtils.jsonEncodeMin(ImmutableMap.of(columnName, Optional.of(stats)), ImmutableMap.of(columnName, TIMESTAMP_MICROS)))
.isEqualTo(ImmutableMap.of(columnName, "2020-08-26T01:02:03.123Z"));
assertThat(DeltaLakeParquetStatisticsUtils.jsonEncodeMax(ImmutableMap.of(columnName, Optional.of(stats)), ImmutableMap.of(columnName, TIMESTAMP_MICROS)))
.isEqualTo(ImmutableMap.of(columnName, "2020-08-26T01:02:03.988Z"));
}

private static byte[] timestampToBytes(LocalDateTime localDateTime)
{
long epochMicros = localDateTime.toEpochSecond(UTC) * MICROSECONDS_PER_SECOND
+ localDateTime.getNano() / NANOSECONDS_PER_MICROSECOND;

Slice slice = Slices.allocate(8);
slice.setLong(0, epochMicros);
return slice.byteArray();
}

@Test
public void testTimestampStatisticsHighPrecision()
{
String columnName = "t_timestamp";
PrimitiveType type = new PrimitiveType(Type.Repetition.REQUIRED, PrimitiveType.PrimitiveTypeName.INT96, columnName);
Statistics<?> stats = Statistics.getBuilderForReading(type)
.withMin(toParquetEncoding(LocalDateTime.parse("2020-08-26T01:02:03.123456789")))
.withMax(toParquetEncoding(LocalDateTime.parse("2020-08-26T01:02:03.123987654")))
.withNumNulls(2)
.build();

assertThat(DeltaLakeParquetStatisticsUtils.jsonEncodeMin(ImmutableMap.of(columnName, Optional.of(stats)), ImmutableMap.of(columnName, TIMESTAMP_MICROS)))
.isEqualTo(ImmutableMap.of(columnName, "2020-08-26T01:02:03.123Z"));
assertThat(DeltaLakeParquetStatisticsUtils.jsonEncodeMax(ImmutableMap.of(columnName, Optional.of(stats)), ImmutableMap.of(columnName, TIMESTAMP_MICROS)))
.isEqualTo(ImmutableMap.of(columnName, "2020-08-26T01:02:03.124Z"));
}

@Test
public void testTimestampWithTimeZoneStatisticsHighPrecision()
{
String columnName = "t_timestamp";
PrimitiveType type = new PrimitiveType(Type.Repetition.REQUIRED, PrimitiveType.PrimitiveTypeName.INT96, columnName);
Expand All @@ -129,7 +190,7 @@ public void testTimestampStatisticsHighPrecision()
}

@Test
public void testTimestampStatisticsMillisPrecision()
public void testTimestampWithTimeZoneStatisticsMillisPrecision()
{
String columnName = "t_timestamp";
PrimitiveType type = new PrimitiveType(Type.Repetition.REQUIRED, PrimitiveType.PrimitiveTypeName.INT96, columnName);
Expand Down
Loading