From c8b861c7ca7a0ea13db4d5db97f06a3e23b8be62 Mon Sep 17 00:00:00 2001 From: Alex Date: Fri, 8 Oct 2021 14:58:18 -0400 Subject: [PATCH] Support writing Parquet encoding stats Encoding stats are used by the reader to check if the dictionary pages can be used for predicate pushdown. --- .../trino/parquet/writer/ParquetWriter.java | 1 + .../parquet/writer/PrimitiveColumnWriter.java | 22 +++++++++++++++-- .../plugin/hive/TestHiveConnectorTest.java | 24 +++++++++++++++++++ 3 files changed, 45 insertions(+), 2 deletions(-) diff --git a/lib/trino-parquet/src/main/java/io/trino/parquet/writer/ParquetWriter.java b/lib/trino-parquet/src/main/java/io/trino/parquet/writer/ParquetWriter.java index 53502e40f89e..101879833880 100644 --- a/lib/trino-parquet/src/main/java/io/trino/parquet/writer/ParquetWriter.java +++ b/lib/trino-parquet/src/main/java/io/trino/parquet/writer/ParquetWriter.java @@ -276,6 +276,7 @@ private List updateColumnMetadataOffset(List col for (ColumnMetaData column : columns) { ColumnMetaData columnMetaData = new ColumnMetaData(column.type, column.encodings, column.path_in_schema, column.codec, column.num_values, column.total_uncompressed_size, column.total_compressed_size, currentOffset); columnMetaData.setStatistics(column.getStatistics()); + columnMetaData.setEncoding_stats(column.getEncoding_stats()); builder.add(columnMetaData); currentOffset += column.getTotal_compressed_size(); } diff --git a/lib/trino-parquet/src/main/java/io/trino/parquet/writer/PrimitiveColumnWriter.java b/lib/trino-parquet/src/main/java/io/trino/parquet/writer/PrimitiveColumnWriter.java index cd4617bded44..f9abaaeaf3c0 100644 --- a/lib/trino-parquet/src/main/java/io/trino/parquet/writer/PrimitiveColumnWriter.java +++ b/lib/trino-parquet/src/main/java/io/trino/parquet/writer/PrimitiveColumnWriter.java @@ -27,6 +27,8 @@ import org.apache.parquet.column.statistics.Statistics; import org.apache.parquet.column.values.rle.RunLengthBitPackingHybridEncoder; import org.apache.parquet.format.ColumnMetaData; +import org.apache.parquet.format.PageEncodingStats; +import org.apache.parquet.format.PageType; import org.apache.parquet.format.converter.ParquetMetadataConverter; import org.apache.parquet.hadoop.metadata.CompressionCodecName; import org.openjdk.jol.info.ClassLayout; @@ -36,9 +38,11 @@ import java.io.ByteArrayOutputStream; import java.io.IOException; import java.util.ArrayList; +import java.util.HashMap; import java.util.HashSet; import java.util.Iterator; import java.util.List; +import java.util.Map; import java.util.Set; import static com.google.common.base.Preconditions.checkState; @@ -73,7 +77,9 @@ public class PrimitiveColumnWriter private int currentPageRowCount; // column meta data stats - private final Set encodings; + private final Set encodings = new HashSet<>(); + private final Map dataPagesWithEncoding = new HashMap<>(); + private final Map dictionaryPagesWithEncoding = new HashMap<>(); private long totalCompressedSize; private long totalUnCompressedSize; private long totalRows; @@ -96,7 +102,6 @@ public PrimitiveColumnWriter(ColumnDescriptor columnDescriptor, PrimitiveValueWr this.definitionLevelEncoder = requireNonNull(definitionLevelEncoder, "definitionLevelEncoder is null"); this.repetitionLevelEncoder = requireNonNull(repetitionLevelEncoder, "repetitionLevelEncoder is null"); this.primitiveValueWriter = requireNonNull(primitiveValueWriter, "primitiveValueWriter is null"); - this.encodings = new HashSet<>(); this.compressionCodec = requireNonNull(compressionCodecName, "compressionCodecName is null"); this.compressor = getCompressor(compressionCodecName); this.pageSizeThreshold = pageSizeThreshold; @@ -178,6 +183,14 @@ private ColumnMetaData getColumnMetaData() totalCompressedSize, -1); columnMetaData.setStatistics(ParquetMetadataConverter.toParquetStatistics(columnStatistics)); + ImmutableList.Builder pageEncodingStats = ImmutableList.builder(); + dataPagesWithEncoding.entrySet().stream() + .map(encodingAndCount -> new PageEncodingStats(PageType.DATA_PAGE_V2, encodingAndCount.getKey(), encodingAndCount.getValue())) + .forEach(pageEncodingStats::add); + dictionaryPagesWithEncoding.entrySet().stream() + .map(encodingAndCount -> new PageEncodingStats(PageType.DICTIONARY_PAGE, encodingAndCount.getKey(), encodingAndCount.getValue())) + .forEach(pageEncodingStats::add); + columnMetaData.setEncoding_stats(pageEncodingStats.build()); return columnMetaData; } @@ -236,6 +249,8 @@ private void flushCurrentPageToBuffer() List dataOutputs = outputDataStreams.build(); + dataPagesWithEncoding.merge(new ParquetMetadataConverter().getEncoding(primitiveValueWriter.getEncoding()), 1, Integer::sum); + // update total stats totalCompressedSize += pageHeader.size() + compressedSize; totalUnCompressedSize += pageHeader.size() + uncompressedSize; @@ -283,6 +298,7 @@ private List getDataStreams() dictPage.add(pageData); totalCompressedSize += pageHeader.size() + compressedSize; totalUnCompressedSize += pageHeader.size() + uncompressedSize; + dictionaryPagesWithEncoding.merge(new ParquetMetadataConverter().getEncoding(dictionaryPage.getEncoding()), 1, Integer::sum); primitiveValueWriter.resetDictionary(); } @@ -325,6 +341,8 @@ public void reset() totalUnCompressedSize = 0; totalRows = 0; encodings.clear(); + dataPagesWithEncoding.clear(); + dictionaryPagesWithEncoding.clear(); this.columnStatistics = Statistics.createStats(columnDescriptor.getPrimitiveType()); getDataStreamsCalled = false; diff --git a/plugin/trino-hive/src/test/java/io/trino/plugin/hive/TestHiveConnectorTest.java b/plugin/trino-hive/src/test/java/io/trino/plugin/hive/TestHiveConnectorTest.java index 4455acd5e714..b1b43d096945 100644 --- a/plugin/trino-hive/src/test/java/io/trino/plugin/hive/TestHiveConnectorTest.java +++ b/plugin/trino-hive/src/test/java/io/trino/plugin/hive/TestHiveConnectorTest.java @@ -4625,6 +4625,30 @@ public void testParquetLongDecimalPredicatePushdown() assertNoDataRead("SELECT * FROM test_parquet_long_decimal_predicate_pushdown WHERE decimal_t != DECIMAL '12345678900000000.345'"); } + @Test + public void testParquetDictionaryPredicatePushdown() + { + testParquetDictionaryPredicatePushdown(getSession()); + } + + @Test + public void testParquetDictionaryPredicatePushdownWithOptimizedWriter() + { + testParquetDictionaryPredicatePushdown( + Session.builder(getSession()) + .setCatalogSessionProperty("hive", "experimental_parquet_optimized_writer_enabled", "true") + .build()); + } + + private void testParquetDictionaryPredicatePushdown(Session session) + { + String tableName = "test_parquet_dictionary_pushdown"; + assertUpdate(session, "DROP TABLE IF EXISTS " + tableName); + assertUpdate(session, "CREATE TABLE " + tableName + " (n BIGINT) WITH (format = 'PARQUET')"); + assertUpdate(session, "INSERT INTO " + tableName + " VALUES 1, 1, 2, 2, 4, 4, 5, 5", 8); + assertNoDataRead("SELECT * FROM " + tableName + " WHERE n = 3"); + } + private void assertNoDataRead(@Language("SQL") String sql) { assertQueryStats(