Skip to content

Commit

Permalink
Fixing Review Comments
Browse files Browse the repository at this point in the history
- Unwrapped a redundant else in ParquetEncoding.java
- Replaced regular switch with enhanced switch case in ParquetTypeUtils.java
- Handle BYTE_STREAM_SPLIT in ValueDecoders
- Reorder methods in AbstractTestParquetReader.java
- Remove debugging statements in AbstractTestParquetReader.java
- Replace printStackTrace appropriately in AbstractTestParquetReader.java
- Introduce Builder in  Custom Parquet Writer so that we could enable withByteStreamSplitEncoding.
  • Loading branch information
manupatteri committed Jan 12, 2023
1 parent 7a8bec0 commit cc9e664
Show file tree
Hide file tree
Showing 5 changed files with 80 additions and 54 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -190,9 +190,7 @@ public ValuesReader getValuesReader(ColumnDescriptor descriptor, ValuesType valu
if (typeName == FLOAT) {
return new ByteStreamSplitValuesReaderForFloat();
}
else {
return new ByteStreamSplitValuesReaderForDouble();
}
return new ByteStreamSplitValuesReaderForDouble();
}
};

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -159,27 +159,18 @@ private static int getPathIndex(List<PrimitiveColumnIO> columns, List<String> pa
@SuppressWarnings("deprecation")
public static ParquetEncoding getParquetEncoding(Encoding encoding)
{
switch (encoding) {
case PLAIN:
return ParquetEncoding.PLAIN;
case RLE:
return ParquetEncoding.RLE;
case BYTE_STREAM_SPLIT:
return ParquetEncoding.BYTE_STREAM_SPLIT;
case BIT_PACKED:
return ParquetEncoding.BIT_PACKED;
case PLAIN_DICTIONARY:
return ParquetEncoding.PLAIN_DICTIONARY;
case DELTA_BINARY_PACKED:
return ParquetEncoding.DELTA_BINARY_PACKED;
case DELTA_LENGTH_BYTE_ARRAY:
return ParquetEncoding.DELTA_LENGTH_BYTE_ARRAY;
case DELTA_BYTE_ARRAY:
return ParquetEncoding.DELTA_BYTE_ARRAY;
case RLE_DICTIONARY:
return ParquetEncoding.RLE_DICTIONARY;
}
throw new ParquetDecodingException("Unsupported Parquet encoding: " + encoding);
return switch (encoding) {
case PLAIN -> ParquetEncoding.PLAIN;
case RLE -> ParquetEncoding.RLE;
case BYTE_STREAM_SPLIT -> ParquetEncoding.BYTE_STREAM_SPLIT;
case BIT_PACKED -> ParquetEncoding.BIT_PACKED;
case PLAIN_DICTIONARY -> ParquetEncoding.PLAIN_DICTIONARY;
case DELTA_BINARY_PACKED -> ParquetEncoding.DELTA_BINARY_PACKED;
case DELTA_LENGTH_BYTE_ARRAY -> ParquetEncoding.DELTA_LENGTH_BYTE_ARRAY;
case DELTA_BYTE_ARRAY -> ParquetEncoding.DELTA_BYTE_ARRAY;
case RLE_DICTIONARY -> ParquetEncoding.RLE_DICTIONARY;
default -> throw new ParquetDecodingException("Unsupported Parquet encoding: " + encoding);
};
}

public static org.apache.parquet.schema.Type getParquetTypeByName(String columnName, GroupType groupType)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import org.apache.parquet.column.values.ValuesReader;

import static com.google.common.base.Preconditions.checkArgument;
import static io.trino.parquet.ParquetEncoding.BYTE_STREAM_SPLIT;
import static io.trino.parquet.ParquetEncoding.PLAIN;
import static io.trino.parquet.ValuesType.VALUES;
import static io.trino.parquet.reader.decoders.ApacheParquetValueDecoders.BinaryApacheParquetValueDecoder;
Expand Down Expand Up @@ -72,6 +73,9 @@ public static ValueDecoder<long[]> getDoubleDecoder(ParquetEncoding encoding, Pr
if (PLAIN.equals(encoding)) {
return new LongPlainValueDecoder();
}
else if (BYTE_STREAM_SPLIT.equals(encoding)) {
return new LongApacheParquetValueDecoder(getApacheParquetReader(encoding, field));
}
throw wrongEncoding(encoding, field);
}

Expand All @@ -80,6 +84,9 @@ public static ValueDecoder<int[]> getRealDecoder(ParquetEncoding encoding, Primi
if (PLAIN.equals(encoding)) {
return new IntPlainValueDecoder();
}
else if (BYTE_STREAM_SPLIT.equals(encoding)) {
return new IntApacheParquetValueDecoder(getApacheParquetReader(encoding, field));
}
throw wrongEncoding(encoding, field);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@
import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
import org.apache.hadoop.hive.serde2.objectinspector.primitive.JavaHiveDecimalObjectInspector;
import org.apache.hadoop.hive.serde2.typeinfo.DecimalTypeInfo;
import org.apache.parquet.hadoop.metadata.CompressionCodecName;
import org.apache.parquet.hadoop.ParquetWriter;
import org.apache.parquet.schema.MessageType;
import org.joda.time.DateTimeZone;
import org.testng.annotations.BeforeClass;
Expand All @@ -50,6 +50,7 @@

import java.io.File;
import java.io.IOException;
import java.io.UncheckedIOException;
import java.math.BigInteger;
import java.util.ArrayList;
import java.util.Arrays;
Expand Down Expand Up @@ -1901,6 +1902,33 @@ public void testMapMaxReadBytes()
tester.testMaxReadBytes(getStandardMapObjectInspector(javaStringObjectInspector, javaLongObjectInspector), values, values, mapType(VARCHAR, BIGINT), maxReadBlockSize);
}

@Test
public void testDoubleByteStreamSplitSingleColumn()
throws IOException
{
List<String> columnNames = ImmutableList.of("byte_stream_split_sample");
MessageType parquetSchema = parseMessageType("message byte_stream_split_test { optional double byte_stream_split_sample; }");
try (ParquetTester.TempFile tempFile = new ParquetTester.TempFile("test", "parquet")) {
ContiguousSet<Long> epochMillisValues = ContiguousSet.create(Range.closedOpen((long) -1_000, (long) 1_000), DiscreteDomain.longs());
List<List<String>> stringListOfLists = epochMillisValues.stream().map(n -> Arrays.asList(String.valueOf(n * 1.0))).collect(Collectors.toList());
List<Double> doubleList = epochMillisValues.stream().map(n -> n * 1.0).collect(Collectors.toList());

ParquetWriter writer = getParquetWriter(parquetSchema, tempFile.getFile());
stringListOfLists.stream().forEach(element -> {
try {
writer.write(element);
}
catch (IOException e) {
throw new UncheckedIOException(e);
}
});
writer.close();
ConnectorSession session = getHiveSession(new HiveConfig());
TestUtils.testReadingAs(DOUBLE, session, tempFile, columnNames, doubleList);
}
}


private static <T> Iterable<T> repeatEach(int n, Iterable<T> iterable)
{
return () -> new AbstractIterator<>()
Expand Down Expand Up @@ -2182,37 +2210,10 @@ private static SqlDate intToSqlDate(Integer input)
return new SqlDate(input);
}

private static CustomParquetWriter getParquetWriter(MessageType schema, File outputFile)
private static ParquetWriter getParquetWriter(MessageType schema, File outputFile)
throws IOException
{
Path path = new Path(outputFile.toURI().toString());
return new CustomParquetWriter(path, schema, false, CompressionCodecName.SNAPPY);
}

@Test
public void testDoubleByteStreamSplitSingleColumn()
throws IOException
{
List<String> columnNames = ImmutableList.of("byte_stream_split_sample");
MessageType parquetSchema = parseMessageType("message byte_stream_split_test { optional double byte_stream_split_sample; }");
try (ParquetTester.TempFile tempFile = new ParquetTester.TempFile("test", "parquet")) {
ContiguousSet<Long> epochMillisValues = ContiguousSet.create(Range.closedOpen((long) -1_000, (long) 1_000), DiscreteDomain.longs());
epochMillisValues.stream().forEach(System.out::println);
List<List<String>> stringListOfLists = epochMillisValues.stream().map(n -> Arrays.asList(String.valueOf(n * 1.0))).collect(Collectors.toList());
List<Double> doubleList = epochMillisValues.stream().map(n -> n * 1.0).collect(Collectors.toList());

CustomParquetWriter writer = getParquetWriter(parquetSchema, tempFile.getFile());
stringListOfLists.stream().forEach(element -> {
try {
writer.write(element);
}
catch (IOException e) {
e.printStackTrace();
}
});
writer.close();
ConnectorSession session = getHiveSession(new HiveConfig());
TestUtils.testReadingAs(DOUBLE, session, tempFile, columnNames, doubleList);
}
return new CustomParquetWriter.Builder(path).withSchema(schema).withByteStreamSplitEncoding(true).build();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,10 @@
*/
package io.trino.plugin.hive.parquet;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.parquet.hadoop.ParquetWriter;
import org.apache.parquet.hadoop.api.WriteSupport;
import org.apache.parquet.hadoop.metadata.CompressionCodecName;
import org.apache.parquet.schema.MessageType;

Expand All @@ -32,4 +35,30 @@ public CustomParquetWriter(
{
super(file, new CustomWriteSupport(schema), codecName, DEFAULT_BLOCK_SIZE, DEFAULT_PAGE_SIZE, enableDictionary, false);
}
public static class Builder extends ParquetWriter.Builder<List<String>, Builder>
{

MessageType schema;
protected Builder(Path path)
{
super(path);
}

@Override
protected Builder self()
{
return this;
}


public Builder withSchema(MessageType schema) {
this.schema = schema;
return this;
}
@Override
protected WriteSupport<List<String>> getWriteSupport(Configuration conf)
{
return new CustomWriteSupport(schema);
}
}
}

0 comments on commit cc9e664

Please sign in to comment.