-
Notifications
You must be signed in to change notification settings - Fork 2.5k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Enabling global index for MOR #389
Conversation
d1e9542
to
984079a
Compare
@n3nash: We should address the redundant fs.listStatus calls so that we no longer have to worry about working around it. As we have MVCC, can we construct FileSystemView only once in the driver and make it available as a spark broadcast read-only object. For FileSystemView lookup, this view should be the correct one. For the Reader, it should only be the initial file-system-view. For Async ingestion, this is similar to that of ingestion (merged view of this broadcast variable (initial-view) and a delta-view which the compactor itself caused by compacting part of latest file-slice). |
@bvaradar Thanks for your comments. I agree with you that we have to address fs.listStatus once so we don't have to worry about it all the time. This has been a topic of discussion for a very long time, you can also see a PR made along these lines here : #278. There are different possible approaches and trade-offs which we have discussed internally. Happy to share the approaches and trade-offs offline if you're interested. We also want to add this : #209 when we make that change. |
@@ -195,17 +195,24 @@ public Partitioner getUpsertPartitioner(WorkloadProfile profile) { | |||
|
|||
// append rollback blocks for updates | |||
if (commitMetadata.getPartitionToWriteStats().containsKey(partitionPath)) { | |||
// This needs to be done since GlobalIndex at the moment does not store the latest commit time |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
From the diff, I think the goal for this change is to correctly identify the base-commit so that we can append a rollback block. In that case, Can't we just get the base-commit from the writeStat all the time (global index or not ) ? The AppendHandle sets the base-commit as the previous commit in the write-stat. I may be missing some context here.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is the scenario where the write failed, we record old commitTimes in the inflight file.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is specific to the HBaseIndex implementation and not global indexing in general actually..
can you expand on why this is fixed specifically for rollbacks alone? how does the happy path know the base commit time to append to, if HBaseIndex won't look it up? |
For the happy path, we do a listStatus to find the latest valid file : https://github.com/uber/hudi/blob/master/hoodie-client/src/main/java/com/uber/hoodie/io/HoodieAppendHandle.java#L97. |
Long startTime = System.currentTimeMillis(); | ||
|
||
HoodieIndex hoodieIndex = HoodieIndex.createIndex(config, jsc); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
please leave a TODO here to clean this up later once table has reference to the index ..
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done
@@ -195,17 +195,24 @@ public Partitioner getUpsertPartitioner(WorkloadProfile profile) { | |||
|
|||
// append rollback blocks for updates | |||
if (commitMetadata.getPartitionToWriteStats().containsKey(partitionPath)) { | |||
// This needs to be done since GlobalIndex at the moment does not store the latest commit time |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is specific to the HBaseIndex implementation and not global indexing in general actually..
@vinothchandar Yes, it is specific to HbaseIndex but isGlobal() seemed cleaner so kept that. We can do index instanceOf HbaseIndex but that's again specific to how clients want to use Hbase, may be they allow for updates too. WDYT ? |
984079a
to
6a06bda
Compare
Here is a summary from my investigation. We dont use the commitTime in HoodieRecordLocation in a way that matters. That's why it works now . HBaseIndex could in theory list and find out the commitTime once and we can annotate stuff nicely. But given fast path is already not relying on this. I am willing to concede. Filed an issue to track this |
@vinothchandar @bvaradar Kept this change in HoodieMergeOnReadTable rollback and not HbaseIndex to avoid multiple fs.listStatus() calls. Will try to add a test case for this.