Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[BUG] repartition failing on multiple-workers #2321

Closed
VibhuJawa opened this issue Jul 17, 2019 · 19 comments
Closed

[BUG] repartition failing on multiple-workers #2321

VibhuJawa opened this issue Jul 17, 2019 · 19 comments
Assignees
Labels
bug Something isn't working dask Dask issue Python Affects Python cuDF API.

Comments

@VibhuJawa
Copy link
Member

VibhuJawa commented Jul 17, 2019

Describe the bug
I am running into issues when i try to re partition on (multiple workers>=4) even though i have enough memory , I am at just 43 % capacity.

This seems to work fine if i have just 2 workers.

Steps/Code to reproduce bug
Helper Functions

import cudf
import dask
import dask_cudf
from dask_cuda import LocalCUDACluster
from dask.distributed import Client, wait

import numpy as np
import pandas as pd
import numpy as np
import io


n_cols = 40
#n_rows per worker 
n_rows = 4_850_000 
n_parts_per_worker = 8
# appx 33 Mill per worker

dtypes = dict(zip([str(i) for i in range(0,n_cols)],[np.int32]*(n_cols)))
df = pd.read_csv(io.StringIO(""),names=list(dtypes.keys()), dtype=dtypes)
meta_df = cudf.from_pandas(df)


#works with 2 workers but fails at 4
n_workers = 4
# Create Cluster
cluster = LocalCUDACluster(n_workers=n_workers)
client = Client(cluster)


## DataFrame Helper Function
def create_df(n_rows,n_cols):
    df=cudf.DataFrame()
    for col_id in range(0,n_cols):
        df[str(col_id)]= np.ones(shape=n_rows,dtype=np.int32)
    return df

Create dataframe

# Create Data Frame
parts = [dask.delayed(create_df)(n_rows,n_cols=n_cols) for i in range(0,n_workers*n_parts_per_worker)]
df = dask_cudf.from_delayed(parts,meta=meta_df)
df = df.persist()
wait(df)
print("len of df = {:,}".format(len(df)))
! nvidia-smi

nvidia-smi output

Wed Jul 17 11:56:55 2019       
+-----------------------------------------------------------------------------+
| NVIDIA-SMI 410.48                 Driver Version: 410.48                    |
|-------------------------------+----------------------+----------------------+
| GPU  Name        Persistence-M| Bus-Id        Disp.A | Volatile Uncorr. ECC |
| Fan  Temp  Perf  Pwr:Usage/Cap|         Memory-Usage | GPU-Util  Compute M. |
|===============================+======================+======================|
|   0  Tesla P100-SXM2...  On   | 00000000:06:00.0 Off |                    0 |
| N/A   35C    P0    48W / 300W |   7104MiB / 16280MiB |      0%      Default |
+-------------------------------+----------------------+----------------------+
|   1  Tesla P100-SXM2...  On   | 00000000:07:00.0 Off |                    0 |
| N/A   35C    P0    49W / 300W |   6757MiB / 16280MiB |      0%      Default |
+-------------------------------+----------------------+----------------------+
|   2  Tesla P100-SXM2...  On   | 00000000:0A:00.0 Off |                    0 |
| N/A   32C    P0    46W / 300W |   6757MiB / 16280MiB |      0%      Default |
+-------------------------------+----------------------+----------------------+
|   3  Tesla P100-SXM2...  On   | 00000000:0B:00.0 Off |                    0 |
| N/A   32C    P0    44W / 300W |   6757MiB / 16280MiB |      0%      Default |
+-------------------------------+----------------------+----------------------+
|   4  Tesla P100-SXM2...  On   | 00000000:85:00.0 Off |                    0 |
| N/A   32C    P0    33W / 300W |     10MiB / 16280MiB |      0%      Default |
+-------------------------------+----------------------+----------------------+
|   5  Tesla P100-SXM2...  On   | 00000000:86:00.0 Off |                    0 |
| N/A   31C    P0    34W / 300W |     10MiB / 16280MiB |      0%      Default |
+-------------------------------+----------------------+----------------------+
|   6  Tesla P100-SXM2...  On   | 00000000:89:00.0 Off |                    0 |
| N/A   31C    P0    35W / 300W |     10MiB / 16280MiB |      0%      Default |
+-------------------------------+----------------------+----------------------+
|   7  Tesla P100-SXM2...  On   | 00000000:8A:00.0 Off |                    0 |
| N/A   32C    P0    32W / 300W |     10MiB / 16280MiB |      0%      Default |
+-------------------------------+----------------------+----------------------+
                                                                               
