Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

buffer of a multiple of 68 bytes Minhash bug #301

Open
mcleish7 opened this issue Nov 12, 2024 · 0 comments
Open

buffer of a multiple of 68 bytes Minhash bug #301

mcleish7 opened this issue Nov 12, 2024 · 0 comments

Comments

@mcleish7
Copy link

Hi,

We tried to copy the example minhash deduplication script. Our edits parallelise over 512 nodes but seemingly randomly on some jobs we get a iterative unpacking requires a buffer of a multiple of 68 bytes error. Most jobs are successful but about 100 of them seem to trigger this error. Do you have any ideas on what the cause of this could be please?

Thanks,
Sean

Example log:

2024-11-08 11:35:43.399 | INFO     | datatrove.utils.logging:add_task_logger:58 - Launching pipeline for rank=3
2024-11-08 11:35:43.400 | INFO     | datatrove.utils.logging:log_pipeline:90 - 
--- 🛠️ PIPELINE 🛠
📖 - READER: 📒 Parquet
🫂 - DEDUP: 🎯 MinHash stage 1
2024-11-08 11:38:13.675 | INFO     | datatrove.pipeline.readers.base:read_files_shard:191 - Reading input file ArxivInstruct/00003.parquet, 1/15
2024-11-08 11:41:23.516 | INFO     | datatrove.pipeline.readers.base:read_files_shard:191 - Reading input file P3/00360.parquet, 2/15
2024-11-08 11:42:20.997 | INFO     | datatrove.pipeline.readers.base:read_files_shard:191 - Reading input file P3/00872.parquet, 3/15
2024-11-08 11:43:15.013 | INFO     | datatrove.pipeline.readers.base:read_files_shard:191 - Reading input file SlimPajama-Mix/00360.parquet, 4/15
2024-11-08 12:05:37.416 | INFO     | datatrove.pipeline.readers.base:read_files_shard:191 - Reading input file SlimPajama-Mix/00872.parquet, 5/15
2024-11-08 12:27:50.805 | INFO     | datatrove.pipeline.readers.base:read_files_shard:191 - Reading input file SlimPajama-Mix/01384.parquet, 6/15
2024-11-08 12:50:08.829 | INFO     | datatrove.pipeline.readers.base:read_files_shard:191 - Reading input file dolma-pes2o/00083.parquet, 7/15
2024-11-08 12:54:08.327 | INFO     | datatrove.pipeline.readers.base:read_files_shard:191 - Reading input file matrix-books/00044.parquet, 8/15
2024-11-08 12:54:08.456 | WARNING  | datatrove.pipeline.readers.base:get_document_from_dict:83 - Found document without text, skipping. Is your `text_key` ("text") correct? Available keys: []
2024-11-08 13:06:08.373 | INFO     | datatrove.pipeline.readers.base:read_files_shard:191 - Reading input file matrix-books/00556.parquet, 9/15
2024-11-08 13:12:16.930 | INFO     | datatrove.pipeline.readers.base:read_files_shard:191 - Reading input file matrix-exams/00044.parquet, 10/15
2024-11-08 13:12:18.911 | INFO     | datatrove.pipeline.readers.base:read_files_shard:191 - Reading input file matrix-exams/00556.parquet, 11/15
2024-11-08 13:12:20.935 | INFO     | datatrove.pipeline.readers.base:read_files_shard:191 - Reading input file openweb-math/00033.parquet, 12/15
2024-11-08 13:30:52.932 | INFO     | datatrove.pipeline.readers.base:read_files_shard:191 - Reading input file prox-redpajama/00033.parquet, 13/15
2024-11-08 13:57:07.564 | INFO     | datatrove.pipeline.readers.base:read_files_shard:191 - Reading input file smollm-fineweb-edu/00059.parquet, 14/15
2024-11-08 14:15:22.729 | INFO     | datatrove.pipeline.readers.base:read_files_shard:191 - Reading input file smollm-fineweb-edu/00571.parquet, 15/15
2024-11-08 14:33:57.657 | INFO     | datatrove.pipeline.dedup.minhash:run:227 - Sorting buckets...
2024-11-08 14:33:52.434 | ERROR    | datatrove.executor.base:_run_for_rank:108 - iterative unpacking requires a buffer of a multiple of 68 bytes
Traceback (most recent call last):

  File "ENV_PATH/bin/launch_pickled_pipeline", line 8, in <module>
    sys.exit(main())
    │   │    └ <function main at 0x7fcf547a76a0>
    │   └ <built-in function exit>
    └ <module 'sys' (built-in)>
  File "ENV_PATH/lib/python3.11/site-packages/datatrove/tools/launch_pickled_pipeline.py", line 18, in main
    executor.run()
    │        └ <function SlurmPipelineExecutor.run at 0x7fcf53151260>
    └ <datatrove.executor.slurm.SlurmPipelineExecutor object at 0x7fcf53147c10>
  File "ENV_PATH/lib/python3.11/site-packages/datatrove/executor/slurm.py", line 180, in run
    self._run_for_rank(rank)
    │    │             └ 3
    │    └ <function PipelineExecutor._run_for_rank at 0x7fcf53150900>
    └ <datatrove.executor.slurm.SlurmPipelineExecutor object at 0x7fcf53147c10>
