-
Notifications
You must be signed in to change notification settings - Fork 3.1k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Fix delta lake CDF entries for MERGE queries #16127
Fix delta lake CDF entries for MERGE queries #16127
Conversation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good catch
LongBitmapDataProvider rowsToDelete = deletion.rowsToDelete; | ||
LongBitmapDataProvider rowsDeletedBecauseOfUpdate = deletion.rowsDeletedBecauseOfUpdate(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we avoid marking update preimage rows in both Bitmaps?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
i don't think so, we need to carry information if particular position deletion came from DELETE or from UPDATE, instead of a bitmap a normal Map could be used that would store the reason for the deletion but I assumed it is more efficient to use 2 bitmaps.
I can change to one Map if you think it is better
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Two Bitmaps is good, but you have redundant information for updated rows, the positions for both rowsToDelete
and rowsDeletedBecauseOfUpdate
are set. I'd expect you to set one or the other, not both.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ok I think I figured out your point, and now the second if would probably also work the way you wrote it, I will give it a shot. thanks
int retainedCount = 0; | ||
int deletedCount = 0; | ||
int deletedBecauseOfUpdateCount = 0; | ||
int deletedBecauseOfDeleteCount = 0; | ||
for (int position = 0; position < positionCount; position++) { | ||
if (!rowsToDelete.contains(filePosition)) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'd rephrase these as:
if (rowsToDelete.contains(filePosition)) {
...
}
else if (rowsDeletedBecauseOfUpdate.contains(filePosition)) {
...
}
else {
// retain ...
}
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
But that would be incorrect. I am interested in rows that will be deleted I just need to split them into 2 types
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ok my bad, I figured it out and it works. Thanks
881b5b2
to
1b534fb
Compare
Build is red (error-prone-checks)
|
onDelta().executeQuery("INSERT INTO default." + tableName2 + " VALUES (4, 'nation4000', 4000)"); | ||
|
||
onTrino().executeQuery("MERGE INTO delta.default." + tableName1 + " cdf USING delta.default." + tableName2 + " n " + | ||
"ON (cdf.nationkey = n.nationkey) " + |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: Use text blocks
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
hmm ok but I am not a big fun, the fact they don't support interpolation is a no go for me
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
#16127 (comment) we will stick to strings, sorry
@@ -410,21 +416,30 @@ private Optional<DataFileInfo> rewriteParquetFile(Path path, FileDeletion deleti | |||
int positionCount = page.getPositionCount(); | |||
int[] retained = new int[positionCount]; | |||
int[] deleted = new int[positionCount]; | |||
int[] deletedBecauseOfUpdate = new int[positionCount]; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We created before this change one array with positionCount
items. Now we create two arrays with positionCount
items. Is this valid?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We created 2 arrays with positionCount:
int[] retained = new int[positionCount];
int[] deleted = new int[positionCount];
now we create three, this is valid, these arrays are only used to get specific positions from page
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why not use rowsDeletedBecauseOfUpdate.getLongCardinality()
instead?
"USING DELTA " + | ||
"LOCATION 's3://" + bucketName + "/databricks-compatibility-test-" + tableName1 + "'" + | ||
"TBLPROPERTIES (delta.enableChangeDataFeed = true)"); | ||
onDelta().executeQuery("CREATE TABLE default." + tableName2 + " (nationkey INT, name STRING, regionkey INT) " + |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this table does not need to enable cdf
@Test(groups = {DELTA_LAKE_DATABRICKS, DELTA_LAKE_EXCLUDE_73, PROFILE_SPECIFIC_TESTS}) | ||
public void testMergeMixedDeleteAndUpdateIntoTableWithCdfEnabled() | ||
{ | ||
String tableName1 = "test_merge_mixed_delete_and_update_into_table_with_cdf_" + randomNameSuffix(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
tableName1
-> targetTableName
tableName2
-> sourceTableName
as in the syntax of MERGE
https://trino.io/docs/current/sql/merge.html
"WHEN MATCHED AND cdf.nationkey = 2 " + | ||
"THEN DELETE " + | ||
"WHEN MATCHED AND cdf.nationkey > 2 " + | ||
"THEN UPDATE SET nationkey = (cdf.nationkey + n.nationkey + n.regionkey) " + |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
optional nit for getting the test easier to read
Let's rather update the field you have now named as regionkey
to easier follow what gets changes by having the nationkey
as key.
I'd go with a page_views (pagekey, url, views)
sample which is easier to follow (when merging a page with the same id, the views get summed).
1b534fb
to
982fc2d
Compare
plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/DeltaLakeMergeSink.java
Outdated
Show resolved
Hide resolved
...ava/io/trino/tests/product/deltalake/TestDeltaLakeDatabricksChangeDataFeedCompatibility.java
Outdated
Show resolved
Hide resolved
982fc2d
to
f1c89aa
Compare
@ebyhr could you please run this one with secrets ? |
/test-with-secrets sha=f1c89aadc1ab6472d4c292a04e55daa2c8a511bb |
The CI workflow run with tests that require additional secrets finished as failure: https://github.com/trinodb/trino/actions/runs/4201339088 |
Before this change MERGE that made both UPDATE and DELETE could create CDF entries with rows marked as delete when in fact there were update_preimage and other way around
f1c89aa
to
ea53061
Compare
Description
Before this change MERGE that made both UPDATE and DELETE could create CDF entries with rows marked as delete when in fact there were update_preimage and other way around
Additional context and related issues
Release notes
(x) Release notes are required, with the following suggested text: