Skip to content

Commit

Permalink
Fix failure to read nullable time(6) columns
Browse files Browse the repository at this point in the history
  • Loading branch information
joejensen authored and ebyhr committed Jul 17, 2023
1 parent f87aafe commit 77dc3fd
Show file tree
Hide file tree
Showing 8 changed files with 196 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@
import io.trino.orc.metadata.statistics.StringStatistics;
import io.trino.orc.metadata.statistics.StringStatisticsBuilder;
import io.trino.orc.metadata.statistics.StripeStatistics;
import io.trino.orc.metadata.statistics.TimeMicrosStatisticsBuilder;
import io.trino.orc.metadata.statistics.TimestampStatisticsBuilder;
import io.trino.spi.Page;
import io.trino.spi.TrinoException;
Expand Down Expand Up @@ -93,6 +94,7 @@
import static io.trino.spi.type.IntegerType.INTEGER;
import static io.trino.spi.type.RealType.REAL;
import static io.trino.spi.type.SmallintType.SMALLINT;
import static io.trino.spi.type.TimeType.TIME_MICROS;
import static io.trino.spi.type.TimestampType.TIMESTAMP_MICROS;
import static io.trino.spi.type.TimestampType.TIMESTAMP_MILLIS;
import static io.trino.spi.type.TimestampType.TIMESTAMP_NANOS;
Expand Down Expand Up @@ -628,6 +630,11 @@ else if (VARBINARY.equals(type) || UUID.equals(type)) {
fieldExtractor = ignored -> ImmutableList.of();
fieldBuilders = ImmutableList.of();
}
else if (TIME_MICROS.equals(type)) {
statisticsBuilder = new TimeMicrosStatisticsBuilder(new NoOpBloomFilterBuilder());
fieldExtractor = ignored -> ImmutableList.of();
fieldBuilders = ImmutableList.of();
}
else if (DATE.equals(type)) {
statisticsBuilder = new DateStatisticsBuilder(new NoOpBloomFilterBuilder());
fieldExtractor = ignored -> ImmutableList.of();
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,93 @@
/*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.trino.orc.metadata.statistics;

import io.trino.spi.block.Block;
import io.trino.spi.type.Type;

import java.util.Optional;

import static io.trino.orc.metadata.statistics.IntegerStatistics.INTEGER_VALUE_BYTES;
import static io.trino.spi.type.Timestamps.PICOSECONDS_PER_MICROSECOND;
import static java.lang.Math.addExact;
import static java.util.Objects.requireNonNull;

public class TimeMicrosStatisticsBuilder
implements LongValueStatisticsBuilder
{
private long nonNullValueCount;
private long minimum = Long.MAX_VALUE;
private long maximum = Long.MIN_VALUE;
private long sum;
private boolean overflow;

private final BloomFilterBuilder bloomFilterBuilder;

public TimeMicrosStatisticsBuilder(BloomFilterBuilder bloomFilterBuilder)
{
this.bloomFilterBuilder = requireNonNull(bloomFilterBuilder, "bloomFilterBuilder is null");
}

@Override
public long getValueFromBlock(Type type, Block block, int position)
{
return type.getLong(block, position) / PICOSECONDS_PER_MICROSECOND;
}

@Override
public void addValue(long value)
{
nonNullValueCount++;

minimum = Math.min(value, minimum);
maximum = Math.max(value, maximum);

if (!overflow) {
try {
sum = addExact(sum, value);
}
catch (ArithmeticException e) {
overflow = true;
}
}
bloomFilterBuilder.addLong(value);
}

private Optional<IntegerStatistics> buildIntegerStatistics()
{
if (nonNullValueCount == 0) {
return Optional.empty();
}
return Optional.of(new IntegerStatistics(minimum, maximum, overflow ? null : sum));
}

@Override
public ColumnStatistics buildColumnStatistics()
{
Optional<IntegerStatistics> integerStatistics = buildIntegerStatistics();
return new ColumnStatistics(
nonNullValueCount,
integerStatistics.map(s -> INTEGER_VALUE_BYTES).orElse(0L),
null,
integerStatistics.orElse(null),
null,
null,
null,
null,
null,
null,
null,
bloomFilterBuilder.buildBloomFilter());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -188,6 +188,9 @@ private Block readNullBlock(boolean[] isNull, int nonNullCount)
if (type instanceof BigintType) {
return longReadNullBlock(isNull, nonNullCount);
}
if (type instanceof TimeType) {
return longReadNullBlock(isNull, nonNullCount);
}
if (type instanceof IntegerType || type instanceof DateType) {
return intReadNullBlock(isNull, nonNullCount);
}
Expand All @@ -209,6 +212,7 @@ private Block longReadNullBlock(boolean[] isNull, int nonNullCount)

dataStream.next(longNonNullValueTemp, nonNullCount);

maybeTransformValues(longNonNullValueTemp, nonNullCount);
long[] result = unpackLongNulls(longNonNullValueTemp, isNull);

return new LongArrayBlock(nextBatchSize, Optional.of(isNull), result);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import io.trino.orc.metadata.statistics.DoubleStatisticsBuilder;
import io.trino.orc.metadata.statistics.IntegerStatisticsBuilder;
import io.trino.orc.metadata.statistics.StringStatisticsBuilder;
import io.trino.orc.metadata.statistics.TimeMicrosStatisticsBuilder;
import io.trino.orc.metadata.statistics.TimestampStatisticsBuilder;
import io.trino.spi.type.TimeType;
import io.trino.spi.type.Type;
Expand Down Expand Up @@ -57,7 +58,7 @@ public static ColumnWriter createColumnWriter(
checkArgument(timeType.getPrecision() == 6, "%s not supported for ORC writer", type);
checkArgument(orcType.getOrcTypeKind() == LONG, "wrong ORC type %s for type %s", orcType, type);
checkArgument("TIME".equals(orcType.getAttributes().get(ICEBERG_LONG_TYPE)), "wrong attributes %s for type %s", orcType.getAttributes(), type);
return new TimeColumnWriter(columnId, type, compression, bufferSize, () -> new IntegerStatisticsBuilder(bloomFilterBuilder.get()));
return new TimeColumnWriter(columnId, type, compression, bufferSize, () -> new TimeMicrosStatisticsBuilder(bloomFilterBuilder.get()));
}
switch (orcType.getOrcTypeKind()) {
case BOOLEAN:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import io.trino.spi.type.DecimalType;
import io.trino.spi.type.SqlDate;
import io.trino.spi.type.SqlDecimal;
import io.trino.spi.type.SqlTime;
import io.trino.spi.type.SqlTimestamp;
import io.trino.spi.type.SqlTimestampWithTimeZone;
import io.trino.spi.type.SqlVarbinary;
Expand Down Expand Up @@ -53,6 +54,7 @@
import static io.trino.spi.type.IntegerType.INTEGER;
import static io.trino.spi.type.RealType.REAL;
import static io.trino.spi.type.SmallintType.SMALLINT;
import static io.trino.spi.type.TimeType.TIME_MICROS;
import static io.trino.spi.type.TimeZoneKey.UTC_KEY;
import static io.trino.spi.type.TimestampType.TIMESTAMP_MICROS;
import static io.trino.spi.type.TimestampType.TIMESTAMP_MILLIS;
Expand Down Expand Up @@ -316,6 +318,25 @@ public void testTimestampMillis()
tester.testRoundTrip(TIMESTAMP_MILLIS, newArrayList(limit(cycle(map.values()), 30_000)));
}

@Test
public void testTimeMicros()
throws Exception
{
Map<String, SqlTime> map = ImmutableMap.<String, SqlTime>builder()
.put("00:00:00.000000", SqlTime.newInstance(6, 0L))
.put("12:05:19.257000", SqlTime.newInstance(6, 43519257000000000L))
.put("17:37:07.638000", SqlTime.newInstance(6, 63427638000000000L))
.put("05:17:37.346000", SqlTime.newInstance(6, 19057346000000000L))
.put("06:09:00.988000", SqlTime.newInstance(6, 22140988000000000L))
.put("13:31:34.185000", SqlTime.newInstance(6, 48694185000000000L))
.put("01:09:07.185000", SqlTime.newInstance(6, 4147185000000000L))
.put("20:43:39.822000", SqlTime.newInstance(6, 74619822000000000L))
.put("23:59:59.999000", SqlTime.newInstance(6, 86399999000000000L))
.buildOrThrow();
map.forEach((expected, value) -> assertEquals(value.toString(), expected));
tester.testRoundTrip(TIME_MICROS, newArrayList(limit(cycle(map.values()), 30_000)));
}

@Test
public void testTimestampMicros()
throws Exception
Expand Down
47 changes: 46 additions & 1 deletion lib/trino-orc/src/test/java/io/trino/orc/OrcTester.java
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@
import io.trino.spi.type.RowType;
import io.trino.spi.type.SqlDate;
import io.trino.spi.type.SqlDecimal;
import io.trino.spi.type.SqlTime;
import io.trino.spi.type.SqlTimestamp;
import io.trino.spi.type.SqlTimestampWithTimeZone;
import io.trino.spi.type.SqlVarbinary;
Expand Down Expand Up @@ -139,7 +140,9 @@
import static io.trino.orc.metadata.CompressionKind.ZLIB;
import static io.trino.orc.metadata.CompressionKind.ZSTD;
import static io.trino.orc.metadata.OrcType.OrcTypeKind.BINARY;
import static io.trino.orc.metadata.OrcType.OrcTypeKind.LONG;
import static io.trino.orc.reader.ColumnReaders.ICEBERG_BINARY_TYPE;
import static io.trino.orc.reader.ColumnReaders.ICEBERG_LONG_TYPE;
import static io.trino.spi.type.BigintType.BIGINT;
import static io.trino.spi.type.BooleanType.BOOLEAN;
import static io.trino.spi.type.Chars.truncateToLengthAndTrimSpaces;
Expand All @@ -150,6 +153,7 @@
import static io.trino.spi.type.IntegerType.INTEGER;
import static io.trino.spi.type.RealType.REAL;
import static io.trino.spi.type.SmallintType.SMALLINT;
import static io.trino.spi.type.TimeType.TIME_MICROS;
import static io.trino.spi.type.TimeZoneKey.UTC_KEY;
import static io.trino.spi.type.TimestampType.TIMESTAMP_MICROS;
import static io.trino.spi.type.TimestampType.TIMESTAMP_MILLIS;
Expand Down Expand Up @@ -443,7 +447,10 @@ private void assertRoundTrip(Type writeType, Type readType, List<?> writeValues,
{
OrcWriterStats stats = new OrcWriterStats();
for (CompressionKind compression : compressions) {
boolean hiveSupported = (compression != LZ4) && (compression != ZSTD) && !isTimestampTz(writeType) && !isTimestampTz(readType) && !isUuid(writeType) && !isUuid(readType);
boolean hiveSupported = (compression != LZ4) && (compression != ZSTD)
&& !containsTimeMicros(writeType) && !containsTimeMicros(readType)
&& !isTimestampTz(writeType) && !isTimestampTz(readType)
&& !isUuid(writeType) && !isUuid(readType);

for (Format format : formats) {
// write Hive, read Trino
Expand Down Expand Up @@ -669,6 +676,16 @@ public static void writeOrcColumnTrino(File outputFile, CompressionKind compress
Optional.empty(),
ImmutableMap.of(ICEBERG_BINARY_TYPE, "UUID")));
}
if (TIME_MICROS.equals(mappedType)) {
return Optional.of(new OrcType(
LONG,
ImmutableList.of(),
ImmutableList.of(),
Optional.empty(),
Optional.empty(),
Optional.empty(),
ImmutableMap.of(ICEBERG_LONG_TYPE, "TIME")));
}
return Optional.empty();
}));

Expand Down Expand Up @@ -740,6 +757,9 @@ else if (DATE.equals(type)) {
long days = ((SqlDate) value).getDays();
type.writeLong(blockBuilder, days);
}
else if (TIME_MICROS.equals(type)) {
type.writeLong(blockBuilder, ((SqlTime) value).getPicos());
}
else if (TIMESTAMP_MILLIS.equals(type)) {
type.writeLong(blockBuilder, ((SqlTimestamp) value).getEpochMicros());
}
Expand Down Expand Up @@ -1077,6 +1097,9 @@ private static ObjectInspector getJavaObjectInspector(Type type)
if (type.equals(DATE)) {
return javaDateObjectInspector;
}
if (type.equals(TIME_MICROS)) {
return javaLongObjectInspector;
}
if (type.equals(TIMESTAMP_MILLIS) || type.equals(TIMESTAMP_MICROS) || type.equals(TIMESTAMP_NANOS)) {
return javaTimestampObjectInspector;
}
Expand Down Expand Up @@ -1148,6 +1171,9 @@ private static Object preprocessWriteValueHive(Type type, Object value)
if (type.equals(DATE)) {
return Date.ofEpochDay(((SqlDate) value).getDays());
}
if (type.equals(TIME_MICROS)) {
return ((SqlTime) value).getPicos() / PICOSECONDS_PER_MICROSECOND;
}
if (type.equals(TIMESTAMP_MILLIS) || type.equals(TIMESTAMP_MICROS) || type.equals(TIMESTAMP_NANOS)) {
LocalDateTime dateTime = ((SqlTimestamp) value).toLocalDateTime();
return Timestamp.ofEpochSecond(dateTime.toEpochSecond(ZoneOffset.UTC), dateTime.getNano());
Expand Down Expand Up @@ -1351,6 +1377,25 @@ private static Type rowType(Type... fieldTypes)
return TESTING_TYPE_MANAGER.getParameterizedType(StandardTypes.ROW, typeSignatureParameters.build());
}

private static boolean containsTimeMicros(Type type)
{
if (type.equals(TIME_MICROS)) {
return true;
}
if (type instanceof ArrayType arrayType) {
return containsTimeMicros(arrayType.getElementType());
}
if (type instanceof MapType mapType) {
return containsTimeMicros(mapType.getKeyType()) || containsTimeMicros(mapType.getValueType());
}
if (type instanceof RowType rowType) {
return rowType.getFields().stream()
.map(RowType.Field::getType)
.anyMatch(OrcTester::containsTimeMicros);
}
return false;
}

private static boolean isTimestampTz(Type type)
{
if (type instanceof TimestampWithTimeZoneType) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import io.trino.spi.type.RowType;
import io.trino.spi.type.SqlDate;
import io.trino.spi.type.SqlDecimal;
import io.trino.spi.type.SqlTime;
import io.trino.spi.type.SqlTimestamp;
import io.trino.spi.type.SqlTimestampWithTimeZone;
import io.trino.spi.type.Type;
Expand All @@ -51,13 +52,15 @@
import static io.trino.spi.type.IntegerType.INTEGER;
import static io.trino.spi.type.RealType.REAL;
import static io.trino.spi.type.SmallintType.SMALLINT;
import static io.trino.spi.type.TimeType.TIME_MICROS;
import static io.trino.spi.type.TimeZoneKey.UTC_KEY;
import static io.trino.spi.type.TimestampType.TIMESTAMP_MICROS;
import static io.trino.spi.type.TimestampType.TIMESTAMP_MILLIS;
import static io.trino.spi.type.TimestampType.TIMESTAMP_NANOS;
import static io.trino.spi.type.TimestampWithTimeZoneType.TIMESTAMP_TZ_MICROS;
import static io.trino.spi.type.TimestampWithTimeZoneType.TIMESTAMP_TZ_MILLIS;
import static io.trino.spi.type.TimestampWithTimeZoneType.TIMESTAMP_TZ_NANOS;
import static io.trino.spi.type.Timestamps.PICOSECONDS_PER_MICROSECOND;
import static io.trino.spi.type.TinyintType.TINYINT;
import static io.trino.spi.type.UuidType.UUID;
import static java.util.stream.Collectors.toList;
Expand Down Expand Up @@ -103,6 +106,9 @@ public static OrcPredicate createOrcPredicate(Type type, Iterable<?> values)
return new DecimalOrcPredicate(expectedValues);
}

if (TIME_MICROS.equals(type)) {
return new LongOrcPredicate(false, transform(expectedValues, value -> ((SqlTime) value).getPicos() / PICOSECONDS_PER_MICROSECOND));
}
if (TIMESTAMP_MILLIS.equals(type)) {
return new LongOrcPredicate(false, transform(expectedValues, value -> ((SqlTimestamp) value).getMillis()));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -135,6 +135,23 @@ private void testReadSingleIntegerColumnOrcFile(String orcFileResourceName, int
}
}

@Test
public void testTimeType()
{
// Regression test for https://github.com/trinodb/trino/issues/15603
try (TestTable table = new TestTable(getQueryRunner()::execute, "test_time", "(col time(6))")) {
assertUpdate("INSERT INTO " + table.getName() + " VALUES (TIME '13:30:00'), (TIME '14:30:00'), (NULL)", 3);
assertQuery("SELECT * FROM " + table.getName(), "VALUES '13:30:00', '14:30:00', NULL");
assertQuery(
"SHOW STATS FOR " + table.getName(),
"""
VALUES
('col', null, 2.0, 0.33333333333, null, null, null),
(null, null, null, null, 3, null, null)
""");
}
}

@Override
public void testDropAmbiguousRowFieldCaseSensitivity()
{
Expand Down

0 comments on commit 77dc3fd

Please sign in to comment.