Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Create Cache class for exact, fuzzy, and semantic deduplication #384

Open
wants to merge 32 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 9 commits
Commits
Show all changes
32 commits
Select commit Hold shift + click to select a range
769e2ea
add global cache variable and use it for exact dedup
sarahyurick Nov 19, 2024
b77139c
global cache for semdedup
sarahyurick Nov 19, 2024
337cec8
run black and modify pytest
sarahyurick Nov 19, 2024
6d55d8c
update image notebook
sarahyurick Nov 20, 2024
622912b
Merge branch 'main' into global_cache_dir
sarahyurick Nov 20, 2024
4cb26d5
save fuzzy dedup progress
sarahyurick Nov 20, 2024
b001622
save progress
sarahyurick Nov 20, 2024
0c14626
update remaining docs
sarahyurick Nov 20, 2024
7486459
run black
sarahyurick Nov 20, 2024
053f312
Merge branch 'main' into global_cache_dir
sarahyurick Dec 6, 2024
1b1ba30
Merge branch 'main' into global_cache_dir
sarahyurick Dec 11, 2024
4b12651
Merge branch 'main' into global_cache_dir
sarahyurick Dec 17, 2024
4160471
Merge branch 'main' into global_cache_dir
sarahyurick Dec 20, 2024
8a22ace
Merge branch 'main' into global_cache_dir
sarahyurick Dec 23, 2024
5e9bef1
Merge branch 'main' into global_cache_dir
sarahyurick Jan 3, 2025
d823a0b
Merge remote-tracking branch 'upstream/main' into global_cache_dir
sarahyurick Jan 21, 2025
0890fb0
re-add get_cache_directory changes
sarahyurick Jan 21, 2025
8fd79fb
create Cache singleton class
sarahyurick Jan 21, 2025
0d7b969
update exact_dedup
sarahyurick Jan 22, 2025
2c1a435
add semdedup functionality with Cache
sarahyurick Jan 22, 2025
f0ff2ce
add semdedup_example script
sarahyurick Jan 22, 2025
a379893
Cache singleton option for fuzzy dedup
sarahyurick Jan 23, 2025
67f609c
run black
sarahyurick Jan 23, 2025
8693177
fix tutorials
sarahyurick Jan 23, 2025
c296cc7
Merge branch 'main' into global_cache_dir
sarahyurick Jan 29, 2025
510347c
Merge branch 'main' into global_cache_dir
sarahyurick Feb 18, 2025
0635ebf
run black
sarahyurick Feb 18, 2025
a229857
import assert_eq
sarahyurick Feb 18, 2025
30ec409
fix semdedup test
sarahyurick Feb 19, 2025
1a63468
Merge branch 'main' into global_cache_dir
sarahyurick Feb 20, 2025
2075588
Merge branch 'main' into global_cache_dir
sarahyurick Feb 25, 2025
a6c5de3
remove repeating param
sarahyurick Feb 25, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 0 additions & 1 deletion config/fuzzy_dedup_config.yaml
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@
cache_dir: "./fuzzy_dedup_cache"
# Optional Params below with default values
# profile_dir: null
# id_field: "id"
Expand Down
3 changes: 0 additions & 3 deletions config/sem_dedup_config.yaml
Original file line number Diff line number Diff line change
@@ -1,14 +1,11 @@
# Configuration file for semdantic dedup
cache_dir: "semdedup_cache"
num_files: 16

# Embeddings configuration
embeddings_save_loc: "embeddings"
embedding_model_name_or_path: "sentence-transformers/all-MiniLM-L6-v2"
embedding_batch_size: 128

# Clustering configuration
clustering_save_loc: "clustering_results"
n_clusters: 1000
seed: 1234
max_iter: 100
Expand Down
9 changes: 5 additions & 4 deletions docs/user-guide/gpudeduplication.rst
Original file line number Diff line number Diff line change
Expand Up @@ -184,7 +184,6 @@ Python API
from nemo_curator import FuzzyDuplicatesConfig

