From 9b40ead952fee4a10b089638ff56b44916306aaa Mon Sep 17 00:00:00 2001 From: Piotr Findeisen Date: Tue, 9 Nov 2021 15:28:22 +0100 Subject: [PATCH] Add stats-based writer partitioning for identity-partitioned Iceberg table When Iceberg table is partitioned on identity transforms, `ConnectorNewTableLayout` returned from `ConnectorMetadata.getInsertLayout` or `ConnectorMetadata.getNewTableLayout` doesn't have to have `ConnectorNewTableLayout.partitioning` handle set. This enables the engine to choose whether writers should be repartitioned or not ("preferred write partitioning"). This improves write speed when writing small number of partitions. --- .../trino/plugin/iceberg/IcebergMetadata.java | 14 ++-- .../iceberg/BaseIcebergConnectorTest.java | 67 ++++++++++++++++--- 2 files changed, 67 insertions(+), 14 deletions(-) diff --git a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergMetadata.java b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergMetadata.java index 7385206dbea6..bd0a0a0b25c1 100644 --- a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergMetadata.java +++ b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergMetadata.java @@ -451,12 +451,16 @@ private Optional getWriteLayout(Schema tableSchema, Par .map(field -> requireNonNull(columnById.get(field.sourceId()), () -> "Cannot find source column for partitioning field " + field)) .distinct() .collect(toImmutableList()); + List partitioningColumnNames = partitioningColumns.stream() + .map(IcebergColumnHandle::getName) + .collect(toImmutableList()); - return Optional.of(new ConnectorNewTableLayout( - new IcebergPartitioningHandle(toPartitionFields(partitionSpec), partitioningColumns), - partitioningColumns.stream() - .map(IcebergColumnHandle::getName) - .collect(toImmutableList()))); + if (partitionSpec.fields().stream().allMatch(field -> field.transform().isIdentity())) { + // Do not set partitioningHandle, to let engine determine whether to repartition data or not, on stat-based basis. + return Optional.of(new ConnectorNewTableLayout(partitioningColumnNames)); + } + IcebergPartitioningHandle partitioningHandle = new IcebergPartitioningHandle(toPartitionFields(partitionSpec), partitioningColumns); + return Optional.of(new ConnectorNewTableLayout(partitioningHandle, partitioningColumnNames)); } @Override diff --git a/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/BaseIcebergConnectorTest.java b/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/BaseIcebergConnectorTest.java index 63b5beb92671..f7745affd23c 100644 --- a/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/BaseIcebergConnectorTest.java +++ b/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/BaseIcebergConnectorTest.java @@ -83,6 +83,7 @@ import static com.google.common.collect.MoreCollectors.onlyElement; import static com.google.common.collect.MoreCollectors.toOptional; import static io.trino.SystemSessionProperties.JOIN_DISTRIBUTION_TYPE; +import static io.trino.SystemSessionProperties.PREFERRED_WRITE_PARTITIONING_MIN_NUMBER_OF_PARTITIONS; import static io.trino.plugin.hive.HdfsEnvironment.HdfsContext; import static io.trino.plugin.hive.HiveTestUtils.HDFS_ENVIRONMENT; import static io.trino.plugin.iceberg.IcebergQueryRunner.createIcebergQueryRunner; @@ -2729,34 +2730,82 @@ public void testLocalDynamicFilteringWithSelectiveBuildSizeJoin() } @Test(dataProvider = "repartitioningDataProvider") - public void testRepartitionDataOnCtas(String partitioning, int expectedFiles) + public void testRepartitionDataOnCtas(Session session, String partitioning, int expectedFiles) { - testRepartitionData(getSession(), "tpch.tiny.orders", true, partitioning, expectedFiles); + testRepartitionData(session, "tpch.tiny.orders", true, partitioning, expectedFiles); } @Test(dataProvider = "repartitioningDataProvider") - public void testRepartitionDataOnInsert(String partitioning, int expectedFiles) + public void testRepartitionDataOnInsert(Session session, String partitioning, int expectedFiles) { - testRepartitionData(getSession(), "tpch.tiny.orders", false, partitioning, expectedFiles); + testRepartitionData(session, "tpch.tiny.orders", false, partitioning, expectedFiles); } @DataProvider public Object[][] repartitioningDataProvider() { + Session defaultSession = getSession(); + // For identity-only partitioning, Iceberg connector returns ConnectorNewTableLayout with partitionColumns set, but without partitioning. + // This is treated by engine as "preferred", but not mandatory partitioning, and gets ignored if stats suggest number of partitions + // written is low. Without partitioning, number of files created is nondeterministic, as a writer (worker node) may or may not receive data. + Session obeyConnectorPartitioning = Session.builder(defaultSession) + .setSystemProperty(PREFERRED_WRITE_PARTITIONING_MIN_NUMBER_OF_PARTITIONS, "1") + .build(); + return new Object[][] { // identity partitioning column - {"'orderstatus'", 3}, + {obeyConnectorPartitioning, "'orderstatus'", 3}, // bucketing - {"'bucket(custkey, 13)'", 13}, + {defaultSession, "'bucket(custkey, 13)'", 13}, // varchar-based - {"'truncate(comment, 1)'", 35}, + {defaultSession, "'truncate(comment, 1)'", 35}, // complex; would exceed 100 open writers limit in IcebergPageSink without write repartitioning - {"'bucket(custkey, 4)', 'truncate(comment, 1)'", 131}, + {defaultSession, "'bucket(custkey, 4)', 'truncate(comment, 1)'", 131}, // same column multiple times - {"'truncate(comment, 1)', 'orderstatus', 'bucket(comment, 2)'", 180}, + {defaultSession, "'truncate(comment, 1)', 'orderstatus', 'bucket(comment, 2)'", 180}, }; } + @Test + public void testStatsBasedRepartitionDataOnCtas() + { + testStatsBasedRepartitionData(true); + } + + @Test + public void testStatsBasedRepartitionDataOnInsert() + { + testStatsBasedRepartitionData(false); + } + + private void testStatsBasedRepartitionData(boolean ctas) + { + Session sessionRepartitionSmall = Session.builder(getSession()) + .setSystemProperty(PREFERRED_WRITE_PARTITIONING_MIN_NUMBER_OF_PARTITIONS, "2") + .build(); + Session sessionRepartitionMany = Session.builder(getSession()) + .setSystemProperty(PREFERRED_WRITE_PARTITIONING_MIN_NUMBER_OF_PARTITIONS, "5") + .build(); + // Use DISTINCT to add data redistribution between source table and the writer. This makes it more likely that all writers get some data. + String sourceRelation = "(SELECT DISTINCT orderkey, custkey, orderstatus FROM tpch.tiny.orders)"; + testRepartitionData( + sessionRepartitionSmall, + sourceRelation, + ctas, + "'orderstatus'", + 3); + // Test uses relatively small table (60K rows). When engine doesn't redistribute data for writes, + // occasionally a worker node doesn't get any data and fewer files get created. + assertEventually(() -> { + testRepartitionData( + sessionRepartitionMany, + sourceRelation, + ctas, + "'orderstatus'", + 9); + }); + } + private void testRepartitionData(Session session, String sourceRelation, boolean ctas, String partitioning, int expectedFiles) { String tableName = "repartition" +