diff --git a/python/cudf/cudf/_lib/column.pyi b/python/cudf/cudf/_lib/column.pyi index ce78f69be0b..1ab50af6bf3 100644 --- a/python/cudf/cudf/_lib/column.pyi +++ b/python/cudf/cudf/_lib/column.pyi @@ -68,9 +68,9 @@ class Column: @property def children(self) -> Tuple[ColumnBase, ...]: ... def set_base_children(self, value: Tuple[ColumnBase, ...]) -> None: ... - def _detach_refs(self, zero_copied=False) -> None: ... - def _has_a_weakref(self) -> bool: ... - def _is_cai_zero_copied(self) -> bool: ... + def _unlink_shared_buffers(self, zero_copied=False) -> None: ... + def _is_shared_buffers(self) -> bool: ... + def _buffers_zero_copied(self) -> bool: ... def _mimic_inplace( self, other_col: ColumnBase, inplace=False ) -> Optional[ColumnBase]: ... diff --git a/python/cudf/cudf/_lib/column.pyx b/python/cudf/cudf/_lib/column.pyx index 4420c2bef7a..082ff94f9c2 100644 --- a/python/cudf/cudf/_lib/column.pyx +++ b/python/cudf/cudf/_lib/column.pyx @@ -12,7 +12,7 @@ import cudf._lib as libcudf from cudf.api.types import is_categorical_dtype from cudf.core.buffer import ( Buffer, - RefCountableBuffer, + CopyOnWriteBuffer, SpillableBuffer, SpillLock, acquire_spill_lock, @@ -74,7 +74,6 @@ cdef class Column: self.set_base_children(children) self.set_base_data(data) self.set_base_mask(mask) - self._zero_copied = False @property def base_size(self): @@ -217,7 +216,7 @@ cdef class Column: # be expensive. hasattr(value, "__cuda_array_interface__") ): - if isinstance(value, RefCountableBuffer): + if isinstance(value, CopyOnWriteBuffer): value = SimpleNamespace( __cuda_array_interface__=( value._cuda_array_interface_readonly @@ -314,49 +313,39 @@ cdef class Column: self._children = None self._base_children = value - def _has_a_weakref(self) -> bool: + def _is_shared_buffers(self) -> bool: """ - Determines if the column has a weak reference. + Determines if any of the buffers underneath the column + have been shared else-where. """ - - return ( - ( - isinstance(self.base_data, RefCountableBuffer) and - self.base_data._has_a_weakref() - ) - or - ( - isinstance(self.base_mask, RefCountableBuffer) and - self.base_mask._has_a_weakref() - ) + is_data_shared = ( + isinstance(self.base_data, CopyOnWriteBuffer) and + self.base_data._is_shared() ) + is_mask_shared = ( + isinstance(self.base_mask, CopyOnWriteBuffer) and + self.base_mask._is_shared() + ) + return is_mask_shared or is_data_shared - def _is_cai_zero_copied(self) -> bool: - """ - Determines if the column is zero copied. - """ - return ( - self._zero_copied or - ( - self.base_data is not None and - isinstance(self.base_data, RefCountableBuffer) and - self.base_data._is_cai_zero_copied() - ) or - ( - self.base_mask is not None and - isinstance(self.base_mask, RefCountableBuffer) and - self.base_mask._is_cai_zero_copied() - ) + def _buffers_zero_copied(self): + data_zero_copied = ( + isinstance(self.base_data, CopyOnWriteBuffer) and + self.base_data._zero_copied ) + mask_zero_copied = ( + isinstance(self.base_mask, CopyOnWriteBuffer) and + self.base_mask._zero_copied + ) + return data_zero_copied or mask_zero_copied - def _detach_refs(self, zero_copied=False): + def _unlink_shared_buffers(self, zero_copied=False): """ Detaches a column from its current Buffers by making a true deep-copy. """ - if not self._is_cai_zero_copied() and self._has_a_weakref(): + if not self._buffers_zero_copied() and self._is_shared_buffers(): new_col = self.force_deep_copy() - self._offset = new_col.offset self._size = new_col.size self._dtype = new_col._dtype diff --git a/python/cudf/cudf/core/buffer/__init__.py b/python/cudf/cudf/core/buffer/__init__.py index bfe1b7a4468..f92d414d797 100644 --- a/python/cudf/cudf/core/buffer/__init__.py +++ b/python/cudf/cudf/core/buffer/__init__.py @@ -1,10 +1,10 @@ # Copyright (c) 2022, NVIDIA CORPORATION. from cudf.core.buffer.buffer import Buffer, cuda_array_interface_wrapper +from cudf.core.buffer.cow_buffer import CopyOnWriteBuffer from cudf.core.buffer.spillable_buffer import SpillableBuffer, SpillLock from cudf.core.buffer.utils import ( acquire_spill_lock, as_buffer, get_spill_lock, ) -from cudf.core.buffer.weakrefable_buffer import RefCountableBuffer diff --git a/python/cudf/cudf/core/buffer/cow_buffer.py b/python/cudf/cudf/core/buffer/cow_buffer.py new file mode 100644 index 00000000000..7377c6f199e --- /dev/null +++ b/python/cudf/cudf/core/buffer/cow_buffer.py @@ -0,0 +1,151 @@ +# Copyright (c) 2022, NVIDIA CORPORATION. + +from __future__ import annotations + +import copy +from collections import defaultdict +from typing import Any, DefaultDict, Tuple, Type, TypeVar +from weakref import WeakSet + +import rmm + +from cudf.core.buffer.buffer import Buffer, cuda_array_interface_wrapper + +T = TypeVar("T", bound="CopyOnWriteBuffer") + + +class CopyOnWriteBuffer(Buffer): + """A Buffer represents device memory. + + Use the factory function `as_buffer` to create a Buffer instance. + """ + + # This dict keeps track of all instances that have the same `ptr` + # and `size` attributes. Each key of the dict is a `(ptr, size)` + # tuple and the corresponding value is a set of weak references to + # instances with that `ptr` and `size`. + _instances: DefaultDict[Tuple, WeakSet] = defaultdict(WeakSet) + + # TODO: This is synonymous to SpillableBuffer._exposed attribute + # and has to be merged. + _zero_copied: bool + + def _finalize_init(self): + # the last step in initializing a `CopyOnWriteBuffer` + # is to track it in `_instances`: + key = (self.ptr, self.size) + self.__class__._instances[key].add(self) + self._zero_copied = False + + @classmethod + def _from_device_memory(cls: Type[T], data: Any) -> T: + """Create a Buffer from an object exposing `__cuda_array_interface__`. + + No data is being copied. + + Parameters + ---------- + data : device-buffer-like + An object implementing the CUDA Array Interface. + + Returns + ------- + Buffer + Buffer representing the same device memory as `data` + """ + + # Bypass `__init__` and initialize attributes manually + ret = super()._from_device_memory(data) + ret._finalize_init() + return ret + + @classmethod + def _from_host_memory(cls: Type[T], data: Any) -> T: + ret = super()._from_host_memory(data) + ret._finalize_init() + return ret + + def _is_shared(self): + """ + Return `True` if `self`'s memory is shared with other columns. + """ + return len(self.__class__._instances[(self.ptr, self.size)]) > 1 + + def copy(self, deep: bool = True): + """ + Return a copy of Buffer. + + Parameters + ---------- + deep : bool, default True + If True, returns a deep-copy of the underlying Buffer data. + If False, returns a shallow-copy of the Buffer pointing to + the same underlying data. + + Returns + ------- + Buffer + """ + if not deep: + copied_buf = CopyOnWriteBuffer.__new__(CopyOnWriteBuffer) + copied_buf._ptr = self._ptr + copied_buf._size = self._size + copied_buf._owner = self._owner + copied_buf._finalize_init() + return copied_buf + else: + owner_copy: rmm.DeviceBuffer = copy.copy(self._owner) + return self._from_device_memory( + cuda_array_interface_wrapper( + ptr=owner_copy.ptr, + size=owner_copy.size, + owner=owner_copy, + ) + ) + + @property + def _cuda_array_interface_readonly(self) -> dict: + """ + Internal Implementation for the CUDA Array Interface without + triggering a deepcopy. + """ + return { + "data": (self.ptr, True), + "shape": (self.size,), + "strides": None, + "typestr": "|u1", + "version": 0, + } + + @property + def __cuda_array_interface__(self) -> dict: + # Detach if there are any weak-references. + + # Mark the Buffer as ``zero_copied=True``, + # which will prevent any copy-on-write + # mechanism post this operation. + # This is done because we don't have any + # control over knowing if a third-party library + # has modified the data this Buffer is + # pointing to. + self._unlink_shared_buffers(zero_copied=True) + + result = self._cuda_array_interface_readonly + result["data"] = (self.ptr, False) + return result + + def _unlink_shared_buffers(self, zero_copied=False): + """ + Unlinks a Buffer if it is shared with other buffers(i.e., weak references exist) by making + a true deep-copy. + """ + if not self._zero_copied and self._shallow_copied(): + # make a deep copy of existing DeviceBuffer + # and replace pointer to it. + current_buf = rmm.DeviceBuffer(ptr=self.ptr, size=self.size) + new_buf = current_buf.copy() + self._ptr = new_buf.ptr + self._size = new_buf.size + self._owner = new_buf + self._finalize_init() + self._zero_copied = zero_copied diff --git a/python/cudf/cudf/core/buffer/utils.py b/python/cudf/cudf/core/buffer/utils.py index 8dc1d6db194..c5ff4cbcd88 100644 --- a/python/cudf/cudf/core/buffer/utils.py +++ b/python/cudf/cudf/core/buffer/utils.py @@ -7,9 +7,9 @@ from typing import Any, Dict, Optional, Tuple, Union from cudf.core.buffer.buffer import Buffer, cuda_array_interface_wrapper +from cudf.core.buffer.cow_buffer import CopyOnWriteBuffer from cudf.core.buffer.spill_manager import get_global_manager from cudf.core.buffer.spillable_buffer import SpillableBuffer, SpillLock -from cudf.core.buffer.weakrefable_buffer import RefCountableBuffer from cudf.options import get_option @@ -74,11 +74,11 @@ def as_buffer( ) if get_option("copy_on_write"): - if isinstance(data, (Buffer, RefCountableBuffer)) or hasattr( + if isinstance(data, (Buffer, CopyOnWriteBuffer)) or hasattr( data, "__cuda_array_interface__" ): - return RefCountableBuffer._from_device_memory(data) - return RefCountableBuffer._from_host_memory(data) + return CopyOnWriteBuffer._from_device_memory(data) + return CopyOnWriteBuffer._from_host_memory(data) if get_global_manager() is not None: if hasattr(data, "__cuda_array_interface__"): return SpillableBuffer._from_device_memory(data, exposed=exposed) diff --git a/python/cudf/cudf/core/buffer/weakrefable_buffer.py b/python/cudf/cudf/core/buffer/weakrefable_buffer.py deleted file mode 100644 index 4eefd3f991a..00000000000 --- a/python/cudf/cudf/core/buffer/weakrefable_buffer.py +++ /dev/null @@ -1,324 +0,0 @@ -# Copyright (c) 2022, NVIDIA CORPORATION. - -from __future__ import annotations - -import copy -import weakref -from typing import Any, Dict, Tuple, Type, TypeVar - -import rmm - -import cudf -from cudf.core.buffer.buffer import Buffer, cuda_array_interface_wrapper - -T = TypeVar("T", bound="RefCountableBuffer") - - -class CachedInstanceMeta(type): - """ - Metaclass for BufferWeakref, which will ensure creation - of singleton instance. - """ - - __instances: Dict[Tuple, BufferWeakref] = {} - - def __call__(cls, ptr, size): - cache_key = (ptr, size) - try: - # try retrieving an instance from the cache: - return cls.__instances[cache_key] - except KeyError: - # if an instance couldn't be found in the cache, - # construct it and add to cache: - obj = super().__call__(ptr, size) - try: - cls.__instances[cache_key] = obj - except TypeError: - # couldn't hash the arguments, don't cache: - return obj - return obj - except TypeError: - # couldn't hash the arguments, don't cache: - return super().__call__(ptr, size) - - def _clear_instance_cache(cls): - cls.__instances.clear() - - -class BufferWeakref(metaclass=CachedInstanceMeta): - """ - A proxy class to be used by ``Buffer`` for generating weakreferences. - """ - - __slots__ = ("ptr", "size", "__weakref__") - - def __init__(self, ptr, size) -> None: - self.ptr = ptr - self.size = size - - -def custom_weakref_callback(ref): - """ - A callback for ``weakref.ref`` API to generate unique - weakref instances that can be counted correctly. - - Example below shows why this is necessary: - - In [1]: import cudf - In [2]: import weakref - - Let's create an object ``x`` that we are going to weakref: - - In [3]: x = cudf.core.buffer.BufferWeakref(1, 2) - - Now generate three weak-references of it: - - In [4]: a = weakref.ref(x) - In [5]: b = weakref.ref(x) - In [6]: c = weakref.ref(x) - - ``weakref.ref`` actually returns the same singleton object: - - In [7]: a - Out[7]: - In [8]: b - Out[8]: - In [9]: c - Out[9]: - - In [10]: a is b - Out[10]: True - In [11]: b is c - Out[11]: True - - This will be problematic as we cannot determine what is the count - of weak-references: - - In [12]: weakref.getweakrefcount(x) - Out[12]: 1 - - Notice, though we want ``weakref.getweakrefcount`` to return ``3``, it - returns ``1``. So we need to work-around this by using an empty/no-op - callback: - - In [13]: def custom_weakref_callback(ref): - ...: pass - ...: - - - In [14]: d = weakref.ref(x, custom_weakref_callback) - In [15]: e = weakref.ref(x, custom_weakref_callback) - In [16]: f = weakref.ref(x, custom_weakref_callback) - - Now there is an each unique weak-reference created: - - In [17]: d - Out[17]: - In [18]: e - Out[18]: - In [19]: f - Out[19]: - - Now calling ``weakref.getweakrefcount`` will result in ``4``, which is correct: - - In [20]: weakref.getweakrefcount(x) - Out[20]: 4 - - In [21]: d is not e - Out[21]: True - - In [22]: d is not f - Out[22]: True - - In [23]: e is not f - Out[23]: True - """ # noqa: E501 - pass - - -class RefCountableBuffer(Buffer): - """A Buffer represents device memory. - - Use the factory function `as_buffer` to create a Buffer instance. - """ - - _weak_ref: object - _proxy_ref: None | BufferWeakref - # TODO: This is synonymous to SpillableBuffer._exposed attribute - # and has to be merged. - _zero_copied: bool - - @classmethod - def _from_device_memory(cls: Type[T], data: Any) -> T: - """Create a Buffer from an object exposing `__cuda_array_interface__`. - - No data is being copied. - - Parameters - ---------- - data : device-buffer-like - An object implementing the CUDA Array Interface. - - Returns - ------- - Buffer - Buffer representing the same device memory as `data` - """ - - # Bypass `__init__` and initialize attributes manually - ret = super()._from_device_memory(data) - ret._weak_ref = None - ret._proxy_ref = None - ret._zero_copied = False - ret._update_ref() - return ret - - def _is_cai_zero_copied(self): - """ - Returns a flag, that indicates if the Buffer has been zero-copied. - """ - return self._zero_copied - - def _update_ref(self): - """ - Generate the new proxy reference. - """ - # TODO: See if this can be merged into spill-lock - # once spilling and copy on write are merged. - self._proxy_ref = BufferWeakref(self._ptr, self._size) - - def get_ref(self): - """ - Returns the proxy reference. - """ - if self._proxy_ref is None: - self._update_ref() - return self._proxy_ref - - def _has_a_weakref(self): - """ - Checks if the Buffer has a weak-reference. - """ - weakref_count = weakref.getweakrefcount(self.get_ref()) - - if weakref_count == 1: - # When the weakref_count is 1, it could be a possibility - # that a copied Buffer got destroyed and hence this - # method should return False in that case as there is only - # one Buffer pointing to the device memory. - return ( - weakref.getweakrefs(self.get_ref())[0]() is not self.get_ref() - ) - else: - return weakref_count > 0 - - def _get_weakref(self): - """ - Returns a weak-reference for the Buffer. - """ - return weakref.ref(self.get_ref(), custom_weakref_callback) - - def copy(self, deep: bool = True): - """ - Return a copy of Buffer. - - Parameters - ---------- - deep : bool, default True - If True, returns a deep-copy of the underlying Buffer data. - If False, returns a shallow-copy of the Buffer pointing to - the same underlying data. - - Returns - ------- - Buffer - """ - if not deep: - if ( - cudf.get_option("copy_on_write") - and not self._is_cai_zero_copied() - ): - copied_buf = RefCountableBuffer.__new__(RefCountableBuffer) - copied_buf._ptr = self._ptr - copied_buf._size = self._size - copied_buf._owner = self._owner - copied_buf._proxy_ref = None - copied_buf._weak_ref = None - copied_buf._zero_copied = False - - if self._has_a_weakref(): - # If `self` has weak-references - # we will then have to keep that - # weak-reference alive, hence - # pass it onto `copied_buf` - copied_buf._weak_ref = self._weak_ref - else: - # If `self` has no weak-references, - # we will have to generate a new weak-reference - # and assign it to `copied_buf` - copied_buf._weak_ref = self._get_weakref() - - self._weak_ref = copied_buf._get_weakref() - - return copied_buf - else: - shallow_copy = RefCountableBuffer.__new__(RefCountableBuffer) - shallow_copy._ptr = self._ptr - shallow_copy._size = self._size - shallow_copy._owner = self._owner - return shallow_copy - else: - owner_copy: rmm.DeviceBuffer = copy.copy(self._owner) - return self._from_device_memory( - cuda_array_interface_wrapper( - ptr=owner_copy.ptr, - size=owner_copy.size, - owner=owner_copy, - ) - ) - - @property - def _cuda_array_interface_readonly(self) -> dict: - """ - Internal Implementation for the CUDA Array Interface without - triggering a deepcopy. - """ - return { - "data": (self.ptr, True), - "shape": (self.size,), - "strides": None, - "typestr": "|u1", - "version": 0, - } - - @property - def __cuda_array_interface__(self) -> dict: - # Detach if there are any weak-references. - - # Mark the Buffer as ``zero_copied=True``, - # which will prevent any copy-on-write - # mechanism post this operation. - # This is done because we don't have any - # control over knowing if a third-party library - # has modified the data this Buffer is - # pointing to. - self._detach_refs(zero_copied=True) - - result = self._cuda_array_interface_readonly - result["data"] = (self.ptr, False) - return result - - def _detach_refs(self, zero_copied=False): - """ - Detaches a Buffer from it's weak-references by making - a true deep-copy. - """ - if not self._zero_copied and self._has_a_weakref(): - # make a deep copy of existing DeviceBuffer - # and replace pointer to it. - current_buf = rmm.DeviceBuffer(ptr=self.ptr, size=self.size) - new_buf = current_buf.copy() - self._ptr = new_buf.ptr - self._size = new_buf.size - self._owner = new_buf - self._zero_copied = zero_copied diff --git a/python/cudf/cudf/core/column/column.py b/python/cudf/cudf/core/column/column.py index 8a07bfc5e49..ccf902ee286 100644 --- a/python/cudf/cudf/core/column/column.py +++ b/python/cudf/cudf/core/column/column.py @@ -64,7 +64,7 @@ ) from cudf.core._compat import PANDAS_GE_150 from cudf.core.abc import Serializable -from cudf.core.buffer import Buffer, RefCountableBuffer, as_buffer +from cudf.core.buffer import Buffer, CopyOnWriteBuffer, as_buffer from cudf.core.dtypes import ( CategoricalDtype, IntervalDtype, @@ -420,7 +420,7 @@ def copy(self: T, deep: bool = True) -> T: return self.force_deep_copy() else: if cudf.get_option("copy_on_write"): - if self._is_cai_zero_copied(): + if self._buffers_zero_copied(): return self.force_deep_copy() copied_col = cast( @@ -1910,7 +1910,7 @@ def as_column( elif ( fastpath and cudf.get_option("copy_on_write") - and isinstance(col.base_data, RefCountableBuffer) + and isinstance(col.base_data, CopyOnWriteBuffer) ): col.base_data._zero_copied = True