Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[BUG] MemoryError: std::bad_alloc: - with workflow.fit on 1 parquet file from Criteo dataset #1181

Closed
leiterenato opened this issue Oct 12, 2021 · 21 comments
Assignees
Labels
bug Something isn't working

Comments

@leiterenato
Copy link

Describe the bug
I am trying to run Workflow.fit on 1 parquet file from Criteo Dataset (day_1.parquet).
Here is the transformation:

  # Columns definition
  cont_names = ["I" + str(x) for x in range(1, 14)]
  cat_names = ["C" + str(x) for x in range(1, 27)]

  # Transformation pipeline
  num_buckets = 10000000
  categorify_op = Categorify(max_size=num_buckets)
  cat_features = cat_names >> categorify_op
  cont_features = cont_names >> FillMissing() >> Clip(min_value=0) >> Normalize()
  features = cat_features + cont_features + ['label']

This code works for nvtabular version 0.5.3, but when I upgraded to version 0.7.0 I started receiving the following error:
MemoryError: std::bad_alloc: CUDA error at: /opt/conda/include/rmm/mr/device/cuda_memory_resource.hpp:69: cudaErrorMemoryAllocation out of memory

I am using 1 x T4.

Steps/Code to reproduce bug

  1. Create a workflow with following transformations:
  # Columns definition
  cont_names = ["I" + str(x) for x in range(1, 14)]
  cat_names = ["C" + str(x) for x in range(1, 27)]

  # Transformation pipeline
  num_buckets = 10000000
  categorify_op = Categorify(max_size=num_buckets)
  cat_features = cat_names >> categorify_op
  cont_features = cont_names >> FillMissing() >> Clip(min_value=0) >> Normalize()
  features = cat_features + cont_features + ['label']
  1. Create a cluster
  device_size = device_mem_size()
  device_limit = int(device_limit_frac * device_size)
  device_pool_size = int(device_pool_frac * device_size)
  rmm_pool_size = (device_pool_size // 256) * 256

  cluster = LocalCUDACluster(
      device_memory_limit=device_limit,
      rmm_pool_size=rmm_pool_size
  )
  1. run workflow.fit on 1 parquet file from Criteo dataset.

Expected behavior
Workflow.fit must generate the statistics and run without any problem on a T4.

Environment details (please complete the following information):
Google Cloud Vertex AI
1 x T4
conda install -c nvidia -c rapidsai -c numba -c conda-forge pynvml dask-cuda nvtabular=0.7.0 cudatoolkit=11.0

Looking at the stack trace, I seems to be a problem in the read_parquet from cudf.
I attached a print with some logs.
Screen Shot 2021-10-12 at 10 06 28

@leiterenato leiterenato added the bug Something isn't working label Oct 12, 2021
@quasiben
Copy link

quasiben commented Oct 12, 2021

Would it be possible to get the full traceback ?

@leiterenato
Copy link
Author

leiterenato commented Oct 12, 2021

Hi Benjamin,
Sure, I can send to you tomorrow (out of work today) ...
Thank you very much!

@leiterenato
Copy link
Author

@quasiben Here is the full traceback.

---------------------------------------------------------------------------
MemoryError                               Traceback (most recent call last)
/tmp/ipykernel_2150/402720306.py in <module>
----> 1 fit_dataset_op(datasets, workflow_path)

/tmp/ipykernel_2150/2157713268.py in fit_dataset_op(datasets, workflow_path, split_name, device_limit_frac, device_pool_frac, part_mem_frac)
     33 
     34     logging.info('Starting workflow fitting')
---> 35     etl.fit_and_save_workflow(criteo_workflow, dataset, full_workflow_path)
     36     logging.info('Finished generating statistics for dataset.')
     37     logging.info(f'Workflow saved to {full_workflow_path}')

~/workspace/merlin-on-vertex/src/preprocessing/etl/etl.py in fit_and_save_workflow(workflow, dataset, workflow_path)
    150     workflow_path: str
    151 ):
--> 152     workflow.fit(dataset)
    153     workflow.save(workflow_path)
    154 

/opt/conda/envs/nvt-prod/lib/python3.8/site-packages/nvtabular/workflow/workflow.py in fit(self, dataset)
    227                 results = [r.result() for r in self.client.compute(stats)]
    228             else:
