Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support reading time millis columns in optimized parquet #18535

Merged
merged 2 commits into from
Aug 4, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -141,10 +141,16 @@ public ColumnReader create(PrimitiveField field, AggregatedMemoryContext aggrega
}
return createColumnReader(field, valueDecoders::getIntDecoder, INT_ADAPTER, memoryContext);
}
if (type instanceof TimeType && primitiveType == INT64) {
if (annotation instanceof TimeLogicalTypeAnnotation timeAnnotation && timeAnnotation.getUnit() == MICROS) {
if (type instanceof TimeType) {
if (!(annotation instanceof TimeLogicalTypeAnnotation timeAnnotation)) {
throw unsupportedException(type, field);
}
if (primitiveType == INT64 && timeAnnotation.getUnit() == MICROS) {
return createColumnReader(field, valueDecoders::getTimeMicrosDecoder, LONG_ADAPTER, memoryContext);
}
if (primitiveType == INT32 && timeAnnotation.getUnit() == MILLIS) {
return createColumnReader(field, valueDecoders::getTimeMillisDecoder, LONG_ADAPTER, memoryContext);
}
throw unsupportedException(type, field);
}
if (BIGINT.equals(type) && primitiveType == INT64
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import io.trino.spi.type.DecimalType;
import io.trino.spi.type.Decimals;
import io.trino.spi.type.Int128;
import io.trino.spi.type.TimeType;
import io.trino.spi.type.TimestampType;
import io.trino.spi.type.TimestampWithTimeZoneType;
import io.trino.spi.type.Type;
Expand Down Expand Up @@ -80,7 +81,9 @@
import static io.trino.spi.type.Timestamps.MILLISECONDS_PER_SECOND;
import static io.trino.spi.type.Timestamps.NANOSECONDS_PER_MICROSECOND;
import static io.trino.spi.type.Timestamps.NANOSECONDS_PER_MILLISECOND;
import static io.trino.spi.type.Timestamps.PICOSECONDS_PER_DAY;
import static io.trino.spi.type.Timestamps.PICOSECONDS_PER_MICROSECOND;
import static io.trino.spi.type.Timestamps.PICOSECONDS_PER_MILLISECOND;
import static io.trino.spi.type.Timestamps.PICOSECONDS_PER_NANOSECOND;
import static io.trino.spi.type.Timestamps.round;
import static java.lang.Math.floorDiv;
Expand Down Expand Up @@ -318,6 +321,29 @@ public ValueDecoder<long[]> getTimeMicrosDecoder(ParquetEncoding encoding)
});
}

public ValueDecoder<long[]> getTimeMillisDecoder(ParquetEncoding encoding)
{
int precision = ((TimeType) field.getType()).getPrecision();
if (precision < 3) {
return new InlineTransformDecoder<>(
getInt32ToLongDecoder(encoding),
(values, offset, length) -> {
// decoded values are millis, round to lower precision and convert to picos
// modulo PICOSECONDS_PER_DAY is applied for the case when a value is rounded up to PICOSECONDS_PER_DAY
Comment on lines +331 to +332
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should we have similar rounding for getTimeMicrosDecoder where trino type is lower than precision than 6?
(or assertion that type is 6 if only time(6) is supported)

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, I'm planning to tackle that separately by adding rounding on lower precision. In practise we're only supporting time(6) in iceberg right now, so it's not a problem yet. But from parquet reader perspective it is better to assume such a scenario might arise in future and support it correctly.

for (int i = offset; i < offset + length; i++) {
values[i] = (round(values[i], 3 - precision) * PICOSECONDS_PER_MILLISECOND) % PICOSECONDS_PER_DAY;
}
});
}
return new InlineTransformDecoder<>(
getInt32ToLongDecoder(encoding),
(values, offset, length) -> {
for (int i = offset; i < offset + length; i++) {
values[i] = values[i] * PICOSECONDS_PER_MILLISECOND;
}
});
}

