Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Enable Hive optimized Parquet writer by default #17393

Merged
merged 2 commits into from
May 12, 2023
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -13,11 +13,10 @@
*/
package io.trino.parquet.writer;

import com.google.common.primitives.Ints;
import io.airlift.units.DataSize;
import org.apache.parquet.hadoop.ParquetWriter;

import static java.lang.Math.toIntExact;

public class ParquetWriterOptions
{
private static final DataSize DEFAULT_MAX_ROW_GROUP_SIZE = DataSize.ofBytes(ParquetWriter.DEFAULT_BLOCK_SIZE);
Expand All @@ -35,8 +34,8 @@ public static ParquetWriterOptions.Builder builder()

private ParquetWriterOptions(DataSize maxBlockSize, DataSize maxPageSize, int batchSize)
{
this.maxRowGroupSize = toIntExact(maxBlockSize.toBytes());
this.maxPageSize = toIntExact(maxPageSize.toBytes());
this.maxRowGroupSize = Ints.saturatedCast(maxBlockSize.toBytes());
electrum marked this conversation as resolved.
Show resolved Hide resolved
this.maxPageSize = Ints.saturatedCast(maxPageSize.toBytes());
electrum marked this conversation as resolved.
Show resolved Hide resolved
this.batchSize = batchSize;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,10 +15,12 @@

import io.airlift.units.DataSize;
import io.airlift.units.Duration;
import io.trino.spi.TrinoException;
import io.trino.spi.session.PropertyMetadata;

import java.util.function.Consumer;

import static io.trino.spi.StandardErrorCode.INVALID_SESSION_PROPERTY;
import static io.trino.spi.type.VarcharType.VARCHAR;

public final class PropertyMetadataUtil
Expand Down Expand Up @@ -47,6 +49,20 @@ public static PropertyMetadata<DataSize> dataSizeProperty(String name, String de
DataSize::toString);
}

public static void validateMinDataSize(String name, DataSize value, DataSize min)
{
if (value.compareTo(min) < 0) {
throw new TrinoException(INVALID_SESSION_PROPERTY, "%s must be at least %s: %s".formatted(name, min, value));
}
}

public static void validateMaxDataSize(String name, DataSize value, DataSize max)
{
if (value.compareTo(max) > 0) {
throw new TrinoException(INVALID_SESSION_PROPERTY, "%s must be at most %s: %s".formatted(name, max, value));
}
}

public static PropertyMetadata<Duration> durationProperty(String name, String description, Duration defaultValue, boolean hidden)
{
return durationProperty(name, description, defaultValue, value -> {}, hidden);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,12 @@
import static com.google.common.collect.ImmutableSet.toImmutableSet;
import static io.trino.plugin.base.session.PropertyMetadataUtil.dataSizeProperty;
import static io.trino.plugin.base.session.PropertyMetadataUtil.durationProperty;
import static io.trino.plugin.base.session.PropertyMetadataUtil.validateMaxDataSize;
import static io.trino.plugin.base.session.PropertyMetadataUtil.validateMinDataSize;
import static io.trino.plugin.hive.parquet.ParquetWriterConfig.PARQUET_WRITER_MAX_BLOCK_SIZE;
import static io.trino.plugin.hive.parquet.ParquetWriterConfig.PARQUET_WRITER_MAX_PAGE_SIZE;
import static io.trino.plugin.hive.parquet.ParquetWriterConfig.PARQUET_WRITER_MIN_BLOCK_SIZE;
import static io.trino.plugin.hive.parquet.ParquetWriterConfig.PARQUET_WRITER_MIN_PAGE_SIZE;
import static io.trino.spi.StandardErrorCode.INVALID_SESSION_PROPERTY;
import static io.trino.spi.session.PropertyMetadata.booleanProperty;
import static io.trino.spi.session.PropertyMetadata.doubleProperty;
Expand Down Expand Up @@ -427,11 +433,19 @@ public HiveSessionProperties(
PARQUET_WRITER_BLOCK_SIZE,
"Parquet: Writer block size",
parquetWriterConfig.getBlockSize(),
value -> {
validateMinDataSize(PARQUET_WRITER_BLOCK_SIZE, value, DataSize.valueOf(PARQUET_WRITER_MIN_BLOCK_SIZE));
validateMaxDataSize(PARQUET_WRITER_BLOCK_SIZE, value, DataSize.valueOf(PARQUET_WRITER_MAX_BLOCK_SIZE));
},
false),
dataSizeProperty(
PARQUET_WRITER_PAGE_SIZE,
"Parquet: Writer page size",
parquetWriterConfig.getPageSize(),
value -> {
validateMinDataSize(PARQUET_WRITER_PAGE_SIZE, value, DataSize.valueOf(PARQUET_WRITER_MIN_PAGE_SIZE));
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same in DeltaLakeSessionProperties and IcebergSessionProperties

validateMaxDataSize(PARQUET_WRITER_PAGE_SIZE, value, DataSize.valueOf(PARQUET_WRITER_MAX_PAGE_SIZE));
},
false),
integerProperty(
PARQUET_WRITER_BATCH_SIZE,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@
import io.airlift.configuration.ConfigDescription;
import io.airlift.configuration.LegacyConfig;
import io.airlift.units.DataSize;
import io.airlift.units.MaxDataSize;
import io.airlift.units.MinDataSize;
import io.trino.parquet.writer.ParquetWriterOptions;
import org.apache.parquet.hadoop.ParquetWriter;

Expand All @@ -25,13 +27,20 @@

public class ParquetWriterConfig
{
public static final String PARQUET_WRITER_MIN_BLOCK_SIZE = "4MB";
public static final String PARQUET_WRITER_MAX_BLOCK_SIZE = "2GB";
public static final String PARQUET_WRITER_MIN_PAGE_SIZE = "8kB";
public static final String PARQUET_WRITER_MAX_PAGE_SIZE = "8MB";

private boolean parquetOptimizedWriterEnabled;

private DataSize blockSize = DataSize.ofBytes(ParquetWriter.DEFAULT_BLOCK_SIZE);
private DataSize pageSize = DataSize.ofBytes(ParquetWriter.DEFAULT_PAGE_SIZE);
private int batchSize = ParquetWriterOptions.DEFAULT_BATCH_SIZE;
private double validationPercentage = 5;

@MinDataSize(PARQUET_WRITER_MIN_BLOCK_SIZE)
@MaxDataSize(PARQUET_WRITER_MAX_BLOCK_SIZE)
public DataSize getBlockSize()
{
return blockSize;
Expand All @@ -45,6 +54,8 @@ public ParquetWriterConfig setBlockSize(DataSize blockSize)
return this;
}

@MinDataSize(PARQUET_WRITER_MIN_PAGE_SIZE)
@MaxDataSize(PARQUET_WRITER_MAX_PAGE_SIZE)
public DataSize getPageSize()
{
return pageSize;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ private void buildSortedTables(String tableName, String sortByColumnName, String
assertUpdate(
Session.builder(getSession())
.setCatalogSessionProperty(catalog, "parquet_writer_page_size", "10000B")
.setCatalogSessionProperty(catalog, "parquet_writer_block_size", "100GB")
.setCatalogSessionProperty(catalog, "parquet_writer_block_size", "2GB")
.build(),
format("INSERT INTO %s SELECT *, ARRAY[rand(), rand(), rand()] FROM tpch.tiny.orders", tableName),
15000);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,16 +46,16 @@ public void testLegacyProperties()
ParquetWriterConfig.class,
Map.of(
"parquet.optimized-writer.enabled", "true",
"parquet.writer.block-size", "2PB",
"parquet.writer.page-size", "3PB"),
"parquet.writer.block-size", "33MB",
"parquet.writer.page-size", "7MB"),
Map.of(
"parquet.experimental-optimized-writer.enabled", "true",
"hive.parquet.writer.block-size", "2PB",
"hive.parquet.writer.page-size", "3PB"),
"hive.parquet.writer.block-size", "33MB",
"hive.parquet.writer.page-size", "7MB"),
Map.of(
"hive.parquet.optimized-writer.enabled", "true",
"hive.parquet.writer.block-size", "2PB",
"hive.parquet.writer.page-size", "3PB"));
"hive.parquet.writer.block-size", "33MB",
"hive.parquet.writer.page-size", "7MB"));
}

@Test
Expand All @@ -64,14 +64,14 @@ public void testExplicitPropertyMappings()
Map<String, String> properties = Map.of(
"parquet.optimized-writer.enabled", "true",
"parquet.writer.block-size", "234MB",
"parquet.writer.page-size", "11MB",
"parquet.writer.page-size", "6MB",
"parquet.writer.batch-size", "100",
"parquet.optimized-writer.validation-percentage", "10");

ParquetWriterConfig expected = new ParquetWriterConfig()
.setParquetOptimizedWriterEnabled(true)
.setBlockSize(DataSize.of(234, MEGABYTE))
.setPageSize(DataSize.of(11, MEGABYTE))
.setPageSize(DataSize.of(6, MEGABYTE))
.setBatchSize(100)
.setValidationPercentage(10);

Expand Down