Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Use force repartitioning for OPTIMIZE for iceberg #10619

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -432,7 +432,7 @@ public Optional<ConnectorNewTableLayout> getNewTableLayout(ConnectorSession sess
{
Schema schema = toIcebergSchema(tableMetadata.getColumns());
PartitionSpec partitionSpec = parsePartitionFields(schema, getPartitioning(tableMetadata.getProperties()));
return getWriteLayout(schema, partitionSpec);
return getWriteLayout(schema, partitionSpec, false);
}

@Override
Expand Down Expand Up @@ -462,10 +462,10 @@ public Optional<ConnectorNewTableLayout> getInsertLayout(ConnectorSession sessio
{
IcebergTableHandle table = (IcebergTableHandle) tableHandle;
Table icebergTable = catalog.loadTable(session, table.getSchemaTableName());
return getWriteLayout(icebergTable.schema(), icebergTable.spec());
return getWriteLayout(icebergTable.schema(), icebergTable.spec(), false);
}

private Optional<ConnectorNewTableLayout> getWriteLayout(Schema tableSchema, PartitionSpec partitionSpec)
private Optional<ConnectorNewTableLayout> getWriteLayout(Schema tableSchema, PartitionSpec partitionSpec, boolean forceRepartitioning)
{
if (partitionSpec.isUnpartitioned()) {
return Optional.empty();
Expand All @@ -483,7 +483,7 @@ private Optional<ConnectorNewTableLayout> getWriteLayout(Schema tableSchema, Par
.map(IcebergColumnHandle::getName)
.collect(toImmutableList());

if (partitionSpec.fields().stream().allMatch(field -> field.transform().isIdentity())) {
if (!forceRepartitioning && partitionSpec.fields().stream().allMatch(field -> field.transform().isIdentity())) {
// Do not set partitioningHandle, to let engine determine whether to repartition data or not, on stat-based basis.
return Optional.of(new ConnectorNewTableLayout(partitioningColumnNames));
}
Expand Down Expand Up @@ -615,7 +615,9 @@ public Optional<ConnectorNewTableLayout> getLayoutForTableExecute(ConnectorSessi
private Optional<ConnectorNewTableLayout> getLayoutForOptimize(ConnectorSession session, IcebergTableExecuteHandle executeHandle)
{
Table icebergTable = catalog.loadTable(session, executeHandle.getSchemaTableName());
return getWriteLayout(icebergTable.schema(), icebergTable.spec());
// from performance perspective it is better to have lower number of bigger files than other way around
// thus we force repartitioning for optimize to achieve this
return getWriteLayout(icebergTable.schema(), icebergTable.spec(), true);
findepi marked this conversation as resolved.
Show resolved Hide resolved
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,7 @@
import static io.trino.spi.type.VarcharType.VARCHAR;
import static io.trino.testing.MaterializedResult.resultBuilder;
import static io.trino.testing.QueryAssertions.assertEqualsIgnoreOrder;
import static io.trino.testing.TestingSession.testSessionBuilder;
import static io.trino.testing.assertions.Assert.assertEquals;
import static io.trino.testing.assertions.Assert.assertEventually;
import static io.trino.testing.sql.TestTable.randomTableSuffix;
Expand Down Expand Up @@ -2980,8 +2981,8 @@ public void testSplitPruningFromDataFileStatistics(DataMappingTestSetup testSetu
String tableName = table.getName();
String values =
Stream.concat(
nCopies(100, testSetup.getSampleValueLiteral()).stream(),
nCopies(100, testSetup.getHighValueLiteral()).stream())
nCopies(100, testSetup.getSampleValueLiteral()).stream(),
nCopies(100, testSetup.getHighValueLiteral()).stream())
findepi marked this conversation as resolved.
Show resolved Hide resolved
.map(value -> "(" + value + ", rand())")
.collect(Collectors.joining(", "));
assertUpdate(withSmallRowGroups(getSession()), "INSERT INTO " + tableName + " VALUES " + values, 200);
Expand Down Expand Up @@ -3327,8 +3328,9 @@ private List<String> getAllDataFilesFromTableDirectory(String tableName)
{
String schema = getSession().getSchema().orElseThrow();
Path tableDataDir = getDistributedQueryRunner().getCoordinator().getBaseDataDir().resolve("iceberg_data").resolve(schema).resolve(tableName).resolve("data");
try (Stream<Path> list = Files.list(tableDataDir)) {
return list
try (Stream<Path> walk = Files.walk(tableDataDir)) {
return walk
.filter(Files::isRegularFile)
.filter(path -> !path.getFileName().toString().matches("\\..*\\.crc"))
.map(Path::toString)
.collect(toImmutableList());
Expand All @@ -3348,4 +3350,46 @@ public void testOptimizeParameterValidation()
"ALTER TABLE nation EXECUTE OPTIMIZE (file_size_threshold => '33s')",
"\\QUnable to set procedure property 'file_size_threshold' to ['33s']: Unknown unit: s");
}

@Test
public void testOptimizeForPartitionedTable()
throws IOException
{
// This test will have its own session to make sure partitioning is indeed forced and is not a result
// of session configuration
Session session = testSessionBuilder()
.setCatalog(getQueryRunner().getDefaultSession().getCatalog())
.setSchema(getQueryRunner().getDefaultSession().getSchema())
.setSystemProperty("use_preferred_write_partitioning", "true")
.setSystemProperty("preferred_write_partitioning_min_number_of_partitions", "100")
.build();
String tableName = "test_repartitiong_during_optimize_" + randomTableSuffix();
assertUpdate(session, "CREATE TABLE " + tableName + " (key varchar, value integer) WITH (partitioning = ARRAY['key'])");
// optimize an empty table
assertQuerySucceeds(session, "ALTER TABLE " + tableName + " EXECUTE OPTIMIZE");

assertUpdate(session, "INSERT INTO " + tableName + " VALUES ('one', 1)", 1);
assertUpdate(session, "INSERT INTO " + tableName + " VALUES ('one', 2)", 1);
assertUpdate(session, "INSERT INTO " + tableName + " VALUES ('one', 3)", 1);
assertUpdate(session, "INSERT INTO " + tableName + " VALUES ('one', 4)", 1);
assertUpdate(session, "INSERT INTO " + tableName + " VALUES ('one', 5)", 1);
assertUpdate(session, "INSERT INTO " + tableName + " VALUES ('one', 6)", 1);
assertUpdate(session, "INSERT INTO " + tableName + " VALUES ('one', 7)", 1);
assertUpdate(session, "INSERT INTO " + tableName + " VALUES ('two', 8)", 1);
assertUpdate(session, "INSERT INTO " + tableName + " VALUES ('two', 9)", 1);
assertUpdate(session, "INSERT INTO " + tableName + " VALUES ('three', 10)", 1);

List<String> initialFiles = getActiveFiles(tableName);
assertThat(initialFiles).hasSize(10);

computeActual(session, "ALTER TABLE " + tableName + " EXECUTE OPTIMIZE");

assertThat(query(session, "SELECT sum(value), listagg(key, ' ') WITHIN GROUP (ORDER BY key) FROM " + tableName))
.matches("VALUES (BIGINT '55', VARCHAR 'one one one one one one one three two two')");

List<String> updatedFiles = getActiveFiles(tableName);
// as we force repartitioning there should be only 3 partitions
assertThat(updatedFiles).hasSize(3);
assertThat(getAllDataFilesFromTableDirectory(tableName)).containsExactlyInAnyOrderElementsOf(concat(initialFiles, updatedFiles));
}
}