Skip to content

Commit

Permalink
Support reading time millis columns in optimized parquet
Browse files Browse the repository at this point in the history
  • Loading branch information
raunaqmorarka committed Aug 4, 2023
1 parent 4749601 commit 657bbfc
Show file tree
Hide file tree
Showing 6 changed files with 168 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -141,10 +141,16 @@ public ColumnReader create(PrimitiveField field, AggregatedMemoryContext aggrega
}
return createColumnReader(field, valueDecoders::getIntDecoder, INT_ADAPTER, memoryContext);
}
if (type instanceof TimeType && primitiveType == INT64) {
if (annotation instanceof TimeLogicalTypeAnnotation timeAnnotation && timeAnnotation.getUnit() == MICROS) {
if (type instanceof TimeType) {
if (!(annotation instanceof TimeLogicalTypeAnnotation timeAnnotation)) {
throw unsupportedException(type, field);
}
if (primitiveType == INT64 && timeAnnotation.getUnit() == MICROS) {
return createColumnReader(field, valueDecoders::getTimeMicrosDecoder, LONG_ADAPTER, memoryContext);
}
if (primitiveType == INT32 && timeAnnotation.getUnit() == MILLIS) {
return createColumnReader(field, valueDecoders::getTimeMillisDecoder, LONG_ADAPTER, memoryContext);
}
throw unsupportedException(type, field);
}
if (BIGINT.equals(type) && primitiveType == INT64
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import io.trino.spi.type.DecimalType;
import io.trino.spi.type.Decimals;
import io.trino.spi.type.Int128;
import io.trino.spi.type.TimeType;
import io.trino.spi.type.TimestampType;
import io.trino.spi.type.TimestampWithTimeZoneType;
import io.trino.spi.type.Type;
Expand Down Expand Up @@ -80,7 +81,9 @@
import static io.trino.spi.type.Timestamps.MILLISECONDS_PER_SECOND;
import static io.trino.spi.type.Timestamps.NANOSECONDS_PER_MICROSECOND;
import static io.trino.spi.type.Timestamps.NANOSECONDS_PER_MILLISECOND;
import static io.trino.spi.type.Timestamps.PICOSECONDS_PER_DAY;
import static io.trino.spi.type.Timestamps.PICOSECONDS_PER_MICROSECOND;
import static io.trino.spi.type.Timestamps.PICOSECONDS_PER_MILLISECOND;
import static io.trino.spi.type.Timestamps.PICOSECONDS_PER_NANOSECOND;
import static io.trino.spi.type.Timestamps.round;
import static java.lang.Math.floorDiv;
Expand Down Expand Up @@ -318,6 +321,29 @@ public ValueDecoder<long[]> getTimeMicrosDecoder(ParquetEncoding encoding)
});
}

public ValueDecoder<long[]> getTimeMillisDecoder(ParquetEncoding encoding)
{
int precision = ((TimeType) field.getType()).getPrecision();
if (precision < 3) {
return new InlineTransformDecoder<>(
getInt32ToLongDecoder(encoding),
(values, offset, length) -> {
// decoded values are millis, round to lower precision and convert to picos
// modulo PICOSECONDS_PER_DAY is applied for the case when a value is rounded up to PICOSECONDS_PER_DAY
for (int i = offset; i < offset + length; i++) {
values[i] = (round(values[i], 3 - precision) * PICOSECONDS_PER_MILLISECOND) % PICOSECONDS_PER_DAY;
}
});
}
return new InlineTransformDecoder<>(
getInt32ToLongDecoder(encoding),
(values, offset, length) -> {
for (int i = offset; i < offset + length; i++) {
values[i] = values[i] * PICOSECONDS_PER_MILLISECOND;
}
});
}

