-
Notifications
You must be signed in to change notification settings - Fork 415
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Add Vacuum command in DeltaTable #207
Conversation
990a4b2
to
d4eb866
Compare
d4b25d5
to
6357570
Compare
6357570
to
2a3e3d7
Compare
6f0bdd1
to
fc5c3b6
Compare
fc5c3b6
to
fa6dc42
Compare
/// Names of the form partitionCol=[value] are partition directories, and should be | ||
/// deleted even if they'd normally be hidden. The _db_index directory contains (bloom filter) | ||
/// indexes and these must be deleted when the data they are tied to is deleted. | ||
fn is_hidden_directory(&self, path_name: &str) -> Result<bool, DeltaTableError> { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The lack of unit tests here make me uneasy, considering the importance not deleting data that shouldn't be deleted 😆
For the purposes of vacuum is it insufficient to just delete files ending in .parquet
🤔
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I agree! We should add integration tests to validate the different scenarios.
|
||
/// Run the Vacuum command on the Delta Table: delete files no longer referenced by a Delta table and are older than the retention threshold. | ||
/// We do not recommend that you set a retention interval shorter than 7 days, because old snapshots and uncommitted files can still be in use by concurrent readers or writers to the table. If vacuum cleans up active files, concurrent readers can fail or, worse, tables can be corrupted when vacuum deletes files that have not yet been committed. | ||
pub async fn vacuum( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This function is clean, nice work!
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I will file an issue for testing around vacuum
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
sorry for the late review, I added some performance related suggestions :)
I didn't know there is a _delta_index
directory, this is pretty cool.
let mut all_files = self.storage.list_objs(&self.table_path).await?; | ||
while let Some(obj_meta) = all_files.next().await { | ||
let obj_meta = obj_meta?; | ||
let is_not_valid_file = !self.get_file_paths().contains(&obj_meta.path); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@fvaleye this will result in performance issue right? we are recreating a new string vector on every listed s3 object here? would be better to construct a hashset based on the return of get_file_paths
before entering the while loop.
let is_not_valid_file = !self.get_file_paths().contains(&obj_meta.path); | ||
let is_valid_tombstone = tombstones_path.contains(&obj_meta.path); | ||
let is_not_hidden_directory = !self.is_hidden_directory(&obj_meta.path)?; | ||
if is_not_valid_file && is_valid_tombstone && is_not_hidden_directory { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
self.is_hidden_directory
is always invoked in this tight loop even when previous two conditions are not met, would be better to perform early exit of the loop using continue
so we don't call self.is_hidden_directory
when it's not needed. This method builds serval strings at runtime, which is an expensive routine.
let mut tombstones = vec![]; | ||
let mut all_files = self.storage.list_objs(&self.table_path).await?; | ||
while let Some(obj_meta) = all_files.next().await { | ||
let obj_meta = obj_meta?; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think it would be better to strip table path from obj_meta.path
right after here and only work with object path like how it's done in the scala implementation, this way, we can avoid all the self.storage.join_path
calls, which can get expensive if we are dealing with 100k+ objects.
retention_hours: u64, | ||
dry_run: bool, | ||
) -> Result<Vec<String>, DeltaTableError> { | ||
let tombstones_path = self.get_stale_files(retention_hours)?; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nitpick, i think expired_tombstones
would be a better name here so readers won't mix this up with valid tombstones that are not supposed to be deleted.
Description
Add Vacuum on the Delta table
Related Issue(s)
#97