> File "ENV_PATH/lib/python3.11/site-packages/datatrove/executor/base.py", line 90, in _run_for_rank
    pipelined_data = pipeline_step(pipelined_data, rank, self.world_size)
                     │             │               │     │    └ <property object at 0x7fcf53137920>
                     │             │               │     └ <datatrove.executor.slurm.SlurmPipelineExecutor object at 0x7fcf53147c10>
                     │             │               └ 3
                     │             └ <generator object BaseDiskReader.run at 0x7fcf53111cf0>
                     └ 🫂 - DEDUP: 🎯 MinHash stage 1
  File "ENV_PATH/lib/python3.11/site-packages/datatrove/pipeline/base.py", line 119, in __call__
    return self.run(data, rank, world_size)
           │    │   │     │     └ 512
           │    │   │     └ 3
           │    │   └ <generator object BaseDiskReader.run at 0x7fcf53111cf0>
           │    └ <function MinhashDedupSignature.run at 0x7fcf51287ba0>
           └ 🫂 - DEDUP: 🎯 MinHash stage 1
  File "ENV_PATH/lib/python3.11/site-packages/datatrove/pipeline/dedup/minhash.py", line 230, in run
    sigs = sorted(
  File "ENV_PATH/lib/python3.11/site-packages/datatrove/pipeline/dedup/minhash.py", line 107, in read_sigs
    for data in read_tuples_from_file(f, line_format, lines_to_buffer=lines_to_buffer):
                │                     │  │                            └ -1
                │                     │  └ '8QI'
                │                     └ <fsspec.implementations.local.LocalFileOpener object at 0x7fcf21983d30>
                └ <function read_tuples_from_file at 0x7fcf51286de0>
  File "ENV_PATH/lib/python3.11/site-packages/datatrove/utils/binaryio.py", line 30, in read_tuples_from_file
    yield from reader.iter_unpack(chunk)
               │      │           └ b'\xfe\xcd \x97\x00\x00\x00\x00\x8b+[\xe4\x8c\x04\t\x00$\xbf\xd0\xef=?\x01\x00\x19\xd2\x1d\xbd\xbc\xc4\x05\x00\xef\xa5,z\'\xc...
               │      └ <method 'iter_unpack' of '_struct.Struct' objects>
               └ <_struct.Struct object at 0x7fcf53125e40>

struct.error: iterative unpacking requires a buffer of a multiple of 68 bytes

Code:

ACCOUNT = "my_account_num"
BASE_DIR = "mcleish/recurrent_data"
DATASET_STAGING = f"language_datasets/staging_dataset"
CACHE_DIR = f"{BASE_DIR}/test_cache"
FINAL_LOCATION = f"{BASE_DIR}/processed_dataset"
DEDUP_STAGING = f"{BASE_DIR}/dedup_staging_dataset"
final_dataset_name = "dedup_v01"

TMP_STAGE_DIR = f"{CACHE_DIR}/tmp/minhash"
LOCAL_LOGS_FOLDER = f"{DEDUP_STAGING}/{final_dataset_name}/logs"
TOTAL_TASKS = 512


from datatrove.executor.slurm import SlurmPipelineExecutor
from datatrove.pipeline.dedup import MinhashDedupSignature
from datatrove.pipeline.dedup.minhash import (
    MinhashConfig,
    MinhashDedupBuckets,
    MinhashDedupCluster,
    MinhashDedupFilter,
)
from datatrove.pipeline.readers import ParquetReader
from datatrove.pipeline.writers.parquet import ParquetWriter
from datatrove.utils.hashing import HashConfig
from datatrove.utils.typeshelper import Languages

# from datatrove.pipeline.tokens import TokensCounter

# you can also change ngrams or the number of buckets and their size here
minhash_config = MinhashConfig(
    hash_config=HashConfig(precision=64),
    num_buckets=14,
    hashes_per_bucket=8,
)  # better precision -> fewer false positives (collisions)


# this is the original data that we want to deduplicate
INPUT_READER = ParquetReader(DATASET_STAGING)

# stage 1 computes minhash signatures for each task (each task gets a set of files)
stage1 = SlurmPipelineExecutor(
    job_name="mh1",
    pipeline=[
        INPUT_READER,
        MinhashDedupSignature(
            output_folder=f"{TMP_STAGE_DIR}/signatures",
            config=minhash_config,
            language=Languages.english,
        ),
    ],
    tasks=TOTAL_TASKS,
    time="24:00:00",
    partition="batch",
    qos="normal",
    mem_per_cpu_gb=4,
    cpus_per_task=1,
    sbatch_args={"account": ACCOUNT, "nodes": 1, "ntasks-per-node": 32},
    logging_dir=f"{LOCAL_LOGS_FOLDER}/signatures",
    slurm_logs_folder=f"{LOCAL_LOGS_FOLDER}/signatures/slurm_logs",
)

# stage 2 finds matches between signatures in each bucket
stage2 = SlurmPipelineExecutor(
    job_name="mh2",
    pipeline=[
        MinhashDedupBuckets(
            input_folder=f"{TMP_STAGE_DIR}/signatures",
            output_folder=f"{TMP_STAGE_DIR}/buckets",
            config=minhash_config,
        ),
    ],
    tasks=minhash_config.num_buckets,
    time="24:00:00",
    partition="batch",
    qos="normal",
    mem_per_cpu_gb=4,
    cpus_per_task=1,
    sbatch_args={"account": ACCOUNT, "nodes": 1, "ntasks-per-node": 32},
    logging_dir=f"{LOCAL_LOGS_FOLDER}/buckets",
    depends=stage1,
    slurm_logs_folder=f"{LOCAL_LOGS_FOLDER}/buckets/slurm_logs",
)

# stage 3 creates clusters of duplicates using the results from all buckets
stage3 = SlurmPipelineExecutor(
    job_name="mh3",
    pipeline=[
        MinhashDedupCluster(
            input_folder=f"{TMP_STAGE_DIR}/buckets",
            output_folder=f"{TMP_STAGE_DIR}/remove_ids",
            config=minhash_config,
        ),
    ],
    tasks=1,
    time="24:00:00",
    partition="batch",
    qos="normal",
    sbatch_args={"account": ACCOUNT, "nodes": 1, "ntasks-per-node": 1},
    logging_dir=f"{LOCAL_LOGS_FOLDER}/clusters",
    mem_per_cpu_gb=70,
    cpus_per_task=2,
    depends=stage2,
    slurm_logs_folder=f"{LOCAL_LOGS_FOLDER}/clusters/slurm_logs",
)

# stage 4 reads the original input data and removes all but 1 sample per duplicate cluster
# the data must match exactly stage 1, so number of tasks and the input source must be the same
stage4 = SlurmPipelineExecutor(
    job_name="mh4",
    pipeline=[
        INPUT_READER,
        # TokensCounter(),  # nice way to see how many tokens we had before and after deduplication -- no wifi on frontier :(
        MinhashDedupFilter(input_folder=f"{TMP_STAGE_DIR}/remove_ids"),
        ParquetWriter(
            output_folder=f"{DEDUP_STAGING}/{final_dataset_name}/deduplicated_output"
        ),
    ],
    tasks=TOTAL_TASKS,
    time="24:00:00",
    partition="batch",
    qos="normal",
    mem_per_cpu_gb=4,
    cpus_per_task=1,
    sbatch_args={"account": ACCOUNT, "nodes": 1, "ntasks-per-node": 32},
    logging_dir=f"{LOCAL_LOGS_FOLDER}/filter",
    depends=stage3,
    slurm_logs_folder=f"{LOCAL_LOGS_FOLDER}/filter/slurm_logs",
)

stage4.run()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

1 participant