Skip to content

Commit

Permalink
Add stats-based writer partitioning for identity-partitioned Iceberg …
Browse files Browse the repository at this point in the history
…table

When Iceberg table is partitioned on identity transforms,
`ConnectorNewTableLayout` returned from
`ConnectorMetadata.getInsertLayout` or
`ConnectorMetadata.getNewTableLayout` doesn't have to have
`ConnectorNewTableLayout.partitioning` handle set. This enables the
engine to choose whether writers should be repartitioned or not
("preferred write partitioning"). This improves write speed when writing
small number of partitions.
  • Loading branch information
findepi committed Nov 11, 2021
1 parent dd286ed commit 9b40ead
Show file tree
Hide file tree
Showing 2 changed files with 67 additions and 14 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -451,12 +451,16 @@ private Optional<ConnectorNewTableLayout> getWriteLayout(Schema tableSchema, Par
.map(field -> requireNonNull(columnById.get(field.sourceId()), () -> "Cannot find source column for partitioning field " + field))
.distinct()
.collect(toImmutableList());
List<String> partitioningColumnNames = partitioningColumns.stream()
.map(IcebergColumnHandle::getName)
.collect(toImmutableList());

return Optional.of(new ConnectorNewTableLayout(
new IcebergPartitioningHandle(toPartitionFields(partitionSpec), partitioningColumns),
partitioningColumns.stream()
.map(IcebergColumnHandle::getName)
.collect(toImmutableList())));
if (partitionSpec.fields().stream().allMatch(field -> field.transform().isIdentity())) {
// Do not set partitioningHandle, to let engine determine whether to repartition data or not, on stat-based basis.
return Optional.of(new ConnectorNewTableLayout(partitioningColumnNames));
}
IcebergPartitioningHandle partitioningHandle = new IcebergPartitioningHandle(toPartitionFields(partitionSpec), partitioningColumns);
return Optional.of(new ConnectorNewTableLayout(partitioningHandle, partitioningColumnNames));
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,7 @@
import static com.google.common.collect.MoreCollectors.onlyElement;
import static com.google.common.collect.MoreCollectors.toOptional;
import static io.trino.SystemSessionProperties.JOIN_DISTRIBUTION_TYPE;
import static io.trino.SystemSessionProperties.PREFERRED_WRITE_PARTITIONING_MIN_NUMBER_OF_PARTITIONS;
import static io.trino.plugin.hive.HdfsEnvironment.HdfsContext;
import static io.trino.plugin.hive.HiveTestUtils.HDFS_ENVIRONMENT;
import static io.trino.plugin.iceberg.IcebergQueryRunner.createIcebergQueryRunner;
Expand Down Expand Up @@ -2729,34 +2730,82 @@ public void testLocalDynamicFilteringWithSelectiveBuildSizeJoin()
}

@Test(dataProvider = "repartitioningDataProvider")
public void testRepartitionDataOnCtas(String partitioning, int expectedFiles)
public void testRepartitionDataOnCtas(Session session, String partitioning, int expectedFiles)
{
testRepartitionData(getSession(), "tpch.tiny.orders", true, partitioning, expectedFiles);
testRepartitionData(session, "tpch.tiny.orders", true, partitioning, expectedFiles);
}

@Test(dataProvider = "repartitioningDataProvider")
public void testRepartitionDataOnInsert(String partitioning, int expectedFiles)
public void testRepartitionDataOnInsert(Session session, String partitioning, int expectedFiles)
{
testRepartitionData(getSession(), "tpch.tiny.orders", false, partitioning, expectedFiles);
testRepartitionData(session, "tpch.tiny.orders", false, partitioning, expectedFiles);
}

@DataProvider
public Object[][] repartitioningDataProvider()
{
Session defaultSession = getSession();
// For identity-only partitioning, Iceberg connector returns ConnectorNewTableLayout with partitionColumns set, but without partitioning.
// This is treated by engine as "preferred", but not mandatory partitioning, and gets ignored if stats suggest number of partitions
// written is low. Without partitioning, number of files created is nondeterministic, as a writer (worker node) may or may not receive data.
Session obeyConnectorPartitioning = Session.builder(defaultSession)
.setSystemProperty(PREFERRED_WRITE_PARTITIONING_MIN_NUMBER_OF_PARTITIONS, "1")
.build();

return new Object[][] {
// identity partitioning column
{"'orderstatus'", 3},
{obeyConnectorPartitioning, "'orderstatus'", 3},
// bucketing
{"'bucket(custkey, 13)'", 13},
{defaultSession, "'bucket(custkey, 13)'", 13},
// varchar-based
{"'truncate(comment, 1)'", 35},
{defaultSession, "'truncate(comment, 1)'", 35},
// complex; would exceed 100 open writers limit in IcebergPageSink without write repartitioning
{"'bucket(custkey, 4)', 'truncate(comment, 1)'", 131},
{defaultSession, "'bucket(custkey, 4)', 'truncate(comment, 1)'", 131},
// same column multiple times
{"'truncate(comment, 1)', 'orderstatus', 'bucket(comment, 2)'", 180},
{defaultSession, "'truncate(comment, 1)', 'orderstatus', 'bucket(comment, 2)'", 180},
};
}

@Test
public void testStatsBasedRepartitionDataOnCtas()
{
testStatsBasedRepartitionData(true);
}

@Test
public void testStatsBasedRepartitionDataOnInsert()
{
testStatsBasedRepartitionData(false);
}

private void testStatsBasedRepartitionData(boolean ctas)
{
Session sessionRepartitionSmall = Session.builder(getSession())
.setSystemProperty(PREFERRED_WRITE_PARTITIONING_MIN_NUMBER_OF_PARTITIONS, "2")
.build();
Session sessionRepartitionMany = Session.builder(getSession())
.setSystemProperty(PREFERRED_WRITE_PARTITIONING_MIN_NUMBER_OF_PARTITIONS, "5")
.build();
// Use DISTINCT to add data redistribution between source table and the writer. This makes it more likely that all writers get some data.
String sourceRelation = "(SELECT DISTINCT orderkey, custkey, orderstatus FROM tpch.tiny.orders)";
testRepartitionData(
sessionRepartitionSmall,
sourceRelation,
ctas,
"'orderstatus'",
3);
// Test uses relatively small table (60K rows). When engine doesn't redistribute data for writes,
// occasionally a worker node doesn't get any data and fewer files get created.
assertEventually(() -> {
testRepartitionData(
sessionRepartitionMany,
sourceRelation,
ctas,
"'orderstatus'",
9);
});
}

private void testRepartitionData(Session session, String sourceRelation, boolean ctas, String partitioning, int expectedFiles)
{
String tableName = "repartition" +
Expand Down

0 comments on commit 9b40ead

Please sign in to comment.