+-----------------------------------------------------------------------------+
| Processes:                                                       GPU Memory |
|  GPU       PID   Type   Process name                             Usage      |
|=============================================================================|
|    0     59424      C   ...naconda3/envs/rapids_nightly/bin/python   347MiB |
|    0     59468      C   ...naconda3/envs/rapids_nightly/bin/python  6747MiB |
|    1     59472      C   ...naconda3/envs/rapids_nightly/bin/python  6747MiB |
|    2     59466      C   ...naconda3/envs/rapids_nightly/bin/python  6747MiB |
|    3     59470      C   ...naconda3/envs/rapids_nightly/bin/python  6747MiB |
+-----------------------------------------------------------------------------+

Repartition df

## repartition Df
## Run into OOM issues at repartition
df = df.repartition(npartitions=n_workers)
df = df.persist()
wait(df)
print("len of df = {:,}".format(len(df)))

Error Trace

---------------------------------------------------------------------------
RuntimeError                              Traceback (most recent call last)
<ipython-input-3-219dd226ab30> in <module>
      4 df = df.persist()
      5 wait(df)
----> 6 print("len of df = {:,}".format(len(df)))

~/anaconda3/envs/rapids_nightly/lib/python3.7/site-packages/dask/dataframe/core.py in __len__(self)
    510     def __len__(self):
    511         return self.reduction(
--> 512             len, np.sum, token="len", meta=int, split_every=False
    513         ).compute()
    514 