--> 229                 results = dask.compute(stats, scheduler="synchronous")[0]
    230 
    231             for computed_stats, op in zip(results, ops):

/opt/conda/envs/nvt-prod/lib/python3.8/site-packages/dask/base.py in compute(*args, **kwargs)
    566         postcomputes.append(x.__dask_postcompute__())
    567 
--> 568     results = schedule(dsk, keys, **kwargs)
    569     return repack([f(r, *a) for r, (f, a) in zip(results, postcomputes)])
    570 

/opt/conda/envs/nvt-prod/lib/python3.8/site-packages/dask/local.py in get_sync(dsk, keys, **kwargs)
    558     """
    559     kwargs.pop("num_workers", None)  # if num_workers present, remove it
--> 560     return get_async(
    561         synchronous_executor.submit,
    562         synchronous_executor._max_workers,

/opt/conda/envs/nvt-prod/lib/python3.8/site-packages/dask/local.py in get_async(submit, num_workers, dsk, result, cache, get_id, rerun_exceptions_locally, pack_exception, raise_exception, callbacks, dumps, loads, chunksize, **kwargs)
    501             while state["waiting"] or state["ready"] or state["running"]:
    502                 fire_tasks(chunksize)
--> 503                 for key, res_info, failed in queue_get(queue).result():
    504                     if failed:
    505                         exc, tb = loads(res_info)

/opt/conda/envs/nvt-prod/lib/python3.8/concurrent/futures/_base.py in result(self, timeout)
    435                     raise CancelledError()
    436                 elif self._state == FINISHED:
--> 437                     return self.__get_result()
    438 
    439                 self._condition.wait(timeout)

/opt/conda/envs/nvt-prod/lib/python3.8/concurrent/futures/_base.py in __get_result(self)
    387         if self._exception:
    388             try:
--> 389                 raise self._exception
    390             finally:
    391                 # Break a reference cycle with the exception in self._exception

/opt/conda/envs/nvt-prod/lib/python3.8/site-packages/dask/local.py in submit(self, fn, *args, **kwargs)
    543         fut = Future()
    544         try:
--> 545             fut.set_result(fn(*args, **kwargs))
    546         except BaseException as e:
    547             fut.set_exception(e)

/opt/conda/envs/nvt-prod/lib/python3.8/site-packages/dask/local.py in batch_execute_tasks(it)
    235     Batch computing of multiple tasks with `execute_task`
    236     """
--> 237     return [execute_task(*a) for a in it]
    238 
    239 

