-
Notifications
You must be signed in to change notification settings - Fork 3k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Use force repartitioning for OPTIMIZE for iceberg #10619
Use force repartitioning for OPTIMIZE for iceberg #10619
Conversation
@@ -481,7 +481,7 @@ public ConnectorOutputTableHandle beginCreateTable(ConnectorSession session, Con | |||
.map(IcebergColumnHandle::getName) | |||
.collect(toImmutableList()); | |||
|
|||
if (partitionSpec.fields().stream().allMatch(field -> field.transform().isIdentity())) { | |||
if (partitionSpec.fields().stream().allMatch(field -> field.transform().isIdentity()) && !forceRepartitioning) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Flip &&
so the simple boolean comes first
throws IOException | ||
{ | ||
String schema = getSession().getSchema().orElseThrow(); | ||
Path tableDataDir = getDistributedQueryRunner().getCoordinator().getBaseDataDir().resolve("iceberg_data").resolve(schema).resolve(tableName).resolve("data"); | ||
try (Stream<Path> list = Files.list(tableDataDir)) { | ||
return list | ||
.flatMap(path -> { | ||
try { | ||
return Files.walk(path).filter(Files::isRegularFile); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Change Files.list()
above to use Files.walk()
try { | ||
return Files.walk(path).filter(Files::isRegularFile); | ||
} | ||
catch (IOException e) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This should not occur. If it does, something is wrong and we should fail
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yes I know but as IOException is a checked one and it was inside the flatMap I had to do something with it..
8171387
to
22bb2ee
Compare
plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergMetadata.java
Show resolved
Hide resolved
plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/BaseIcebergConnectorTest.java
Show resolved
Hide resolved
{ | ||
return computeActual(format("SELECT file_path FROM \"%s$files\"", tableName)).getOnlyColumn() | ||
.map(String.class::cast) | ||
.collect(toImmutableList()); | ||
.collect(toImmutableSet()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
i guess @findepi used List here intentionally. Why the change?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I changed it to have the same way as in delta lake but if it was intentional I will revert my change
.map(Path::toString) | ||
.collect(toImmutableList()); | ||
} | ||
return Files.walk(tableDataDir) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
from Files.walk
docs
This method must be used within a try-with-resources statement or similar control structure to ensure that the stream's open directories are closed promptly after the stream's operations have completed.
Anyway, the changes here look as not related to the main part of the commit, while they actually are.
I'd extract this to a prep commit like "Let getAllDataFilesFromTableDirectory recurse directories"
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ok
// optimize an empty table | ||
assertQuerySucceeds("ALTER TABLE " + tableName + " EXECUTE OPTIMIZE"); | ||
assertThat(getActiveFiles(tableName)).isEmpty(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Redundant.
throws IOException | ||
{ | ||
String tableName = "test_repartitiong_during_optimize_" + randomTableSuffix(); | ||
assertUpdate("CREATE TABLE " + tableName + " (key integer, value varchar) WITH (partitioning = ARRAY['value'])"); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Partitioning on "value" is not natural.
Also it's more natural for partitioning column to come first in the table schema: pk integer, value varchar) ...
@@ -3339,4 +3340,43 @@ public void testOptimizeParameterValidation() | |||
"ALTER TABLE nation EXECUTE OPTIMIZE (file_size_threshold => '33s')", | |||
"\\QUnable to set procedure property 'file_size_threshold' to ['33s']: Unknown unit: s"); | |||
} | |||
|
|||
@Test | |||
public void testRepartitioningIsForcedDuringOptimize() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We should have tests for OPTIMIZE with partitioned tables.
Actually, this may be the test. Just rename it.
The "forced repartitioning" will then be an inline comment where the assertion is done
Set<String> initialFiles = getActiveFiles(tableName); | ||
assertThat(initialFiles).hasSize(10); | ||
|
||
computeActual("ALTER TABLE " + tableName + " EXECUTE OPTIMIZE"); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Use session with high preferred_write_partitioning_min_number_of_partitions
to verify repartitioning is indeed forced, as the test class could have it artificially lowered.
assertThat(updatedFiles).hasSize(3); | ||
assertThat(getAllDataFilesFromTableDirectory(tableName)).isEqualTo(union(initialFiles, updatedFiles)); | ||
} | ||
finally { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
the try finally
is not needed here, as the test runs on isolated resources (a temp folder).
you can omit it for improved readability
22bb2ee
to
c7b9667
Compare
@findepi do you mind taking another look ? |
c7b9667
to
645a418
Compare
One unrelated failure, I created an issue for it #10693 |
{ | ||
// This test will have its own session to make sure partitioning is indeed forced and is not a result | ||
// of session configuration | ||
Session currentSession = testSessionBuilder() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
call it session
return getActiveFiles(tableName, getQueryRunner().getDefaultSession()); | ||
} | ||
|
||
private List<String> getActiveFiles(String tableName, Session session) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
getActiveFiles
result doesn't depend on the session; the fact it uses SELECT
should be viewed as impl detail, it could go directly to storage, or use Iceberg APIs directly instead
no need to pass session arg here.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
hmm ok
.setSystemProperty("use_preferred_write_partitioning", "false") | ||
.setSystemProperty("preferred_write_partitioning_min_number_of_partitions", "100") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
use_preferred_write_partitioning=false cancels preferred_write_partitioning_min_number_of_partitions
i think use_preferred_write_partitioning should be true
For all partitioned tables it makes sense to force repartitiong while performing OPTIMIZE. Previously it was forced only if all fields in partition spec had identinty transform.
645a418
to
01b02cf
Compare
For all partitioned tables it makes sense to force repartitiong
while performing OPTIMIZE. Previously it was forced only if all fields
in partition spec had identinty transform.