-
Notifications
You must be signed in to change notification settings - Fork 2.5k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Fix for updating duplicate records in same/different files in same pa… #380
Fix for updating duplicate records in same/different files in same pa… #380
Conversation
Also @bvaradar. |
@suniluber can you sign the CLA ? |
@vinothchandar signed the CLA. |
// If a record being tagged exists in multiple files in the same partition and if the | ||
// currentLocation has already been set, then create a new record with the same key | ||
// and data, set the current path of the new record and return it. | ||
if (record.getCurrentLocation() != null) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
whats the intent of this if-block? Seems you are re-initing the same object again based on it self?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
When you a record in multiple files in the same partition, then rowKeyRecordPairRDD will have 2 entries with the same exact in memory copy of the HoodieRecord and the 2 separate filenames that the record is found in. In this case, setCurrentLocation will fail for the second file, as we are setting the location on the same in memory object. HoodieRecord restricts setting the currentLocation if it is already set. So creating multiple copies of the HoodieRecord with the different file locations.
I can create a new HoodieRecord here and set the current location. Thoughts ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I see. this is very hacky :) .. arghhh. :)
can you explain this more clearly in the comments there? esp this part "When you a record in multiple files in the same partition, then rowKeyRecordPairRDD will have 2 entries with the same exact in memory copy of the HoodieRecord and the 2 separate filenames that the record is found in"
we can leave as it is.
@@ -185,8 +191,7 @@ private boolean writeUpdateRecord(HoodieRecord<T> hoodieRecord, | |||
} | |||
|
|||
/** | |||
* Go through an old record. Here if we detect a newer version shows up, we write the new one to | |||
* the file. | |||
* Go through an old record. Here if we detect a newer version shows up, we write the new one to the file. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
can you please revert pure whitespace changes?
@@ -107,6 +107,10 @@ protected HoodieWriteConfig getConfig() { | |||
.withIndexConfig(HoodieIndexConfig.newBuilder().withIndexType(HoodieIndex.IndexType.BLOOM).build()); | |||
} | |||
|
|||
HoodieWriteConfig.Builder getConfigWithBulkInsertParallelismBuilder() { | |||
return getConfigBuilder().withBulkInsertParallelism(1); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
1 is very restrictive.. Can we make it atleast 2, so there is some notion of parallel compute
@@ -52,6 +54,7 @@ | |||
|
|||
private WriteStatus writeStatus; | |||
private Map<String, HoodieRecord<T>> keyToNewRecords; | |||
private Set<String> existingRecords; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
rename to : writtenRecordKeys
@@ -581,6 +587,171 @@ public void testCommitWritesRelativePaths() throws Exception { | |||
} | |||
} | |||
|
|||
@Test | |||
public void testUpsertsForMultipleRecordsInSameFile() throws Exception { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we just create a IOHandle level test for this? I would like to leave this file for API level tests.
@@ -247,6 +258,16 @@ public HoodieRecord generateUpdateRecord(HoodieKey key, String commitTime) throw | |||
return updates; | |||
} | |||
|
|||
/** | |||
* Generates 1 new update, given the hoodie key, riderName and driverName | |||
*/ |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
can we call generateUpdates(commitTime, 1)
? Please try to reuse the existing methods as much as possible
/** | ||
* Generates a new avro record of the above schema format, with the given hoodiekey, riderName and driverName | ||
*/ | ||
public static TestRawTripPayload generateGenericRecord(HoodieKey key, String commitTime, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is returning a TestRawTripPayload
.. Can we reuse generateRandomValue() above.. any specific reason you need rider and driver names specifically passed in and not use the defaults there?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Agree with Vinoth's comments.
In general, I feel we should just put the guard in all our public APIs (HoodieWriteClient) to prevent these cases. An extra reduceByKey on incoming RDD will avoid the problem in first place and keep the data-set uncorrupted.
Even with the fix in this PR, there is no guarantee that readers will not be seeing duplicate records. @vinothchandar : Thoughts ?
1491dd1
to
c5f63fa
Compare
c5f63fa
to
bdd77dc
Compare
@vinothchandar I have made the discussed changes. Can you review when you get a chance ? |
@bvaradar i think sunil does not want to pay the reducebykey cost and thus the changes. I have clarified with them that they should have unit test in the ingestion system to keep ensuring behavior and this just happens to work :) @suniluber will take a pass today |
} | ||
|
||
@Test | ||
public void testUpsertsForMultipleRecordsInSameFile() throws Exception { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@suniluber Like I mentioned before, can't we write a IOHandle level test for this?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
is this test covering in a guaranteed way - duplicates inside same file and duplicates within same partition but different files?
Again I feel a lower level unit test would test all that.. Does such a test exist elsewhere for e.g in the ingestion code? I think those two are critical cases to cover to ensure this does not regress
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@vinothchandar yes, this test is covering duplicates within the same file in the same partition and duplicates within different files in the same partition.
} | ||
|
||
Dataset<Row> dataSet = HoodieClientTestUtils.read(basePath, sqlContext, fs, fullPartitionPaths); | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: please remove extra blank line
*/ | ||
newCommitTime = "003"; | ||
client.startCommitWithTime(newCommitTime); | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
can you please get rid of extra newlines in this file,
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Few comments. LG otherwise
Fix for #333
@ovj @vinothchandar @jianxu @n3nash