~/anaconda3/envs/rapids_nightly/lib/python3.7/site-packages/dask/base.py in compute(self, **kwargs)
    173         dask.base.compute
    174         """
--> 175         (result,) = compute(self, traverse=False, **kwargs)
    176         return result
    177 

~/anaconda3/envs/rapids_nightly/lib/python3.7/site-packages/dask/base.py in compute(*args, **kwargs)
    444     keys = [x.__dask_keys__() for x in collections]
    445     postcomputes = [x.__dask_postcompute__() for x in collections]
--> 446     results = schedule(dsk, keys, **kwargs)
    447     return repack([f(r, *a) for r, (f, a) in zip(results, postcomputes)])
    448 

~/anaconda3/envs/rapids_nightly/lib/python3.7/site-packages/distributed/client.py in get(self, dsk, keys, restrictions, loose_restrictions, resources, sync, asynchronous, direct, retries, priority, fifo_timeout, actors, **kwargs)
   2525                     should_rejoin = False
   2526             try:
-> 2527                 results = self.gather(packed, asynchronous=asynchronous, direct=direct)
   2528             finally:
   2529                 for f in futures.values():

~/anaconda3/envs/rapids_nightly/lib/python3.7/site-packages/distributed/client.py in gather(self, futures, errors, direct, asynchronous)
   1821                 direct=direct,
   1822                 local_worker=local_worker,
-> 1823                 asynchronous=asynchronous,
   1824             )
   1825 

~/anaconda3/envs/rapids_nightly/lib/python3.7/site-packages/distributed/client.py in sync(self, func, asynchronous, callback_timeout, *args, **kwargs)
    761         else:
    762             return sync(
--> 763                 self.loop, func, *args, callback_timeout=callback_timeout, **kwargs
    764             )
    765 

~/anaconda3/envs/rapids_nightly/lib/python3.7/site-packages/distributed/utils.py in sync(loop, func, callback_timeout, *args, **kwargs)
    330             e.wait(10)
    331     if error[0]:
--> 332         six.reraise(*error[0])
    333     else:
    334         return result[0]

~/anaconda3/envs/rapids_nightly/lib/python3.7/site-packages/six.py in reraise(tp, value, tb)
    691             if value.__traceback__ is not tb:
    692                 raise value.with_traceback(tb)
--> 693             raise value
    694         finally:
    695             value = None

~/anaconda3/envs/rapids_nightly/lib/python3.7/site-packages/distributed/utils.py in f()
    315             if callback_timeout is not None:
    316                 future = gen.with_timeout(timedelta(seconds=callback_timeout), future)
--> 317             result[0] = yield future
    318         except Exception as exc:
    319             error[0] = sys.exc_info()

~/anaconda3/envs/rapids_nightly/lib/python3.7/site-packages/tornado/gen.py in run(self)
    733 
    734                     try:
--> 735                         value = future.result()
    736                     except Exception:
    737                         exc_info = sys.exc_info()

~/anaconda3/envs/rapids_nightly/lib/python3.7/site-packages/tornado/gen.py in run(self)
    740                     if exc_info is not None:
    741                         try:
--> 742                             yielded = self.gen.throw(*exc_info)  # type: ignore
    743                         finally:
    744                             # Break up a reference to itself

~/anaconda3/envs/rapids_nightly/lib/python3.7/site-packages/distributed/client.py in _gather(self, futures, errors, direct, local_worker)
   1678                             exc = CancelledError(key)
   1679                         else:
-> 1680                             six.reraise(type(exception), exception, traceback)
   1681                         raise exc
   1682                     if errors == "skip":

~/anaconda3/envs/rapids_nightly/lib/python3.7/site-packages/six.py in reraise(tp, value, tb)
    690                 value = tp()
    691             if value.__traceback__ is not tb:
--> 692                 raise value.with_traceback(tb)
    693             raise value
    694         finally:

~/anaconda3/envs/rapids_nightly/lib/python3.7/site-packages/dask/dataframe/methods.py in concat()
    341         func = concat_dispatch.dispatch(type(dfs[0]))
    342         return func(
--> 343             dfs, axis=axis, join=join, uniform=uniform, filter_warning=filter_warning
    344         )
    345 

~/anaconda3/envs/rapids_nightly/lib/python3.7/site-packages/dask_cudf/backends.py in concat_cudf()
     31     assert axis == 0
     32     assert join == "outer"
---> 33     return cudf.concat(dfs)

~/anaconda3/envs/rapids_nightly/lib/python3.7/site-packages/cudf/multi.py in concat()
     52 
     53     if typ is DataFrame:
---> 54         return DataFrame._concat(objs, axis=axis, ignore_index=ignore_index)
     55     elif typ is Series:
     56         return Series._concat(objs, axis=axis)

~/anaconda3/envs/rapids_nightly/lib/python3.7/site-packages/cudf/dataframe/dataframe.py in _concat()
   1444         data = [
   1445             (c, Series._concat([o[c] for o in objs], index=index))
-> 1446             for c in objs[0].columns
   1447         ]
   1448         out = cls(data)

~/anaconda3/envs/rapids_nightly/lib/python3.7/site-packages/cudf/dataframe/dataframe.py in <listcomp>()
   1444         data = [
   1445             (c, Series._concat([o[c] for o in objs], index=index))
-> 1446             for c in objs[0].columns
   1447         ]
   1448         out = cls(data)

~/anaconda3/envs/rapids_nightly/lib/python3.7/site-packages/cudf/dataframe/series.py in _concat()
   1035         else:
   1036             name = None
-> 1037         col = Column._concat([o._column for o in objs])
   1038         return cls(data=col, index=index, name=name)
   1039 

~/anaconda3/envs/rapids_nightly/lib/python3.7/site-packages/cudf/dataframe/column.py in _concat()
    110         # Performance the actual concatenation
    111         if newsize > 0:
--> 112             col = _column_concat(objs, col)
    113 
    114         return col

cudf/bindings/concat.pyx in cudf.bindings.concat._column_concat()

RuntimeError: CUDA error encountered at: /conda/envs/gdf/conda-bld/libcudf_1563314405241/work/cpp/src/column/legacy/column.cpp:101: 11 cudaErrorInvalidValue invalid argument

Environment overview (please complete the following information)

  • Method of cuDF install: [conda]
cudf                      0.9.0a                py37_1312    rapidsai-nightly
dask-cudf                 0.9.0a1                  py37_0    rapidsai-nightly/label/cuda10.0
libcudf                   0.9.0a            cuda10.0_1312    rapidsai-nightly
dask-cuda                 0.9.0a0+17.g3057f94.dirty          pypi_0    pypi

Edit: Added Nvidia-smi output.

Edit 2: Removed OOM from heading as this seems to be unrelated.
Sorry for the confusion

@VibhuJawa VibhuJawa added Needs Triage Need team to review and classify bug Something isn't working labels Jul 17, 2019
@kkraus14 kkraus14 added Python Affects Python cuDF API. dask Dask issue and removed Needs Triage Need team to review and classify labels Jul 17, 2019
@pentschev
Copy link
Member

Is this issue deterministic? I just ran the example above on a DGX with 16GB GPUs and I can't reproduce that. nvidia-smi also doesn't report memory growing past 7399MB for me.

@pentschev
Copy link
Member

For completeness, I tested this on nightly build.

@VibhuJawa
Copy link
Member Author

VibhuJawa commented Jul 18, 2019

@pentschev , Can you provide the exact versions that you are on , Cause i am on nightly too (just updated everything) .

This seems to be non-deterministic for me.

Also, this might be un-related to OOM cause it seems to be peaking at 12000 MiB for me but i can still not do a re partition successfully.

I have updated the heading to reflect that.

%%bash
conda list | grep 'cudf'
conda list | grep 'dask'
cudf                      0.9.0a                py37_1412    rapidsai-nightly
dask-cudf                 0.9.0a1                  py37_0    rapidsai-nightly/label/cuda10.0
libcudf                   0.9.0a            cuda10.0_1412    rapidsai-nightly
dask                      2.1.0                      py_0  
dask-core                 2.1.0                      py_0  
dask-cuda                 0.9.0a0+17.g3057f94.dirty          pypi_0    pypi
dask-cudf                 0.9.0a1                  py37_0    rapidsai-nightly/label/cuda10.0 

@VibhuJawa VibhuJawa changed the title [BUG] OOM issues with repartition [BUG] repartition failing on multiple-workers Jul 18, 2019
@pentschev
Copy link
Member

@VibhuJawa here's a list of what I'm using:

cudf                      0.9.0a                py37_1435    rapidsai-nightly
dask                      2.1.0                      py_0    conda-forge
dask-core                 2.1.0                      py_0    conda-forge
dask-cuda                 0.9.0a0+17.g3057f94.dirty          pypi_0    pypi
dask-cudf                 0.9.0a1                  py37_0    rapidsai-nightly
libcudf                   0.9.0a             cuda9.2_1435    rapidsai-nightly

I still couldn't reproduce this after some 3-4 attempts just now

@pentschev
Copy link
Member

The only major difference I see is you're using CUDA 10, not sure if that is of big concern. Also, I just updated everything, so maybe your cudf build is from yesterday.

@pentschev
Copy link
Member

Alright, I was able to reproduce this, and it was my mistake that I couldn't before (I was only running the first two blocks of code, and missed the third one).

This is directly related to the issue in rapidsai/dask-cuda#57. What happens here is the worker crashes due to the amount of data. Setting device_memory_limit may help by spilling it to disk, but it will make it slower.

I discussed offline with @VibhuJawa, and for such pipelines we have two options:

  1. Paying the price of working with smaller chunks; or
  2. Paying the price of spilling data to host more often.

Both will incur in overhead, and this may be very dependent on the algorithm in question, but I tend to believe that option 1 will tend to perform better.

I have been working on benchmarking alternatives for spilling memory in rapidsai/dask-cuda#92, but the outlook isn't great. Besides the cost of copying the memory to host, there's also a cost into serializing that memory. For an idea of the current status, spilling to host has currently a bandwidth of about 550 MB/s, in contrast to the real bandwidth we can achieve of 6 GB/s when using unpinned memory. I expect to be able to speedup serialization, but the actual spilling bandwidth will certainly be < 6 GB/s.

@VibhuJawa
Copy link
Member Author

@pentschev ,

Apologies for the late follow up.

Just to be clear, could you make spill over work with repartition on smaller chunks on a 16 gb card ?

I changed the number of rows to just 37_890 and parts per worker to 1024 . (We have about 38 million rows per worker) and set device_memory_limit='10000 MiB' but am still getting the same error.

## n_rows per worker 
n_rows = 4_850_000//128 (`37890`)
n_parts_per_worker = 8*128

@mrocklin
Copy link
Collaborator

mrocklin commented Aug 6, 2019

Why do you want only one partition per worker? This seems possibly inefficient. For example, what happens if a few of the partitions end up on the same worker? Dask makes no guarantees about behavior here.

@mrocklin
Copy link
Collaborator

mrocklin commented Aug 6, 2019

So if I have a 16GB card, and 7GB of memory per worker spread among lots of partitions. Then I move the data around so that I can repartition it. Lets say that in a pessimistic case most pieces of data have to move, so now I have 14 GB of memory occupied per worker. Then, just for a moment, I need to copy 7GB of that data into a single 7GB dataframe before I can release the 7GB of smaller dataframes. Now I have, briefly, 21 GB of data on my 16GB cards.

Perhaps this explains the problem?

@mrocklin
Copy link
Collaborator

mrocklin commented Aug 6, 2019

Dask's scheduling heuristics aren't good at playing very close to the line like this. I recommend keeping many small partitions. Things work more smoothly. If you need giant chunks for some reason, then Dask may not be a good fit. This approach is more common in MPI workloads.

@ayushdg
Copy link
Member

ayushdg commented Aug 6, 2019

Is there a general estimate on how close can we be to the limit? Usually on host based workflows there is a sweet spot for the number of partitions (not too many, not too few) which gives the best performance.
My experience running different workflows using Dask_cudf has been that the performance seems to keep improving as we increase the partition size to being as large as possible without going OOM. So often while optimizing for performance I seem to keep pushing for giant chunks until I get memory errors.

@mrocklin
Copy link
Collaborator

mrocklin commented Aug 6, 2019

Our general recommendations are here: https://docs.dask.org/en/latest/best-practices.html#avoid-very-large-partitions . Note that in your case I'm assuming that you're running with one thread rather than 10, as in that example.

You should have space for a few times more chunks to be in memory than you have concurrent tasks.

My understanding from @kkraus14 was that performance benefits should drop off a bit after you reach a large enough size to saturate all of the thread blocks. The estimate I was given was that this was likely to occur in the few hundred MB range. I don't know what's optimal though, it sounds like you have evidence against this.

@mrocklin
Copy link
Collaborator

mrocklin commented Aug 6, 2019

In your case, I'd recommend chunks in the gigabyte range, unless there is some very pressing need to go larger.

@VibhuJawa
Copy link
Member Author

Why do you want only one partition per worker? This seems possibly inefficient. For example, what happens if a few of the partitions end up on the same worker? Dask makes no guarantees about behavior here.

I wanted to re-partition this because i was sending it to Dask-XGBoost to train, which concats the dataframes on each worker before training and i wanted to side-step that by doing this re-partition.

See: https://github.com/rapidsai/dask-xgboost/blob/dask-cudf/dask_xgboost/core.py#L67-L69

@mt-jones , Can you please confirm if we can prevent the memory shoot up we were seeing by doing this or is it completely unrelated ?

I have a 16GB card, and 7GB of memory per worker spread among lots of partitions. Then I move the data around so that I can repartition it. Lets say that in a pessimistic case most pieces of data have to move, so now I have 14 GB of memory occupied per worker. Then, just for a moment, I need to copy 7GB of that data into a single 7GB dataframe before I can release the 7GB of smaller dataframes. Now I have, briefly, 21 GB of data on my 16GB cards.

Perhaps this explains the problem?

Yup, that explains it but can we not side-step that problem by doing the below config changes ?

dask.config.set({'distributed.scheduler.work-stealing': False})
dask.config.set({'distributed.scheduler.bandwidth': 1})

On the performance aspects of large chunks vs small-chunks, my experience is similar as @ayushdg .

I ran some performance tests on a workflow yesterday which i can share on side-channels which show the performance drop with decreasing chunksize .

@mrocklin
Copy link
Collaborator

mrocklin commented Aug 6, 2019

I do not immediately see how those changes would avoid the situation I'm talking about above.

The bit that does this in XGBoost intentionally looks at the data that is already on the workers and concatenates that. That should remove the extra 7GB from moving data around. This is unrelated to the config settings you mention.

@mrocklin
Copy link
Collaborator

mrocklin commented Aug 6, 2019

I ran some performance tests on a workflow yesterday which i can share on side-channels which show the performance drop with decreasing chunksize .

FWIW I think that it'd be great to show such a benchmark in a public place like a GitHub issue if you're able to make something that is shareable.

@VibhuJawa
Copy link
Member Author

FWIW I think that it'd be great to show such a benchmark in a public place like a GitHub issue if you're able to make something that is shareable.

Okay, let me try to run the same operations on some dummy data and try to make something sharable.

@mrocklin
Copy link
Collaborator

mrocklin commented Aug 6, 2019

Also, I suspect that the conversation will be able to engage more people if it doesn't include Dask. It would be interesting to see how cudf operations react as data sizes get larger. If you can show performance degredation with small chunks with just cudf then you'll have a lot of people who can engage on the problem. If Dask is involved then it's probably just a couple of us (most of whom are saturated).

@kkraus14
Copy link
Collaborator

This is resolved as of 0.14.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
bug Something isn't working dask Dask issue Python Affects Python cuDF API.
Projects
None yet
Development

No branches or pull requests

5 participants