config = FuzzyDuplicatesConfig(
cache_dir="/path/to/dedup_outputs", # must be cleared between runs
id_field="my_id",
text_field="text",
seed=42,
Expand All @@ -200,7 +199,6 @@ Python API

.. code-block:: yaml

cache_dir: /path/to/dedup_outputs
id_field: my_id
text_field: text
seed: 42
Expand All @@ -223,8 +221,12 @@ Python API
.. code-block:: python

from nemo_curator import FuzzyDuplicates
from nemo_curator.cache import initialize_cache_directory
from nemo_curator.datasets import DocumentDataset

# Initialize cache directory where intermediate results are stored
initialize_cache_directory("/path/to/dedup_outputs")

# Initialize the deduplication object
FuzzyDups = FuzzyDuplicates(config=config, logger="./")

Expand All @@ -251,7 +253,7 @@ Python API
- The default values of ``num_buckets`` and ``hashes_per_bucket`` are set to find documents with an approximately Jaccard similarity of 0.8 or above.
- Higher ``buckets_per_shuffle`` values can lead to better performance but might lead to out of memory errors.
- Setting the ``false_positive_check`` flag to ``False`` is ideal for optimal performance.
- When setting the ``false_positive_check`` flag to ``True`` ensure ``cache_dir`` between runs is emptied to avoid data from previous runs interfering with the current run's results.
- When setting the ``false_positive_check`` flag to ``True``, ensure the cache directory is emptied between runs to avoid data from previous runs interfering with the current run's results.

""""""""""""
CLI Utility
Expand Down Expand Up @@ -392,7 +394,6 @@ steps (all scripts are included in the `nemo_curator/scripts/fuzzy_deduplication

# same as `python connected_components.py`
gpu_connected_component \
--jaccard-pairs-path /path/to/dedup_output/jaccard_similarity_results.parquet `#Or /path/to/dedup_output/_edges.parquet` \
--output-dir /path/to/dedup_output \
--cache-dir /path/to/cc_cache \
--jaccard-threshold 0.8 \
Expand Down
17 changes: 9 additions & 8 deletions docs/user-guide/semdedup.rst
Original file line number Diff line number Diff line change
Expand Up @@ -38,16 +38,13 @@ Semantic deduplication in NeMo Curator can be configured using a YAML file. Here
.. code-block:: yaml

# Configuration file for semantic dedup
cache_dir: "semdedup_cache"
num_files: -1

# Embeddings configuration
embeddings_save_loc: "embeddings"
embedding_model_name_or_path: "sentence-transformers/all-MiniLM-L6-v2"
embedding_batch_size: 128

# Clustering configuration
clustering_save_loc: "clustering_results"
n_clusters: 1000
seed: 1234
max_iter: 100
Expand Down Expand Up @@ -154,6 +151,15 @@ You can use the ``add_id`` module from NeMo Curator if needed:
id_dataset.to_json("output_file_path", write_to_filename=True)


You also need to set a global variable representing the cache directory where the outputs are written:

.. code-block:: python

from nemo_curator.cache import initialize_cache_directory

initialize_cache_directory("cache_dir")


To perform semantic deduplication, you can either use individual components or the SemDedup class with a configuration file.

Use Individual Components
Expand All @@ -169,7 +175,6 @@ Use Individual Components
embedding_creator = EmbeddingCreator(
embedding_model_name_or_path="path/to/pretrained/model",
embedding_batch_size=128,
embedding_output_dir="path/to/output/embeddings",
input_column="text",
logger="path/to/log/dir",
)
Expand All @@ -187,7 +192,6 @@ Use Individual Components
id_column="doc_id",
max_iter=100,
n_clusters=50000,
clustering_output_dir="path/to/output/clusters",
logger="path/to/log/dir"
)
clustered_dataset = clustering_model(embeddings_dataset)
Expand All @@ -201,12 +205,9 @@ Use Individual Components
# Step 3: Semantic Deduplication
semantic_dedup = SemanticClusterLevelDedup(
n_clusters=50000,
emb_by_clust_dir="path/to/embeddings/by/cluster",
sorted_clusters_dir="path/to/sorted/clusters",
id_column="doc_id",
id_column_type="str",
which_to_keep="hard",
output_dir="path/to/output/deduped",
logger="path/to/log/dir"
)
semantic_dedup.compute_semantic_match_dfs()
Expand Down
4 changes: 1 addition & 3 deletions examples/exact_deduplication.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,7 @@

from nemo_curator.datasets import DocumentDataset
from nemo_curator.modules import ExactDuplicates
from nemo_curator.utils.distributed_utils import get_client, read_data, write_to_disk
from nemo_curator.utils.file_utils import get_all_files_paths_under
from nemo_curator.utils.distributed_utils import get_client, write_to_disk
from nemo_curator.utils.script_utils import ArgumentHelper


Expand Down Expand Up @@ -46,7 +45,6 @@ def main(args):
logger=log_dir,
id_field=dataset_id_field,
text_field=dataset_text_field,
# cache_dir=output_dir # Optionally write the output to disk
)

