Skip to content

Commit

Permalink
Fix deprecated Parquet API usage
Browse files Browse the repository at this point in the history
Update usage of the following APIs in trino-parquet
- Binary
- ColumnChunkMetaData
- ColumnDescriptor
- Statistics
  • Loading branch information
nevillelyh committed Aug 19, 2022
1 parent 508d5e2 commit 5a5e2d3
Show file tree
Hide file tree
Showing 5 changed files with 67 additions and 51 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ public void testInvalidBinaryLength()
{
try {
byte[] invalidLengthBinaryTimestamp = new byte[8];
decodeInt96Timestamp(Binary.fromByteArray(invalidLengthBinaryTimestamp));
decodeInt96Timestamp(Binary.fromConstantByteArray(invalidLengthBinaryTimestamp));
}
catch (TrinoException e) {
assertEquals(e.getErrorCode(), NOT_SUPPORTED.toErrorCode());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,6 @@
import org.apache.parquet.internal.column.columnindex.BoundaryOrder;
import org.apache.parquet.internal.column.columnindex.ColumnIndex;
import org.apache.parquet.internal.column.columnindex.ColumnIndexBuilder;
import org.apache.parquet.io.api.Binary;
import org.apache.parquet.schema.LogicalTypeAnnotation;
import org.apache.parquet.schema.LogicalTypeAnnotation.TimeUnit;
import org.apache.parquet.schema.PrimitiveType;
Expand Down Expand Up @@ -100,7 +99,6 @@
import static java.util.Collections.singletonList;
import static java.util.Collections.singletonMap;
import static java.util.concurrent.TimeUnit.MILLISECONDS;
import static org.apache.parquet.column.statistics.Statistics.getStatsBasedOnType;
import static org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName;
import static org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName.BINARY;
import static org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName.FIXED_LEN_BYTE_ARRAY;
Expand Down Expand Up @@ -136,9 +134,11 @@ public void testBoolean()

private static BooleanStatistics booleanColumnStats(boolean minimum, boolean maximum)
{
BooleanStatistics statistics = new BooleanStatistics();
statistics.setMinMax(minimum, maximum);
return statistics;
return (BooleanStatistics) Statistics.getBuilderForReading(Types.optional(PrimitiveTypeName.BOOLEAN).named("BooleanColumn"))
.withMin(BytesUtils.booleanToBytes(minimum))
.withMax(BytesUtils.booleanToBytes(maximum))
.withNumNulls(0)
.build();
}

@Test
Expand Down Expand Up @@ -350,9 +350,11 @@ public void testString()

private static BinaryStatistics stringColumnStats(String minimum, String maximum)
{
BinaryStatistics statistics = new BinaryStatistics();
statistics.setMinMax(Binary.fromString(minimum), Binary.fromString(maximum));
return statistics;
return (BinaryStatistics) Statistics.getBuilderForReading(Types.optional(BINARY).named("StringColumn"))
.withMin(minimum.getBytes(UTF_8))
.withMax(maximum.getBytes(UTF_8))
.withNumNulls(0)
.build();
}

@Test
Expand Down Expand Up @@ -514,9 +516,11 @@ private static long toEpochWithPrecision(LocalDateTime time, int precision)

private static BinaryStatistics timestampColumnStats(LocalDateTime minimum, LocalDateTime maximum)
{
BinaryStatistics statistics = new BinaryStatistics();
statistics.setMinMax(Binary.fromConstantByteArray(toParquetEncoding(minimum)), Binary.fromConstantByteArray(toParquetEncoding(maximum)));
return statistics;
return (BinaryStatistics) Statistics.getBuilderForReading(Types.optional(BINARY).named("TimestampColumn"))
.withMin(toParquetEncoding(minimum))
.withMax(toParquetEncoding(maximum))
.withNumNulls(0)
.build();
}

private static byte[] toParquetEncoding(LocalDateTime timestamp)
Expand Down Expand Up @@ -544,9 +548,12 @@ public void testVarcharMatchesWithStatistics()
RichColumnDescriptor column = new RichColumnDescriptor(columnDescriptor, new PrimitiveType(OPTIONAL, BINARY, "Test column"));
TupleDomain<ColumnDescriptor> effectivePredicate = getEffectivePredicate(column, createVarcharType(255), utf8Slice(value));
TupleDomainParquetPredicate parquetPredicate = new TupleDomainParquetPredicate(effectivePredicate, singletonList(column), UTC);
Statistics<?> stats = getStatsBasedOnType(column.getPrimitiveType().getPrimitiveTypeName());
stats.setNumNulls(1L);
stats.setMinMaxFromBytes(value.getBytes(UTF_8), value.getBytes(UTF_8));
PrimitiveType type = column.getPrimitiveType();
Statistics<?> stats = Statistics.getBuilderForReading(type)
.withMin(value.getBytes(UTF_8))
.withMax(value.getBytes(UTF_8))
.withNumNulls(1L)
.build();
assertTrue(parquetPredicate.matches(2, ImmutableMap.of(column, stats), ID));
}

Expand Down Expand Up @@ -582,7 +589,7 @@ public void testBigintMatchesWithStatistics()
throws ParquetCorruptionException
{
RichColumnDescriptor column = new RichColumnDescriptor(
new ColumnDescriptor(new String[] {"path"}, INT64, 0, 0),
new ColumnDescriptor(new String[] {"path"}, Types.optional(INT64).named("Test column"), 0, 0),
new PrimitiveType(OPTIONAL, INT64, "Test column"));
TupleDomain<ColumnDescriptor> effectivePredicate = TupleDomain.withColumnDomains(ImmutableMap.of(
column,
Expand All @@ -597,7 +604,7 @@ public void testBigintMatchesWithStatistics()
@Test
public void testVarcharMatchesWithDictionaryDescriptor()
{
ColumnDescriptor columnDescriptor = new ColumnDescriptor(new String[] {"path"}, BINARY, 0, 0);
ColumnDescriptor columnDescriptor = new ColumnDescriptor(new String[] {"path"}, Types.optional(BINARY).named("Test column"), 0, 0);
RichColumnDescriptor column = new RichColumnDescriptor(columnDescriptor, new PrimitiveType(OPTIONAL, BINARY, "Test column"));
TupleDomain<ColumnDescriptor> effectivePredicate = getEffectivePredicate(column, createVarcharType(255), EMPTY_SLICE);
TupleDomainParquetPredicate parquetPredicate = new TupleDomainParquetPredicate(effectivePredicate, singletonList(column), UTC);
Expand All @@ -616,7 +623,7 @@ public void testColumnIndexWithNullPages()
asList(1L, 2L, 3L, 4L, 5L, 6L),
toByteBufferList(null, 2L, null, 4L, null, 9L),
toByteBufferList(null, 3L, null, 15L, null, 10L));
ColumnDescriptor columnDescriptor = new ColumnDescriptor(new String[] {"path"}, INT64, 0, 0);
ColumnDescriptor columnDescriptor = new ColumnDescriptor(new String[] {"path"}, Types.optional(INT64).named("Test column"), 0, 0);
RichColumnDescriptor column = new RichColumnDescriptor(columnDescriptor, new PrimitiveType(OPTIONAL, INT64, "Test column"));
assertThat(getDomain(BIGINT, 200, columnIndex, new ParquetDataSourceId("test"), column, UTC))
.isEqualTo(Domain.create(
Expand All @@ -634,7 +641,7 @@ private ColumnDescriptor createColumnDescriptor(PrimitiveTypeName typeName, Stri

private TupleDomain<ColumnDescriptor> getEffectivePredicate(RichColumnDescriptor column, VarcharType type, Slice value)
{
ColumnDescriptor predicateColumn = new ColumnDescriptor(column.getPath(), column.getPrimitiveType().getPrimitiveTypeName(), 0, 0);
ColumnDescriptor predicateColumn = new ColumnDescriptor(column.getPath(), column.getPrimitiveType(), 0, 0);
Domain predicateDomain = singleValue(type, value);
Map<ColumnDescriptor, Domain> predicateColumns = singletonMap(predicateColumn, predicateDomain);
return withColumnDomains(predicateColumns);
Expand All @@ -647,12 +654,11 @@ private static FloatStatistics floatColumnStats(float minimum, float maximum)

private static FloatStatistics floatColumnStats(float minimum, float maximum, boolean hasNulls)
{
FloatStatistics statistics = new FloatStatistics();
statistics.setMinMax(minimum, maximum);
if (hasNulls) {
statistics.setNumNulls(1);
}
return statistics;
return (FloatStatistics) Statistics.getBuilderForReading(Types.optional(FLOAT).named("FloatColumn"))
.withMin(BytesUtils.longToBytes(Float.floatToRawIntBits(minimum)))
.withMax(BytesUtils.longToBytes(Float.floatToRawIntBits(maximum)))
.withNumNulls(hasNulls ? 1 : 0)
.build();
}

private static DoubleStatistics doubleColumnStats(double minimum, double maximum)
Expand All @@ -662,26 +668,29 @@ private static DoubleStatistics doubleColumnStats(double minimum, double maximum

private static DoubleStatistics doubleColumnStats(double minimum, double maximum, boolean hasNulls)
{
DoubleStatistics statistics = new DoubleStatistics();
statistics.setMinMax(minimum, maximum);
if (hasNulls) {
statistics.setNumNulls(1);
}
return statistics;
return (DoubleStatistics) Statistics.getBuilderForReading(Types.optional(PrimitiveTypeName.DOUBLE).named("DoubleColumn"))
.withMin(BytesUtils.longToBytes(Double.doubleToLongBits(minimum)))
.withMax(BytesUtils.longToBytes(Double.doubleToLongBits(maximum)))
.withNumNulls(hasNulls ? 1 : 0)
.build();
}

private static IntStatistics intColumnStats(int minimum, int maximum)
{
IntStatistics statistics = new IntStatistics();
statistics.setMinMax(minimum, maximum);
return statistics;
return (IntStatistics) Statistics.getBuilderForReading(Types.optional(INT32).named("IntColumn"))
.withMin(BytesUtils.intToBytes(minimum))
.withMax(BytesUtils.intToBytes(maximum))
.withNumNulls(0)
.build();
}

private static LongStatistics longColumnStats(long minimum, long maximum)
{
LongStatistics statistics = new LongStatistics();
statistics.setMinMax(minimum, maximum);
return statistics;
return (LongStatistics) Statistics.getBuilderForReading(Types.optional(INT64).named("LongColumn"))
.withMin(BytesUtils.longToBytes(minimum))
.withMax(BytesUtils.longToBytes(maximum))
.withNumNulls(0)
.build();
}

private static BinaryStatistics binaryColumnStats(long minimum, long maximum)
Expand All @@ -691,18 +700,18 @@ private static BinaryStatistics binaryColumnStats(long minimum, long maximum)

private static BinaryStatistics binaryColumnStats(BigInteger minimum, BigInteger maximum)
{
BinaryStatistics statistics = new BinaryStatistics();
statistics.setMinMax(
Binary.fromConstantByteArray(minimum.toByteArray()),
Binary.fromConstantByteArray(maximum.toByteArray()));
return statistics;
return (BinaryStatistics) Statistics.getBuilderForReading(Types.optional(BINARY).named("BinaryColumn"))
.withMin(minimum.toByteArray())
.withMax(maximum.toByteArray())
.withNumNulls(0)
.build();
}

private static LongStatistics longOnlyNullsStats(long numNulls)
{
LongStatistics statistics = new LongStatistics();
statistics.setNumNulls(numNulls);
return statistics;
return (LongStatistics) Statistics.getBuilderForReading(Types.optional(INT64).named("LongColumn"))
.withNumNulls(numNulls)
.build();
}

private DictionaryDescriptor floatDictionaryDescriptor(float... values)
Expand All @@ -715,7 +724,7 @@ private DictionaryDescriptor floatDictionaryDescriptor(float... values)
}
}
return new DictionaryDescriptor(
new ColumnDescriptor(new String[] {"dummy"}, new PrimitiveType(OPTIONAL, PrimitiveType.PrimitiveTypeName.FLOAT, 0, ""), 1, 1),
new ColumnDescriptor(new String[] {"dummy"}, new PrimitiveType(OPTIONAL, FLOAT, 0, "FloatColumn"), 1, 1),
Optional.of(new DictionaryPage(Slices.wrappedBuffer(buf.toByteArray()), values.length, PLAIN_DICTIONARY)));
}

Expand All @@ -729,7 +738,7 @@ private DictionaryDescriptor doubleDictionaryDescriptor(double... values)
}
}
return new DictionaryDescriptor(
new ColumnDescriptor(new String[] {"dummy"}, new PrimitiveType(OPTIONAL, PrimitiveType.PrimitiveTypeName.DOUBLE, 0, ""), 1, 1),
new ColumnDescriptor(new String[] {"dummy"}, new PrimitiveType(OPTIONAL, PrimitiveTypeName.DOUBLE, 0, "DoubleColumn"), 1, 1),
Optional.of(new DictionaryPage(Slices.wrappedBuffer(buf.toByteArray()), values.length, PLAIN_DICTIONARY)));
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
import io.trino.parquet.ParquetEncoding;
import io.trino.spi.predicate.Domain;
import org.apache.parquet.column.ColumnDescriptor;
import org.apache.parquet.schema.Types;
import org.openjdk.jmh.annotations.Benchmark;
import org.openjdk.jmh.annotations.BenchmarkMode;
import org.openjdk.jmh.annotations.Fork;
Expand Down Expand Up @@ -91,7 +92,7 @@ private DictionaryDescriptor createBigintDictionary()
}

return new DictionaryDescriptor(
new ColumnDescriptor(new String[] {"path"}, INT64, 0, 0),
new ColumnDescriptor(new String[] {"path"}, Types.optional(INT64).named("Test column"), 0, 0),
Optional.of(
new DictionaryPage(
slice,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,10 @@
import org.apache.parquet.column.Encoding;
import org.apache.parquet.column.EncodingStats;
import org.apache.parquet.column.statistics.BinaryStatistics;
import org.apache.parquet.column.statistics.Statistics;
import org.apache.parquet.hadoop.metadata.ColumnChunkMetaData;
import org.apache.parquet.schema.PrimitiveType;
import org.apache.parquet.schema.Types;
import org.testng.annotations.Test;

import java.util.Set;
Expand Down Expand Up @@ -108,7 +111,9 @@ private ColumnChunkMetaData createColumnMetaDataV2(Encoding... dataEncodings)
.addDictEncoding(PLAIN)
.addDataEncodings(ImmutableSet.copyOf(dataEncodings)).build();

return ColumnChunkMetaData.get(fromDotString("column"), BINARY, UNCOMPRESSED, encodingStats, encodingStats.getDataEncodings(), new BinaryStatistics(), 0, 0, 1, 1, 1);
PrimitiveType type = Types.optional(BINARY).named("");
Statistics<?> stats = Statistics.createStats(type);
return ColumnChunkMetaData.get(fromDotString("column"), type, UNCOMPRESSED, encodingStats, encodingStats.getDataEncodings(), stats, 0, 0, 1, 1, 1);
}

@SuppressWarnings("deprecation")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,11 +22,12 @@
import io.trino.spi.block.Block;
import org.apache.parquet.bytes.HeapByteBufferAllocator;
import org.apache.parquet.column.ColumnDescriptor;
import org.apache.parquet.column.statistics.IntStatistics;
import org.apache.parquet.column.statistics.Statistics;
import org.apache.parquet.column.values.plain.PlainValuesWriter;
import org.apache.parquet.column.values.rle.RunLengthBitPackingHybridEncoder;
import org.apache.parquet.internal.filter2.columnindex.RowRanges;
import org.apache.parquet.schema.PrimitiveType;
import org.apache.parquet.schema.Types;
import org.testng.annotations.DataProvider;
import org.testng.annotations.Test;

Expand Down Expand Up @@ -248,7 +249,7 @@ private static DataPage createDataPage(RowRange pageRowRange)
Slices.wrappedBuffer(encodePlainValues(values)),
valueCount * 4,
OptionalLong.of(start),
new IntStatistics(),
Statistics.createStats(Types.optional(INT32).named("TestColumn")),
false);
}

Expand Down

0 comments on commit 5a5e2d3

Please sign in to comment.