Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Out of Memory Sort Fails even with Spill over #57

Closed
VibhuJawa opened this issue May 28, 2019 · 38 comments
Closed

Out of Memory Sort Fails even with Spill over #57

VibhuJawa opened this issue May 28, 2019 · 38 comments

Comments

@VibhuJawa
Copy link
Member

VibhuJawa commented May 28, 2019

Out of Memory Sort still seems to be failing even with device spill PR merged. (#51) .

The memory still seems to linearly grow which causes RuntimeError: parallel_for failed: out of memory.

from dask.distributed import Client, wait
from dask_cuda import LocalCUDACluster
import cudf, dask_cudf

# Use dask-cuda to start one worker per GPU on a single-node system
# When you shutdown this notebook kernel, the Dask cluster also shuts down.
cluster = LocalCUDACluster(ip='0.0.0.0',n_workers=1, device_memory_limit='10000 MiB')
client = Client(cluster)
# # print client info
print(client)

# Code to simulate_data

def generate_file(output_file,rows=100):
    with open(output_file, 'wb') as f:
        f.write(b'A,B,C,D,E,F,G,H,I,J,K\n')
        f.write(b'22,697,56,0.0,0.0,0.0,0.0,0.0,0.0,0,0\n23,697,56,0.0,0.0,0.0,0.0,0.0,0.0,0,0\n'*(rows//2))
        f.close()

# generate the test file 
output_file='test.csv'
# Uncomment below
generate_file(output_file,rows=100_000_000)

# reading it using dask_cudf
df = dask_cudf.read_csv(output_file,chunksize='100 MiB')
print(df.head(10).to_pandas())


# reading it using dask_cudf
df = df.sort_values(['A','B','C'])

Error Trace :

--------------------------------------------------------------------------
RuntimeError                              Traceback (most recent call last)
<ipython-input-1-62d876400539> in <module>
     30 
     31 # reading it using dask_cudf
---> 32 df = df.sort_values(['A','B','C'])

~/anaconda3/envs/py_36_rapids/lib/python3.6/site-packages/dask_cudf/core.py in sort_values(self, by, ignore_index)
    440         """
    441         parts = self.to_delayed()
--> 442         sorted_parts = batcher_sortnet.sort_delayed_frame(parts, by)
    443         return from_delayed(sorted_parts, meta=self._meta).reset_index(
    444             force=not ignore_index

~/anaconda3/envs/py_36_rapids/lib/python3.6/site-packages/dask_cudf/batcher_sortnet.py in sort_delayed_frame(parts, by)
    133         list(map(delayed(lambda x: int(x is not None)), parts[:valid]))
    134     )
--> 135     valid = compute(valid_ct)[0]
    136     validparts = parts[:valid]
    137     return validparts

~/anaconda3/envs/py_36_rapids/lib/python3.6/site-packages/dask/base.py in compute(*args, **kwargs)
    396     keys = [x.__dask_keys__() for x in collections]
    397     postcomputes = [x.__dask_postcompute__() for x in collections]
--> 398     results = schedule(dsk, keys, **kwargs)
    399     return repack([f(r, *a) for r, (f, a) in zip(results, postcomputes)])
    400 

~/anaconda3/envs/py_36_rapids/lib/python3.6/site-packages/distributed/client.py in get(self, dsk, keys, restrictions, loose_restrictions, resources, sync, asynchronous, direct, retries, priority, fifo_timeout, actors, **kwargs)
   2566                     should_rejoin = False
   2567             try:
-> 2568                 results = self.gather(packed, asynchronous=asynchronous, direct=direct)
   2569             finally:
   2570                 for f in futures.values():

~/anaconda3/envs/py_36_rapids/lib/python3.6/site-packages/distributed/client.py in gather(self, futures, errors, maxsize, direct, asynchronous)
   1820                 direct=direct,
   1821                 local_worker=local_worker,
-> 1822                 asynchronous=asynchronous,
   1823             )
   1824 

~/anaconda3/envs/py_36_rapids/lib/python3.6/site-packages/distributed/client.py in sync(self, func, *args, **kwargs)
    751             return future
    752         else:
--> 753             return sync(self.loop, func, *args, **kwargs)
    754 
    755     def __repr__(self):

~/anaconda3/envs/py_36_rapids/lib/python3.6/site-packages/distributed/utils.py in sync(loop, func, *args, **kwargs)
    329             e.wait(10)
    330     if error[0]:
--> 331         six.reraise(*error[0])
    332     else:
    333         return result[0]

~/anaconda3/envs/py_36_rapids/lib/python3.6/site-packages/six.py in reraise(tp, value, tb)
    691             if value.__traceback__ is not tb:
    692                 raise value.with_traceback(tb)
--> 693             raise value
    694         finally:
    695             value = None

~/anaconda3/envs/py_36_rapids/lib/python3.6/site-packages/distributed/utils.py in f()
    314             if timeout is not None:
    315                 future = gen.with_timeout(timedelta(seconds=timeout), future)
--> 316             result[0] = yield future
    317         except Exception as exc:
    318             error[0] = sys.exc_info()

~/anaconda3/envs/py_36_rapids/lib/python3.6/site-packages/tornado/gen.py in run(self)
    727 
    728                     try:
--> 729                         value = future.result()
    730                     except Exception:
    731                         exc_info = sys.exc_info()

~/anaconda3/envs/py_36_rapids/lib/python3.6/site-packages/tornado/gen.py in run(self)
    734                     if exc_info is not None:
    735                         try:
--> 736                             yielded = self.gen.throw(*exc_info)  # type: ignore
    737                         finally:
    738                             # Break up a reference to itself

~/anaconda3/envs/py_36_rapids/lib/python3.6/site-packages/distributed/client.py in _gather(self, futures, errors, direct, local_worker)
   1651                             six.reraise(CancelledError, CancelledError(key), None)
   1652                         else:
-> 1653                             six.reraise(type(exception), exception, traceback)
   1654                     if errors == "skip":
   1655                         bad_keys.add(key)

~/anaconda3/envs/py_36_rapids/lib/python3.6/site-packages/six.py in reraise(tp, value, tb)
    690                 value = tp()
    691             if value.__traceback__ is not tb:
--> 692                 raise value.with_traceback(tb)
    693             raise value
    694         finally:

~/anaconda3/envs/py_36_rapids/lib/python3.6/site-packages/dask/compatibility.py in apply()
     91     def apply(func, args, kwargs=None):
     92         if kwargs:
---> 93             return func(*args, **kwargs)
     94         else:
     95             return func(*args)

~/anaconda3/envs/py_36_rapids/lib/python3.6/site-packages/dask_cudf/batcher_sortnet.py in _compare_frame()
     72     if a is not None and b is not None:
     73         joint = gd.concat([a, b])
---> 74         sorten = joint.sort_values(by=by)
     75         # Split the sorted frame using the *max_part_size*
     76         lhs, rhs = sorten[:max_part_size], sorten[max_part_size:]

~/anaconda3/envs/py_36_rapids/lib/python3.6/site-packages/cudf/dataframe/dataframe.py in sort_values()
   1279         return self._sort_by(self[by].argsort(
   1280             ascending=ascending,
-> 1281             na_position=na_position)
   1282         )
   1283 

~/anaconda3/envs/py_36_rapids/lib/python3.6/site-packages/cudf/dataframe/dataframe.py in _sort_by()
   1225         # Perform out = data[index] for all columns
   1226         for k in self.columns:
-> 1227             df[k] = self[k].take(sorted_indices.to_gpu_array())
   1228         df.index = self.index.take(sorted_indices.to_gpu_array())
   1229         return df

~/anaconda3/envs/py_36_rapids/lib/python3.6/site-packages/cudf/dataframe/series.py in take()
    324             return self[indices]
    325 
--> 326         col = cpp_copying.apply_gather_array(self.data.to_gpu_array(), indices)
    327 
    328         if self._column.mask:

cudf/bindings/copying.pyx in cudf.bindings.copying.apply_gather_array()

cudf/bindings/copying.pyx in cudf.bindings.copying.apply_gather_column()

cudf/bindings/copying.pyx in cudf.bindings.copying.apply_gather()

RuntimeError: parallel_for failed: out of memory
@VibhuJawa
Copy link
Member Author

@randerzander . FYI.

@mrocklin
Copy link
Contributor

@pentschev care to try the example above?

@pentschev
Copy link
Member

Sorry, I totally missed this before. I checked the example and I can reproduce. I believe we still have bugs on the memory spilling, which wasn't really tested before besides my synthetic test case. This is now on the top of my priority list, since now we seem to have more people who need this and we would like to have this working properly for 0.8.

@pentschev
Copy link
Member

What I found out that memory is really the issue, in the case described here, the GPU has 16GB of memory. Trying that example with device_memory_limit='10000 MiB' fails, and just before failing the real GPU memory utilization was at 16GB, despite the LRU being under 10GB. Reducing to device_memory_limit='5000 MiB' completes successfully, but takes 120 minutes. Raising back to device_memory_limit='10000 MiB' but reducing from chunksize='4096 MiB' to chunksize='1024 MiB' also finishes here, taking 71 minutes.

So what’s happening is that cuDF will allocate more memory that is proportional to the chunksize, which makes sense. As of now, I don’t know if there’s a better/safer way of keeping track of the entire device memory (including that managed by cuDF), so I don’t see another working solution for now other than having smaller chunks.

On side channels @VibhuJawa pointed out that the chunk sizes have a non-negligible impact on performance, so this is definitely something we want to improve in the future, but for the time being, using smaller chunk sizes is the only solution here.

@jrhemstad
Copy link

jrhemstad commented Aug 6, 2019

Could you try configuring RMM to use managed memory and see how that works?

See: https://github.com/rapidsai/rmm/blob/dfa8740883735e57bc9ebb95ed56a1321141a8b0/README.md#handling-rmm-options-in-python-code

You would use

from librmm_cffi import librmm_config as rmm_cfg
rmm_cfg.use_managed_memory = true

before import cudf

For a managed memory pool, you can do:

from librmm_cffi import librmm_config as rmm_cfg
rmm_cfg.use_managed_memory = true
rmm_cfg.use_pool_allocator = True

@pentschev
Copy link
Member

I've finally got a chance to test this and I can confirm enabling RMM's managed memory works for me. I simply made sure each worker enables managed memory, and the adapted code from #57 (comment) looks like this:

from dask.distributed import Client, wait
from dask_cuda import LocalCUDACluster
import cudf, dask_cudf

# Use dask-cuda to start one worker per GPU on a single-node system
# When you shutdown this notebook kernel, the Dask cluster also shuts down.
cluster = LocalCUDACluster(ip='0.0.0.0',n_workers=1, device_memory_limit='10000 MiB')
client = Client(cluster)
# # print client info
print(client)

# Code to simulate_data

def generate_file(output_file,rows=100):
    with open(output_file, 'wb') as f:
        f.write(b'A,B,C,D,E,F,G,H,I,J,K\n')
        f.write(b'22,697,56,0.0,0.0,0.0,0.0,0.0,0.0,0,0\n23,697,56,0.0,0.0,0.0,0.0,0.0,0.0,0,0\n'*(rows//2))
        f.close()

# generate the test file 
output_file='test.csv'
# Uncomment below
generate_file(output_file,rows=100_000_000)

def initialize_rmm():
    from librmm_cffi import librmm_config as rmm_cfg
    import cudf
    cudf.rmm.finalize()
    rmm_cfg.use_managed_memory = True
    return cudf.rmm.initialize()

client.run(initialize_rmm)

# reading it using dask_cudf
df = dask_cudf.read_csv(output_file,chunksize='100 MiB')
print(df.head(10).to_pandas())


# reading it using dask_cudf
df = df.sort_values(['A','B','C'])

Similarly, the code in #65 (comment) works too.

@VibhuJawa could you try it out as well?

@jrhemstad
Copy link

Note that with this PR from @shwina, it's even easier to set the allocator mode: rapidsai/cudf#2682

It should be as easy as just doing:

// non-pool, managed memory
set_allocator(allocator="managed")

// managed memory pool
set_allocator(allocator="managed", pool=True)

@pentschev
Copy link
Member

One additional comment, I did also some profiling on chunksize, setting it to four different values, 100 MiB, 200 MiB, 500 MiB and 1000 MiB.

First, without setting a device_memory_limit I get 3m45s, 2m50s, 1m36s and 1m23s for the values above, respectively. Then setting device_memory_limit=10000 MiB, there's little change, staying in the 4m20s-4m40s, in other words, Dask today is in general slower to move data to/from host, but will allow it to move also to disk.

Since the copying of data to host in Dask requires allocating NumPy arrays in host memory, this is one of the limiting factors (which when using managed memory on C++ libraries, such as cuDF, is essentially free since we don't necessarily have to pay the price of memory allocation). However, I worked on a fix for NumPy that was released only in 1.17.1 (numpy/numpy#14216 -- not yet available through conda, only pip) and it reduces execution time from 4m20s-4m40s down to 3m10s-4m10s.

@mrocklin these numbers may interest you as well.

@jakirkham
Copy link
Member

@jrhemstad, does managed here mean cudaMallocManaged?

@pentschev
Copy link
Member

@jrhemstad, does managed here mean cudaMallocManaged?

@jakirkham yes, the RMM documentation has details on that: https://github.com/rapidsai/rmm#cuda-managed-memory

@VibhuJawa
Copy link
Member Author

CC: @beckernick , see thread for spill-over discussion.

@datametrician
Copy link

@kkraus14 @mrocklin @quasiben how can we solve this quickly...

@quasiben
Copy link
Member

quasiben commented Jan 8, 2020

We just spent time chatting about resolving this issue with @beckernick . It was suggested to try CUDA Managed Memory. This solution has the draw back of not being supported with UCX message passing and thus the user would have to use TCP. Estimate for UCX support of CUDA Managed Memory will probably happen near the end of this year though this can potentially change with different resource allocation

@datametrician
Copy link

What else can we do in the short term?

@pentschev
Copy link
Member

I don't think we have any real alternatives to using managed memory to solve this issue, but @jrhemstad may have ideas on how we could emulate managed memory somehow on the RMM side without actual managed memory.

If we don't have any real alternatives as I believe to be the case, I think the best way to handle this is to focus on speeding up managed memory support in UCX.

@mrocklin
Copy link
Contributor

mrocklin commented Jan 9, 2020 via email

@datametrician
Copy link

SOL we can sort 1TB on 16 GPUs in ~70 seconds right now. I believe MapR holds the record on 1004x (4-core) nodes at ~50 seconds. Given a DGX-2 has <60 cores seems like SOL to SOL favors GPU.

@mrocklin
Copy link
Contributor

mrocklin commented Jan 18, 2020 via email

@datametrician
Copy link

That is from disk (csv) and writing back to disk.

@jakirkham
Copy link
Member

What if we copied managed memory over to non-managed memory before sending it?

@pentschev
Copy link
Member

What if we copied managed memory over to non-managed memory before sending it?

I would then argue that we will increase the memory footprint in such a case, and that's probably going to limit problem sizes even more. That said, my bias/preferred action would continue to be on working for managed memory support in UCX sooner.

@jakirkham
Copy link
Member

We could reuse the same buffer to copy into. Admittedly it will still increase the memory footprint, but by a fixed amount. It may also help with other issues (buffer registration).

Agree this is a workaround. However if we are trying to do something sooner than fixing UCX, this could be one path to doing that.

@pentschev
Copy link
Member

I see now that you're suggesting a fixed memory footprint, which didn't occur to me before. In that case, there's still a big issue: for larger buffers, we would need to do multiple copies, that would incur in various synchronization steps and decrease performance too. I'm very inclined to believe that the gain we would have by using managed memory in this case would be consumed by this behavior.

I'm not opposed to someone trying that out if one has the bandwidth to do it, but I still think that time would be better spent working directly with UCX folks to solve the issue at core.

@jakirkham
Copy link
Member

So I may be misunderstanding, but the motivation for using managed memory was to avoid a crash not improve performance. Is that correct? If so, I don't think we need to be concerned about degraded performance because it would still run (which is infinitely better 😉). In any event, it is very difficult to reason correctly about how much of a performance penalty one would take. Much easier to run it and benchmark it.

@pentschev
Copy link
Member

It's actually twofold: there may be an OOM crash, but managed memory has also a significant improvement in performance with TCP, which I can't explain but it's very useful.

Apart from that, there's another issue I forgot to mention -- which is arguably more important -- and is that we can't currently have any managed memory if we're using UCX, all those allocations will be captured by UCX. IOW, the application must not contain any managed memory, if it does the application crashes as we have no way to enforce UCX not to capture that memory.

@pentschev
Copy link
Member

@jakirkham @kkraus14 and I were discussing this offline, so to set expectations straight:

“To share device memory pointers and events across processes, an application must use the Inter Process Communication API, which is described in detail in the reference manual. The IPC API is only supported for 64-bit processes on Linux and for devices of compute capability 2.0 and higher. Note that the IPC API is not supported for cudaMallocManaged allocations.”

Reference: https://docs.nvidia.com/cuda/cuda-c-programming-guide/index.html#interprocess-communication

@jakirkham
Copy link
Member

It's actually twofold: there may be an OOM crash, but managed memory has also a significant improvement in performance with TCP, which I can't explain but it's very useful.

If that's the case, could we just only use TCP with managed memory? Or is there a downside here?

@pentschev
Copy link
Member

If that's the case, could we just only use TCP with managed memory? Or is there a downside here?

I'm not sure I understand your question. If we use TCP, then there's no NVLink, which means transfers are limited by TCP bandwidth.

@jakirkham
Copy link
Member

...managed memory has also a significant improvement in performance with TCP...

Maybe I'm not understanding. Is this useful or should we ignore it?

@pentschev
Copy link
Member

Maybe I'm not understanding. Is this useful or should we ignore it?

With TCP it's useful, but for the reasons we discussed above, we can't use managed memory with UCX, so in this last case it's not useful (even though my initial hopes were that it would work and we would see a performance improvement with UCX too).

@kkraus14
Copy link

managed memory has also a significant improvement in performance with TCP, which I can't explain but it's very useful.

This is likely that managed memory is optimizing the device <--> host memory transfer which can provide a 4x speedup over a naive non-pinned device <--> host memory transfer.

@pentschev
Copy link
Member

@kkraus14 by "optimizing" you mean that it internally uses pinned memory or are you suggesting there are even further optimizations?

@kkraus14
Copy link

Internally it uses pinned memory and then it tries to be smart about how it prefetches memory and overlaps the transfers with compute.

@jakirkham
Copy link
Member

Yeah we might look at doing this (spilling to pinned memory) ourselves. There has been some discussion about adding some sort of pinned memory support with RMM. Though there is likely some work needed on the dask-cuda side to spill to pinned memory. Not entirely sure what this will look like yet.

@beckernick
Copy link
Member

@jakirkham
Copy link
Member

If we do look into spilling to pinned memory with RMM, issue ( rapidsai/rmm#260 ) is tracking the relevant work needed/done.

@madsbk madsbk mentioned this issue Aug 6, 2020
6 tasks
@pentschev
Copy link
Member

With TPCx-BB efforts being largely successful, I'd say this has been fixed or improved substantially, is that correct @VibhuJawa ? Are we good closing this or should we keep it open?

@VibhuJawa
Copy link
Member Author

With TPCx-BB efforts being largely successful, I'd say this has been fixed or improved substantially, is that correct @VibhuJawa ? Are we good closing this or should we keep it open?

@pentschev , Yup, with all the TPCxbb work this has indeed improved a lot. This is good to close in my book too.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

9 participants