Skip to content

Commit

Permalink
Minor updates to duplicate removal (#570)
Browse files Browse the repository at this point in the history
* Minor updates to duplicate removal

Signed-off-by: Sarah Yurick <[email protected]>

* change warning to print

Signed-off-by: Sarah Yurick <[email protected]>

---------

Signed-off-by: Sarah Yurick <[email protected]>
  • Loading branch information
sarahyurick authored Feb 25, 2025
1 parent e1abd74 commit a080400
Show file tree
Hide file tree
Showing 7 changed files with 98 additions and 16 deletions.
3 changes: 1 addition & 2 deletions examples/exact_deduplication.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
# Copyright (c) 2024, NVIDIA CORPORATION. All rights reserved.
# Copyright (c) 2025, NVIDIA CORPORATION. All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -64,7 +64,6 @@ def main(args):
if isinstance(duplicates, str):
duplicates = DocumentDataset.read_parquet(duplicates, backend=backend)

# It's easy to apply dataframe operations to the dataset by using the underlying df.
result = exact_dup.remove(input_dataset, duplicates)
write_to_disk(result, output_dir, output_type="parquet")
print(time.time() - t0)
Expand Down
2 changes: 1 addition & 1 deletion nemo_curator/modules/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -136,7 +136,7 @@ def __post_init__(self):

if not self.perform_removal:
warnings.warn(
"In future releases (starting with 0.8.0) the default will be True."
"In future NeMo Curator releases, the default value for perform_removal will be True."
)


Expand Down
39 changes: 33 additions & 6 deletions nemo_curator/modules/exact_dedup.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
# Copyright (c) 2024, NVIDIA CORPORATION. All rights reserved.
# Copyright (c) 2025, NVIDIA CORPORATION. All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -72,16 +72,20 @@ def __init__(
self.id_field = id_field
self.text_field = text_field
self.perform_removal = perform_removal

if not self.perform_removal:
warnings.warn(
"In future releases (starting with 0.8.0) the default will be True."
"In future NeMo Curator releases, the default value for perform_removal will be True."
)

if self.perform_removal and cache_dir is None:
warnings.warn("cache_dir is recommended to remove duplicates.")

if cache_dir is None and profile_dir is not None:
warnings.warn(
"cache_dir for intermediate outputs is required to generate profiles"
)

self.cache_dir = cache_dir
self.profile_dir = profile_dir

Expand All @@ -94,7 +98,7 @@ def __init__(
else:
self._logger = logger

def _exact_dup_ids(self, df: dd.DataFrame):
def _exact_dup_ids(self, df: dd.DataFrame) -> dd.DataFrame:
"""
Get the id's for text/documents that are exact duplicates
Parameters
Expand All @@ -105,17 +109,20 @@ def _exact_dup_ids(self, df: dd.DataFrame):
* A unique ID column for each document
"""
hash_df = self._compute_hashes(df)

shuffle_context = (
config.set({"dataframe.shuffle.method": "tasks"})
if DASK_P2P_ERROR
else nullcontext()
)

with shuffle_context:
dup_ids = hash_df.shuffle(
on=["_hashes"],
ignore_index=True,
npartitions=max(1, (hash_df.npartitions // 3)),
).map_partitions(lambda x: x[x["_hashes"].duplicated(keep=False)])

return dup_ids

def _compute_hashes(
Expand All @@ -129,9 +136,11 @@ def _compute_hashes(
self._logger.info("Starting lazy hash generation")
res = df[[self.id_field]]
res["_hashes"] = df[self.text_field].map_partitions(self.hash_documents)

self._logger.info(
f"Lazy hash generation complete for {res.npartitions} partitions"
)

return res

def hash_documents(
Expand All @@ -142,20 +151,24 @@ def hash_documents(
"""
if is_cudf_type(df):
return df.hash_values(method=self.hash_method)

elif isinstance(df, pd.Series):
# TODO: Generalize ty using self.hash_method
return df.apply(lambda x: md5(x.encode()).hexdigest())

else:
raise ValueError(f"Unsupported type: {type(df)}")

def identify_duplicates(self, dataset: DocumentDataset) -> DocumentDataset:
"""
Find document ID's for exact duplicates in a given DocumentDataset
Find document IDs for exact duplicates in a given DocumentDataset
Parameters
----------
dataset: DocumentDataset
The input datset to find exact duplicates
Returns
-------
DocumentDataset containing ID's and hashes of all duplicate documents
DocumentDataset containing IDs and hashes of all duplicate documents
"""
result = self._exact_dup_ids(df=dataset.df)

Expand All @@ -165,19 +178,24 @@ def identify_duplicates(self, dataset: DocumentDataset) -> DocumentDataset:
t0 = time.time()
self._logger.info("Starting execution for ExactDedup")
write_path = os.path.join(self.cache_dir, "_exact_duplicates.parquet")

if os.path.exists(write_path):
warnings.warn(
f"Output path f{write_path} already exists and will be overwritten"
)

with performance_report_if_with_ts_suffix(
self.profile_dir,
"exact-dedup-profile",
):
result.to_parquet(write_path, write_index=False, overwrite=True)

self._logger.info(
f"Time taken for Exact Dedup Computation = {time.time() - t0}s and output written at {write_path}"
)

backend = "cudf" if is_cudf_type(result) else "pandas"

return DocumentDataset.read_parquet(
write_path,
backend=backend,
Expand All @@ -194,11 +212,18 @@ def remove(
Parameters
----------
dataset: DocumentDataset
The input datset to remove exact duplicates
The input dataset from which to remove exact duplicates
duplicates_to_remove: DocumentDataset
The dataset containing IDs of the exact duplicates to remove
Returns
-------
DocumentDataset containing only non-duplicate documents
"""

if duplicates_to_remove is None:
print("No exact duplicates to remove, returning original dataset")
return dataset

result = remove_duplicates(
left=dataset.df,
duplicates=duplicates_to_remove.df,
Expand All @@ -209,6 +234,8 @@ def remove(

def call(self, dataset: DocumentDataset) -> DocumentDataset:
duplicates = self.identify_duplicates(dataset)

if self.perform_removal:
return self.remove(dataset, duplicates)

return duplicates
17 changes: 12 additions & 5 deletions nemo_curator/modules/fuzzy_dedup/fuzzyduplicates.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
# Copyright (c) 2024, NVIDIA CORPORATION. All rights reserved.
# Copyright (c) 2025, NVIDIA CORPORATION. All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -257,19 +257,24 @@ def identify_duplicates(

def remove(
self, dataset: DocumentDataset, duplicates_to_remove: Optional[DocumentDataset]
) -> Optional[DocumentDataset]:
) -> DocumentDataset:
"""
Remove exact duplicates from a given DocumentDataset
Remove fuzzy duplicates from a given DocumentDataset
Parameters
----------
dataset: DocumentDataset
The input datset to remove exact duplicates
The input dataset from which to remove fuzzy duplicates
duplicates_to_remove: DocumentDataset
The dataset containing IDs of the fuzzy duplicates to remove
Returns
-------
DocumentDataset containing only non-duplicate documents
"""

if not duplicates_to_remove:
return None
print("No fuzzy duplicates to remove, returning original dataset")
return dataset

result = remove_duplicates(
left=dataset.df,
duplicates=duplicates_to_remove.df,
Expand All @@ -282,6 +287,8 @@ def call(
self, dataset: DocumentDataset, perform_removal: bool = False
) -> DocumentDataset:
duplicates = self.identify_duplicates(dataset)

if perform_removal:
return self.remove(dataset, duplicates)

return duplicates
17 changes: 16 additions & 1 deletion nemo_curator/utils/duplicates_removal.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,17 @@
# Copyright (c) 2025, NVIDIA CORPORATION. All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

from typing import List, Union

import dask.dataframe as dd
Expand Down Expand Up @@ -27,8 +41,9 @@ def left_anti_join(
right: dd.DataFrame,
left_on: Union[str, List[str]],
right_on: Union[str, List[str]],
):
) -> dd.DataFrame:
assert left_on != right_on, "left_on and right_on cannot be the same"

merge = left.merge(
right=right,
how="left",
Expand Down
14 changes: 14 additions & 0 deletions tests/test_duplicates_removal.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,17 @@
# Copyright (c) 2025, NVIDIA CORPORATION. All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

from typing import Literal

import pandas as pd
Expand Down
22 changes: 21 additions & 1 deletion tests/test_exact_dedup.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
# Copyright (c) 2024, NVIDIA CORPORATION. All rights reserved.
# Copyright (c) 2025, NVIDIA CORPORATION. All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -35,6 +35,14 @@ def exact_dedup_data(request):
return DocumentDataset(df)


@pytest.fixture
def exact_no_dedup_data(request):
# A dataset with no exact duplicates
df = pd.DataFrame({"id": [1, 2, 300], "text": ["abc", "aba", "abb"]})
df = dd.from_pandas(df, 2)
return DocumentDataset(df)


class TestExactDuplicates:
def test_unsupported_hash(self):
with pytest.raises(ValueError):
Expand Down Expand Up @@ -74,3 +82,15 @@ def test_dup(self, exact_dedup_data, cache_result, tmpdir):
}
).sort_values(by="id", ignore_index=True)
pd.testing.assert_frame_equal(duplicates_df, expected_df, check_like=True)

def test_no_dedup(self, exact_no_dedup_data):
exact_dups = ExactDuplicates(
id_field="id",
text_field="text",
hash_method="md5",
perform_removal=True,
)
result_df = exact_dups(exact_no_dedup_data).df.compute().reset_index(drop=True)
expected_df = exact_no_dedup_data.df.compute().reset_index(drop=True)

pd.testing.assert_frame_equal(result_df, expected_df)

0 comments on commit a080400

Please sign in to comment.