Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Optimize writing RLE runs in parquet column descriptors #22089

Merged
merged 4 commits into from
May 28, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,6 @@
import io.trino.spi.Page;
import io.trino.spi.type.Type;
import jakarta.annotation.Nullable;
import org.apache.parquet.column.ParquetProperties;
import org.apache.parquet.column.values.bloomfilter.BloomFilter;
import org.apache.parquet.format.BloomFilterAlgorithm;
import org.apache.parquet.format.BloomFilterCompression;
Expand Down Expand Up @@ -88,7 +87,6 @@
import static java.lang.Math.min;
import static java.nio.charset.StandardCharsets.US_ASCII;
import static java.util.Objects.requireNonNull;
import static org.apache.parquet.column.ParquetProperties.WriterVersion.PARQUET_1_0;

public class ParquetWriter
implements Closeable
Expand Down Expand Up @@ -439,15 +437,9 @@ static String formatCreatedBy(String trinoVersion)

private void initColumnWriters()
{
ParquetProperties parquetProperties = ParquetProperties.builder()
.withWriterVersion(PARQUET_1_0)
.withPageSize(writerOption.getMaxPageSize())
.build();

this.columnWriters = ParquetWriters.getColumnWriters(
messageType,
primitiveTypes,
parquetProperties,
compressionCodec,
writerOption,
parquetTimeZone);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,6 @@
import io.trino.spi.type.VarbinaryType;
import io.trino.spi.type.VarcharType;
import org.apache.parquet.column.ColumnDescriptor;
import org.apache.parquet.column.ParquetProperties;
import org.apache.parquet.column.values.ValuesWriter;
import org.apache.parquet.column.values.bloomfilter.BlockSplitBloomFilter;
import org.apache.parquet.column.values.bloomfilter.BloomFilter;
Expand All @@ -66,6 +65,8 @@

import static com.google.common.base.Preconditions.checkArgument;
import static io.trino.parquet.writer.ParquetWriter.SUPPORTED_BLOOM_FILTER_TYPES;
import static io.trino.parquet.writer.valuewriter.ColumnDescriptorValuesWriter.newDefinitionLevelWriter;
import static io.trino.parquet.writer.valuewriter.ColumnDescriptorValuesWriter.newRepetitionLevelWriter;
import static io.trino.spi.StandardErrorCode.NOT_SUPPORTED;
import static io.trino.spi.type.BigintType.BIGINT;
import static io.trino.spi.type.BooleanType.BOOLEAN;
Expand All @@ -88,6 +89,7 @@

final class ParquetWriters
{
private static final int DEFAULT_DICTIONARY_PAGE_SIZE = 1024 * 1024;
static final int BLOOM_FILTER_EXPECTED_ENTRIES = 100_000;

private ParquetWriters() {}
Expand Down Expand Up @@ -166,16 +168,14 @@ static PrimitiveValueWriter getValueWriter(ValuesWriter valuesWriter, Type type,
static List<ColumnWriter> getColumnWriters(
MessageType messageType,
Map<List<String>, Type> trinoTypes,
ParquetProperties parquetProperties,
CompressionCodec compressionCodec,
ParquetWriterOptions writerOptions,
Optional<DateTimeZone> parquetTimeZone)
{
TrinoValuesWriterFactory valuesWriterFactory = new TrinoValuesWriterFactory(parquetProperties);
TrinoValuesWriterFactory valuesWriterFactory = new TrinoValuesWriterFactory(writerOptions.getMaxPageSize(), DEFAULT_DICTIONARY_PAGE_SIZE);
WriteBuilder writeBuilder = new WriteBuilder(
messageType,
trinoTypes,
parquetProperties,
valuesWriterFactory,
compressionCodec,
writerOptions,
Expand All @@ -189,9 +189,9 @@ private static class WriteBuilder
{
private final MessageType type;
private final Map<List<String>, Type> trinoTypes;
private final ParquetProperties parquetProperties;
private final TrinoValuesWriterFactory valuesWriterFactory;
private final CompressionCodec compressionCodec;
private final int maxPageSize;
private final int pageValueCountLimit;
private final Set<String> bloomFilterColumns;
private final Optional<DateTimeZone> parquetTimeZone;
Expand All @@ -202,18 +202,17 @@ private static class WriteBuilder
WriteBuilder(
MessageType messageType,
Map<List<String>, Type> trinoTypes,
ParquetProperties parquetProperties,
TrinoValuesWriterFactory valuesWriterFactory,
CompressionCodec compressionCodec,
ParquetWriterOptions writerOptions,
Optional<DateTimeZone> parquetTimeZone)
{
this.type = requireNonNull(messageType, "messageType is null");
this.trinoTypes = requireNonNull(trinoTypes, "trinoTypes is null");
this.parquetProperties = requireNonNull(parquetProperties, "parquetProperties is null");
this.valuesWriterFactory = requireNonNull(valuesWriterFactory, "valuesWriterFactory is null");
this.compressionCodec = requireNonNull(compressionCodec, "compressionCodec is null");
this.pageValueCountLimit = requireNonNull(writerOptions, "writerOptions is null").getMaxPageValueCount();
this.maxPageSize = writerOptions.getMaxPageSize();
this.pageValueCountLimit = writerOptions.getMaxPageValueCount();
this.maxBloomFilterSize = writerOptions.getMaxBloomFilterSize();
this.bloomFilterColumns = requireNonNull(writerOptions.getBloomFilterColumns(), "bloomFilterColumns is null");
this.bloomFilterFpp = writerOptions.getBLoomFilterFpp();
Expand Down Expand Up @@ -270,10 +269,10 @@ public ColumnWriter primitive(PrimitiveType primitive)
return new PrimitiveColumnWriter(
columnDescriptor,
getValueWriter(valuesWriterFactory.newValuesWriter(columnDescriptor, bloomFilter), trinoType, columnDescriptor.getPrimitiveType(), parquetTimeZone),
parquetProperties.newDefinitionLevelWriter(columnDescriptor),
parquetProperties.newRepetitionLevelWriter(columnDescriptor),
newDefinitionLevelWriter(columnDescriptor, maxPageSize),
newRepetitionLevelWriter(columnDescriptor, maxPageSize),
compressionCodec,
parquetProperties.getPageSizeThreshold(),
maxPageSize,
pageValueCountLimit,
bloomFilter);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import io.trino.parquet.writer.repdef.DefLevelWriterProviders;
import io.trino.parquet.writer.repdef.RepLevelWriterProvider;
import io.trino.parquet.writer.repdef.RepLevelWriterProviders;
import io.trino.parquet.writer.valuewriter.ColumnDescriptorValuesWriter;
import io.trino.parquet.writer.valuewriter.PrimitiveValueWriter;
import io.trino.plugin.base.io.ChunkedSliceOutput;
import jakarta.annotation.Nullable;
Expand All @@ -28,7 +29,6 @@
import org.apache.parquet.column.Encoding;
import org.apache.parquet.column.page.DictionaryPage;
import org.apache.parquet.column.statistics.Statistics;
import org.apache.parquet.column.values.ValuesWriter;
import org.apache.parquet.column.values.bloomfilter.BloomFilter;
import org.apache.parquet.format.ColumnMetaData;
import org.apache.parquet.format.CompressionCodec;
Expand Down Expand Up @@ -75,8 +75,8 @@ public class PrimitiveColumnWriter
private final CompressionCodec compressionCodec;

private final PrimitiveValueWriter primitiveValueWriter;
private final ValuesWriter definitionLevelWriter;
private final ValuesWriter repetitionLevelWriter;
private final ColumnDescriptorValuesWriter definitionLevelWriter;
private final ColumnDescriptorValuesWriter repetitionLevelWriter;

private boolean closed;
private boolean getDataStreamsCalled;
Expand Down Expand Up @@ -113,8 +113,8 @@ public class PrimitiveColumnWriter
public PrimitiveColumnWriter(
ColumnDescriptor columnDescriptor,
PrimitiveValueWriter primitiveValueWriter,
ValuesWriter definitionLevelWriter,
ValuesWriter repetitionLevelWriter,
ColumnDescriptorValuesWriter definitionLevelWriter,
ColumnDescriptorValuesWriter repetitionLevelWriter,
CompressionCodec compressionCodec,
int pageSizeThreshold,
int pageValueCountLimit,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,14 +14,14 @@
package io.trino.parquet.writer.repdef;

import com.google.common.collect.Iterables;
import org.apache.parquet.column.values.ValuesWriter;
import io.trino.parquet.writer.valuewriter.ColumnDescriptorValuesWriter;

import java.util.List;
import java.util.Optional;

public interface DefLevelWriterProvider
{
DefinitionLevelWriter getDefinitionLevelWriter(Optional<DefinitionLevelWriter> nestedWriter, ValuesWriter encoder);
DefinitionLevelWriter getDefinitionLevelWriter(Optional<DefinitionLevelWriter> nestedWriter, ColumnDescriptorValuesWriter encoder);

interface DefinitionLevelWriter
{
Expand All @@ -34,7 +34,7 @@ record ValuesCount(int totalValuesCount, int maxDefinitionLevelValuesCount)
{
}

static DefinitionLevelWriter getRootDefinitionLevelWriter(List<DefLevelWriterProvider> defLevelWriterProviders, ValuesWriter encoder)
static DefinitionLevelWriter getRootDefinitionLevelWriter(List<DefLevelWriterProvider> defLevelWriterProviders, ColumnDescriptorValuesWriter encoder)
{
// Constructs hierarchy of DefinitionLevelWriter from leaf to root
DefinitionLevelWriter rootDefinitionLevelWriter = Iterables.getLast(defLevelWriterProviders)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,13 +13,13 @@
*/
package io.trino.parquet.writer.repdef;

import io.trino.parquet.writer.valuewriter.ColumnDescriptorValuesWriter;
import io.trino.spi.block.ArrayBlock;
import io.trino.spi.block.Block;
import io.trino.spi.block.ColumnarArray;
import io.trino.spi.block.ColumnarMap;
import io.trino.spi.block.MapBlock;
import io.trino.spi.block.RowBlock;
import org.apache.parquet.column.values.ValuesWriter;

import java.util.Optional;

Expand Down Expand Up @@ -65,7 +65,7 @@ static class PrimitiveDefLevelWriterProvider
}

@Override
public DefinitionLevelWriter getDefinitionLevelWriter(Optional<DefinitionLevelWriter> nestedWriter, ValuesWriter encoder)
public DefinitionLevelWriter getDefinitionLevelWriter(Optional<DefinitionLevelWriter> nestedWriter, ColumnDescriptorValuesWriter encoder)
{
checkArgument(nestedWriter.isEmpty(), "nestedWriter should be empty for primitive definition level writer");
return new DefinitionLevelWriter()
Expand All @@ -84,9 +84,7 @@ public ValuesCount writeDefinitionLevels(int positionsCount)
checkValidPosition(offset, positionsCount, block.getPositionCount());
int nonNullsCount = 0;
if (!block.mayHaveNull()) {
for (int position = offset; position < offset + positionsCount; position++) {
encoder.writeInteger(maxDefinitionLevel);
}
encoder.writeRepeatInteger(maxDefinitionLevel, positionsCount);
nonNullsCount = positionsCount;
}
else {
Expand Down Expand Up @@ -117,7 +115,7 @@ static class RowDefLevelWriterProvider
}

@Override
public DefinitionLevelWriter getDefinitionLevelWriter(Optional<DefinitionLevelWriter> nestedWriterOptional, ValuesWriter encoder)
public DefinitionLevelWriter getDefinitionLevelWriter(Optional<DefinitionLevelWriter> nestedWriterOptional, ColumnDescriptorValuesWriter encoder)
{
checkArgument(nestedWriterOptional.isPresent(), "nestedWriter should be present for column row definition level writer");
return new DefinitionLevelWriter()
Expand Down Expand Up @@ -180,7 +178,7 @@ static class ColumnMapDefLevelWriterProvider
}

@Override
public DefinitionLevelWriter getDefinitionLevelWriter(Optional<DefinitionLevelWriter> nestedWriterOptional, ValuesWriter encoder)
public DefinitionLevelWriter getDefinitionLevelWriter(Optional<DefinitionLevelWriter> nestedWriterOptional, ColumnDescriptorValuesWriter encoder)
{
checkArgument(nestedWriterOptional.isPresent(), "nestedWriter should be present for column map definition level writer");
return new DefinitionLevelWriter()
Expand Down Expand Up @@ -265,7 +263,7 @@ static class ColumnArrayDefLevelWriterProvider
}

@Override
public DefinitionLevelWriter getDefinitionLevelWriter(Optional<DefinitionLevelWriter> nestedWriterOptional, ValuesWriter encoder)
public DefinitionLevelWriter getDefinitionLevelWriter(Optional<DefinitionLevelWriter> nestedWriterOptional, ColumnDescriptorValuesWriter encoder)
{
checkArgument(nestedWriterOptional.isPresent(), "nestedWriter should be present for column map definition level writer");
return new DefinitionLevelWriter()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,14 +14,14 @@
package io.trino.parquet.writer.repdef;

import com.google.common.collect.Iterables;
import org.apache.parquet.column.values.ValuesWriter;
import io.trino.parquet.writer.valuewriter.ColumnDescriptorValuesWriter;

import java.util.List;
import java.util.Optional;

public interface RepLevelWriterProvider
{
RepetitionLevelWriter getRepetitionLevelWriter(Optional<RepetitionLevelWriter> nestedWriter, ValuesWriter encoder);
RepetitionLevelWriter getRepetitionLevelWriter(Optional<RepetitionLevelWriter> nestedWriter, ColumnDescriptorValuesWriter encoder);

/**
* Parent repetition level marks at which level either:
Expand All @@ -36,7 +36,7 @@ interface RepetitionLevelWriter
void writeRepetitionLevels(int parentLevel);
}

static RepetitionLevelWriter getRootRepetitionLevelWriter(List<RepLevelWriterProvider> repLevelWriterProviders, ValuesWriter encoder)
static RepetitionLevelWriter getRootRepetitionLevelWriter(List<RepLevelWriterProvider> repLevelWriterProviders, ColumnDescriptorValuesWriter encoder)
{
// Constructs hierarchy of RepetitionLevelWriter from leaf to root
RepetitionLevelWriter rootRepetitionLevelWriter = Iterables.getLast(repLevelWriterProviders)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,13 +13,13 @@
*/
package io.trino.parquet.writer.repdef;

import io.trino.parquet.writer.valuewriter.ColumnDescriptorValuesWriter;
import io.trino.spi.block.ArrayBlock;
import io.trino.spi.block.Block;
import io.trino.spi.block.ColumnarArray;
import io.trino.spi.block.ColumnarMap;
import io.trino.spi.block.MapBlock;
import io.trino.spi.block.RowBlock;
import org.apache.parquet.column.values.ValuesWriter;

import java.util.Optional;

Expand Down Expand Up @@ -63,7 +63,7 @@ static class PrimitiveRepLevelWriterProvider
}

@Override
public RepetitionLevelWriter getRepetitionLevelWriter(Optional<RepetitionLevelWriter> nestedWriter, ValuesWriter encoder)
public RepetitionLevelWriter getRepetitionLevelWriter(Optional<RepetitionLevelWriter> nestedWriter, ColumnDescriptorValuesWriter encoder)
{
checkArgument(nestedWriter.isEmpty(), "nestedWriter should be empty for primitive repetition level writer");
return new RepetitionLevelWriter()
Expand All @@ -80,9 +80,7 @@ public void writeRepetitionLevels(int parentLevel)
public void writeRepetitionLevels(int parentLevel, int positionsCount)
{
checkValidPosition(offset, positionsCount, block.getPositionCount());
for (int i = 0; i < positionsCount; i++) {
encoder.writeInteger(parentLevel);
}
encoder.writeRepeatInteger(parentLevel, positionsCount);
offset += positionsCount;
}
};
Expand All @@ -101,7 +99,7 @@ static class RowRepLevelWriterProvider
}

@Override
public RepetitionLevelWriter getRepetitionLevelWriter(Optional<RepetitionLevelWriter> nestedWriterOptional, ValuesWriter encoder)
public RepetitionLevelWriter getRepetitionLevelWriter(Optional<RepetitionLevelWriter> nestedWriterOptional, ColumnDescriptorValuesWriter encoder)
{
checkArgument(nestedWriterOptional.isPresent(), "nestedWriter should be present for column row repetition level writer");
return new RepetitionLevelWriter()
Expand Down Expand Up @@ -160,7 +158,7 @@ static class ColumnMapRepLevelWriterProvider
}

@Override
public RepetitionLevelWriter getRepetitionLevelWriter(Optional<RepetitionLevelWriter> nestedWriterOptional, ValuesWriter encoder)
public RepetitionLevelWriter getRepetitionLevelWriter(Optional<RepetitionLevelWriter> nestedWriterOptional, ColumnDescriptorValuesWriter encoder)
{
checkArgument(nestedWriterOptional.isPresent(), "nestedWriter should be present for column map repetition level writer");
return new RepetitionLevelWriter()
Expand Down Expand Up @@ -224,7 +222,7 @@ static class ColumnArrayRepLevelWriterProvider
}

@Override
public RepetitionLevelWriter getRepetitionLevelWriter(Optional<RepetitionLevelWriter> nestedWriterOptional, ValuesWriter encoder)
public RepetitionLevelWriter getRepetitionLevelWriter(Optional<RepetitionLevelWriter> nestedWriterOptional, ColumnDescriptorValuesWriter encoder)
{
checkArgument(nestedWriterOptional.isPresent(), "nestedWriter should be present for column map repetition level writer");
return new RepetitionLevelWriter()
Expand Down
Loading