duplicates = exact_dup(dataset=input_dataset)
Expand Down
4 changes: 2 additions & 2 deletions examples/fuzzy_deduplication.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
import dask

from nemo_curator import FuzzyDuplicates, FuzzyDuplicatesConfig
from nemo_curator.cache import initialize_cache_directory
from nemo_curator.datasets import DocumentDataset
from nemo_curator.utils.distributed_utils import get_client, write_to_disk
from nemo_curator.utils.script_utils import ArgumentHelper
Expand All @@ -31,7 +32,7 @@ def main(args):

dataset_dir = "/path/to/dataset"
log_dir = "./"
cache_dir = "./fuzzy_cache" # must be cleared between runs
initialize_cache_directory("./fuzzy_cache") # must be cleared between runs
output_dir = "./output"
dataset_id_field = "id"
dataset_text_field = "text"
Expand Down Expand Up @@ -65,7 +66,6 @@ def main(args):
)

fuzzy_dedup_config = FuzzyDuplicatesConfig(
cache_dir=cache_dir,
id_field=dataset_id_field,
text_field=dataset_text_field,
seed=42,
Expand Down
10 changes: 4 additions & 6 deletions examples/semdedup_example.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,15 +16,13 @@
import os
import time

from nemo_curator.cache import get_cache_directory, initialize_cache_directory
from nemo_curator.datasets import DocumentDataset
from nemo_curator.log import create_logger
from nemo_curator.modules.config import SemDedupConfig
from nemo_curator.modules.semantic_dedup import SemDedup
from nemo_curator.utils.distributed_utils import get_client, read_data
from nemo_curator.utils.file_utils import (
expand_outdir_and_mkdir,
get_all_files_paths_under,
)
from nemo_curator.utils.file_utils import get_all_files_paths_under
from nemo_curator.utils.script_utils import ArgumentHelper


Expand All @@ -41,11 +39,11 @@ def main(args):
silence_hf_warnings()
client.run(silence_hf_warnings)

expand_outdir_and_mkdir(semdedup_config.cache_dir)
initialize_cache_directory(args.cache_dir)
logger = create_logger(
rank=0,
name="logger-end-to_end-semdup",
log_file=os.path.join(semdedup_config.cache_dir, "compute_embeddings.log"),
log_file=os.path.join(get_cache_directory(), "compute_embeddings.log"),
log_level=logging.INFO,
stdout=True,
)
Expand Down
34 changes: 34 additions & 0 deletions nemo_curator/cache.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
# Copyright (c) 2024, NVIDIA CORPORATION. All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

from nemo_curator.utils.file_utils import expand_outdir_and_mkdir

# Global variable to store the cache directory
_global_cache_dir = None


def initialize_cache_directory(cache_dir: str):
"""
Initialize and set the global cache directory.
"""
global _global_cache_dir
cache_dir = expand_outdir_and_mkdir(cache_dir)
_global_cache_dir = cache_dir


def get_cache_directory() -> str:
"""
Retrieve the global cache directory.
"""
return _global_cache_dir
16 changes: 4 additions & 12 deletions nemo_curator/modules/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@

import yaml

from nemo_curator.cache import get_cache_directory


@dataclass
class BaseConfig:
Expand Down Expand Up @@ -46,8 +48,6 @@ class FuzzyDuplicatesConfig(BaseConfig):
text_field: Column in the Dataset denoting document content.
profile_dir: str, Default None
If specified directory to write dask profile
cache_dir: str, Default None
Location to store deduplcation intermediates such as minhashes/buckets etc.
false_positive_check: bool,
Whether to run a check to look for false positives within buckets.
Note: This is a computationally expensive step.
Expand All @@ -60,7 +60,6 @@ class FuzzyDuplicatesConfig(BaseConfig):
"""

# General config
cache_dir: str
profile_dir: Optional[str] = None
id_field: str = "id"
text_field: str = "text"
Expand All @@ -83,7 +82,7 @@ class FuzzyDuplicatesConfig(BaseConfig):

def __post_init__(self):
self.num_hashes = self.num_buckets * self.hashes_per_bucket
if self.cache_dir is None:
if get_cache_directory() is None:
raise ValueError(
"Finding fuzzy duplicates requires a cache directory accessible via all workers to store intermediates"
)
Expand Down Expand Up @@ -116,14 +115,10 @@ class SemDedupConfig(BaseConfig):
Configuration for Semantic Deduplication.

Attributes:
cache_dir (str): Directory to store cache.
profile_dir (Optional[str]): If specified directory to write dask profile. Default is None.
cache_dir (str): Directory to store cache.
num_files (int): Number of files. Default is -1, meaning all files.
embeddings_save_loc (str): Location to save embeddings.
embedding_model_name_or_path (str): Model name or path for embeddings.
embedding_batch_size (int): Inital Batch size for processing embeddings.
clustering_save_loc (str): Location to save clustering results.
n_clusters (int): Number of clusters.
seed (int): Seed for clustering.
max_iter (int): Maximum iterations for clustering.
Expand All @@ -135,17 +130,14 @@ class SemDedupConfig(BaseConfig):
eps_to_extract (float): Epsilon value to extract deduplicated data.
"""

cache_dir: str
profile_dir: Optional[str] = None
num_files: int = -1

# Embeddings
embeddings_save_loc: str = "embeddings"
embedding_model_name_or_path: str = "sentence-transformers/all-MiniLM-L6-v2"
embedding_batch_size: int = 128

# Clustering config
clustering_save_loc: str = "clustering_results"
n_clusters: int = 1000
seed: int = 1234
max_iter: int = 100
Expand All @@ -161,7 +153,7 @@ class SemDedupConfig(BaseConfig):
eps_to_extract: float = 0.01

def __post_init__(self):
if self.cache_dir is None:
if get_cache_directory() is None:
raise ValueError(
"Finding sem-dedup requires a cache directory accessible via all workers to store intermediates"
)
Expand Down
9 changes: 4 additions & 5 deletions nemo_curator/modules/exact_dedup.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@
import time
import warnings
from contextlib import nullcontext
from datetime import datetime
from hashlib import md5
from typing import Optional, Union

Expand All @@ -27,6 +26,7 @@
from dask import dataframe as dd

from nemo_curator._compat import DASK_P2P_ERROR
from nemo_curator.cache import get_cache_directory
from nemo_curator.datasets import DocumentDataset
from nemo_curator.log import create_logger
from nemo_curator.utils.distributed_utils import performance_report_if_with_ts_suffix
Expand All @@ -45,7 +45,6 @@ def __init__(
text_field: str = "text",
hash_method: str = "md5",
profile_dir: Optional[str] = None,
cache_dir: Optional[str] = None,
):
"""
Parameters
Expand All @@ -56,10 +55,10 @@ def __init__(
hash_method: The hashing algorithm used for identifying exact duplicates. Currently supports {"md5"}
profile_dir: str, Default None
If specified directory to write dask profile
cache_dir: str, Default None
If specified, will compute & write duplicate id's to cache directory.
"""

cache_dir = get_cache_directory()

if hash_method not in self.SUPPORTED_HASHES:
raise ValueError(
f"{hash_method} not in supported hash_methods. Choose a hash_method from {self.SUPPORTED_HASHES}"
Expand All @@ -69,7 +68,7 @@ def __init__(
self.text_field = text_field
if cache_dir is None and profile_dir is not None:
warnings.warn(
"cache_dir for intermediate outputs is required to generate profiles"
"cache_dir for intermediate outputs is required to generate profiles. Please use initialize_cache_directory for this."
)
self.cache_dir = cache_dir
self.profile_dir = profile_dir
Expand Down
Loading
Loading