/opt/conda/envs/nvt-prod/lib/python3.8/site-packages/dask/local.py in <listcomp>(.0)
    235     Batch computing of multiple tasks with `execute_task`
    236     """
--> 237     return [execute_task(*a) for a in it]
    238 
    239 

/opt/conda/envs/nvt-prod/lib/python3.8/site-packages/dask/local.py in execute_task(key, task_info, dumps, loads, get_id, pack_exception)
    226         failed = False
    227     except BaseException as e:
--> 228         result = pack_exception(e, dumps)
    229         failed = True
    230     return key, result, failed

/opt/conda/envs/nvt-prod/lib/python3.8/site-packages/dask/local.py in execute_task(key, task_info, dumps, loads, get_id, pack_exception)
    221     try:
    222         task, data = loads(task_info)
--> 223         result = _execute_task(task, data)
    224         id = get_id()
    225         result = dumps((result, id))

/opt/conda/envs/nvt-prod/lib/python3.8/site-packages/dask/core.py in _execute_task(arg, cache, dsk)
    119         # temporaries by their reference count and can execute certain
    120         # operations in-place.
--> 121         return func(*(_execute_task(a, cache) for a in args))
    122     elif not ishashable(arg):
    123         return arg

/opt/conda/envs/nvt-prod/lib/python3.8/site-packages/dask/optimization.py in __call__(self, *args)
    967         if not len(args) == len(self.inkeys):
    968             raise ValueError("Expected %d args, got %d" % (len(self.inkeys), len(args)))
--> 969         return core.get(self.dsk, self.outkey, dict(zip(self.inkeys, args)))
    970 
    971     def __reduce__(self):

/opt/conda/envs/nvt-prod/lib/python3.8/site-packages/dask/core.py in get(dsk, out, cache)
    149     for key in toposort(dsk):
    150         task = dsk[key]
--> 151         result = _execute_task(task, cache)
    152         cache[key] = result
    153     result = _execute_task(out, cache)

/opt/conda/envs/nvt-prod/lib/python3.8/site-packages/dask/core.py in _execute_task(arg, cache, dsk)
    119         # temporaries by their reference count and can execute certain
    120         # operations in-place.
--> 121         return func(*(_execute_task(a, cache) for a in args))
    122     elif not ishashable(arg):
    123         return arg

/opt/conda/envs/nvt-prod/lib/python3.8/site-packages/dask/dataframe/io/parquet/core.py in __call__(self, part)
     85             part = [part]
     86 
---> 87         return read_parquet_part(
     88             self.fs,
     89             self.engine,

/opt/conda/envs/nvt-prod/lib/python3.8/site-packages/dask/dataframe/io/parquet/core.py in read_parquet_part(fs, engine, meta, part, columns, index, kwargs)
    409             # Part kwargs expected
    410             func = engine.read_partition
--> 411             dfs = [
    412                 func(fs, rg, columns.copy(), index, **toolz.merge(kwargs, kw))
    413                 for (rg, kw) in part

/opt/conda/envs/nvt-prod/lib/python3.8/site-packages/dask/dataframe/io/parquet/core.py in <listcomp>(.0)
    410             func = engine.read_partition
    411             dfs = [
--> 412                 func(fs, rg, columns.copy(), index, **toolz.merge(kwargs, kw))
    413                 for (rg, kw) in part
    414             ]

/opt/conda/envs/nvt-prod/lib/python3.8/site-packages/nvtabular/io/parquet.py in read_partition(cls, fs, pieces, *args, **kwargs)
     96                 # This version of cudf does not include optimized
     97                 # fsspec usage for remote storage - use custom code path
---> 98                 return _optimized_read_partition_remote(fs, pieces, *args, **kwargs)
     99             # Newer versions of cudf are already optimized for s3/gcs
    100             return CudfEngine.read_partition(fs, pieces, *args, **kwargs)

/opt/conda/envs/nvt-prod/lib/python3.8/site-packages/nvtabular/io/fsspec_utils.py in _optimized_read_partition_remote(fs, pieces, columns, index, categories, partitions, **kwargs)
     68 
     69     # Call optimized read utility
---> 70     df = _optimized_read_remote(path, row_group, columns, fs, **kwargs)
     71 
     72     #

/opt/conda/envs/nvt-prod/lib/python3.8/site-packages/nvtabular/io/fsspec_utils.py in _optimized_read_remote(path, row_groups, columns, fs, **kwargs)
    124     # Call cudf.read_parquet on the dummy buffer
    125     strings_to_cats = kwargs.get("strings_to_categorical", False)
--> 126     df = cudf.read_parquet(
    127         dummy_buffer,
    128         engine="cudf",

/opt/conda/envs/nvt-prod/lib/python3.8/site-packages/cudf/io/parquet.py in read_parquet(filepath_or_buffer, engine, columns, filters, row_groups, skiprows, num_rows, strings_to_categorical, use_pandas_metadata, *args, **kwargs)
    241 
    242     if engine == "cudf":
--> 243         return libparquet.read_parquet(
    244             filepaths_or_buffers,
    245             columns=columns,

cudf/_lib/parquet.pyx in cudf._lib.parquet.read_parquet()

cudf/_lib/parquet.pyx in cudf._lib.parquet.read_parquet()

MemoryError: std::bad_alloc: CUDA error at: /opt/conda/envs/nvt-prod/include/rmm/mr/device/cuda_memory_resource.hpp:69: cudaErrorMemoryAllocation out of memory

I am using a V100 instead of a T4. The region I am working on GCP is out of T4s right now but the error is the same.
And if the worker machine (VM) doesn't have enough memory (RAM) to hold an entire file (~11GB) I start getting Dask Memory leak errors.

@quasiben
Copy link

What are the values for device_limit and device_limit_frac ? And what gpu memory sizes of V100s and T4 on GCP ? Are they both 16GBs ?

@leiterenato
Copy link
Author

leiterenato commented Oct 13, 2021

  device_limit_frac = 0.8,
  device_pool_frac = 0.9,
  part_mem_frac = 0.125

Yes, they have 16GB of RAM.
With NVtabular v0.5.3, this same configuration was working.

@quasiben
Copy link

Sorry for all the questions here, still getting up to speed. Is this data stored is on disk or from gcsfs/other remote sore ?

Rick recently made some for nvtabular in 0.7 #1119 which should result in better performance. Would it be possible to test with device-limit-frac 0.75 and device-pool-frac 0.85 to see if things pass? I'm asking folks to see if any other changes resulted in higher memory usage in read_parquet

@leiterenato
Copy link
Author

No problem, I am happy to answer all the questions.
The data is located on GCS (remote source).

Sure, I will try that.
Also, I will try to execute the same code with the nvcr.io/nvidia/merlin/merlin-training:21.09 container.

@leiterenato
Copy link
Author

leiterenato commented Oct 13, 2021

@quasiben I am creating my cluster like this:

  device_size = device_mem_size()
  device_limit = int(device_limit_frac * device_size)
  device_pool_size = int(device_pool_frac * device_size)
  rmm_pool_size = (device_pool_size // 256) * 256

  cluster = LocalCUDACluster(
      device_memory_limit=device_limit,
      rmm_pool_size=rmm_pool_size
  )

Is there a better way to create my cluster?
I am using 1 VM with up to 8 GPUs.

@rjzamora
Copy link
Collaborator

Rick recently made some for nvtabular in 0.7 #1119 which should result in better performance.

From the traceback, it looks like the new "optimized" code path is already being used. However, the read_parquet call may not be the root problem here (when some other part of the preprocessing pipeline starts using more memory, the IO call is still the most likely place for a memory error). I will try to reproduce as as soon as I can, but it would be interesting to know if commenting out the _optimized_read_remote code path still produces the same error.

@leiterenato
Copy link
Author

Just a quick update, I increased the worker memory to 120GB and it worked.

 device_limit_frac = 0.8,
  device_pool_frac = 0.9,
  part_mem_frac = 0.125

Another detail is that the script crashes while creating the nvtabular.Dataset.

I think this problem might be related to the worker memory management, not to the GPU.

@leiterenato
Copy link
Author

Sorry, my job just crashed again with 2 x T4s, 32 vCPUS and 120GB of RAM.

---------------------------------------------------------------------------
MemoryError                               Traceback (most recent call last)
/tmp/ipykernel_2322/4004743745.py in <module>
----> 1 analyze_dataset_op(datasets, workflow_path)

/tmp/ipykernel_2322/1001680271.py in analyze_dataset_op(datasets, workflow_path, split_name, device_limit_frac, device_pool_frac, part_mem_frac)
     31 
     32     logging.info('Starting workflow fitting')
---> 33     criteo_workflow = etl.analyze_dataset(criteo_workflow, dataset)
     34     logging.info('Finished generating statistics for dataset.')
     35 

/opt/conda/envs/nvt-conda-053/lib/python3.8/site-packages/preprocessing/etl.py in analyze_dataset(workflow, dataset)
    169 ) -> nvt.Workflow:
    170     '''Calculate statistics for a given workflow'''
--> 171     workflow.fit(dataset)
    172     return workflow
    173 

/opt/conda/envs/nvt-conda-053/lib/python3.8/site-packages/nvtabular-0.5.3-py3.8.egg/nvtabular/workflow.py in fit(self, dataset)
    145                 results = [r.result() for r in self.client.compute(stats)]
    146             else:
--> 147                 results = dask.compute(stats, scheduler="synchronous")[0]
    148 
    149             for computed_stats, op in zip(results, ops):

/opt/conda/envs/nvt-conda-053/lib/python3.8/site-packages/dask/base.py in compute(*args, **kwargs)
    564         postcomputes.append(x.__dask_postcompute__())
    565 
--> 566     results = schedule(dsk, keys, **kwargs)
    567     return repack([f(r, *a) for r, (f, a) in zip(results, postcomputes)])
    568 

/opt/conda/envs/nvt-conda-053/lib/python3.8/site-packages/dask/local.py in get_sync(dsk, keys, **kwargs)
    558     """
    559     kwargs.pop("num_workers", None)  # if num_workers present, remove it