public ValueDecoder<long[]> getInt96ToShortTimestampDecoder(ParquetEncoding encoding, DateTimeZone timeZone)
{
checkArgument(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@
import com.google.common.primitives.Booleans;
import io.airlift.slice.Slice;
import io.airlift.slice.Slices;
import io.trino.memory.context.AggregatedMemoryContext;
import io.trino.parquet.reader.ParquetReader;
import io.trino.parquet.writer.ParquetSchemaConverter;
import io.trino.parquet.writer.ParquetWriter;
import io.trino.parquet.writer.ParquetWriterOptions;
Expand All @@ -28,6 +30,9 @@
import io.trino.spi.type.Type;
import io.trino.spi.type.TypeOperators;
import org.apache.parquet.format.CompressionCodec;
import org.apache.parquet.hadoop.metadata.BlockMetaData;
import org.apache.parquet.hadoop.metadata.ParquetMetadata;
import org.apache.parquet.io.MessageColumnIO;
import org.joda.time.DateTimeZone;

import java.io.ByteArrayOutputStream;
Expand All @@ -38,13 +43,19 @@
import java.util.Random;

import static com.google.common.base.Preconditions.checkArgument;
import static com.google.common.base.Throwables.throwIfUnchecked;
import static com.google.common.collect.ImmutableList.toImmutableList;
import static io.trino.parquet.ParquetTypeUtils.constructField;
import static io.trino.parquet.ParquetTypeUtils.getColumnIO;
import static io.trino.parquet.ParquetTypeUtils.lookupColumnByName;
import static io.trino.spi.block.ArrayBlock.fromElementBlock;
import static io.trino.spi.block.MapBlock.fromKeyValueBlock;
import static io.trino.spi.block.RowBlock.fromFieldBlocks;
import static io.trino.spi.type.BigintType.BIGINT;
import static io.trino.spi.type.TypeUtils.writeNativeValue;
import static java.lang.Math.toIntExact;
import static java.util.Collections.nCopies;
import static org.joda.time.DateTimeZone.UTC;

public class ParquetTestUtils
{
Expand Down Expand Up @@ -78,6 +89,48 @@ public static Slice writeParquetFile(ParquetWriterOptions writerOptions, List<Ty
return Slices.wrappedBuffer(outputStream.toByteArray());
}

public static ParquetReader createParquetReader(
ParquetDataSource input,
ParquetMetadata parquetMetadata,
AggregatedMemoryContext memoryContext,
List<Type> types,
List<String> columnNames)
throws IOException
{
org.apache.parquet.hadoop.metadata.FileMetaData fileMetaData = parquetMetadata.getFileMetaData();
MessageColumnIO messageColumnIO = getColumnIO(fileMetaData.getSchema(), fileMetaData.getSchema());
ImmutableList.Builder<Field> columnFields = ImmutableList.builder();
for (int i = 0; i < types.size(); i++) {
columnFields.add(constructField(
types.get(i),
lookupColumnByName(messageColumnIO, columnNames.get(i)))
.orElseThrow());
}
long nextStart = 0;
ImmutableList.Builder<Long> blockStartsBuilder = ImmutableList.builder();
for (BlockMetaData block : parquetMetadata.getBlocks()) {
blockStartsBuilder.add(nextStart);
nextStart += block.getRowCount();
}
List<Long> blockStarts = blockStartsBuilder.build();
return new ParquetReader(
Optional.ofNullable(fileMetaData.getCreatedBy()),
columnFields.build(),
parquetMetadata.getBlocks(),
blockStarts,
input,
UTC,
memoryContext,
new ParquetReaderOptions(),
exception -> {
throwIfUnchecked(exception);
return new RuntimeException(exception);
},
Optional.empty(),
nCopies(blockStarts.size(), Optional.empty()),
Optional.empty());
}

public static List<io.trino.spi.Page> generateInputPages(List<Type> types, int positionsPerPage, int pageCount)
{
ImmutableList.Builder<io.trino.spi.Page> pagesBuilder = ImmutableList.builder();
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
/*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.trino.parquet.reader;

import io.trino.parquet.AbstractParquetDataSource;
import io.trino.parquet.ParquetDataSourceId;
import io.trino.parquet.ParquetReaderOptions;

import java.io.File;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.io.RandomAccessFile;

public class FileParquetDataSource
extends AbstractParquetDataSource
{
private final RandomAccessFile input;

public FileParquetDataSource(File path, ParquetReaderOptions options)
throws FileNotFoundException
{
super(new ParquetDataSourceId(path.getPath()), path.length(), options);
this.input = new RandomAccessFile(path, "r");
}

@Override
public void close()
throws IOException
{
super.close();
input.close();
raunaqmorarka marked this conversation as resolved.
Show resolved Hide resolved
}

@Override
protected void readInternal(long position, byte[] buffer, int bufferOffset, int bufferLength)
throws IOException
{
input.seek(position);
input.readFully(buffer, bufferOffset, bufferLength);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,34 +16,26 @@
import com.google.common.collect.ImmutableList;
import io.airlift.units.DataSize;
import io.trino.memory.context.AggregatedMemoryContext;
import io.trino.parquet.Field;
import io.trino.parquet.ParquetDataSource;
import io.trino.parquet.ParquetReaderOptions;
import io.trino.parquet.writer.ParquetWriterOptions;
import io.trino.spi.Page;
import io.trino.spi.block.LazyBlock;
import io.trino.spi.type.Type;
import org.apache.parquet.hadoop.metadata.BlockMetaData;
import org.apache.parquet.hadoop.metadata.ParquetMetadata;
import org.apache.parquet.io.MessageColumnIO;
import org.testng.annotations.Test;

import java.io.IOException;
import java.util.List;
import java.util.Optional;

import static com.google.common.base.Throwables.throwIfUnchecked;
import static io.trino.memory.context.AggregatedMemoryContext.newSimpleAggregatedMemoryContext;
import static io.trino.parquet.ParquetTestUtils.createParquetReader;
import static io.trino.parquet.ParquetTestUtils.generateInputPages;
import static io.trino.parquet.ParquetTestUtils.writeParquetFile;
import static io.trino.parquet.ParquetTypeUtils.constructField;
import static io.trino.parquet.ParquetTypeUtils.getColumnIO;
import static io.trino.parquet.ParquetTypeUtils.lookupColumnByName;
import static io.trino.spi.type.BigintType.BIGINT;
import static io.trino.spi.type.IntegerType.INTEGER;
import static java.util.Collections.nCopies;
import static org.assertj.core.api.AssertionsForClassTypes.assertThat;
import static org.joda.time.DateTimeZone.UTC;

public class TestParquetReaderMemoryUsage
{
Expand Down Expand Up @@ -106,46 +98,4 @@ public void testColumnReaderMemoryUsage()
reader.close();
assertThat(memoryContext.getBytes()).isEqualTo(0);
}

private static ParquetReader createParquetReader(
ParquetDataSource input,
ParquetMetadata parquetMetadata,
AggregatedMemoryContext memoryContext,
List<Type> types,
List<String> columnNames)
throws IOException
{
org.apache.parquet.hadoop.metadata.FileMetaData fileMetaData = parquetMetadata.getFileMetaData();
MessageColumnIO messageColumnIO = getColumnIO(fileMetaData.getSchema(), fileMetaData.getSchema());
ImmutableList.Builder<Field> columnFields = ImmutableList.builder();
for (int i = 0; i < types.size(); i++) {
columnFields.add(constructField(
types.get(i),
lookupColumnByName(messageColumnIO, columnNames.get(i)))
.orElseThrow());
}
long nextStart = 0;
ImmutableList.Builder<Long> blockStartsBuilder = ImmutableList.builder();
for (BlockMetaData block : parquetMetadata.getBlocks()) {
blockStartsBuilder.add(nextStart);
nextStart += block.getRowCount();
}
List<Long> blockStarts = blockStartsBuilder.build();
return new ParquetReader(
Optional.ofNullable(fileMetaData.getCreatedBy()),
columnFields.build(),
parquetMetadata.getBlocks(),
blockStarts,
input,
UTC,
memoryContext,
new ParquetReaderOptions(),
exception -> {
throwIfUnchecked(exception);
return new RuntimeException(exception);
},
Optional.empty(),
nCopies(blockStarts.size(), Optional.empty()),
Optional.empty());
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,81 @@
/*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.trino.parquet.reader;

import com.google.common.collect.ImmutableList;
import com.google.common.io.Resources;
import io.trino.parquet.ParquetDataSource;
import io.trino.parquet.ParquetReaderOptions;
import io.trino.spi.Page;
import io.trino.spi.block.Block;
import io.trino.spi.type.SqlTime;
import io.trino.spi.type.TimeType;
import io.trino.spi.type.Type;
import org.apache.parquet.hadoop.metadata.ParquetMetadata;
import org.testng.annotations.DataProvider;
import org.testng.annotations.Test;

import java.io.File;
import java.util.List;
import java.util.Optional;
import java.util.stream.Stream;

import static io.trino.memory.context.AggregatedMemoryContext.newSimpleAggregatedMemoryContext;
import static io.trino.parquet.ParquetTestUtils.createParquetReader;
import static io.trino.spi.type.TimeType.TIME_MICROS;
import static io.trino.spi.type.TimeType.TIME_MILLIS;
import static io.trino.spi.type.TimeType.TIME_NANOS;
import static io.trino.spi.type.TimeType.TIME_SECONDS;
import static io.trino.testing.DataProviders.toDataProvider;
import static io.trino.testing.TestingConnectorSession.SESSION;
import static org.assertj.core.api.Assertions.assertThat;

public class TestTimeMillis
{
@Test(dataProvider = "timeTypeProvider")
public void testTimeMillsInt32(TimeType timeType)
throws Exception
{
List<String> columnNames = ImmutableList.of("COLUMN1", "COLUMN2");
List<Type> types = ImmutableList.of(timeType, timeType);
int precision = timeType.getPrecision();

ParquetDataSource dataSource = new FileParquetDataSource(
new File(Resources.getResource("time_millis_int32.snappy.parquet").toURI()),
new ParquetReaderOptions());
ParquetMetadata parquetMetadata = MetadataReader.readFooter(dataSource, Optional.empty());
ParquetReader reader = createParquetReader(dataSource, parquetMetadata, newSimpleAggregatedMemoryContext(), types, columnNames);

Page page = reader.nextPage();
Block block = page.getBlock(0).getLoadedBlock();
assertThat(block.getPositionCount()).isEqualTo(1);
// TIME '15:03:00'
assertThat(timeType.getObjectValue(SESSION, block, 0))
.isEqualTo(SqlTime.newInstance(precision, 54180000000000000L));

// TIME '23:59:59.999'
block = page.getBlock(1).getLoadedBlock();
assertThat(block.getPositionCount()).isEqualTo(1);
// Rounded up to 0 if precision < 3
assertThat(timeType.getObjectValue(SESSION, block, 0))
.isEqualTo(SqlTime.newInstance(precision, timeType == TIME_SECONDS ? 0L : 86399999000000000L));
}

@DataProvider
public static Object[][] timeTypeProvider()
{
return Stream.of(TIME_SECONDS, TIME_MILLIS, TIME_MICROS, TIME_NANOS)
.collect(toDataProvider());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -668,6 +668,7 @@ private static ColumnReaderFormat<?>[] columnReaders()
new ColumnReaderFormat<>(FIXED_LEN_BYTE_ARRAY, 16, uuidType(), UUID, FIXED_LENGTH_WRITER, DICTIONARY_FIXED_LENGTH_WRITER, WRITE_UUID, ASSERT_INT_128),
new ColumnReaderFormat<>(FIXED_LEN_BYTE_ARRAY, 16, null, UUID, FIXED_LENGTH_WRITER, DICTIONARY_FIXED_LENGTH_WRITER, WRITE_UUID, ASSERT_INT_128),
// Trino type precision is irrelevant since the data is always stored as picoseconds
new ColumnReaderFormat<>(INT32, timeType(false, MILLIS), TimeType.TIME_MILLIS, PLAIN_WRITER, DICTIONARY_INT_WRITER, WRITE_INT, assertTime(9)),
new ColumnReaderFormat<>(INT64, timeType(false, MICROS), TimeType.TIME_MICROS, PLAIN_WRITER, DICTIONARY_LONG_WRITER, WRITE_LONG, assertTime(6)),
// Reading a column TimeLogicalTypeAnnotation as a BIGINT
new ColumnReaderFormat<>(INT64, timeType(false, MICROS), BIGINT, PLAIN_WRITER, DICTIONARY_LONG_WRITER, WRITE_LONG, ASSERT_LONG),
Expand Down
Binary file not shown.