-
Notifications
You must be signed in to change notification settings - Fork 232
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
prevent adding duplicate files #1036
Conversation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for contributing this! A few minor comments
pyiceberg/table/__init__.py
Outdated
unique_files = set(file_paths) | ||
return len(unique_files) != len(file_paths) | ||
|
||
def find_referenced_files(self, file_paths: List[str]) -> list[str]: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
FYI there's also the table.inspect.files()
API
https://py.iceberg.apache.org/api/#files
which returns all data files in the current snapshot
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I guess we only need to worry about the current snapshot.
If a data file existed in previous snapshots, but not in the current, we can still add the file.
pyiceberg/table/__init__.py
Outdated
@@ -621,6 +621,24 @@ def delete(self, delete_filter: Union[str, BooleanExpression], snapshot_properti | |||
if not delete_snapshot.files_affected and not delete_snapshot.rewrites_needed: | |||
warnings.warn("Delete operation did not match any records") | |||
|
|||
def has_duplicates(self, file_paths: List[str]) -> bool: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: static method. may be better if inlined
pyiceberg/table/__init__.py
Outdated
@@ -630,7 +648,15 @@ def add_files(self, file_paths: List[str], snapshot_properties: Dict[str, str] = | |||
|
|||
Raises: | |||
FileNotFoundError: If the file does not exist. | |||
ValueError: Raises a ValueError in case file_paths is not unique |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: given file_paths contains duplicate files
pyiceberg/table/__init__.py
Outdated
@@ -630,7 +648,15 @@ def add_files(self, file_paths: List[str], snapshot_properties: Dict[str, str] = | |||
|
|||
Raises: | |||
FileNotFoundError: If the file does not exist. | |||
ValueError: Raises a ValueError in case file_paths is not unique | |||
ValueError: Raises a ValueError in case file already referenced in table |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: given file_paths already referenced by table
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hi @amitgilad3 thanks again for putting together this PR! This will be a stellar safeguard to have on this new API.
I left a comment regarding the method definition, but the functionality looks 💯
pyiceberg/table/__init__.py
Outdated
@@ -621,6 +621,13 @@ def delete(self, delete_filter: Union[str, BooleanExpression], snapshot_properti | |||
if not delete_snapshot.files_affected and not delete_snapshot.rewrites_needed: | |||
warnings.warn("Delete operation did not match any records") | |||
|
|||
def find_referenced_files(self, file_paths: List[str]) -> list[str]: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We want to be intentional about introducing public methods. Since this is just used by add_files
, could we make this method private? The type notation of list
in the return type also needs to be typing.List
until we deprecate python3.8 support:
def find_referenced_files(self, file_paths: List[str]) -> list[str]: | |
def _find_referenced_files(self, file_paths: List[str]) -> List[str]: |
An alternative is to just keep this logic within add_files
, since this method isn't reused and is just 2 or 3 lines of code
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
you are right, it makes more sense to move the logic into add_files since no one uses it and its on 3 lines if code
pyiceberg/table/__init__.py
Outdated
expr = pc.field("file_path").isin(file_paths) | ||
referenced_files = [file["file_path"] for file in self._table.inspect.files().filter(expr).to_pylist()] | ||
|
||
if referenced_files: | ||
raise ValueError(f"Cannot add files that are already referenced by table, files: {', '.join(referenced_files)}") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Just one more suggestion here @amitgilad3 - should we make this behavior the default, but have a boolean flag available to disable it in the add_files
API?
A similar spark procedure has an equivalent flag check_duplicate_files
and I feel that it could be useful because inspecting the files table requires us to read all of the active manifest files. If the user already knows that they won't be running into this issue, I think it would be useful for them to be able to disable this check.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
so my suggestion is something like:
def add_files(self, file_paths: List[str], snapshot_properties: Dict[str, str] = EMPTY_DICT, check_duplicate_files: bool = True) -> None:
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
i also added the flag to the table api and added tests to make sure the flag works with False
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for the PR, i added a few comments
tests/integration/test_add_files.py
Outdated
with pytest.raises(ValueError) as exc_info: | ||
tbl.add_files(file_paths=[referenced_file]) | ||
assert f"Cannot add files that are already referenced by table, files: {referenced_file}" in str(exc_info.value) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: its not clear to me how referenced_file
is "referenced" already
|
||
|
||
@pytest.mark.integration | ||
def test_add_files_that_referenced_by_current_snapshot_with_check_duplicate_files_false( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: split this into 2 tests. one for the happy path, another for check_duplicate_files=False
.
What happens when check_duplicate_files=False
is set and we add files already referenced, does this leave the table in an inconsistent (bad) state?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
So when you set check_duplicate_files to False you are essentially taking responsibility for scenarios where duplicate files can be added, but the default is to validate
tests/integration/test_add_files.py
Outdated
@pytest.mark.integration | ||
def test_add_files_with_duplicate_files_in_file_paths(spark: SparkSession, session_catalog: Catalog, format_version: int) -> None: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@pytest.mark.integration | |
def test_add_files_with_duplicate_files_in_file_paths(spark: SparkSession, session_catalog: Catalog, format_version: int) -> None: | |
@pytest.mark.integration | |
@pytest.mark.parametrize("format_version", [1, 2]) | |
def test_add_files_with_duplicate_files_in_file_paths(spark: SparkSession, session_catalog: Catalog, format_version: int) -> None: |
@amitgilad3 - this looks almost ready to merge. @kevinjqliu 's made some great suggestions here, so I'm thinking that we can take another round of reviews once we take a pass through adopting those review comments |
Hey @sungwy + @kevinjqliu , again thanks for all the help and guidance , i went over all the comments and fixed them |
Hey @sungwy , fixed all the comments and tests pass is there anything else or can we merge? |
I will take another pass at it today. Thank you @amitgilad3 ! |
* prevent add_files from adding a file that's already referenced by the iceberg table * fix method that searches files that are already referenced + docs * move function to locate duplicate files into add_files * add check_duplicate_files flag to add_files api to make the behaviour according to java api * add check_duplicate_files flag to table level api and add tests * add check_duplicate_files flag to table level api and add tests * fix tests to check new new added flag check_duplicate_files and fix checks * fix linting
* prevent add_files from adding a file that's already referenced by the iceberg table * fix method that searches files that are already referenced + docs * move function to locate duplicate files into add_files * add check_duplicate_files flag to add_files api to make the behaviour according to java api * add check_duplicate_files flag to table level api and add tests * add check_duplicate_files flag to table level api and add tests * fix tests to check new new added flag check_duplicate_files and fix checks * fix linting
This resolves #998 , where duplicate files are added with add_files method, handles 2 cases: