Skip to content

Commit

Permalink
Porting spillabe buffer and manager from rapidsai#11553
Browse files Browse the repository at this point in the history
  • Loading branch information
madsbk committed Nov 9, 2022
1 parent 7535f31 commit f616499
Show file tree
Hide file tree
Showing 26 changed files with 1,602 additions and 44 deletions.
4 changes: 4 additions & 0 deletions ci/gpu/build.sh
Original file line number Diff line number Diff line change
Expand Up @@ -282,6 +282,10 @@ conda list
gpuci_logger "Python py.test for cuDF"
py.test -n 8 --cache-clear --basetemp="$WORKSPACE/cudf-cuda-tmp" --ignore="$WORKSPACE/python/cudf/cudf/benchmarks" --junitxml="$WORKSPACE/junit-cudf.xml" -v --cov-config="$WORKSPACE/python/cudf/.coveragerc" --cov=cudf --cov-report=xml:"$WORKSPACE/python/cudf/cudf-coverage.xml" --cov-report term --dist=loadscope tests

gpuci_logger "Python py.tests for cuDF with spilling (CUDF_SPILL_DEVICE_LIMIT=1)"
# Due to time concerns, we only run a limited set of tests
CUDF_SPILL=on CUDF_SPILL_DEVICE_LIMIT=1 py.test -n 8 --cache-clear --basetemp="$WORKSPACE/cudf-cuda-tmp" --ignore="$WORKSPACE/python/cudf/cudf/benchmarks" -v --cov-config="$WORKSPACE/python/cudf/.coveragerc" --cov-append --cov=cudf --cov-report=xml:"$WORKSPACE/python/cudf/cudf-coverage.xml" --cov-report term --dist=loadscope tests/test_binops.py tests/test_dataframe.py tests/test_buffer.py tests/test_onehot.py tests/test_reshape.py

cd "$WORKSPACE/python/dask_cudf"
gpuci_logger "Python py.test for dask-cudf"
py.test -n 8 --cache-clear --basetemp="$WORKSPACE/dask-cudf-cuda-tmp" --junitxml="$WORKSPACE/junit-dask-cudf.xml" -v --cov-config=.coveragerc --cov=dask_cudf --cov-report=xml:"$WORKSPACE/python/dask_cudf/dask-cudf-coverage.xml" --cov-report term dask_cudf
Expand Down
9 changes: 8 additions & 1 deletion docs/cudf/source/developer_guide/library_design.md
Original file line number Diff line number Diff line change
Expand Up @@ -203,7 +203,6 @@ For instance, all numerical types (floats and ints of different widths) are all

### Buffer


`Column`s are in turn composed of one or more `Buffer`s.
A `Buffer` represents a single, contiguous, device memory allocation owned by another object.
A `Buffer` constructed from a preexisting device memory allocation (such as a CuPy array) will view that memory.
Expand All @@ -212,6 +211,14 @@ Conversely, when constructed from a host object,
The data is then copied from the host object into the newly allocated device memory.
You can read more about [device memory allocation with RMM here](https://github.com/rapidsai/rmm).


### Spilling to host memory

Setting the environment variable `CUDF_SPILL=on` enables automatic spilling (and "unspilling") of buffers from
device to host to enable out-of-memory computation, i.e., computing on objects that occupy more memory than is
available on the GPU.


## The Cython layer

The lowest level of cuDF is its interaction with `libcudf` via Cython.
Expand Down
3 changes: 3 additions & 0 deletions python/cudf/cudf/_lib/binaryop.pyx
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ from cudf._lib.cpp.types cimport data_type, type_id
from cudf._lib.types cimport dtype_to_data_type, underlying_type_t_type_id

from cudf.api.types import is_scalar, is_string_dtype
from cudf.core.buffer import with_spill_lock

cimport cudf._lib.cpp.binaryop as cpp_binaryop
from cudf._lib.cpp.binaryop cimport binary_operator
Expand Down Expand Up @@ -156,6 +157,7 @@ cdef binaryop_s_v(DeviceScalar lhs, Column rhs,
return Column.from_unique_ptr(move(c_result))


@with_spill_lock()
def binaryop(lhs, rhs, op, dtype):
"""
Dispatches a binary op call to the appropriate libcudf function:
Expand Down Expand Up @@ -203,6 +205,7 @@ def binaryop(lhs, rhs, op, dtype):
return result


@with_spill_lock()
def binaryop_udf(Column lhs, Column rhs, udf_ptx, dtype):
"""
Apply a user-defined binary operator (a UDF) defined in `udf_ptx` on
Expand Down
6 changes: 4 additions & 2 deletions python/cudf/cudf/_lib/column.pxd
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
# Copyright (c) 2020, NVIDIA CORPORATION.
# Copyright (c) 2020-2022, NVIDIA CORPORATION.

from libcpp cimport bool
from libcpp.memory cimport unique_ptr
Expand Down Expand Up @@ -28,7 +28,9 @@ cdef class Column:
cdef mutable_column_view mutable_view(self) except *

@staticmethod
cdef Column from_unique_ptr(unique_ptr[column] c_col)
cdef Column from_unique_ptr(
unique_ptr[column] c_col, bint data_ptr_exposed=*
)

@staticmethod
cdef Column from_column_view(column_view, object)
Expand Down
104 changes: 85 additions & 19 deletions python/cudf/cudf/_lib/column.pyx
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,14 @@ import rmm
import cudf
import cudf._lib as libcudf
from cudf.api.types import is_categorical_dtype
from cudf.core.buffer import Buffer, as_buffer
from cudf.core.buffer import (
Buffer,
SpillableBuffer,
SpillLock,
as_buffer,
get_spill_lock,
with_spill_lock,
)

from cpython.buffer cimport PyObject_CheckBuffer
from libc.stdint cimport uintptr_t
Expand Down Expand Up @@ -95,7 +102,11 @@ cdef class Column:
if self._data is None:
start = self.offset * self.dtype.itemsize
end = start + self.size * self.dtype.itemsize
self._data = self.base_data[start:end]
if start == 0 and end == self.base_data.size:
# `data` spans all of `base_data`
self._data = self.base_data
else:
self._data = self.base_data[start:end]
return self._data

@property
Expand Down Expand Up @@ -249,7 +260,8 @@ cdef class Column:
@property
def null_count(self):
if self._null_count is None:
self._null_count = self.compute_null_count()
with with_spill_lock():
self._null_count = self.compute_null_count()
return self._null_count

@property
Expand Down Expand Up @@ -381,7 +393,14 @@ cdef class Column:
cdef vector[column_view] children
cdef void* data

data = <void*><uintptr_t>(col.base_data_ptr)
if col.base_data is None:
data = NULL
elif isinstance(col.base_data, SpillableBuffer):
data = <void*><uintptr_t>(col.base_data).get_ptr(
spill_lock=get_spill_lock()
)
else:
data = <void*><uintptr_t>(col.base_data.ptr)

cdef Column child_column
if col.base_children:
Expand All @@ -406,7 +425,16 @@ cdef class Column:
children)

@staticmethod
cdef Column from_unique_ptr(unique_ptr[column] c_col):
cdef Column from_unique_ptr(
unique_ptr[column] c_col, bint data_ptr_exposed=False
):
"""Create a Column from a column
Typically, this is called on the result of a libcudf operation.
If the data of the libcudf result has been exposed, set
`data_ptr_exposed=True` to expose the memory of the returned Column
as well.
"""
cdef column_view view = c_col.get()[0].view()
cdef libcudf_types.type_id tid = view.type().id()
cdef libcudf_types.data_type c_dtype
Expand All @@ -431,28 +459,38 @@ cdef class Column:
# After call to release(), c_col is unusable
cdef column_contents contents = move(c_col.get()[0].release())

data = DeviceBuffer.c_from_unique_ptr(move(contents.data))
data = as_buffer(data)
data = as_buffer(
DeviceBuffer.c_from_unique_ptr(move(contents.data)),
exposed=data_ptr_exposed
)

if null_count > 0:
mask = DeviceBuffer.c_from_unique_ptr(move(contents.null_mask))
mask = as_buffer(mask)
mask = as_buffer(
DeviceBuffer.c_from_unique_ptr(move(contents.null_mask)),
exposed=data_ptr_exposed
)
else:
mask = None

cdef vector[unique_ptr[column]] c_children = move(contents.children)
children = ()
children = []
if c_children.size() != 0:
children = tuple(Column.from_unique_ptr(move(c_children[i]))
for i in range(c_children.size()))
# Because of a bug in Cython, we cannot set the optional
# `data_ptr_exposed` argument within a comprehension.
for i in range(c_children.size()):
child = Column.from_unique_ptr(
move(c_children[i]),
data_ptr_exposed=data_ptr_exposed
)
children.append(child)

return cudf.core.column.build_column(
data,
dtype=dtype,
mask=mask,
size=size,
null_count=null_count,
children=children
children=tuple(children)
)

@staticmethod
Expand All @@ -474,6 +512,7 @@ cdef class Column:
size = cv.size()
offset = cv.offset()
dtype = dtype_from_column_view(cv)
dtype_itemsize = dtype.itemsize if hasattr(dtype, "itemsize") else 1

data_ptr = <uintptr_t>(cv.head[void]())
data = None
Expand All @@ -484,19 +523,45 @@ cdef class Column:
data_owner = owner.base_data
mask_owner = mask_owner.base_mask
base_size = owner.base_size

base_nbytes = base_size * dtype_itemsize
if data_ptr:
if data_owner is None:
data = as_buffer(
rmm.DeviceBuffer(ptr=data_ptr,
size=(size+offset) * dtype.itemsize)
size=(size+offset) * dtype_itemsize)
)
elif (
# This is an optimization to avoid creating a new
# SpillableBuffer that represent the same memory
# as the owner.
column_owner and
isinstance(data_owner, SpillableBuffer) and
# We have to make sure that `data_owner` is already spill
# locked and that its pointer is the same as `data_ptr`
# _without_ exposing the buffer permanently.
not data_owner.spillable and
data_owner.get_ptr(spill_lock=SpillLock()) == data_ptr and
data_owner.size == base_nbytes
):
data = data_owner
else:
# At this point we don't know the relationship between data_ptr
# and data_owner thus we mark both of them exposed.
# TODO: try to discover their relationship and create a
# SpillableBufferSlice instead.
data = as_buffer(
data=data_ptr,
size=(base_size) * dtype.itemsize,
owner=data_owner
data_ptr,
size=base_nbytes,
owner=data_owner,
exposed=True,
)
if isinstance(data_owner, SpillableBuffer):
if data_owner.is_spilled:
raise ValueError(
f"{data_owner} is spilled, which invalidates "
f"the exposed data_ptr ({hex(data_ptr)})"
)
data_owner.ptr # accessing the pointer marks it exposed.
else:
data = as_buffer(
rmm.DeviceBuffer(ptr=data_ptr, size=0)
Expand Down Expand Up @@ -538,7 +603,8 @@ cdef class Column:
mask = as_buffer(
data=mask_ptr,
size=bitmask_allocation_size_bytes(base_size),
owner=mask_owner
owner=mask_owner,
exposed=True
)

if cv.has_nulls():
Expand Down
Loading

0 comments on commit f616499

Please sign in to comment.