Skip to content

Commit

Permalink
Support writing Parquet encoding stats
Browse files Browse the repository at this point in the history
Encoding stats are used by the reader to check if the dictionary
pages can be used for predicate pushdown.
  • Loading branch information
alexjo2144 authored and findepi committed Oct 13, 2021
1 parent 8fd28be commit c8b861c
Show file tree
Hide file tree
Showing 3 changed files with 45 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -276,6 +276,7 @@ private List<ColumnMetaData> updateColumnMetadataOffset(List<ColumnMetaData> col
for (ColumnMetaData column : columns) {
ColumnMetaData columnMetaData = new ColumnMetaData(column.type, column.encodings, column.path_in_schema, column.codec, column.num_values, column.total_uncompressed_size, column.total_compressed_size, currentOffset);
columnMetaData.setStatistics(column.getStatistics());
columnMetaData.setEncoding_stats(column.getEncoding_stats());
builder.add(columnMetaData);
currentOffset += column.getTotal_compressed_size();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,8 @@
import org.apache.parquet.column.statistics.Statistics;
import org.apache.parquet.column.values.rle.RunLengthBitPackingHybridEncoder;
import org.apache.parquet.format.ColumnMetaData;
import org.apache.parquet.format.PageEncodingStats;
import org.apache.parquet.format.PageType;
import org.apache.parquet.format.converter.ParquetMetadataConverter;
import org.apache.parquet.hadoop.metadata.CompressionCodecName;
import org.openjdk.jol.info.ClassLayout;
Expand All @@ -36,9 +38,11 @@
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;

import static com.google.common.base.Preconditions.checkState;
Expand Down Expand Up @@ -73,7 +77,9 @@ public class PrimitiveColumnWriter
private int currentPageRowCount;

// column meta data stats
private final Set<Encoding> encodings;
private final Set<Encoding> encodings = new HashSet<>();
private final Map<org.apache.parquet.format.Encoding, Integer> dataPagesWithEncoding = new HashMap<>();
private final Map<org.apache.parquet.format.Encoding, Integer> dictionaryPagesWithEncoding = new HashMap<>();
private long totalCompressedSize;
private long totalUnCompressedSize;
private long totalRows;
Expand All @@ -96,7 +102,6 @@ public PrimitiveColumnWriter(ColumnDescriptor columnDescriptor, PrimitiveValueWr
this.definitionLevelEncoder = requireNonNull(definitionLevelEncoder, "definitionLevelEncoder is null");
this.repetitionLevelEncoder = requireNonNull(repetitionLevelEncoder, "repetitionLevelEncoder is null");
this.primitiveValueWriter = requireNonNull(primitiveValueWriter, "primitiveValueWriter is null");
this.encodings = new HashSet<>();
this.compressionCodec = requireNonNull(compressionCodecName, "compressionCodecName is null");
this.compressor = getCompressor(compressionCodecName);
this.pageSizeThreshold = pageSizeThreshold;
Expand Down Expand Up @@ -178,6 +183,14 @@ private ColumnMetaData getColumnMetaData()
totalCompressedSize,
-1);
columnMetaData.setStatistics(ParquetMetadataConverter.toParquetStatistics(columnStatistics));
ImmutableList.Builder<PageEncodingStats> pageEncodingStats = ImmutableList.builder();
dataPagesWithEncoding.entrySet().stream()
.map(encodingAndCount -> new PageEncodingStats(PageType.DATA_PAGE_V2, encodingAndCount.getKey(), encodingAndCount.getValue()))
.forEach(pageEncodingStats::add);
dictionaryPagesWithEncoding.entrySet().stream()
.map(encodingAndCount -> new PageEncodingStats(PageType.DICTIONARY_PAGE, encodingAndCount.getKey(), encodingAndCount.getValue()))
.forEach(pageEncodingStats::add);
columnMetaData.setEncoding_stats(pageEncodingStats.build());
return columnMetaData;
}

Expand Down Expand Up @@ -236,6 +249,8 @@ private void flushCurrentPageToBuffer()

List<ParquetDataOutput> dataOutputs = outputDataStreams.build();

dataPagesWithEncoding.merge(new ParquetMetadataConverter().getEncoding(primitiveValueWriter.getEncoding()), 1, Integer::sum);

// update total stats
totalCompressedSize += pageHeader.size() + compressedSize;
totalUnCompressedSize += pageHeader.size() + uncompressedSize;
Expand Down Expand Up @@ -283,6 +298,7 @@ private List<ParquetDataOutput> getDataStreams()
dictPage.add(pageData);
totalCompressedSize += pageHeader.size() + compressedSize;
totalUnCompressedSize += pageHeader.size() + uncompressedSize;
dictionaryPagesWithEncoding.merge(new ParquetMetadataConverter().getEncoding(dictionaryPage.getEncoding()), 1, Integer::sum);

primitiveValueWriter.resetDictionary();
}
Expand Down Expand Up @@ -325,6 +341,8 @@ public void reset()
totalUnCompressedSize = 0;
totalRows = 0;
encodings.clear();
dataPagesWithEncoding.clear();
dictionaryPagesWithEncoding.clear();
this.columnStatistics = Statistics.createStats(columnDescriptor.getPrimitiveType());

getDataStreamsCalled = false;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4625,6 +4625,30 @@ public void testParquetLongDecimalPredicatePushdown()
assertNoDataRead("SELECT * FROM test_parquet_long_decimal_predicate_pushdown WHERE decimal_t != DECIMAL '12345678900000000.345'");
}

@Test
public void testParquetDictionaryPredicatePushdown()
{
testParquetDictionaryPredicatePushdown(getSession());
}

@Test
public void testParquetDictionaryPredicatePushdownWithOptimizedWriter()
{
testParquetDictionaryPredicatePushdown(
Session.builder(getSession())
.setCatalogSessionProperty("hive", "experimental_parquet_optimized_writer_enabled", "true")
.build());
}

private void testParquetDictionaryPredicatePushdown(Session session)
{
String tableName = "test_parquet_dictionary_pushdown";
assertUpdate(session, "DROP TABLE IF EXISTS " + tableName);
assertUpdate(session, "CREATE TABLE " + tableName + " (n BIGINT) WITH (format = 'PARQUET')");
assertUpdate(session, "INSERT INTO " + tableName + " VALUES 1, 1, 2, 2, 4, 4, 5, 5", 8);
assertNoDataRead("SELECT * FROM " + tableName + " WHERE n = 3");
}

private void assertNoDataRead(@Language("SQL") String sql)
{
assertQueryStats(
Expand Down

0 comments on commit c8b861c

Please sign in to comment.