diff --git a/lib/trino-parquet/src/main/java/io/trino/parquet/ParquetEncoding.java b/lib/trino-parquet/src/main/java/io/trino/parquet/ParquetEncoding.java index b547628e41e0..09d450a0a90f 100644 --- a/lib/trino-parquet/src/main/java/io/trino/parquet/ParquetEncoding.java +++ b/lib/trino-parquet/src/main/java/io/trino/parquet/ParquetEncoding.java @@ -23,6 +23,8 @@ import org.apache.parquet.column.ColumnDescriptor; import org.apache.parquet.column.values.ValuesReader; import org.apache.parquet.column.values.bitpacking.ByteBitPackingValuesReader; +import org.apache.parquet.column.values.bytestreamsplit.ByteStreamSplitValuesReaderForDouble; +import org.apache.parquet.column.values.bytestreamsplit.ByteStreamSplitValuesReaderForFloat; import org.apache.parquet.column.values.delta.DeltaBinaryPackingValuesReader; import org.apache.parquet.column.values.deltalengthbytearray.DeltaLengthByteArrayValuesReader; import org.apache.parquet.column.values.deltastrings.DeltaByteArrayReader; @@ -44,7 +46,9 @@ import static org.apache.parquet.column.values.bitpacking.Packer.BIG_ENDIAN; import static org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName.BINARY; import static org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName.BOOLEAN; +import static org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName.DOUBLE; import static org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName.FIXED_LEN_BYTE_ARRAY; +import static org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName.FLOAT; import static org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName.INT32; import static org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName.INT64; @@ -149,6 +153,20 @@ public Dictionary initDictionary(ColumnDescriptor descriptor, DictionaryPage dic { return PLAIN.initDictionary(descriptor, dictionaryPage); } + }, + + BYTE_STREAM_SPLIT { + @Override + public ValuesReader getValuesReader(ColumnDescriptor descriptor, ValuesType valuesType) + { + PrimitiveTypeName typeName = descriptor.getPrimitiveType().getPrimitiveTypeName(); + checkArgument(typeName == FLOAT || typeName == DOUBLE, "Encoding BYTE_STREAM_SPLIT is only " + + "supported for type FLOAT and DOUBLE"); + if (typeName == FLOAT) { + return new ByteStreamSplitValuesReaderForFloat(); + } + return new ByteStreamSplitValuesReaderForDouble(); + } }; static final int INT96_TYPE_LENGTH = 12; diff --git a/lib/trino-parquet/src/main/java/io/trino/parquet/ParquetTypeUtils.java b/lib/trino-parquet/src/main/java/io/trino/parquet/ParquetTypeUtils.java index 7232300814ba..e6d86e174a42 100644 --- a/lib/trino-parquet/src/main/java/io/trino/parquet/ParquetTypeUtils.java +++ b/lib/trino-parquet/src/main/java/io/trino/parquet/ParquetTypeUtils.java @@ -27,7 +27,6 @@ import org.apache.parquet.io.ColumnIOFactory; import org.apache.parquet.io.GroupColumnIO; import org.apache.parquet.io.MessageColumnIO; -import org.apache.parquet.io.ParquetDecodingException; import org.apache.parquet.io.PrimitiveColumnIO; import org.apache.parquet.schema.GroupType; import org.apache.parquet.schema.LogicalTypeAnnotation.DecimalLogicalTypeAnnotation; @@ -156,28 +155,17 @@ private static int getPathIndex(List columns, List pa @SuppressWarnings("deprecation") public static ParquetEncoding getParquetEncoding(Encoding encoding) { - switch (encoding) { - case PLAIN: - return ParquetEncoding.PLAIN; - case RLE: - return ParquetEncoding.RLE; - case BYTE_STREAM_SPLIT: - // TODO: https://github.com/trinodb/trino/issues/8357 - throw new ParquetDecodingException("Unsupported Parquet encoding: " + encoding); - case BIT_PACKED: - return ParquetEncoding.BIT_PACKED; - case PLAIN_DICTIONARY: - return ParquetEncoding.PLAIN_DICTIONARY; - case DELTA_BINARY_PACKED: - return ParquetEncoding.DELTA_BINARY_PACKED; - case DELTA_LENGTH_BYTE_ARRAY: - return ParquetEncoding.DELTA_LENGTH_BYTE_ARRAY; - case DELTA_BYTE_ARRAY: - return ParquetEncoding.DELTA_BYTE_ARRAY; - case RLE_DICTIONARY: - return ParquetEncoding.RLE_DICTIONARY; - } - throw new ParquetDecodingException("Unsupported Parquet encoding: " + encoding); + return switch (encoding) { + case PLAIN -> ParquetEncoding.PLAIN; + case RLE -> ParquetEncoding.RLE; + case BYTE_STREAM_SPLIT -> ParquetEncoding.BYTE_STREAM_SPLIT; + case BIT_PACKED -> ParquetEncoding.BIT_PACKED; + case PLAIN_DICTIONARY -> ParquetEncoding.PLAIN_DICTIONARY; + case DELTA_BINARY_PACKED -> ParquetEncoding.DELTA_BINARY_PACKED; + case DELTA_LENGTH_BYTE_ARRAY -> ParquetEncoding.DELTA_LENGTH_BYTE_ARRAY; + case DELTA_BYTE_ARRAY -> ParquetEncoding.DELTA_BYTE_ARRAY; + case RLE_DICTIONARY -> ParquetEncoding.RLE_DICTIONARY; + }; } public static org.apache.parquet.schema.Type getParquetTypeByName(String columnName, GroupType groupType) diff --git a/lib/trino-parquet/src/main/java/io/trino/parquet/reader/decoders/ApacheParquetValueDecoders.java b/lib/trino-parquet/src/main/java/io/trino/parquet/reader/decoders/ApacheParquetValueDecoders.java index abf647b9f719..417b1422f6a3 100644 --- a/lib/trino-parquet/src/main/java/io/trino/parquet/reader/decoders/ApacheParquetValueDecoders.java +++ b/lib/trino-parquet/src/main/java/io/trino/parquet/reader/decoders/ApacheParquetValueDecoders.java @@ -22,6 +22,8 @@ import java.nio.ByteBuffer; import static io.trino.parquet.ParquetReaderUtils.castToByte; +import static java.lang.Double.doubleToLongBits; +import static java.lang.Float.floatToIntBits; import static java.util.Objects.requireNonNull; /** @@ -70,4 +72,78 @@ public void skip(int n) delegate.skip(n); } } + + public static final class DoubleApacheParquetValueDecoder + implements ValueDecoder + { + private final ValuesReader delegate; + + public DoubleApacheParquetValueDecoder(ValuesReader delegate) + { + this.delegate = requireNonNull(delegate, "delegate is null"); + } + + @Override + public void init(SimpleSliceInputStream input) + { + initialize(input, delegate, Double.BYTES); + } + + @Override + public void read(long[] values, int offset, int length) + { + for (int i = offset; i < offset + length; i++) { + values[i] = doubleToLongBits(delegate.readDouble()); + } + } + + @Override + public void skip(int n) + { + delegate.skip(n); + } + } + + public static final class FloatApacheParquetValueDecoder + implements ValueDecoder + { + private final ValuesReader delegate; + + public FloatApacheParquetValueDecoder(ValuesReader delegate) + { + this.delegate = requireNonNull(delegate, "delegate is null"); + } + + @Override + public void init(SimpleSliceInputStream input) + { + initialize(input, delegate, Float.BYTES); + } + + @Override + public void read(int[] values, int offset, int length) + { + for (int i = offset; i < offset + length; i++) { + values[i] = floatToIntBits(delegate.readFloat()); + } + } + + @Override + public void skip(int n) + { + delegate.skip(n); + } + } + + private static void initialize(SimpleSliceInputStream input, ValuesReader reader, int elementSizeInBytes) + { + byte[] buffer = input.readBytes(); + try { + int valueCount = buffer.length / elementSizeInBytes; + reader.initFromPage(valueCount, ByteBufferInputStream.wrap(ByteBuffer.wrap(buffer, 0, buffer.length))); + } + catch (IOException e) { + throw new UncheckedIOException(e); + } + } } diff --git a/lib/trino-parquet/src/main/java/io/trino/parquet/reader/decoders/ValueDecoders.java b/lib/trino-parquet/src/main/java/io/trino/parquet/reader/decoders/ValueDecoders.java index 73d8b3c14502..759976a70f34 100644 --- a/lib/trino-parquet/src/main/java/io/trino/parquet/reader/decoders/ValueDecoders.java +++ b/lib/trino-parquet/src/main/java/io/trino/parquet/reader/decoders/ValueDecoders.java @@ -37,6 +37,7 @@ import org.joda.time.DateTimeZone; import static com.google.common.base.Preconditions.checkArgument; +import static io.trino.parquet.ParquetEncoding.BYTE_STREAM_SPLIT; import static io.trino.parquet.ParquetEncoding.DELTA_BYTE_ARRAY; import static io.trino.parquet.ParquetEncoding.PLAIN; import static io.trino.parquet.ParquetReaderUtils.toByteExact; @@ -45,6 +46,8 @@ import static io.trino.parquet.ParquetTypeUtils.getShortDecimalValue; import static io.trino.parquet.ValuesType.VALUES; import static io.trino.parquet.reader.decoders.ApacheParquetValueDecoders.BooleanApacheParquetValueDecoder; +import static io.trino.parquet.reader.decoders.ApacheParquetValueDecoders.DoubleApacheParquetValueDecoder; +import static io.trino.parquet.reader.decoders.ApacheParquetValueDecoders.FloatApacheParquetValueDecoder; import static io.trino.parquet.reader.decoders.DeltaBinaryPackedDecoders.DeltaBinaryPackedByteDecoder; import static io.trino.parquet.reader.decoders.DeltaBinaryPackedDecoders.DeltaBinaryPackedIntDecoder; import static io.trino.parquet.reader.decoders.DeltaBinaryPackedDecoders.DeltaBinaryPackedLongDecoder; @@ -112,6 +115,9 @@ public ValueDecoder getDoubleDecoder(ParquetEncoding encoding) if (PLAIN.equals(encoding)) { return new LongPlainValueDecoder(); } + else if (BYTE_STREAM_SPLIT.equals(encoding)) { + return new DoubleApacheParquetValueDecoder(getApacheParquetReader(encoding)); + } throw wrongEncoding(encoding); } @@ -120,6 +126,9 @@ public ValueDecoder getRealDecoder(ParquetEncoding encoding) if (PLAIN.equals(encoding)) { return new IntPlainValueDecoder(); } + else if (BYTE_STREAM_SPLIT.equals(encoding)) { + return new FloatApacheParquetValueDecoder(getApacheParquetReader(encoding)); + } throw wrongEncoding(encoding); } diff --git a/lib/trino-parquet/src/test/java/io/trino/parquet/reader/TestByteStreamSplitEncoding.java b/lib/trino-parquet/src/test/java/io/trino/parquet/reader/TestByteStreamSplitEncoding.java new file mode 100644 index 000000000000..d7b293ada3b1 --- /dev/null +++ b/lib/trino-parquet/src/test/java/io/trino/parquet/reader/TestByteStreamSplitEncoding.java @@ -0,0 +1,106 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.trino.parquet.reader; + +import com.google.common.collect.ImmutableList; +import com.google.common.io.Resources; +import io.trino.parquet.ParquetDataSource; +import io.trino.parquet.ParquetReaderOptions; +import io.trino.spi.Page; +import io.trino.spi.block.Block; +import io.trino.spi.block.IntArrayBlock; +import io.trino.spi.type.Type; +import org.apache.parquet.hadoop.metadata.ParquetMetadata; +import org.testng.annotations.Test; + +import java.io.File; +import java.io.IOException; +import java.net.URISyntaxException; +import java.util.List; +import java.util.Optional; +import java.util.stream.IntStream; + +import static io.trino.memory.context.AggregatedMemoryContext.newSimpleAggregatedMemoryContext; +import static io.trino.parquet.ParquetTestUtils.createParquetReader; +import static io.trino.spi.type.DoubleType.DOUBLE; +import static io.trino.spi.type.RealType.REAL; +import static io.trino.testing.TestingConnectorSession.SESSION; +import static org.assertj.core.api.Assertions.assertThat; +import static org.testng.Assert.assertEquals; + +public class TestByteStreamSplitEncoding +{ + @Test + public void testReadFloatDouble() + throws URISyntaxException, IOException + { + List columnNames = ImmutableList.of("columnA", "columnB"); + List types = ImmutableList.of(REAL, DOUBLE); + + ParquetDataSource dataSource = new FileParquetDataSource( + new File(Resources.getResource("byte_stream_split_float_and_double.parquet").toURI()), + new ParquetReaderOptions()); + ParquetMetadata parquetMetadata = MetadataReader.readFooter(dataSource, Optional.empty()); + ParquetReader reader = createParquetReader(dataSource, parquetMetadata, newSimpleAggregatedMemoryContext(), types, columnNames); + + readAndCompare(reader, getExpectedValues()); + } + + private static List> getExpectedValues() + { + ImmutableList.Builder floatsBuilder = ImmutableList.builder(); + for (int i = 0; i < 10; i++) { + IntStream.range(0, 10) + .mapToDouble(j -> j * 1.3) + .forEach(floatsBuilder::add); + } + + ImmutableList.Builder doublesBuilder = ImmutableList.builder(); + for (int i = 0; i < 10; i++) { + IntStream.range(0, 10) + .mapToDouble(j -> j * 1.5) + .forEach(doublesBuilder::add); + } + return ImmutableList.of(floatsBuilder.build(), doublesBuilder.build()); + } + + private static void readAndCompare(ParquetReader reader, List> expected) + throws IOException + { + int rowCount = 0; + int pageCount = 0; + Page page = reader.nextPage(); + while (page != null) { + assertThat(page.getChannelCount()).isEqualTo(2); + if (pageCount % 2 == 1) { // Skip loading every alternative page + for (int channel = 0; channel < page.getChannelCount(); channel++) { + Block block = page.getBlock(channel).getLoadedBlock(); + List expectedValues = expected.get(channel); + for (int postition = 0; postition < block.getPositionCount(); postition++) { + if (block instanceof IntArrayBlock) { + assertEquals(REAL.getObjectValue(SESSION, block, postition), expectedValues.get(rowCount + postition).floatValue()); + } + else { + assertEquals(DOUBLE.getObjectValue(SESSION, block, postition), expectedValues.get(rowCount + postition)); + } + } + } + } + rowCount += page.getPositionCount(); + pageCount++; + page = reader.nextPage(); + } + assertThat(rowCount).isEqualTo(100); + } +} diff --git a/lib/trino-parquet/src/test/java/io/trino/parquet/reader/decoders/TestDoubleValueDecoders.java b/lib/trino-parquet/src/test/java/io/trino/parquet/reader/decoders/TestDoubleValueDecoders.java index bd709e790738..50d972389776 100644 --- a/lib/trino-parquet/src/test/java/io/trino/parquet/reader/decoders/TestDoubleValueDecoders.java +++ b/lib/trino-parquet/src/test/java/io/trino/parquet/reader/decoders/TestDoubleValueDecoders.java @@ -15,9 +15,7 @@ import com.google.common.collect.ImmutableList; import io.trino.parquet.PrimitiveField; -import io.trino.parquet.reader.SimpleSliceInputStream; import io.trino.spi.type.DoubleType; -import org.apache.parquet.column.values.ValuesReader; import org.apache.parquet.column.values.ValuesWriter; import java.util.OptionalInt; @@ -25,8 +23,8 @@ import static io.trino.parquet.ParquetEncoding.PLAIN; import static io.trino.parquet.ParquetEncoding.RLE_DICTIONARY; +import static io.trino.parquet.reader.decoders.ApacheParquetValueDecoders.DoubleApacheParquetValueDecoder; import static io.trino.parquet.reader.flat.LongColumnAdapter.LONG_ADAPTER; -import static java.util.Objects.requireNonNull; import static org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName.DOUBLE; import static org.assertj.core.api.Assertions.assertThat; @@ -108,35 +106,4 @@ private static DataBuffer writeDoubles(ValuesWriter valuesWriter, double[] input return getWrittenBuffer(valuesWriter); } - - private static final class DoubleApacheParquetValueDecoder - implements ValueDecoder - { - private final ValuesReader delegate; - - public DoubleApacheParquetValueDecoder(ValuesReader delegate) - { - this.delegate = requireNonNull(delegate, "delegate is null"); - } - - @Override - public void init(SimpleSliceInputStream input) - { - initialize(input, delegate); - } - - @Override - public void read(long[] values, int offset, int length) - { - for (int i = offset; i < offset + length; i++) { - values[i] = Double.doubleToLongBits(delegate.readDouble()); - } - } - - @Override - public void skip(int n) - { - delegate.skip(n); - } - } } diff --git a/lib/trino-parquet/src/test/java/io/trino/parquet/reader/decoders/TestFloatValueDecoders.java b/lib/trino-parquet/src/test/java/io/trino/parquet/reader/decoders/TestFloatValueDecoders.java index 40b08d55cb3c..65a77d018fa3 100644 --- a/lib/trino-parquet/src/test/java/io/trino/parquet/reader/decoders/TestFloatValueDecoders.java +++ b/lib/trino-parquet/src/test/java/io/trino/parquet/reader/decoders/TestFloatValueDecoders.java @@ -15,8 +15,6 @@ import com.google.common.collect.ImmutableList; import io.trino.parquet.PrimitiveField; -import io.trino.parquet.reader.SimpleSliceInputStream; -import org.apache.parquet.column.values.ValuesReader; import org.apache.parquet.column.values.ValuesWriter; import java.util.OptionalInt; @@ -24,9 +22,9 @@ import static io.trino.parquet.ParquetEncoding.PLAIN; import static io.trino.parquet.ParquetEncoding.RLE_DICTIONARY; +import static io.trino.parquet.reader.decoders.ApacheParquetValueDecoders.FloatApacheParquetValueDecoder; import static io.trino.parquet.reader.flat.IntColumnAdapter.INT_ADAPTER; import static io.trino.spi.type.RealType.REAL; -import static java.util.Objects.requireNonNull; import static org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName.FLOAT; import static org.assertj.core.api.Assertions.assertThat; @@ -108,35 +106,4 @@ private static DataBuffer writeFloats(ValuesWriter valuesWriter, float[] input) return getWrittenBuffer(valuesWriter); } - - private static final class FloatApacheParquetValueDecoder - implements ValueDecoder - { - private final ValuesReader delegate; - - public FloatApacheParquetValueDecoder(ValuesReader delegate) - { - this.delegate = requireNonNull(delegate, "delegate is null"); - } - - @Override - public void init(SimpleSliceInputStream input) - { - initialize(input, delegate); - } - - @Override - public void read(int[] values, int offset, int length) - { - for (int i = offset; i < offset + length; i++) { - values[i] = Float.floatToIntBits(delegate.readFloat()); - } - } - - @Override - public void skip(int n) - { - delegate.skip(n); - } - } } diff --git a/lib/trino-parquet/src/test/resources/byte_stream_split_float_and_double.parquet b/lib/trino-parquet/src/test/resources/byte_stream_split_float_and_double.parquet new file mode 100644 index 000000000000..ded660b3f503 Binary files /dev/null and b/lib/trino-parquet/src/test/resources/byte_stream_split_float_and_double.parquet differ