Skip to content

Commit

Permalink
Intorduce SpillLock
Browse files Browse the repository at this point in the history
  • Loading branch information
madsbk committed Aug 22, 2022
1 parent a1a7017 commit 12753f9
Show file tree
Hide file tree
Showing 10 changed files with 74 additions and 45 deletions.
18 changes: 11 additions & 7 deletions python/cudf/cudf/_lib/binaryop.pyx
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ from libcpp.utility cimport move

from cudf._lib.binaryop cimport underlying_type_t_binary_operator
from cudf._lib.column cimport Column
from cudf._lib.spillable_buffer cimport SpillLock

from cudf._lib.replace import replace_nulls
from cudf._lib.scalar import as_device_scalar
Expand Down Expand Up @@ -102,9 +103,9 @@ class BinaryOperation(IntEnum):

cdef binaryop_v_v(Column lhs, Column rhs,
binary_operator c_op, data_type c_dtype):
cdef column_view c_lhs = lhs.view()
cdef column_view c_rhs = rhs.view()

cdef SpillLock slock = SpillLock()
cdef column_view c_lhs = lhs.view(slock)
cdef column_view c_rhs = rhs.view(slock)
cdef unique_ptr[column] c_result

with nogil:
Expand All @@ -122,7 +123,8 @@ cdef binaryop_v_v(Column lhs, Column rhs,

cdef binaryop_v_s(Column lhs, DeviceScalar rhs,
binary_operator c_op, data_type c_dtype):
cdef column_view c_lhs = lhs.view()
cdef SpillLock slock = SpillLock()
cdef column_view c_lhs = lhs.view(slock)
cdef const scalar* c_rhs = rhs.get_raw_ptr()

cdef unique_ptr[column] c_result
Expand All @@ -142,7 +144,8 @@ cdef binaryop_v_s(Column lhs, DeviceScalar rhs,
cdef binaryop_s_v(DeviceScalar lhs, Column rhs,
binary_operator c_op, data_type c_dtype):
cdef const scalar* c_lhs = lhs.get_raw_ptr()
cdef column_view c_rhs = rhs.view()
cdef SpillLock slock = SpillLock()
cdef column_view c_rhs = rhs.view(slock)

cdef unique_ptr[column] c_result

Expand Down Expand Up @@ -213,8 +216,9 @@ def binaryop_udf(Column lhs, Column rhs, udf_ptx, dtype):
has to be specified in `dtype`, a numpy data type.
Currently ONLY int32, int64, float32 and float64 are supported.
"""
cdef column_view c_lhs = lhs.view()
cdef column_view c_rhs = rhs.view()
cdef SpillLock slock = SpillLock()
cdef column_view c_lhs = lhs.view(slock)
cdef column_view c_rhs = rhs.view(slock)

cdef type_id tid = (
<type_id> (
Expand Down
7 changes: 5 additions & 2 deletions python/cudf/cudf/_lib/column.pxd
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ from rmm._lib.device_buffer cimport device_buffer
from cudf._lib.cpp.column.column cimport column
from cudf._lib.cpp.column.column_view cimport column_view, mutable_column_view
from cudf._lib.cpp.types cimport size_type
from cudf._lib.spillable_buffer cimport SpillLock


cdef class Column:
Expand All @@ -23,8 +24,10 @@ cdef class Column:
cdef object _mask
cdef object _null_count

cdef column_view _view(self, size_type null_count) except *
cdef column_view view(self) except *
cdef column_view _view(
self, size_type null_count, SpillLock spill_lock
) except *
cdef column_view view(self, SpillLock spill_lock=*) except *
cdef mutable_column_view mutable_view(self) except *

@staticmethod
Expand Down
23 changes: 13 additions & 10 deletions python/cudf/cudf/_lib/column.pyx
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ from rmm._lib.device_buffer cimport DeviceBuffer
from cudf._lib.cpp.strings.convert.convert_integers cimport (
from_integers as cpp_from_integers,
)
from cudf._lib.spillable_buffer cimport OwnersVecT, SpillableBuffer
from cudf._lib.spillable_buffer cimport SpillableBuffer

from cudf._lib.types import (
LIBCUDF_TO_SUPPORTED_NUMPY_TYPES,
Expand Down Expand Up @@ -342,7 +342,8 @@ cdef class Column:
return other_col

cdef libcudf_types.size_type compute_null_count(self) except? 0:
return self._view(libcudf_types.UNKNOWN_NULL_COUNT).null_count()
cdef SpillLock slock = SpillLock()
return self._view(libcudf_types.UNKNOWN_NULL_COUNT, slock).null_count()

cdef mutable_column_view mutable_view(self) except *:
if is_categorical_dtype(self.dtype):
Expand Down Expand Up @@ -389,14 +390,18 @@ cdef class Column:
offset,
children)

cdef column_view view(self) except *:
cdef column_view view(self, SpillLock spill_lock=None) except *:
null_count = self.null_count
if null_count is None:
null_count = libcudf_types.UNKNOWN_NULL_COUNT
cdef libcudf_types.size_type c_null_count = null_count
return self._view(c_null_count)
return self._view(c_null_count, spill_lock)

cdef column_view _view(self, libcudf_types.size_type null_count) except *:
cdef column_view _view(
self,
libcudf_types.size_type null_count,
SpillLock spill_lock
) except *:
if is_categorical_dtype(self.dtype):
col = self.base_children[0]
data_dtype = col.dtype
Expand All @@ -407,12 +412,11 @@ cdef class Column:
cdef libcudf_types.data_type dtype = dtype_to_data_type(data_dtype)
cdef libcudf_types.size_type offset = self.offset
cdef vector[column_view] children
cdef shared_ptr[OwnersVecT] owners = make_shared[OwnersVecT]()
cdef void* data
if col.base_data is None:
data = NULL
elif isinstance(col.base_data, SpillableBuffer):
data = (<SpillableBuffer>col.base_data).ptr_raw(owners)
data = (<SpillableBuffer>col.base_data).ptr_raw(spill_lock)
else:
data = <void*><uintptr_t>(col.base_data.ptr)

Expand All @@ -436,8 +440,7 @@ cdef class Column:
mask,
c_null_count,
offset,
children,
static_pointer_cast[void, OwnersVecT](owners)
children
)

@staticmethod
Expand Down Expand Up @@ -552,7 +555,7 @@ cdef class Column:
)
if isinstance(data_owner, SpillableBuffer):
# To prevent data_owner getting spilled, we attach an
# ExposeToken to data. This will make sure that data_owner
# SpillLock to data. This will make sure that data_owner
# is unspillable as long as data is alive.
_, token = data_owner.ptr_restricted()
data.token = token
Expand Down
12 changes: 7 additions & 5 deletions python/cudf/cudf/_lib/copying.pyx
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@ from cudf._lib.column cimport Column
from cudf._lib.scalar import as_device_scalar

from cudf._lib.scalar cimport DeviceScalar
from cudf._lib.utils cimport table_view_from_columns, table_view_from_table

from cudf._lib.reduce import minmax
from cudf.core.abc import Serializable
Expand All @@ -38,12 +37,14 @@ from cudf._lib.cpp.scalar.scalar cimport scalar
from cudf._lib.cpp.table.table cimport table
from cudf._lib.cpp.table.table_view cimport table_view
from cudf._lib.cpp.types cimport size_type
from cudf._lib.spillable_buffer cimport SpillLock
from cudf._lib.utils cimport (
columns_from_table_view,
columns_from_unique_ptr,
data_from_table_view,
data_from_unique_ptr,
table_view_from_columns,
table_view_from_table,
)

# workaround for https://github.com/cython/cython/issues/3885
Expand Down Expand Up @@ -350,15 +351,16 @@ def columns_slice(list input_columns, list indices):
`len(indices) / 2`. The `i`th item in return is a list of columns sliced
from ``input_columns`` with `slice(indices[i*2], indices[i*2 + 1])`.
"""
cdef table_view input_table_view = table_view_from_columns(input_columns)
cdef SpillLock slock = SpillLock()
cdef table_view input_table_view = table_view_from_columns(
input_columns, slock
)
cdef vector[size_type] c_indices = indices
cdef vector[table_view] c_result

with nogil:
c_result = move(
cpp_copying.slice(
input_table_view,
c_indices)
cpp_copying.slice(input_table_view, c_indices)
)

return [
Expand Down
10 changes: 8 additions & 2 deletions python/cudf/cudf/_lib/groupby.pyx
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import cudf

from cudf._lib.column cimport Column
from cudf._lib.scalar cimport DeviceScalar
from cudf._lib.spillable_buffer cimport SpillLock
from cudf._lib.utils cimport (
columns_from_unique_ptr,
data_from_unique_ptr,
Expand Down Expand Up @@ -106,7 +107,10 @@ cdef class GroupBy:
else:
c_null_handling = libcudf_types.null_policy.INCLUDE

cdef table_view keys_view = table_view_from_columns(keys)
self._spill_lock = SpillLock()
cdef table_view keys_view = table_view_from_columns(
keys, self._spill_lock
)

with nogil:
self.c_obj.reset(
Expand All @@ -121,7 +125,9 @@ cdef class GroupBy:
self.dropna = dropna

def groups(self, list values):
cdef table_view values_view = table_view_from_columns(values)
cdef table_view values_view = table_view_from_columns(
values, self._spill_lock
)

with nogil:
c_groups = move(self.c_obj.get()[0].get_groups(values_view))
Expand Down
9 changes: 7 additions & 2 deletions python/cudf/cudf/_lib/spillable_buffer.pxd
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,12 @@ from libc.stdint cimport uintptr_t
from libcpp.memory cimport shared_ptr
from libcpp.vector cimport vector

ctypedef vector[shared_ptr[void]] OwnersVecT # Type aliasing

cdef class SpillLock:
cdef vector[shared_ptr[void]] _expose_counters

cdef add(self, shared_ptr[void] expose_counter)


cdef class SpillableBuffer:
cdef object __weakref__
Expand All @@ -20,4 +25,4 @@ cdef class SpillableBuffer:
cdef object _owner
cdef object _manager

cdef void* ptr_raw(self, shared_ptr[OwnersVecT] owners) except *
cdef void* ptr_raw(self, SpillLock spill_lock) except *
4 changes: 2 additions & 2 deletions python/cudf/cudf/_lib/spillable_buffer.pyi
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ from typing import Any, Union
from cudf.core.buffer import DeviceBufferLike
from cudf.core.spill_manager import SpillManager

class ExposeToken: ...
class SpillLock: ...

class SpillableBuffer(DeviceBufferLike):
def __init__(
Expand All @@ -29,4 +29,4 @@ class SpillableBuffer(DeviceBufferLike):
def expose_counter(self) -> int: ...
def move_inplace(self, target: str) -> None: ...
def is_overlapping(self, ptr: int, size: int): ...
def ptr_restricted(self) -> Union[int, ExposeToken]: ...
def ptr_restricted(self) -> Union[int, SpillLock]: ...
24 changes: 13 additions & 11 deletions python/cudf/cudf/_lib/spillable_buffer.pyx
Original file line number Diff line number Diff line change
Expand Up @@ -30,12 +30,11 @@ cdef shared_ptr[void] create_expose_counter():
)


cdef class ExposeToken:
"""Create an expose token
cdef class SpillLock:
"""Disable spilling temporarily for specify buffers"""

This is just a reference to an expose counter wrapped in Python.
"""
cdef shared_ptr[void] _reference
cdef add(self, shared_ptr[void] expose_counter):
self._expose_counters.push_back(expose_counter)


# TODO: this is not support by PyTorch
Expand Down Expand Up @@ -235,7 +234,7 @@ cdef class SpillableBuffer:
self._last_accessed = time.monotonic()
return self._ptr

cdef void* ptr_raw(self, shared_ptr[OwnersVecT] owners) except *:
cdef void* ptr_raw(self, SpillLock spill_lock) except *:
# Get base buffer
cdef SpillableBuffer base
cdef size_t offset
Expand All @@ -246,13 +245,16 @@ cdef class SpillableBuffer:
base = self._view_desc["base"]
offset = self._view_desc["offset"]

if spill_lock is None:
return <void*><uintptr_t> base.ptr

with base._lock:
base.move_inplace(target="gpu")
base._last_accessed = time.monotonic()
dereference(owners).push_back(base._expose_counter)
spill_lock.add(base._expose_counter)
return <void*><uintptr_t> (base._ptr+offset)

def ptr_restricted(self) -> Union[int, ExposeToken]:
def ptr_restricted(self) -> Union[int, SpillLock]:
# Get base buffer
cdef SpillableBuffer base
cdef size_t offset
Expand All @@ -266,9 +268,9 @@ cdef class SpillableBuffer:
with base._lock:
base.move_inplace(target="gpu")
base._last_accessed = time.monotonic()
token = ExposeToken()
token._reference = base._expose_counter
return base._ptr+offset, token
spill_lock = SpillLock()
spill_lock.add(base._expose_counter)
return base._ptr+offset, spill_lock

@property
def owner(self) -> Any:
Expand Down
5 changes: 4 additions & 1 deletion python/cudf/cudf/_lib/utils.pxd
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ from libcpp.vector cimport vector

from cudf._lib.cpp.column.column cimport column_view
from cudf._lib.cpp.table.table cimport table, table_view
from cudf._lib.spillable_buffer cimport SpillLock


cdef vector[column_view] make_column_views(object columns) except*
Expand All @@ -14,7 +15,9 @@ cdef data_from_unique_ptr(
unique_ptr[table] c_tbl, column_names, index_names=*)
cdef data_from_table_view(
table_view tv, object owner, object column_names, object index_names=*)
cdef table_view table_view_from_columns(columns) except *
cdef table_view table_view_from_columns(
columns, SpillLock spill_lock=*
) except *
cdef table_view table_view_from_table(tbl, ignore_index=*) except*
cdef columns_from_unique_ptr(unique_ptr[table] c_tbl)
cdef columns_from_table_view(table_view tv, object owners)
7 changes: 4 additions & 3 deletions python/cudf/cudf/_lib/utils.pyx
Original file line number Diff line number Diff line change
Expand Up @@ -36,13 +36,14 @@ PARQUET_META_TYPE_MAP = {
for cudf_dtype, pandas_dtype in np_dtypes_to_pandas_dtypes.items()
}

cdef table_view table_view_from_columns(columns) except*:
cdef table_view table_view_from_columns(
columns, SpillLock spill_lock=None
) except *:
"""Create a cudf::table_view from an iterable of Columns."""
cdef vector[column_view] column_views

cdef Column col
for col in columns:
column_views.push_back(col.view())
column_views.push_back(col.view(spill_lock=spill_lock))

return table_view(column_views)

Expand Down

0 comments on commit 12753f9

Please sign in to comment.