From e04bd4d2bf764e834533882085303dc59610ecf4 Mon Sep 17 00:00:00 2001 From: Konrad Dziedzic Date: Wed, 19 Jan 2022 10:31:49 +0100 Subject: [PATCH 1/3] Fix indentation in BaseIcebergConnectorTest --- .../io/trino/plugin/iceberg/BaseIcebergConnectorTest.java | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/BaseIcebergConnectorTest.java b/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/BaseIcebergConnectorTest.java index c7620657401a..1e6ab2006fd6 100644 --- a/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/BaseIcebergConnectorTest.java +++ b/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/BaseIcebergConnectorTest.java @@ -2980,8 +2980,8 @@ public void testSplitPruningFromDataFileStatistics(DataMappingTestSetup testSetu String tableName = table.getName(); String values = Stream.concat( - nCopies(100, testSetup.getSampleValueLiteral()).stream(), - nCopies(100, testSetup.getHighValueLiteral()).stream()) + nCopies(100, testSetup.getSampleValueLiteral()).stream(), + nCopies(100, testSetup.getHighValueLiteral()).stream()) .map(value -> "(" + value + ", rand())") .collect(Collectors.joining(", ")); assertUpdate(withSmallRowGroups(getSession()), "INSERT INTO " + tableName + " VALUES " + values, 200); From ae5c8a03de2643d25cd41cc899ac7cfc3f1114aa Mon Sep 17 00:00:00 2001 From: Konrad Dziedzic Date: Wed, 19 Jan 2022 10:49:16 +0100 Subject: [PATCH 2/3] Let getAllDataFilesFromTableDirectory recurse directories --- .../io/trino/plugin/iceberg/BaseIcebergConnectorTest.java | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/BaseIcebergConnectorTest.java b/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/BaseIcebergConnectorTest.java index 1e6ab2006fd6..e79bbfa3528c 100644 --- a/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/BaseIcebergConnectorTest.java +++ b/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/BaseIcebergConnectorTest.java @@ -80,6 +80,7 @@ import static com.google.common.base.Preconditions.checkArgument; import static com.google.common.collect.ImmutableList.toImmutableList; import static com.google.common.collect.ImmutableMap.toImmutableMap; +import static com.google.common.collect.ImmutableSet.toImmutableSet; import static com.google.common.collect.Iterables.concat; import static com.google.common.collect.Iterables.getOnlyElement; import static com.google.common.collect.MoreCollectors.onlyElement; @@ -3327,8 +3328,9 @@ private List getAllDataFilesFromTableDirectory(String tableName) { String schema = getSession().getSchema().orElseThrow(); Path tableDataDir = getDistributedQueryRunner().getCoordinator().getBaseDataDir().resolve("iceberg_data").resolve(schema).resolve(tableName).resolve("data"); - try (Stream list = Files.list(tableDataDir)) { - return list + try (Stream walk = Files.walk(tableDataDir)) { + return walk + .filter(Files::isRegularFile) .filter(path -> !path.getFileName().toString().matches("\\..*\\.crc")) .map(Path::toString) .collect(toImmutableList()); From 01b02cfbae8db352029b49848f436f6107434654 Mon Sep 17 00:00:00 2001 From: Konrad Dziedzic Date: Wed, 19 Jan 2022 10:54:37 +0100 Subject: [PATCH 3/3] Use force repartitioning for OPTIMIZE for iceberg For all partitioned tables it makes sense to force repartitiong while performing OPTIMIZE. Previously it was forced only if all fields in partition spec had identinty transform. --- .../trino/plugin/iceberg/IcebergMetadata.java | 12 ++--- .../iceberg/BaseIcebergConnectorTest.java | 44 ++++++++++++++++++- 2 files changed, 50 insertions(+), 6 deletions(-) diff --git a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergMetadata.java b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergMetadata.java index bd4b7c203919..cbe4dff24f7e 100644 --- a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergMetadata.java +++ b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergMetadata.java @@ -432,7 +432,7 @@ public Optional getNewTableLayout(ConnectorSession sess { Schema schema = toIcebergSchema(tableMetadata.getColumns()); PartitionSpec partitionSpec = parsePartitionFields(schema, getPartitioning(tableMetadata.getProperties())); - return getWriteLayout(schema, partitionSpec); + return getWriteLayout(schema, partitionSpec, false); } @Override @@ -462,10 +462,10 @@ public Optional getInsertLayout(ConnectorSession sessio { IcebergTableHandle table = (IcebergTableHandle) tableHandle; Table icebergTable = catalog.loadTable(session, table.getSchemaTableName()); - return getWriteLayout(icebergTable.schema(), icebergTable.spec()); + return getWriteLayout(icebergTable.schema(), icebergTable.spec(), false); } - private Optional getWriteLayout(Schema tableSchema, PartitionSpec partitionSpec) + private Optional getWriteLayout(Schema tableSchema, PartitionSpec partitionSpec, boolean forceRepartitioning) { if (partitionSpec.isUnpartitioned()) { return Optional.empty(); @@ -483,7 +483,7 @@ private Optional getWriteLayout(Schema tableSchema, Par .map(IcebergColumnHandle::getName) .collect(toImmutableList()); - if (partitionSpec.fields().stream().allMatch(field -> field.transform().isIdentity())) { + if (!forceRepartitioning && partitionSpec.fields().stream().allMatch(field -> field.transform().isIdentity())) { // Do not set partitioningHandle, to let engine determine whether to repartition data or not, on stat-based basis. return Optional.of(new ConnectorNewTableLayout(partitioningColumnNames)); } @@ -615,7 +615,9 @@ public Optional getLayoutForTableExecute(ConnectorSessi private Optional getLayoutForOptimize(ConnectorSession session, IcebergTableExecuteHandle executeHandle) { Table icebergTable = catalog.loadTable(session, executeHandle.getSchemaTableName()); - return getWriteLayout(icebergTable.schema(), icebergTable.spec()); + // from performance perspective it is better to have lower number of bigger files than other way around + // thus we force repartitioning for optimize to achieve this + return getWriteLayout(icebergTable.schema(), icebergTable.spec(), true); } @Override diff --git a/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/BaseIcebergConnectorTest.java b/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/BaseIcebergConnectorTest.java index e79bbfa3528c..1235407079df 100644 --- a/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/BaseIcebergConnectorTest.java +++ b/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/BaseIcebergConnectorTest.java @@ -80,7 +80,6 @@ import static com.google.common.base.Preconditions.checkArgument; import static com.google.common.collect.ImmutableList.toImmutableList; import static com.google.common.collect.ImmutableMap.toImmutableMap; -import static com.google.common.collect.ImmutableSet.toImmutableSet; import static com.google.common.collect.Iterables.concat; import static com.google.common.collect.Iterables.getOnlyElement; import static com.google.common.collect.MoreCollectors.onlyElement; @@ -99,6 +98,7 @@ import static io.trino.spi.type.VarcharType.VARCHAR; import static io.trino.testing.MaterializedResult.resultBuilder; import static io.trino.testing.QueryAssertions.assertEqualsIgnoreOrder; +import static io.trino.testing.TestingSession.testSessionBuilder; import static io.trino.testing.assertions.Assert.assertEquals; import static io.trino.testing.assertions.Assert.assertEventually; import static io.trino.testing.sql.TestTable.randomTableSuffix; @@ -3350,4 +3350,46 @@ public void testOptimizeParameterValidation() "ALTER TABLE nation EXECUTE OPTIMIZE (file_size_threshold => '33s')", "\\QUnable to set procedure property 'file_size_threshold' to ['33s']: Unknown unit: s"); } + + @Test + public void testOptimizeForPartitionedTable() + throws IOException + { + // This test will have its own session to make sure partitioning is indeed forced and is not a result + // of session configuration + Session session = testSessionBuilder() + .setCatalog(getQueryRunner().getDefaultSession().getCatalog()) + .setSchema(getQueryRunner().getDefaultSession().getSchema()) + .setSystemProperty("use_preferred_write_partitioning", "true") + .setSystemProperty("preferred_write_partitioning_min_number_of_partitions", "100") + .build(); + String tableName = "test_repartitiong_during_optimize_" + randomTableSuffix(); + assertUpdate(session, "CREATE TABLE " + tableName + " (key varchar, value integer) WITH (partitioning = ARRAY['key'])"); + // optimize an empty table + assertQuerySucceeds(session, "ALTER TABLE " + tableName + " EXECUTE OPTIMIZE"); + + assertUpdate(session, "INSERT INTO " + tableName + " VALUES ('one', 1)", 1); + assertUpdate(session, "INSERT INTO " + tableName + " VALUES ('one', 2)", 1); + assertUpdate(session, "INSERT INTO " + tableName + " VALUES ('one', 3)", 1); + assertUpdate(session, "INSERT INTO " + tableName + " VALUES ('one', 4)", 1); + assertUpdate(session, "INSERT INTO " + tableName + " VALUES ('one', 5)", 1); + assertUpdate(session, "INSERT INTO " + tableName + " VALUES ('one', 6)", 1); + assertUpdate(session, "INSERT INTO " + tableName + " VALUES ('one', 7)", 1); + assertUpdate(session, "INSERT INTO " + tableName + " VALUES ('two', 8)", 1); + assertUpdate(session, "INSERT INTO " + tableName + " VALUES ('two', 9)", 1); + assertUpdate(session, "INSERT INTO " + tableName + " VALUES ('three', 10)", 1); + + List initialFiles = getActiveFiles(tableName); + assertThat(initialFiles).hasSize(10); + + computeActual(session, "ALTER TABLE " + tableName + " EXECUTE OPTIMIZE"); + + assertThat(query(session, "SELECT sum(value), listagg(key, ' ') WITHIN GROUP (ORDER BY key) FROM " + tableName)) + .matches("VALUES (BIGINT '55', VARCHAR 'one one one one one one one three two two')"); + + List updatedFiles = getActiveFiles(tableName); + // as we force repartitioning there should be only 3 partitions + assertThat(updatedFiles).hasSize(3); + assertThat(getAllDataFilesFromTableDirectory(tableName)).containsExactlyInAnyOrderElementsOf(concat(initialFiles, updatedFiles)); + } }