public ValueDecoder<long[]> getInt96ToShortTimestampDecoder(ParquetEncoding encoding, DateTimeZone timeZone)
{
checkArgument(
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
/*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.trino.parquet.reader;

import io.trino.parquet.AbstractParquetDataSource;
import io.trino.parquet.ParquetDataSourceId;
import io.trino.parquet.ParquetReaderOptions;

import java.io.File;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.io.RandomAccessFile;

public class FileParquetDataSource
extends AbstractParquetDataSource
{
private final RandomAccessFile input;

public FileParquetDataSource(File path, ParquetReaderOptions options)
throws FileNotFoundException
{
super(new ParquetDataSourceId(path.getPath()), path.length(), options);
this.input = new RandomAccessFile(path, "r");
}

@Override
public void close()
throws IOException
{
super.close();
input.close();
}

@Override
protected void readInternal(long position, byte[] buffer, int bufferOffset, int bufferLength)
throws IOException
{
input.seek(position);
input.readFully(buffer, bufferOffset, bufferLength);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,81 @@
/*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.trino.parquet.reader;

import com.google.common.collect.ImmutableList;
import com.google.common.io.Resources;
import io.trino.parquet.ParquetDataSource;
import io.trino.parquet.ParquetReaderOptions;
import io.trino.spi.Page;
import io.trino.spi.block.Block;
import io.trino.spi.type.SqlTime;
import io.trino.spi.type.TimeType;
import io.trino.spi.type.Type;
import org.apache.parquet.hadoop.metadata.ParquetMetadata;
import org.testng.annotations.DataProvider;
import org.testng.annotations.Test;

import java.io.File;
import java.util.List;
import java.util.Optional;
import java.util.stream.Stream;

import static io.trino.memory.context.AggregatedMemoryContext.newSimpleAggregatedMemoryContext;
import static io.trino.parquet.ParquetTestUtils.createParquetReader;
import static io.trino.spi.type.TimeType.TIME_MICROS;
import static io.trino.spi.type.TimeType.TIME_MILLIS;
import static io.trino.spi.type.TimeType.TIME_NANOS;
import static io.trino.spi.type.TimeType.TIME_SECONDS;
import static io.trino.testing.DataProviders.toDataProvider;
import static io.trino.testing.TestingConnectorSession.SESSION;
import static org.assertj.core.api.Assertions.assertThat;

public class TestTimeMillis
{
@Test(dataProvider = "timeTypeProvider")
public void testTimeMillsInt32(TimeType timeType)
throws Exception
{
List<String> columnNames = ImmutableList.of("COLUMN1", "COLUMN2");
List<Type> types = ImmutableList.of(timeType, timeType);
int precision = timeType.getPrecision();

ParquetDataSource dataSource = new FileParquetDataSource(
new File(Resources.getResource("time_millis_int32.snappy.parquet").toURI()),
new ParquetReaderOptions());
ParquetMetadata parquetMetadata = MetadataReader.readFooter(dataSource, Optional.empty());
ParquetReader reader = createParquetReader(dataSource, parquetMetadata, newSimpleAggregatedMemoryContext(), types, columnNames);

Page page = reader.nextPage();
Block block = page.getBlock(0).getLoadedBlock();
assertThat(block.getPositionCount()).isEqualTo(1);
// TIME '15:03:00'
assertThat(timeType.getObjectValue(SESSION, block, 0))
.isEqualTo(SqlTime.newInstance(precision, 54180000000000000L));

// TIME '23:59:59.999'
block = page.getBlock(1).getLoadedBlock();
assertThat(block.getPositionCount()).isEqualTo(1);
// Rounded up to 0 if precision < 3
assertThat(timeType.getObjectValue(SESSION, block, 0))
.isEqualTo(SqlTime.newInstance(precision, timeType == TIME_SECONDS ? 0L : 86399999000000000L));
}

@DataProvider
public static Object[][] timeTypeProvider()
{
return Stream.of(TIME_SECONDS, TIME_MILLIS, TIME_MICROS, TIME_NANOS)
.collect(toDataProvider());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -668,6 +668,7 @@ private static ColumnReaderFormat<?>[] columnReaders()
new ColumnReaderFormat<>(FIXED_LEN_BYTE_ARRAY, 16, uuidType(), UUID, FIXED_LENGTH_WRITER, DICTIONARY_FIXED_LENGTH_WRITER, WRITE_UUID, ASSERT_INT_128),
new ColumnReaderFormat<>(FIXED_LEN_BYTE_ARRAY, 16, null, UUID, FIXED_LENGTH_WRITER, DICTIONARY_FIXED_LENGTH_WRITER, WRITE_UUID, ASSERT_INT_128),
// Trino type precision is irrelevant since the data is always stored as picoseconds
new ColumnReaderFormat<>(INT32, timeType(false, MILLIS), TimeType.TIME_MILLIS, PLAIN_WRITER, DICTIONARY_INT_WRITER, WRITE_INT, assertTime(9)),
new ColumnReaderFormat<>(INT64, timeType(false, MICROS), TimeType.TIME_MICROS, PLAIN_WRITER, DICTIONARY_LONG_WRITER, WRITE_LONG, assertTime(6)),
// Reading a column TimeLogicalTypeAnnotation as a BIGINT
new ColumnReaderFormat<>(INT64, timeType(false, MICROS), BIGINT, PLAIN_WRITER, DICTIONARY_LONG_WRITER, WRITE_LONG, ASSERT_LONG),
Expand Down
Binary file not shown.

0 comments on commit 657bbfc

Please sign in to comment.