diff --git a/lib/trino-parquet/src/main/java/io/trino/parquet/ParquetEncoding.java b/lib/trino-parquet/src/main/java/io/trino/parquet/ParquetEncoding.java index b547628e41e0..09d450a0a90f 100644 --- a/lib/trino-parquet/src/main/java/io/trino/parquet/ParquetEncoding.java +++ b/lib/trino-parquet/src/main/java/io/trino/parquet/ParquetEncoding.java @@ -23,6 +23,8 @@ import org.apache.parquet.column.ColumnDescriptor; import org.apache.parquet.column.values.ValuesReader; import org.apache.parquet.column.values.bitpacking.ByteBitPackingValuesReader; +import org.apache.parquet.column.values.bytestreamsplit.ByteStreamSplitValuesReaderForDouble; +import org.apache.parquet.column.values.bytestreamsplit.ByteStreamSplitValuesReaderForFloat; import org.apache.parquet.column.values.delta.DeltaBinaryPackingValuesReader; import org.apache.parquet.column.values.deltalengthbytearray.DeltaLengthByteArrayValuesReader; import org.apache.parquet.column.values.deltastrings.DeltaByteArrayReader; @@ -44,7 +46,9 @@ import static org.apache.parquet.column.values.bitpacking.Packer.BIG_ENDIAN; import static org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName.BINARY; import static org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName.BOOLEAN; +import static org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName.DOUBLE; import static org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName.FIXED_LEN_BYTE_ARRAY; +import static org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName.FLOAT; import static org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName.INT32; import static org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName.INT64; @@ -149,6 +153,20 @@ public Dictionary initDictionary(ColumnDescriptor descriptor, DictionaryPage dic { return PLAIN.initDictionary(descriptor, dictionaryPage); } + }, + + BYTE_STREAM_SPLIT { + @Override + public ValuesReader getValuesReader(ColumnDescriptor descriptor, ValuesType valuesType) + { + PrimitiveTypeName typeName = descriptor.getPrimitiveType().getPrimitiveTypeName(); + checkArgument(typeName == FLOAT || typeName == DOUBLE, "Encoding BYTE_STREAM_SPLIT is only " + + "supported for type FLOAT and DOUBLE"); + if (typeName == FLOAT) { + return new ByteStreamSplitValuesReaderForFloat(); + } + return new ByteStreamSplitValuesReaderForDouble(); + } }; static final int INT96_TYPE_LENGTH = 12; diff --git a/lib/trino-parquet/src/main/java/io/trino/parquet/ParquetTypeUtils.java b/lib/trino-parquet/src/main/java/io/trino/parquet/ParquetTypeUtils.java index cc59706413ca..4a0e3d77d0d3 100644 --- a/lib/trino-parquet/src/main/java/io/trino/parquet/ParquetTypeUtils.java +++ b/lib/trino-parquet/src/main/java/io/trino/parquet/ParquetTypeUtils.java @@ -156,28 +156,18 @@ private static int getPathIndex(List columns, List pa @SuppressWarnings("deprecation") public static ParquetEncoding getParquetEncoding(Encoding encoding) { - switch (encoding) { - case PLAIN: - return ParquetEncoding.PLAIN; - case RLE: - return ParquetEncoding.RLE; - case BYTE_STREAM_SPLIT: - // TODO: https://github.com/trinodb/trino/issues/8357 - throw new ParquetDecodingException("Unsupported Parquet encoding: " + encoding); - case BIT_PACKED: - return ParquetEncoding.BIT_PACKED; - case PLAIN_DICTIONARY: - return ParquetEncoding.PLAIN_DICTIONARY; - case DELTA_BINARY_PACKED: - return ParquetEncoding.DELTA_BINARY_PACKED; - case DELTA_LENGTH_BYTE_ARRAY: - return ParquetEncoding.DELTA_LENGTH_BYTE_ARRAY; - case DELTA_BYTE_ARRAY: - return ParquetEncoding.DELTA_BYTE_ARRAY; - case RLE_DICTIONARY: - return ParquetEncoding.RLE_DICTIONARY; - } - throw new ParquetDecodingException("Unsupported Parquet encoding: " + encoding); + return switch (encoding) { + case PLAIN -> ParquetEncoding.PLAIN; + case RLE -> ParquetEncoding.RLE; + case BYTE_STREAM_SPLIT -> ParquetEncoding.BYTE_STREAM_SPLIT; + case BIT_PACKED -> ParquetEncoding.BIT_PACKED; + case PLAIN_DICTIONARY -> ParquetEncoding.PLAIN_DICTIONARY; + case DELTA_BINARY_PACKED -> ParquetEncoding.DELTA_BINARY_PACKED; + case DELTA_LENGTH_BYTE_ARRAY -> ParquetEncoding.DELTA_LENGTH_BYTE_ARRAY; + case DELTA_BYTE_ARRAY -> ParquetEncoding.DELTA_BYTE_ARRAY; + case RLE_DICTIONARY -> ParquetEncoding.RLE_DICTIONARY; + default -> throw new ParquetDecodingException("Unsupported Parquet encoding: " + encoding); + }; } public static org.apache.parquet.schema.Type getParquetTypeByName(String columnName, GroupType groupType) diff --git a/lib/trino-parquet/src/main/java/io/trino/parquet/reader/decoders/ApacheParquetValueDecoders.java b/lib/trino-parquet/src/main/java/io/trino/parquet/reader/decoders/ApacheParquetValueDecoders.java index abf647b9f719..56088adff86a 100644 --- a/lib/trino-parquet/src/main/java/io/trino/parquet/reader/decoders/ApacheParquetValueDecoders.java +++ b/lib/trino-parquet/src/main/java/io/trino/parquet/reader/decoders/ApacheParquetValueDecoders.java @@ -22,6 +22,8 @@ import java.nio.ByteBuffer; import static io.trino.parquet.ParquetReaderUtils.castToByte; +import static java.lang.Double.doubleToLongBits; +import static java.lang.Float.floatToIntBits; import static java.util.Objects.requireNonNull; /** @@ -70,4 +72,91 @@ public void skip(int n) delegate.skip(n); } } + + public static final class DoubleApacheParquetValueDecoder + implements ValueDecoder + { + private final ValuesReader delegate; + + public DoubleApacheParquetValueDecoder(ValuesReader delegate) + { + this.delegate = requireNonNull(delegate, "delegate is null"); + } + + @Override + public void init(SimpleSliceInputStream input) + { + byte[] buffer = input.readBytes(); + try { + int valueCount = buffer.length / Byte.SIZE; + delegate.initFromPage(valueCount, ByteBufferInputStream.wrap(ByteBuffer.wrap(buffer, 0, buffer.length))); + } + catch (IOException e) { + throw new UncheckedIOException(e); + } + } + + @Override + public void read(long[] values, int offset, int length) + { + for (int i = offset; i < offset + length; i++) { + values[i] = doubleToLongBits(delegate.readDouble()); + } + } + + @Override + public void skip(int n) + { + delegate.skip(n); + } + } + + public static final class FloatApacheParquetValueDecoder + implements ValueDecoder + { + private final ValuesReader delegate; + + public FloatApacheParquetValueDecoder(ValuesReader delegate) + { + this.delegate = requireNonNull(delegate, "delegate is null"); + } + + @Override + public void init(SimpleSliceInputStream input) + { + byte[] buffer = input.readBytes(); + try { + int valueCount = buffer.length / Float.BYTES; + delegate.initFromPage(valueCount, ByteBufferInputStream.wrap(ByteBuffer.wrap(buffer, 0, buffer.length))); + } + catch (IOException e) { + throw new UncheckedIOException(e); + } + } + + @Override + public void read(int[] values, int offset, int length) + { + for (int i = offset; i < offset + length; i++) { + values[i] = floatToIntBits(delegate.readFloat()); + } + } + + @Override + public void skip(int n) + { + delegate.skip(n); + } + } + + private static void initialize(SimpleSliceInputStream input, ValuesReader reader) + { + byte[] buffer = input.readBytes(); + try { + reader.initFromPage(0, ByteBufferInputStream.wrap(ByteBuffer.wrap(buffer, 0, buffer.length))); + } + catch (IOException e) { + throw new UncheckedIOException(e); + } + } } diff --git a/lib/trino-parquet/src/main/java/io/trino/parquet/reader/decoders/ValueDecoders.java b/lib/trino-parquet/src/main/java/io/trino/parquet/reader/decoders/ValueDecoders.java index 73d8b3c14502..759976a70f34 100644 --- a/lib/trino-parquet/src/main/java/io/trino/parquet/reader/decoders/ValueDecoders.java +++ b/lib/trino-parquet/src/main/java/io/trino/parquet/reader/decoders/ValueDecoders.java @@ -37,6 +37,7 @@ import org.joda.time.DateTimeZone; import static com.google.common.base.Preconditions.checkArgument; +import static io.trino.parquet.ParquetEncoding.BYTE_STREAM_SPLIT; import static io.trino.parquet.ParquetEncoding.DELTA_BYTE_ARRAY; import static io.trino.parquet.ParquetEncoding.PLAIN; import static io.trino.parquet.ParquetReaderUtils.toByteExact; @@ -45,6 +46,8 @@ import static io.trino.parquet.ParquetTypeUtils.getShortDecimalValue; import static io.trino.parquet.ValuesType.VALUES; import static io.trino.parquet.reader.decoders.ApacheParquetValueDecoders.BooleanApacheParquetValueDecoder; +import static io.trino.parquet.reader.decoders.ApacheParquetValueDecoders.DoubleApacheParquetValueDecoder; +import static io.trino.parquet.reader.decoders.ApacheParquetValueDecoders.FloatApacheParquetValueDecoder; import static io.trino.parquet.reader.decoders.DeltaBinaryPackedDecoders.DeltaBinaryPackedByteDecoder; import static io.trino.parquet.reader.decoders.DeltaBinaryPackedDecoders.DeltaBinaryPackedIntDecoder; import static io.trino.parquet.reader.decoders.DeltaBinaryPackedDecoders.DeltaBinaryPackedLongDecoder; @@ -112,6 +115,9 @@ public ValueDecoder getDoubleDecoder(ParquetEncoding encoding) if (PLAIN.equals(encoding)) { return new LongPlainValueDecoder(); } + else if (BYTE_STREAM_SPLIT.equals(encoding)) { + return new DoubleApacheParquetValueDecoder(getApacheParquetReader(encoding)); + } throw wrongEncoding(encoding); } @@ -120,6 +126,9 @@ public ValueDecoder getRealDecoder(ParquetEncoding encoding) if (PLAIN.equals(encoding)) { return new IntPlainValueDecoder(); } + else if (BYTE_STREAM_SPLIT.equals(encoding)) { + return new FloatApacheParquetValueDecoder(getApacheParquetReader(encoding)); + } throw wrongEncoding(encoding); } diff --git a/lib/trino-parquet/src/test/java/io/trino/parquet/reader/TestByteStreamSplitParquet.java b/lib/trino-parquet/src/test/java/io/trino/parquet/reader/TestByteStreamSplitParquet.java new file mode 100644 index 000000000000..3c7bcedaacf9 --- /dev/null +++ b/lib/trino-parquet/src/test/java/io/trino/parquet/reader/TestByteStreamSplitParquet.java @@ -0,0 +1,246 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.trino.parquet.reader; + +import com.google.common.collect.ImmutableList; +import com.google.common.io.Resources; +import io.airlift.log.Logger; +import io.trino.parquet.ParquetDataSource; +import io.trino.parquet.ParquetReaderOptions; +import io.trino.spi.Page; +import io.trino.spi.block.Block; +import io.trino.spi.block.IntArrayBlock; +import io.trino.spi.type.Type; +import org.apache.parquet.hadoop.metadata.ParquetMetadata; +import org.testng.annotations.Test; + +import java.io.File; +import java.io.IOException; +import java.net.URISyntaxException; +import java.util.ArrayList; +import java.util.List; +import java.util.Optional; +import java.util.stream.Collectors; +import java.util.stream.IntStream; + +import static io.trino.memory.context.AggregatedMemoryContext.newSimpleAggregatedMemoryContext; +import static io.trino.parquet.ParquetTestUtils.createParquetReader; +import static io.trino.spi.type.DoubleType.DOUBLE; +import static io.trino.spi.type.RealType.REAL; +import static io.trino.testing.TestingConnectorSession.SESSION; +import static org.testng.Assert.assertEquals; + +public class TestByteStreamSplitParquet +{ + private static final Logger log = Logger.get(TestByteStreamSplitParquet.class); + + @Test + public void testByteStramSplitReadDoubleDouble() + throws URISyntaxException, IOException + { + List columnNames = ImmutableList.of("columnA", "columnB"); + List types = ImmutableList.of(DOUBLE, DOUBLE); + + ParquetDataSource dataSource = new FileParquetDataSource( + new File(Resources.getResource("byte_stream_split_double_and_double.parquet").toURI()), + new ParquetReaderOptions()); + ParquetMetadata parquetMetadata = MetadataReader.readFooter(dataSource, Optional.empty()); + ParquetReader reader = createParquetReader(dataSource, parquetMetadata, newSimpleAggregatedMemoryContext(), types, columnNames); + + List> expected = getExpectedDoubleDoubleList(); + readAndCompare(reader, expected); + } + + private List> getExpectedDoubleDoubleList() + { + List> completeList = new ArrayList<>(); + List fullFloatList = new ArrayList<>(); + for (int i = 0; i < 10; i++) { + List floatList = IntStream.range(0, 10) + .mapToDouble(j -> j * 1.5) + .boxed() + .collect(Collectors.toList()); + fullFloatList.addAll(floatList); + } + List fullDoubleList = new ArrayList<>(); + for (int i = 0; i < 10; i++) { + List doubleList = IntStream.range(0, 10) + .mapToDouble(j -> j * 1.5) + .boxed() + .collect(Collectors.toList()); + fullDoubleList.addAll(doubleList); + } + completeList.add(fullFloatList); + completeList.add(fullDoubleList); + return completeList; + } + + @Test + public void testByteStramSplitReadFloatFloat() + throws URISyntaxException, IOException + { + List columnNames = ImmutableList.of("columnA", "columnB"); + List types = ImmutableList.of(REAL, REAL); + + ParquetDataSource dataSource = new FileParquetDataSource( + new File(Resources.getResource("byte_stream_split_float_and_float.parquet").toURI()), + new ParquetReaderOptions()); + ParquetMetadata parquetMetadata = MetadataReader.readFooter(dataSource, Optional.empty()); + ParquetReader reader = createParquetReader(dataSource, parquetMetadata, newSimpleAggregatedMemoryContext(), types, columnNames); + + List> expected = getExpectedFloatFloatList(); + readAndCompare(reader, expected); + } + + private List> getExpectedFloatFloatList() + { + List> completeList = new ArrayList<>(); + List fullFloatList = new ArrayList<>(); + for (int i = 0; i < 10; i++) { + List floatList = IntStream.range(0, 10) + .mapToDouble(j -> j * 1.3) + .boxed() + .collect(Collectors.toList()); + fullFloatList.addAll(floatList); + } + List fullDoubleList = new ArrayList<>(); + for (int i = 0; i < 10; i++) { + List doubleList = IntStream.range(0, 10) + .mapToDouble(j -> j * 1.3) + .boxed() + .collect(Collectors.toList()); + fullDoubleList.addAll(doubleList); + } + completeList.add(fullFloatList); + completeList.add(fullDoubleList); + return completeList; + } + + @Test + public void testByteStramSplitReadFloatDouble() + throws URISyntaxException, IOException + { + List columnNames = ImmutableList.of("columnA", "columnB"); + List types = ImmutableList.of(REAL, DOUBLE); + + ParquetDataSource dataSource = new FileParquetDataSource( + new File(Resources.getResource("byte_stream_split_float_and_double.parquet").toURI()), + new ParquetReaderOptions()); + ParquetMetadata parquetMetadata = MetadataReader.readFooter(dataSource, Optional.empty()); + ParquetReader reader = createParquetReader(dataSource, parquetMetadata, newSimpleAggregatedMemoryContext(), types, columnNames); + + List> expected = getExpectedFloatDoubleList(); + readAndCompare(reader, expected); + } + + private List> getExpectedFloatDoubleList() + { + List> completeList = new ArrayList<>(); + List fullFloatList = new ArrayList<>(); + for (int i = 0; i < 10; i++) { + List floatList = IntStream.range(0, 10) + .mapToDouble(j -> j * 1.3) + .boxed() + .collect(Collectors.toList()); + fullFloatList.addAll(floatList); + } + List fullDoubleList = new ArrayList<>(); + for (int i = 0; i < 10; i++) { + List doubleList = IntStream.range(0, 10) + .mapToDouble(j -> j * 1.5) + .boxed() + .collect(Collectors.toList()); + fullDoubleList.addAll(doubleList); + } + completeList.add(fullFloatList); + completeList.add(fullDoubleList); + return completeList; + } + + @Test + public void testByteStramSplitReadDoubleFloat() + throws URISyntaxException, IOException + { + List columnNames = ImmutableList.of("columnA", "columnB"); + List types = ImmutableList.of(DOUBLE, REAL); + + ParquetDataSource dataSource = new FileParquetDataSource( + new File(Resources.getResource("byte_stream_split_double_and_float.parquet").toURI()), + new ParquetReaderOptions()); + ParquetMetadata parquetMetadata = MetadataReader.readFooter(dataSource, Optional.empty()); + ParquetReader reader = createParquetReader(dataSource, parquetMetadata, newSimpleAggregatedMemoryContext(), types, columnNames); + + List> expected = getExpectedDoubleFloatList(); + readAndCompare(reader, expected); + } + + private List> getExpectedDoubleFloatList() + { + List> completeList = new ArrayList<>(); + List fullFloatList = new ArrayList<>(); + List fullDoubleList = new ArrayList<>(); + for (int i = 0; i < 10; i++) { + List doubleList = IntStream.range(0, 10) + .mapToDouble(j -> j * 1.5) + .boxed() + .collect(Collectors.toList()); + fullDoubleList.addAll(doubleList); + } + for (int i = 0; i < 10; i++) { + List floatList = IntStream.range(0, 10) + .mapToDouble(j -> j * 1.3) + .boxed() + .collect(Collectors.toList()); + fullFloatList.addAll(floatList); + } + + completeList.add(fullDoubleList); + completeList.add(fullFloatList); + return completeList; + } + + void readAndCompare(ParquetReader reader, List> expected) + throws IOException + { + long totalValueCounter = 0; + int rowCounter = 0; + Page page = reader.nextPage(); + while (page != null) { + if (page.getChannelCount() > 0) { + //Get the first Block + Block firstBlock = page.getBlock(0).getLoadedBlock(); + //Get positionCount from firstBlock. Every other block will have same positionCount. + for (int postition = 0; postition < firstBlock.getPositionCount(); postition++) { + assertEquals(2, page.getChannelCount()); + for (int k = 0; k < page.getChannelCount(); k++) { + Block block = page.getBlock(k).getLoadedBlock(); + + List expectedValues = expected.get(k); + + if (block instanceof IntArrayBlock) { + assertEquals(REAL.getObjectValue(SESSION, block, postition), expectedValues.get(rowCounter).floatValue()); + } + else { + assertEquals(DOUBLE.getObjectValue(SESSION, block, postition), expectedValues.get(rowCounter)); + } + totalValueCounter++; + } + rowCounter++; + } + } + page = reader.nextPage(); + } + log.info("Summary|rowCounter|" + rowCounter + "|totalValueCounter|" + totalValueCounter); + } +} diff --git a/lib/trino-parquet/src/test/resources/byte_stream_split_double_and_double.parquet b/lib/trino-parquet/src/test/resources/byte_stream_split_double_and_double.parquet new file mode 100644 index 000000000000..a7006bab5a92 Binary files /dev/null and b/lib/trino-parquet/src/test/resources/byte_stream_split_double_and_double.parquet differ diff --git a/lib/trino-parquet/src/test/resources/byte_stream_split_double_and_float.parquet b/lib/trino-parquet/src/test/resources/byte_stream_split_double_and_float.parquet new file mode 100644 index 000000000000..151d15074ba6 Binary files /dev/null and b/lib/trino-parquet/src/test/resources/byte_stream_split_double_and_float.parquet differ diff --git a/lib/trino-parquet/src/test/resources/byte_stream_split_float_and_double.parquet b/lib/trino-parquet/src/test/resources/byte_stream_split_float_and_double.parquet new file mode 100644 index 000000000000..0489235417d2 Binary files /dev/null and b/lib/trino-parquet/src/test/resources/byte_stream_split_float_and_double.parquet differ diff --git a/lib/trino-parquet/src/test/resources/byte_stream_split_float_and_float.parquet b/lib/trino-parquet/src/test/resources/byte_stream_split_float_and_float.parquet new file mode 100644 index 000000000000..46201dc8386f Binary files /dev/null and b/lib/trino-parquet/src/test/resources/byte_stream_split_float_and_float.parquet differ