diff --git a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergMetadata.java b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergMetadata.java index bd4b7c203919..cbe4dff24f7e 100644 --- a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergMetadata.java +++ b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergMetadata.java @@ -432,7 +432,7 @@ public Optional getNewTableLayout(ConnectorSession sess { Schema schema = toIcebergSchema(tableMetadata.getColumns()); PartitionSpec partitionSpec = parsePartitionFields(schema, getPartitioning(tableMetadata.getProperties())); - return getWriteLayout(schema, partitionSpec); + return getWriteLayout(schema, partitionSpec, false); } @Override @@ -462,10 +462,10 @@ public Optional getInsertLayout(ConnectorSession sessio { IcebergTableHandle table = (IcebergTableHandle) tableHandle; Table icebergTable = catalog.loadTable(session, table.getSchemaTableName()); - return getWriteLayout(icebergTable.schema(), icebergTable.spec()); + return getWriteLayout(icebergTable.schema(), icebergTable.spec(), false); } - private Optional getWriteLayout(Schema tableSchema, PartitionSpec partitionSpec) + private Optional getWriteLayout(Schema tableSchema, PartitionSpec partitionSpec, boolean forceRepartitioning) { if (partitionSpec.isUnpartitioned()) { return Optional.empty(); @@ -483,7 +483,7 @@ private Optional getWriteLayout(Schema tableSchema, Par .map(IcebergColumnHandle::getName) .collect(toImmutableList()); - if (partitionSpec.fields().stream().allMatch(field -> field.transform().isIdentity())) { + if (!forceRepartitioning && partitionSpec.fields().stream().allMatch(field -> field.transform().isIdentity())) { // Do not set partitioningHandle, to let engine determine whether to repartition data or not, on stat-based basis. return Optional.of(new ConnectorNewTableLayout(partitioningColumnNames)); } @@ -615,7 +615,9 @@ public Optional getLayoutForTableExecute(ConnectorSessi private Optional getLayoutForOptimize(ConnectorSession session, IcebergTableExecuteHandle executeHandle) { Table icebergTable = catalog.loadTable(session, executeHandle.getSchemaTableName()); - return getWriteLayout(icebergTable.schema(), icebergTable.spec()); + // from performance perspective it is better to have lower number of bigger files than other way around + // thus we force repartitioning for optimize to achieve this + return getWriteLayout(icebergTable.schema(), icebergTable.spec(), true); } @Override diff --git a/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/BaseIcebergConnectorTest.java b/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/BaseIcebergConnectorTest.java index e79bbfa3528c..1235407079df 100644 --- a/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/BaseIcebergConnectorTest.java +++ b/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/BaseIcebergConnectorTest.java @@ -80,7 +80,6 @@ import static com.google.common.base.Preconditions.checkArgument; import static com.google.common.collect.ImmutableList.toImmutableList; import static com.google.common.collect.ImmutableMap.toImmutableMap; -import static com.google.common.collect.ImmutableSet.toImmutableSet; import static com.google.common.collect.Iterables.concat; import static com.google.common.collect.Iterables.getOnlyElement; import static com.google.common.collect.MoreCollectors.onlyElement; @@ -99,6 +98,7 @@ import static io.trino.spi.type.VarcharType.VARCHAR; import static io.trino.testing.MaterializedResult.resultBuilder; import static io.trino.testing.QueryAssertions.assertEqualsIgnoreOrder; +import static io.trino.testing.TestingSession.testSessionBuilder; import static io.trino.testing.assertions.Assert.assertEquals; import static io.trino.testing.assertions.Assert.assertEventually; import static io.trino.testing.sql.TestTable.randomTableSuffix; @@ -3350,4 +3350,46 @@ public void testOptimizeParameterValidation() "ALTER TABLE nation EXECUTE OPTIMIZE (file_size_threshold => '33s')", "\\QUnable to set procedure property 'file_size_threshold' to ['33s']: Unknown unit: s"); } + + @Test + public void testOptimizeForPartitionedTable() + throws IOException + { + // This test will have its own session to make sure partitioning is indeed forced and is not a result + // of session configuration + Session session = testSessionBuilder() + .setCatalog(getQueryRunner().getDefaultSession().getCatalog()) + .setSchema(getQueryRunner().getDefaultSession().getSchema()) + .setSystemProperty("use_preferred_write_partitioning", "true") + .setSystemProperty("preferred_write_partitioning_min_number_of_partitions", "100") + .build(); + String tableName = "test_repartitiong_during_optimize_" + randomTableSuffix(); + assertUpdate(session, "CREATE TABLE " + tableName + " (key varchar, value integer) WITH (partitioning = ARRAY['key'])"); + // optimize an empty table + assertQuerySucceeds(session, "ALTER TABLE " + tableName + " EXECUTE OPTIMIZE"); + + assertUpdate(session, "INSERT INTO " + tableName + " VALUES ('one', 1)", 1); + assertUpdate(session, "INSERT INTO " + tableName + " VALUES ('one', 2)", 1); + assertUpdate(session, "INSERT INTO " + tableName + " VALUES ('one', 3)", 1); + assertUpdate(session, "INSERT INTO " + tableName + " VALUES ('one', 4)", 1); + assertUpdate(session, "INSERT INTO " + tableName + " VALUES ('one', 5)", 1); + assertUpdate(session, "INSERT INTO " + tableName + " VALUES ('one', 6)", 1); + assertUpdate(session, "INSERT INTO " + tableName + " VALUES ('one', 7)", 1); + assertUpdate(session, "INSERT INTO " + tableName + " VALUES ('two', 8)", 1); + assertUpdate(session, "INSERT INTO " + tableName + " VALUES ('two', 9)", 1); + assertUpdate(session, "INSERT INTO " + tableName + " VALUES ('three', 10)", 1); + + List initialFiles = getActiveFiles(tableName); + assertThat(initialFiles).hasSize(10); + + computeActual(session, "ALTER TABLE " + tableName + " EXECUTE OPTIMIZE"); + + assertThat(query(session, "SELECT sum(value), listagg(key, ' ') WITHIN GROUP (ORDER BY key) FROM " + tableName)) + .matches("VALUES (BIGINT '55', VARCHAR 'one one one one one one one three two two')"); + + List updatedFiles = getActiveFiles(tableName); + // as we force repartitioning there should be only 3 partitions + assertThat(updatedFiles).hasSize(3); + assertThat(getAllDataFilesFromTableDirectory(tableName)).containsExactlyInAnyOrderElementsOf(concat(initialFiles, updatedFiles)); + } }