Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix failure to read nullable time(6) columns #15606

Merged
merged 2 commits into from
Jul 17, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@
import io.trino.orc.metadata.statistics.StringStatistics;
import io.trino.orc.metadata.statistics.StringStatisticsBuilder;
import io.trino.orc.metadata.statistics.StripeStatistics;
import io.trino.orc.metadata.statistics.TimeMicrosStatisticsBuilder;
import io.trino.orc.metadata.statistics.TimestampStatisticsBuilder;
import io.trino.spi.Page;
import io.trino.spi.TrinoException;
Expand Down Expand Up @@ -93,6 +94,7 @@
import static io.trino.spi.type.IntegerType.INTEGER;
import static io.trino.spi.type.RealType.REAL;
import static io.trino.spi.type.SmallintType.SMALLINT;
import static io.trino.spi.type.TimeType.TIME_MICROS;
import static io.trino.spi.type.TimestampType.TIMESTAMP_MICROS;
import static io.trino.spi.type.TimestampType.TIMESTAMP_MILLIS;
import static io.trino.spi.type.TimestampType.TIMESTAMP_NANOS;
Expand Down Expand Up @@ -628,6 +630,11 @@ else if (VARBINARY.equals(type) || UUID.equals(type)) {
fieldExtractor = ignored -> ImmutableList.of();
fieldBuilders = ImmutableList.of();
}
else if (TIME_MICROS.equals(type)) {
statisticsBuilder = new TimeMicrosStatisticsBuilder(new NoOpBloomFilterBuilder());
fieldExtractor = ignored -> ImmutableList.of();
fieldBuilders = ImmutableList.of();
}
else if (DATE.equals(type)) {
statisticsBuilder = new DateStatisticsBuilder(new NoOpBloomFilterBuilder());
fieldExtractor = ignored -> ImmutableList.of();
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,93 @@
/*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.trino.orc.metadata.statistics;

import io.trino.spi.block.Block;
import io.trino.spi.type.Type;

import java.util.Optional;

import static io.trino.orc.metadata.statistics.IntegerStatistics.INTEGER_VALUE_BYTES;
import static io.trino.spi.type.Timestamps.PICOSECONDS_PER_MICROSECOND;
import static java.lang.Math.addExact;
import static java.util.Objects.requireNonNull;

public class TimeMicrosStatisticsBuilder
implements LongValueStatisticsBuilder
{
private long nonNullValueCount;
private long minimum = Long.MAX_VALUE;
private long maximum = Long.MIN_VALUE;
private long sum;
private boolean overflow;

private final BloomFilterBuilder bloomFilterBuilder;

public TimeMicrosStatisticsBuilder(BloomFilterBuilder bloomFilterBuilder)
{
this.bloomFilterBuilder = requireNonNull(bloomFilterBuilder, "bloomFilterBuilder is null");
}

@Override
public long getValueFromBlock(Type type, Block block, int position)
{
return type.getLong(block, position) / PICOSECONDS_PER_MICROSECOND;
}

@Override
public void addValue(long value)
{
nonNullValueCount++;

minimum = Math.min(value, minimum);
maximum = Math.max(value, maximum);

if (!overflow) {
try {
sum = addExact(sum, value);
}
catch (ArithmeticException e) {
overflow = true;
}
}
bloomFilterBuilder.addLong(value);
}

private Optional<IntegerStatistics> buildIntegerStatistics()
{
if (nonNullValueCount == 0) {
return Optional.empty();
}
return Optional.of(new IntegerStatistics(minimum, maximum, overflow ? null : sum));
}

@Override
public ColumnStatistics buildColumnStatistics()
{
Optional<IntegerStatistics> integerStatistics = buildIntegerStatistics();
return new ColumnStatistics(
nonNullValueCount,
integerStatistics.map(s -> INTEGER_VALUE_BYTES).orElse(0L),
null,
integerStatistics.orElse(null),
null,
null,
null,
null,
null,
null,
null,
bloomFilterBuilder.buildBloomFilter());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
public final class ColumnReaders
{
public static final String ICEBERG_BINARY_TYPE = "iceberg.binary-type";
public static final String ICEBERG_LONG_TYPE = "iceberg.long-type";

private ColumnReaders() {}

Expand All @@ -47,7 +48,7 @@ public static ColumnReader createColumnReader(
{
if (type instanceof TimeType) {
if (!type.equals(TIME_MICROS) || column.getColumnType() != LONG ||
!"TIME".equals(column.getAttributes().get("iceberg.long-type"))) {
!"TIME".equals(column.getAttributes().get(ICEBERG_LONG_TYPE))) {
throw invalidStreamType(column, type);
}
return new TimeColumnReader(type, column, memoryContext.newLocalMemoryContext(ColumnReaders.class.getSimpleName()));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -188,6 +188,9 @@ private Block readNullBlock(boolean[] isNull, int nonNullCount)
if (type instanceof BigintType) {
return longReadNullBlock(isNull, nonNullCount);
}
if (type instanceof TimeType) {
return longReadNullBlock(isNull, nonNullCount);
}
if (type instanceof IntegerType || type instanceof DateType) {
return intReadNullBlock(isNull, nonNullCount);
}
Expand All @@ -209,6 +212,7 @@ private Block longReadNullBlock(boolean[] isNull, int nonNullCount)

dataStream.next(longNonNullValueTemp, nonNullCount);

maybeTransformValues(longNonNullValueTemp, nonNullCount);
long[] result = unpackLongNulls(longNonNullValueTemp, isNull);

return new LongArrayBlock(nextBatchSize, Optional.of(isNull), result);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import io.trino.orc.metadata.statistics.DoubleStatisticsBuilder;
import io.trino.orc.metadata.statistics.IntegerStatisticsBuilder;
import io.trino.orc.metadata.statistics.StringStatisticsBuilder;
import io.trino.orc.metadata.statistics.TimeMicrosStatisticsBuilder;
import io.trino.orc.metadata.statistics.TimestampStatisticsBuilder;
import io.trino.spi.type.TimeType;
import io.trino.spi.type.Type;
Expand All @@ -33,6 +34,7 @@

import static com.google.common.base.Preconditions.checkArgument;
import static io.trino.orc.metadata.OrcType.OrcTypeKind.LONG;
import static io.trino.orc.reader.ColumnReaders.ICEBERG_LONG_TYPE;
import static java.lang.Math.toIntExact;
import static java.util.Objects.requireNonNull;

Expand All @@ -55,8 +57,8 @@ public static ColumnWriter createColumnWriter(
if (type instanceof TimeType timeType) {
checkArgument(timeType.getPrecision() == 6, "%s not supported for ORC writer", type);
checkArgument(orcType.getOrcTypeKind() == LONG, "wrong ORC type %s for type %s", orcType, type);
checkArgument("TIME".equals(orcType.getAttributes().get("iceberg.long-type")), "wrong attributes %s for type %s", orcType.getAttributes(), type);
return new TimeColumnWriter(columnId, type, compression, bufferSize, () -> new IntegerStatisticsBuilder(bloomFilterBuilder.get()));
checkArgument("TIME".equals(orcType.getAttributes().get(ICEBERG_LONG_TYPE)), "wrong attributes %s for type %s", orcType.getAttributes(), type);
return new TimeColumnWriter(columnId, type, compression, bufferSize, () -> new TimeMicrosStatisticsBuilder(bloomFilterBuilder.get()));
}
switch (orcType.getOrcTypeKind()) {
case BOOLEAN:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import io.trino.spi.type.DecimalType;
import io.trino.spi.type.SqlDate;
import io.trino.spi.type.SqlDecimal;
import io.trino.spi.type.SqlTime;
import io.trino.spi.type.SqlTimestamp;
import io.trino.spi.type.SqlTimestampWithTimeZone;
import io.trino.spi.type.SqlVarbinary;
Expand Down Expand Up @@ -53,6 +54,7 @@
import static io.trino.spi.type.IntegerType.INTEGER;
import static io.trino.spi.type.RealType.REAL;
import static io.trino.spi.type.SmallintType.SMALLINT;
import static io.trino.spi.type.TimeType.TIME_MICROS;
import static io.trino.spi.type.TimeZoneKey.UTC_KEY;
import static io.trino.spi.type.TimestampType.TIMESTAMP_MICROS;
import static io.trino.spi.type.TimestampType.TIMESTAMP_MILLIS;
Expand Down Expand Up @@ -316,6 +318,25 @@ public void testTimestampMillis()
tester.testRoundTrip(TIMESTAMP_MILLIS, newArrayList(limit(cycle(map.values()), 30_000)));
}

@Test
public void testTimeMicros()
throws Exception
{
Map<String, SqlTime> map = ImmutableMap.<String, SqlTime>builder()
.put("00:00:00.000000", SqlTime.newInstance(6, 0L))
.put("12:05:19.257000", SqlTime.newInstance(6, 43519257000000000L))
.put("17:37:07.638000", SqlTime.newInstance(6, 63427638000000000L))
.put("05:17:37.346000", SqlTime.newInstance(6, 19057346000000000L))
.put("06:09:00.988000", SqlTime.newInstance(6, 22140988000000000L))
.put("13:31:34.185000", SqlTime.newInstance(6, 48694185000000000L))
.put("01:09:07.185000", SqlTime.newInstance(6, 4147185000000000L))
.put("20:43:39.822000", SqlTime.newInstance(6, 74619822000000000L))
.put("23:59:59.999000", SqlTime.newInstance(6, 86399999000000000L))
.buildOrThrow();
map.forEach((expected, value) -> assertEquals(value.toString(), expected));
tester.testRoundTrip(TIME_MICROS, newArrayList(limit(cycle(map.values()), 30_000)));
}

@Test
public void testTimestampMicros()
throws Exception
Expand Down
47 changes: 46 additions & 1 deletion lib/trino-orc/src/test/java/io/trino/orc/OrcTester.java
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@
import io.trino.spi.type.RowType;
import io.trino.spi.type.SqlDate;
import io.trino.spi.type.SqlDecimal;
import io.trino.spi.type.SqlTime;
import io.trino.spi.type.SqlTimestamp;
import io.trino.spi.type.SqlTimestampWithTimeZone;
import io.trino.spi.type.SqlVarbinary;
Expand Down Expand Up @@ -139,7 +140,9 @@
import static io.trino.orc.metadata.CompressionKind.ZLIB;
import static io.trino.orc.metadata.CompressionKind.ZSTD;
import static io.trino.orc.metadata.OrcType.OrcTypeKind.BINARY;
import static io.trino.orc.metadata.OrcType.OrcTypeKind.LONG;
import static io.trino.orc.reader.ColumnReaders.ICEBERG_BINARY_TYPE;
import static io.trino.orc.reader.ColumnReaders.ICEBERG_LONG_TYPE;
import static io.trino.spi.type.BigintType.BIGINT;
import static io.trino.spi.type.BooleanType.BOOLEAN;
import static io.trino.spi.type.Chars.truncateToLengthAndTrimSpaces;
Expand All @@ -150,6 +153,7 @@
import static io.trino.spi.type.IntegerType.INTEGER;
import static io.trino.spi.type.RealType.REAL;
import static io.trino.spi.type.SmallintType.SMALLINT;
import static io.trino.spi.type.TimeType.TIME_MICROS;
import static io.trino.spi.type.TimeZoneKey.UTC_KEY;
import static io.trino.spi.type.TimestampType.TIMESTAMP_MICROS;
import static io.trino.spi.type.TimestampType.TIMESTAMP_MILLIS;
Expand Down Expand Up @@ -443,7 +447,10 @@ private void assertRoundTrip(Type writeType, Type readType, List<?> writeValues,
{
OrcWriterStats stats = new OrcWriterStats();
for (CompressionKind compression : compressions) {
boolean hiveSupported = (compression != LZ4) && (compression != ZSTD) && !isTimestampTz(writeType) && !isTimestampTz(readType) && !isUuid(writeType) && !isUuid(readType);
boolean hiveSupported = (compression != LZ4) && (compression != ZSTD)
&& !containsTimeMicros(writeType) && !containsTimeMicros(readType)
&& !isTimestampTz(writeType) && !isTimestampTz(readType)
&& !isUuid(writeType) && !isUuid(readType);

for (Format format : formats) {
// write Hive, read Trino
Expand Down Expand Up @@ -669,6 +676,16 @@ public static void writeOrcColumnTrino(File outputFile, CompressionKind compress
Optional.empty(),
ImmutableMap.of(ICEBERG_BINARY_TYPE, "UUID")));
}
if (TIME_MICROS.equals(mappedType)) {
return Optional.of(new OrcType(
LONG,
ImmutableList.of(),
ImmutableList.of(),
Optional.empty(),
Optional.empty(),
Optional.empty(),
ImmutableMap.of(ICEBERG_LONG_TYPE, "TIME")));
}
return Optional.empty();
}));

Expand Down Expand Up @@ -740,6 +757,9 @@ else if (DATE.equals(type)) {
long days = ((SqlDate) value).getDays();
type.writeLong(blockBuilder, days);
}
else if (TIME_MICROS.equals(type)) {
type.writeLong(blockBuilder, ((SqlTime) value).getPicos());
}
else if (TIMESTAMP_MILLIS.equals(type)) {
type.writeLong(blockBuilder, ((SqlTimestamp) value).getEpochMicros());
}
Expand Down Expand Up @@ -1077,6 +1097,9 @@ private static ObjectInspector getJavaObjectInspector(Type type)
if (type.equals(DATE)) {
return javaDateObjectInspector;
}
if (type.equals(TIME_MICROS)) {
return javaLongObjectInspector;
}
if (type.equals(TIMESTAMP_MILLIS) || type.equals(TIMESTAMP_MICROS) || type.equals(TIMESTAMP_NANOS)) {
return javaTimestampObjectInspector;
}
Expand Down Expand Up @@ -1148,6 +1171,9 @@ private static Object preprocessWriteValueHive(Type type, Object value)
if (type.equals(DATE)) {
return Date.ofEpochDay(((SqlDate) value).getDays());
}
if (type.equals(TIME_MICROS)) {
return ((SqlTime) value).getPicos() / PICOSECONDS_PER_MICROSECOND;
}
if (type.equals(TIMESTAMP_MILLIS) || type.equals(TIMESTAMP_MICROS) || type.equals(TIMESTAMP_NANOS)) {
LocalDateTime dateTime = ((SqlTimestamp) value).toLocalDateTime();
return Timestamp.ofEpochSecond(dateTime.toEpochSecond(ZoneOffset.UTC), dateTime.getNano());
Expand Down Expand Up @@ -1351,6 +1377,25 @@ private static Type rowType(Type... fieldTypes)
return TESTING_TYPE_MANAGER.getParameterizedType(StandardTypes.ROW, typeSignatureParameters.build());
}

private static boolean containsTimeMicros(Type type)
{
if (type.equals(TIME_MICROS)) {
return true;
}
if (type instanceof ArrayType arrayType) {
return containsTimeMicros(arrayType.getElementType());
}
if (type instanceof MapType mapType) {
return containsTimeMicros(mapType.getKeyType()) || containsTimeMicros(mapType.getValueType());
}
if (type instanceof RowType rowType) {
return rowType.getFields().stream()
.map(RowType.Field::getType)
.anyMatch(OrcTester::containsTimeMicros);
}
return false;
}

private static boolean isTimestampTz(Type type)
{
if (type instanceof TimestampWithTimeZoneType) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import io.trino.spi.type.RowType;
import io.trino.spi.type.SqlDate;
import io.trino.spi.type.SqlDecimal;
import io.trino.spi.type.SqlTime;
import io.trino.spi.type.SqlTimestamp;
import io.trino.spi.type.SqlTimestampWithTimeZone;
import io.trino.spi.type.Type;
Expand All @@ -51,13 +52,15 @@
import static io.trino.spi.type.IntegerType.INTEGER;
import static io.trino.spi.type.RealType.REAL;
import static io.trino.spi.type.SmallintType.SMALLINT;
import static io.trino.spi.type.TimeType.TIME_MICROS;
import static io.trino.spi.type.TimeZoneKey.UTC_KEY;
import static io.trino.spi.type.TimestampType.TIMESTAMP_MICROS;
import static io.trino.spi.type.TimestampType.TIMESTAMP_MILLIS;
import static io.trino.spi.type.TimestampType.TIMESTAMP_NANOS;
import static io.trino.spi.type.TimestampWithTimeZoneType.TIMESTAMP_TZ_MICROS;
import static io.trino.spi.type.TimestampWithTimeZoneType.TIMESTAMP_TZ_MILLIS;
import static io.trino.spi.type.TimestampWithTimeZoneType.TIMESTAMP_TZ_NANOS;
import static io.trino.spi.type.Timestamps.PICOSECONDS_PER_MICROSECOND;
import static io.trino.spi.type.TinyintType.TINYINT;
import static io.trino.spi.type.UuidType.UUID;
import static java.util.stream.Collectors.toList;
Expand Down Expand Up @@ -103,6 +106,9 @@ public static OrcPredicate createOrcPredicate(Type type, Iterable<?> values)
return new DecimalOrcPredicate(expectedValues);
}

if (TIME_MICROS.equals(type)) {
return new LongOrcPredicate(false, transform(expectedValues, value -> ((SqlTime) value).getPicos() / PICOSECONDS_PER_MICROSECOND));
}
if (TIMESTAMP_MILLIS.equals(type)) {
return new LongOrcPredicate(false, transform(expectedValues, value -> ((SqlTimestamp) value).getMillis()));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -135,6 +135,23 @@ private void testReadSingleIntegerColumnOrcFile(String orcFileResourceName, int
}
}

@Test
public void testTimeType()
{
// Regression test for https://github.com/trinodb/trino/issues/15603
try (TestTable table = new TestTable(getQueryRunner()::execute, "test_time", "(col time(6))")) {
assertUpdate("INSERT INTO " + table.getName() + " VALUES (TIME '13:30:00'), (TIME '14:30:00'), (NULL)", 3);
assertQuery("SELECT * FROM " + table.getName(), "VALUES '13:30:00', '14:30:00', NULL");
assertQuery(
"SHOW STATS FOR " + table.getName(),
"""
VALUES
('col', null, 2.0, 0.33333333333, null, null, null),
(null, null, null, null, 3, null, null)
""");
}
}

@Override
public void testDropAmbiguousRowFieldCaseSensitivity()
{
Expand Down