--> 560     return get_async(
    561         synchronous_executor.submit,
    562         synchronous_executor._max_workers,

/opt/conda/envs/nvt-conda-053/lib/python3.8/site-packages/dask/local.py in get_async(submit, num_workers, dsk, result, cache, get_id, rerun_exceptions_locally, pack_exception, raise_exception, callbacks, dumps, loads, chunksize, **kwargs)
    501             while state["waiting"] or state["ready"] or state["running"]:
    502                 fire_tasks(chunksize)
--> 503                 for key, res_info, failed in queue_get(queue).result():
    504                     if failed:
    505                         exc, tb = loads(res_info)

/opt/conda/envs/nvt-conda-053/lib/python3.8/concurrent/futures/_base.py in result(self, timeout)
    435                     raise CancelledError()
    436                 elif self._state == FINISHED:
--> 437                     return self.__get_result()
    438 
    439                 self._condition.wait(timeout)

/opt/conda/envs/nvt-conda-053/lib/python3.8/concurrent/futures/_base.py in __get_result(self)
    387         if self._exception:
    388             try:
--> 389                 raise self._exception
    390             finally:
    391                 # Break a reference cycle with the exception in self._exception

/opt/conda/envs/nvt-conda-053/lib/python3.8/site-packages/dask/local.py in submit(self, fn, *args, **kwargs)
    543         fut = Future()
    544         try:
--> 545             fut.set_result(fn(*args, **kwargs))
    546         except BaseException as e:
    547             fut.set_exception(e)

/opt/conda/envs/nvt-conda-053/lib/python3.8/site-packages/dask/local.py in batch_execute_tasks(it)
    235     Batch computing of multiple tasks with `execute_task`
    236     """
--> 237     return [execute_task(*a) for a in it]
    238 
    239 

/opt/conda/envs/nvt-conda-053/lib/python3.8/site-packages/dask/local.py in <listcomp>(.0)
    235     Batch computing of multiple tasks with `execute_task`
    236     """
--> 237     return [execute_task(*a) for a in it]
    238 
    239 

/opt/conda/envs/nvt-conda-053/lib/python3.8/site-packages/dask/local.py in execute_task(key, task_info, dumps, loads, get_id, pack_exception)
    226         failed = False
    227     except BaseException as e:
--> 228         result = pack_exception(e, dumps)
    229         failed = True
    230     return key, result, failed

/opt/conda/envs/nvt-conda-053/lib/python3.8/site-packages/dask/local.py in execute_task(key, task_info, dumps, loads, get_id, pack_exception)
    221     try:
    222         task, data = loads(task_info)
--> 223         result = _execute_task(task, data)
    224         id = get_id()
    225         result = dumps((result, id))

/opt/conda/envs/nvt-conda-053/lib/python3.8/site-packages/dask/core.py in _execute_task(arg, cache, dsk)
    119         # temporaries by their reference count and can execute certain
    120         # operations in-place.
--> 121         return func(*(_execute_task(a, cache) for a in args))
    122     elif not ishashable(arg):
    123         return arg

/opt/conda/envs/nvt-conda-053/lib/python3.8/site-packages/dask/dataframe/io/parquet/core.py in read_parquet_part(fs, func, meta, part, columns, index, kwargs)
    379 
    380     if isinstance(part, list):
--> 381         dfs = [
    382             func(fs, rg, columns.copy(), index, **toolz.merge(kwargs, kw))
    383             for (rg, kw) in part
/opt/conda/envs/nvt-conda-053/lib/python3.8/site-packages/dask/dataframe/io/parquet/core.py in <listcomp>(.0)
    380     if isinstance(part, list):
    381         dfs = [
--> 382             func(fs, rg, columns.copy(), index, **toolz.merge(kwargs, kw))
    383             for (rg, kw) in part
    384         ]

/opt/conda/envs/nvt-conda-053/lib/python3.8/site-packages/dask_cudf/io/parquet.py in read_partition(fs, piece, columns, index, categories, partitions, **kwargs)
     69         else:
     70             with fs.open(path, mode="rb") as f:
---> 71                 df = cudf.read_parquet(
     72                     f,
     73                     engine="cudf",

/opt/conda/envs/nvt-conda-053/lib/python3.8/site-packages/cudf/io/parquet.py in read_parquet(filepath_or_buffer, engine, columns, filters, row_groups, skiprows, num_rows, strings_to_categorical, use_pandas_metadata, *args, **kwargs)
    248 
    249     if engine == "cudf":
--> 250         return libparquet.read_parquet(
    251             filepaths_or_buffers,
    252             columns=columns,

cudf/_lib/parquet.pyx in cudf._lib.parquet.read_parquet()

cudf/_lib/parquet.pyx in cudf._lib.parquet.read_parquet()

MemoryError: std::bad_alloc: CUDA error at: /opt/conda/envs/nvt-conda-053/include/rmm/mr/device/cuda_memory_resource.hpp:69: cudaErrorMemoryAllocation out of memory

@leiterenato
Copy link
Author

@rjzamora @quasiben

I tried with 4 x T4 and only 1 criteo file.
Same error:

---------------------------------------------------------------------------
MemoryError                               Traceback (most recent call last)
/tmp/ipykernel_5336/4004743745.py in <module>
----> 1 analyze_dataset_op(datasets, workflow_path)

/tmp/ipykernel_5336/1001680271.py in analyze_dataset_op(datasets, workflow_path, split_name, device_limit_frac, device_pool_frac, part_mem_frac)
     31 
     32     logging.info('Starting workflow fitting')
---> 33     criteo_workflow = etl.analyze_dataset(criteo_workflow, dataset)
     34     logging.info('Finished generating statistics for dataset.')
     35 

/usr/local/lib/python3.8/dist-packages/preprocessing/etl.py in analyze_dataset(workflow, dataset)
    169 ) -> nvt.Workflow:
    170     '''Calculate statistics for a given workflow'''
--> 171     workflow.fit(dataset)
    172     return workflow
    173 

/nvtabular/nvtabular/workflow/workflow.py in fit(self, dataset)
    227                 results = [r.result() for r in self.client.compute(stats)]
    228             else:
--> 229                 results = dask.compute(stats, scheduler="synchronous")[0]
    230 
    231             for computed_stats, op in zip(results, ops):

~/.local/lib/python3.8/site-packages/dask-2021.7.1-py3.8.egg/dask/base.py in compute(*args, **kwargs)
    566         postcomputes.append(x.__dask_postcompute__())
    567 
--> 568     results = schedule(dsk, keys, **kwargs)
    569     return repack([f(r, *a) for r, (f, a) in zip(results, postcomputes)])
    570 

~/.local/lib/python3.8/site-packages/dask-2021.7.1-py3.8.egg/dask/local.py in get_sync(dsk, keys, **kwargs)
    558     """
    559     kwargs.pop("num_workers", None)  # if num_workers present, remove it
--> 560     return get_async(
    561         synchronous_executor.submit,
    562         synchronous_executor._max_workers,
~/.local/lib/python3.8/site-packages/dask-2021.7.1-py3.8.egg/dask/local.py in get_async(submit, num_workers, dsk, result, cache, get_id, rerun_exceptions_locally, pack_exception, raise_exception, callbacks, dumps, loads, chunksize, **kwargs)
    501             while state["waiting"] or state["ready"] or state["running"]:
    502                 fire_tasks(chunksize)
--> 503                 for key, res_info, failed in queue_get(queue).result():
    504                     if failed:
    505                         exc, tb = loads(res_info)

/usr/lib/python3.8/concurrent/futures/_base.py in result(self, timeout)
    435                     raise CancelledError()
    436                 elif self._state == FINISHED:
--> 437                     return self.__get_result()
    438 
    439                 self._condition.wait(timeout)

/usr/lib/python3.8/concurrent/futures/_base.py in __get_result(self)
    387         if self._exception:
    388             try:
--> 389                 raise self._exception
    390             finally:
    391                 # Break a reference cycle with the exception in self._exception

~/.local/lib/python3.8/site-packages/dask-2021.7.1-py3.8.egg/dask/local.py in submit(self, fn, *args, **kwargs)
    543         fut = Future()
    544         try:
--> 545             fut.set_result(fn(*args, **kwargs))
    546         except BaseException as e:
    547             fut.set_exception(e)

~/.local/lib/python3.8/site-packages/dask-2021.7.1-py3.8.egg/dask/local.py in batch_execute_tasks(it)
    235     Batch computing of multiple tasks with `execute_task`
    236     """
--> 237     return [execute_task(*a) for a in it]
    238 
    239 

~/.local/lib/python3.8/site-packages/dask-2021.7.1-py3.8.egg/dask/local.py in <listcomp>(.0)
    235     Batch computing of multiple tasks with `execute_task`
    236     """
--> 237     return [execute_task(*a) for a in it]
    238 
    239 

~/.local/lib/python3.8/site-packages/dask-2021.7.1-py3.8.egg/dask/local.py in execute_task(key, task_info, dumps, loads, get_id, pack_exception)
    226         failed = False
    227     except BaseException as e:
--> 228         result = pack_exception(e, dumps)
    229         failed = True
    230     return key, result, failed

~/.local/lib/python3.8/site-packages/dask-2021.7.1-py3.8.egg/dask/local.py in execute_task(key, task_info, dumps, loads, get_id, pack_exception)
    221     try:
    222         task, data = loads(task_info)
--> 223         result = _execute_task(task, data)
    224         id = get_id()
    225         result = dumps((result, id))

~/.local/lib/python3.8/site-packages/dask-2021.7.1-py3.8.egg/dask/core.py in _execute_task(arg, cache, dsk)
    119         # temporaries by their reference count and can execute certain
    120         # operations in-place.
--> 121         return func(*(_execute_task(a, cache) for a in args))
    122     elif not ishashable(arg):
    123         return arg

~/.local/lib/python3.8/site-packages/dask-2021.7.1-py3.8.egg/dask/optimization.py in __call__(self, *args)
    967         if not len(args) == len(self.inkeys):
    968             raise ValueError("Expected %d args, got %d" % (len(self.inkeys), len(args)))
--> 969         return core.get(self.dsk, self.outkey, dict(zip(self.inkeys, args)))
    970 
    971     def __reduce__(self):

~/.local/lib/python3.8/site-packages/dask-2021.7.1-py3.8.egg/dask/core.py in get(dsk, out, cache)
    149     for key in toposort(dsk):
    150         task = dsk[key]
--> 151         result = _execute_task(task, cache)
    152         cache[key] = result
    153     result = _execute_task(out, cache)

~/.local/lib/python3.8/site-packages/dask-2021.7.1-py3.8.egg/dask/core.py in _execute_task(arg, cache, dsk)
    119         # temporaries by their reference count and can execute certain
    120         # operations in-place.
--> 121         return func(*(_execute_task(a, cache) for a in args))
    122     elif not ishashable(arg):
    123         return arg

~/.local/lib/python3.8/site-packages/dask-2021.7.1-py3.8.egg/dask/dataframe/io/parquet/core.py in __call__(self, part)
     85             part = [part]
     86 
---> 87         return read_parquet_part(
     88             self.fs,
     89             self.engine,

~/.local/lib/python3.8/site-packages/dask-2021.7.1-py3.8.egg/dask/dataframe/io/parquet/core.py in read_parquet_part(fs, engine, meta, part, columns, index, kwargs)
    409             # Part kwargs expected
    410             func = engine.read_partition
--> 411             dfs = [
    412                 func(fs, rg, columns.copy(), index, **toolz.merge(kwargs, kw))
    413                 for (rg, kw) in part

~/.local/lib/python3.8/site-packages/dask-2021.7.1-py3.8.egg/dask/dataframe/io/parquet/core.py in <listcomp>(.0)
    410             func = engine.read_partition
    411             dfs = [
--> 412                 func(fs, rg, columns.copy(), index, **toolz.merge(kwargs, kw))
    413                 for (rg, kw) in part
    414             ]

/nvtabular/nvtabular/io/parquet.py in read_partition(cls, fs, pieces, *args, **kwargs)
     96                 # This version of cudf does not include optimized
     97                 # fsspec usage for remote storage - use custom code path
---> 98                 return _optimized_read_partition_remote(fs, pieces, *args, **kwargs)
     99             # Newer versions of cudf are already optimized for s3/gcs
    100             return CudfEngine.read_partition(fs, pieces, *args, **kwargs)

/nvtabular/nvtabular/io/fsspec_utils.py in _optimized_read_partition_remote(fs, pieces, columns, index, categories, partitions, **kwargs)
     68 
     69     # Call optimized read utility
---> 70     df = _optimized_read_remote(path, row_group, columns, fs, **kwargs)
     71 
     72     #

/nvtabular/nvtabular/io/fsspec_utils.py in _optimized_read_remote(path, row_groups, columns, fs, **kwargs)
    124     # Call cudf.read_parquet on the dummy buffer
    125     strings_to_cats = kwargs.get("strings_to_categorical", False)
--> 126     df = cudf.read_parquet(
    127         dummy_buffer,
    128         engine="cudf",

/usr/local/lib/python3.8/dist-packages/cudf/io/parquet.py in read_parquet(filepath_or_buffer, engine, columns, filters, row_groups, skiprows, num_rows, strings_to_categorical, use_pandas_metadata, *args, **kwargs)
    241 
    242     if engine == "cudf":
--> 243         return libparquet.read_parquet(
    244             filepaths_or_buffers,
    245             columns=columns,

cudf/_lib/parquet.pyx in cudf._lib.parquet.read_parquet()

cudf/_lib/parquet.pyx in cudf._lib.parquet.read_parquet()

MemoryError: std::bad_alloc: CUDA error at: /usr/include/rmm/mr/device/cuda_memory_resource.hpp:69: cudaErrorMemoryAllocation out of memory

@devavret
Copy link

@leiterenato Can you run it through nsys? nsys profile -o <desired profile filename> <your command> and post the file <desired profile filename> here.

@leiterenato
Copy link
Author

leiterenato commented Oct 14, 2021

@devavret follows the profile with nsys.
This time I executed with a VM with 1 x A100 GPU, but I got the same error.
profile.out.qdrep.zip

I will reboot the VM with a T4 and generate the same report.

@leiterenato
Copy link
Author

This is the execution with 1 x T4.

profile.out.qdrep.zip

@rjzamora
Copy link
Collaborator

@leiterenato - Can you share the code you are using to create and fit your workflow, and to define your dataset? Note that I am able to run the following without error:

cluster = LocalCUDACluster(
    n_workers=1,
    device_memory_limit=device_limit,
    rmm_pool_size=rmm_pool_size,
)
client = Client(cluster)

ds = nvt.Dataset("/my-path-to/criteo/day_0.parquet")
workflow = nvt.Workflow(features, client=client)
workflow.fit(ds)

However, when I do not create the client object and pass it to nvt.Workflow, I get the same error that you are reporting. Therefore, I just want to confirm that you are using the client.

@leiterenato
Copy link
Author

leiterenato commented Oct 14, 2021

@rjzamora
Here is the code I am using.
https://github.com/jarokaz/merlin-on-vertex/blob/main/src/preprocessing/test.ipynb.
I was passing the client to the nvt.Dataset, not to the Workflow (I though Workflow would use the client from Dataset).
It is working now, thank you very much and sorry not to pay attention to this detail.

@rjzamora
Copy link
Collaborator

I was passing the client to the nvt.Dataset, not to the Workflow (I though Workflow would use the client from Dataset).
It is working now, thank you very much and sorry not to pay attention to this detail.

No need to apologize! I actullay think that NVTabular should be picking up the global client object automatically to avoid this problem. We started with the client= API requirement for a reason, but I suspect it is time to relax this requirement.

@leiterenato
Copy link
Author

@rjzamora Thank you for the clarification.
In this case, should I pass the client to the nvt.Dataset as well?

@rjzamora
Copy link
Collaborator

In this case, should I pass the client to the nvt.Dataset as well?

No - This is probably not necessary.

For now, NVTabular will not use the distributed Dask scheduler in fit/transform unless you define your Workflow with the client= argument. You do not need to pass the client to the Dataset API, unless you are planning to do operations on the Dataset (or write it to disk) before transforming it with the Workflow. Note that Workflow.transform will output a Dataset object with the same client object that was used to create the workflow. So the most important place to define the client is usually Workflow.

Overall, I think the client behavior is unnecessarily confusing, and I'd like to simplify it before the next release (shouldn't be hard).

@leiterenato
Copy link
Author

Perfect! 100% clear.
I will close this case.

Thanks again.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
bug Something isn't working
Projects
None yet
Development

No branches or pull requests

5 participants