Skip to content

Commit

Permalink
Use force repartitioning for OPTIMIZE for iceberg
Browse files Browse the repository at this point in the history
For all partitioned tables it makes sense to force repartitiong
while performing OPTIMIZE. Previously it was forced only if all fields
in partition spec had identinty transform.
  • Loading branch information
homar committed Jan 20, 2022
1 parent ae5c8a0 commit 01b02cf
Show file tree
Hide file tree
Showing 2 changed files with 50 additions and 6 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -432,7 +432,7 @@ public Optional<ConnectorNewTableLayout> getNewTableLayout(ConnectorSession sess
{
Schema schema = toIcebergSchema(tableMetadata.getColumns());
PartitionSpec partitionSpec = parsePartitionFields(schema, getPartitioning(tableMetadata.getProperties()));
return getWriteLayout(schema, partitionSpec);
return getWriteLayout(schema, partitionSpec, false);
}

@Override
Expand Down Expand Up @@ -462,10 +462,10 @@ public Optional<ConnectorNewTableLayout> getInsertLayout(ConnectorSession sessio
{
IcebergTableHandle table = (IcebergTableHandle) tableHandle;
Table icebergTable = catalog.loadTable(session, table.getSchemaTableName());
return getWriteLayout(icebergTable.schema(), icebergTable.spec());
return getWriteLayout(icebergTable.schema(), icebergTable.spec(), false);
}

private Optional<ConnectorNewTableLayout> getWriteLayout(Schema tableSchema, PartitionSpec partitionSpec)
private Optional<ConnectorNewTableLayout> getWriteLayout(Schema tableSchema, PartitionSpec partitionSpec, boolean forceRepartitioning)
{
if (partitionSpec.isUnpartitioned()) {
return Optional.empty();
Expand All @@ -483,7 +483,7 @@ private Optional<ConnectorNewTableLayout> getWriteLayout(Schema tableSchema, Par
.map(IcebergColumnHandle::getName)
.collect(toImmutableList());

if (partitionSpec.fields().stream().allMatch(field -> field.transform().isIdentity())) {
if (!forceRepartitioning && partitionSpec.fields().stream().allMatch(field -> field.transform().isIdentity())) {
// Do not set partitioningHandle, to let engine determine whether to repartition data or not, on stat-based basis.
return Optional.of(new ConnectorNewTableLayout(partitioningColumnNames));
}
Expand Down Expand Up @@ -615,7 +615,9 @@ public Optional<ConnectorNewTableLayout> getLayoutForTableExecute(ConnectorSessi
private Optional<ConnectorNewTableLayout> getLayoutForOptimize(ConnectorSession session, IcebergTableExecuteHandle executeHandle)
{
Table icebergTable = catalog.loadTable(session, executeHandle.getSchemaTableName());
return getWriteLayout(icebergTable.schema(), icebergTable.spec());
// from performance perspective it is better to have lower number of bigger files than other way around
// thus we force repartitioning for optimize to achieve this
return getWriteLayout(icebergTable.schema(), icebergTable.spec(), true);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,6 @@
import static com.google.common.base.Preconditions.checkArgument;
import static com.google.common.collect.ImmutableList.toImmutableList;
import static com.google.common.collect.ImmutableMap.toImmutableMap;
import static com.google.common.collect.ImmutableSet.toImmutableSet;
import static com.google.common.collect.Iterables.concat;
import static com.google.common.collect.Iterables.getOnlyElement;
import static com.google.common.collect.MoreCollectors.onlyElement;
Expand All @@ -99,6 +98,7 @@
import static io.trino.spi.type.VarcharType.VARCHAR;
import static io.trino.testing.MaterializedResult.resultBuilder;
import static io.trino.testing.QueryAssertions.assertEqualsIgnoreOrder;
import static io.trino.testing.TestingSession.testSessionBuilder;
import static io.trino.testing.assertions.Assert.assertEquals;
import static io.trino.testing.assertions.Assert.assertEventually;
import static io.trino.testing.sql.TestTable.randomTableSuffix;
Expand Down Expand Up @@ -3350,4 +3350,46 @@ public void testOptimizeParameterValidation()
"ALTER TABLE nation EXECUTE OPTIMIZE (file_size_threshold => '33s')",
"\\QUnable to set procedure property 'file_size_threshold' to ['33s']: Unknown unit: s");
}

@Test
public void testOptimizeForPartitionedTable()
throws IOException
{
// This test will have its own session to make sure partitioning is indeed forced and is not a result
// of session configuration
Session session = testSessionBuilder()
.setCatalog(getQueryRunner().getDefaultSession().getCatalog())
.setSchema(getQueryRunner().getDefaultSession().getSchema())
.setSystemProperty("use_preferred_write_partitioning", "true")
.setSystemProperty("preferred_write_partitioning_min_number_of_partitions", "100")
.build();
String tableName = "test_repartitiong_during_optimize_" + randomTableSuffix();
assertUpdate(session, "CREATE TABLE " + tableName + " (key varchar, value integer) WITH (partitioning = ARRAY['key'])");
// optimize an empty table
assertQuerySucceeds(session, "ALTER TABLE " + tableName + " EXECUTE OPTIMIZE");

assertUpdate(session, "INSERT INTO " + tableName + " VALUES ('one', 1)", 1);
assertUpdate(session, "INSERT INTO " + tableName + " VALUES ('one', 2)", 1);
assertUpdate(session, "INSERT INTO " + tableName + " VALUES ('one', 3)", 1);
assertUpdate(session, "INSERT INTO " + tableName + " VALUES ('one', 4)", 1);
assertUpdate(session, "INSERT INTO " + tableName + " VALUES ('one', 5)", 1);
assertUpdate(session, "INSERT INTO " + tableName + " VALUES ('one', 6)", 1);
assertUpdate(session, "INSERT INTO " + tableName + " VALUES ('one', 7)", 1);
assertUpdate(session, "INSERT INTO " + tableName + " VALUES ('two', 8)", 1);
assertUpdate(session, "INSERT INTO " + tableName + " VALUES ('two', 9)", 1);
assertUpdate(session, "INSERT INTO " + tableName + " VALUES ('three', 10)", 1);

List<String> initialFiles = getActiveFiles(tableName);
assertThat(initialFiles).hasSize(10);

computeActual(session, "ALTER TABLE " + tableName + " EXECUTE OPTIMIZE");

assertThat(query(session, "SELECT sum(value), listagg(key, ' ') WITHIN GROUP (ORDER BY key) FROM " + tableName))
.matches("VALUES (BIGINT '55', VARCHAR 'one one one one one one one three two two')");

List<String> updatedFiles = getActiveFiles(tableName);
// as we force repartitioning there should be only 3 partitions
assertThat(updatedFiles).hasSize(3);
assertThat(getAllDataFilesFromTableDirectory(tableName)).containsExactlyInAnyOrderElementsOf(concat(initialFiles, updatedFiles));
}
}

0 comments on commit 01b02cf

Please sign in to comment.