From d078cff8fc8b3e38c59ca74ab975f6a1ecc49cfb Mon Sep 17 00:00:00 2001 From: "Mads R. B. Kristensen" Date: Mon, 3 Jul 2023 21:15:36 +0200 Subject: [PATCH] Exposure Tracked Buffer (first step towards unifying copy-on-write and spilling) (#13307) The first step towards unifying copy-on-write and spillable buffers. This PR re-implement copy-on-write by introducing a `ExposureTrackedBuffer` and `BufferSlice`. The idea is that when `copy-on-write` (and in a follow-up PR later, when `spill`) is enabled, we use `BufferSlice` throughout cudf. `BufferSlice` is a _view_ of a `ExposureTrackedBuffer` that implements copy-on-write semantics by tracking the number of `BufferSlice` that points to the same `ExposureTrackedBuffer`. Authors: - Mads R. B. Kristensen (https://github.com/madsbk) Approvers: - Vyas Ramasubramani (https://github.com/vyasr) - Lawrence Mitchell (https://github.com/wence-) URL: https://github.com/rapidsai/cudf/pull/13307 --- .../source/developer_guide/library_design.md | 26 +- python/cudf/cudf/_lib/column.pyx | 29 +- python/cudf/cudf/core/buffer/__init__.py | 2 +- python/cudf/cudf/core/buffer/buffer.py | 47 +-- python/cudf/cudf/core/buffer/cow_buffer.py | 168 ---------- .../core/buffer/exposure_tracked_buffer.py | 311 ++++++++++++++++++ .../cudf/cudf/core/buffer/spillable_buffer.py | 12 +- python/cudf/cudf/core/buffer/utils.py | 17 +- python/cudf/cudf/core/column/column.py | 33 +- python/cudf/cudf/tests/test_copying.py | 5 +- 10 files changed, 378 insertions(+), 272 deletions(-) delete mode 100644 python/cudf/cudf/core/buffer/cow_buffer.py create mode 100644 python/cudf/cudf/core/buffer/exposure_tracked_buffer.py diff --git a/docs/cudf/source/developer_guide/library_design.md b/docs/cudf/source/developer_guide/library_design.md index 16b84476549..016c2c1d281 100644 --- a/docs/cudf/source/developer_guide/library_design.md +++ b/docs/cudf/source/developer_guide/library_design.md @@ -325,30 +325,26 @@ This section describes the internal implementation details of the copy-on-write It is recommended that developers familiarize themselves with [the user-facing documentation](copy-on-write-user-doc) of this functionality before reading through the internals below. -The core copy-on-write implementation relies on the `CopyOnWriteBuffer` class. -When the cudf option `"copy_on_write"` is `True`, `as_buffer` will always return a `CopyOnWriteBuffer`. -This subclass of `cudf.Buffer` contains all the mechanisms to enable copy-on-write behavior. -The class stores [weak references](https://docs.python.org/3/library/weakref.html) to every existing `CopyOnWriteBuffer` in `CopyOnWriteBuffer._instances`, a mapping from `ptr` keys to `WeakSet`s containing references to `CopyOnWriteBuffer` objects. -This means that all `CopyOnWriteBuffer`s that point to the same device memory are contained in the same `WeakSet` (corresponding to the same `ptr` key) in `CopyOnWriteBuffer._instances`. -This data structure is then used to determine whether or not to make a copy when a write operation is performed on a `Column` (see below). -If multiple buffers point to the same underlying memory, then a copy must be made whenever a modification is attempted. +The core copy-on-write implementation relies on the factory function `as_exposure_tracked_buffer` and the two classes `ExposureTrackedBuffer` and `BufferSlice`. + +An `ExposureTrackedBuffer` is a subclass of the regular `Buffer` that tracks internal and external references to its underlying memory. Internal references are tracked by maintaining [weak references](https://docs.python.org/3/library/weakref.html) to every `BufferSlice` of the underlying memory. External references are tracked through "exposure" status of the underlying memory. A buffer is considered exposed if the device pointer (integer or void*) has been handed out to a library outside of cudf. In this case, we have no way of knowing if the data are being modified by a third party. + +`BufferSlice` is a subclass of `ExposureTrackedBuffer` that represents a _slice_ of the memory underlying a exposure tracked buffer. + +When the cudf option `"copy_on_write"` is `True`, `as_buffer` calls `as_exposure_tracked_buffer`, which always returns a `BufferSlice`. It is then the slices that determine whether or not to make a copy when a write operation is performed on a `Column` (see below). If multiple slices point to the same underlying memory, then a copy must be made whenever a modification is attempted. ### Eager copies when exposing to third-party libraries -If a `Column`/`CopyOnWriteBuffer` is exposed to a third-party library via `__cuda_array_interface__`, we are no longer able to track whether or not modification of the buffer has occurred. Hence whenever +If a `Column`/`BufferSlice` is exposed to a third-party library via `__cuda_array_interface__`, we are no longer able to track whether or not modification of the buffer has occurred. Hence whenever someone accesses data through the `__cuda_array_interface__`, we eagerly trigger the copy by calling -`_unlink_shared_buffers` which ensures a true copy of underlying device data is made and -unlinks the buffer from any shared "weak" references. Any future copy requests must also trigger a true physical copy (since we cannot track the lifetime of the third-party object). To handle this we also mark the `Column`/`CopyOnWriteBuffer` as -`obj._zero_copied=True` thus indicating that any future shallow-copy requests will trigger a true physical copy -rather than a copy-on-write shallow copy with weak references. +`.make_single_owner_inplace` which ensures a true copy of underlying data is made and that the slice is the sole owner. Any future copy requests must also trigger a true physical copy (since we cannot track the lifetime of the third-party object). To handle this we also mark the `Column`/`BufferSlice` as exposed thus indicating that any future shallow-copy requests will trigger a true physical copy rather than a copy-on-write shallow copy. ### Obtaining a read-only object A read-only object can be quite useful for operations that will not -mutate the data. This can be achieved by calling `._get_cuda_array_interface(readonly=True)`, and creating a `SimpleNameSpace` object around it. -This will not trigger a deep copy even if the `CopyOnWriteBuffer` -has weak references. This API should only be used when the lifetime of the proxy object is restricted to cudf's internal code execution. Handing this out to external libraries or user-facing APIs will lead to untracked references and undefined copy-on-write behavior. We currently use this API for device to host +mutate the data. This can be achieved by calling `.get_ptr(mode="read")`, and using `cuda_array_interface_wrapper` to wrap a `__cuda_array_interface__` object around it. +This will not trigger a deep copy even if multiple `BufferSlice` points to the same `ExposureTrackedBuffer`. This API should only be used when the lifetime of the proxy object is restricted to cudf's internal code execution. Handing this out to external libraries or user-facing APIs will lead to untracked references and undefined copy-on-write behavior. We currently use this API for device to host copies like in `ColumnBase.data_array_view(mode="read")` which is used for `Column.values_host`. diff --git a/python/cudf/cudf/_lib/column.pyx b/python/cudf/cudf/_lib/column.pyx index b4e39da93f3..f28c16b18dc 100644 --- a/python/cudf/cudf/_lib/column.pyx +++ b/python/cudf/cudf/_lib/column.pyx @@ -11,7 +11,7 @@ import cudf._lib as libcudf from cudf.api.types import is_categorical_dtype, is_datetime64tz_dtype from cudf.core.buffer import ( Buffer, - CopyOnWriteBuffer, + ExposureTrackedBuffer, SpillableBuffer, acquire_spill_lock, as_buffer, @@ -339,8 +339,8 @@ cdef class Column: if col.base_data is None: data = NULL else: - data = (col.base_data.get_ptr( - mode="write") + data = ( + col.base_data.get_ptr(mode="write") ) cdef Column child_column @@ -534,13 +534,16 @@ cdef class Column: rmm.DeviceBuffer(ptr=data_ptr, size=(size+offset) * dtype_itemsize) ) - elif column_owner and isinstance(data_owner, CopyOnWriteBuffer): - # TODO: In future, see if we can just pass on the - # CopyOnWriteBuffer reference to another column - # and still create a weak reference. - # With the current design that's not possible. - # https://github.com/rapidsai/cudf/issues/12734 - data = data_owner.copy(deep=False) + elif ( + column_owner and + isinstance(data_owner, ExposureTrackedBuffer) + ): + data = as_buffer( + data=data_ptr, + size=base_nbytes, + owner=data_owner, + exposed=False, + ) elif ( # This is an optimization of the most common case where # from_column_view creates a "view" that is identical to @@ -564,9 +567,9 @@ cdef class Column: owner=data_owner, exposed=True, ) - if isinstance(data_owner, CopyOnWriteBuffer): - data_owner.get_ptr(mode="write") - # accessing the pointer marks it exposed. + if isinstance(data_owner, ExposureTrackedBuffer): + # accessing the pointer marks it exposed permanently. + data_owner.mark_exposed() elif isinstance(data_owner, SpillableBuffer): if data_owner.is_spilled: raise ValueError( diff --git a/python/cudf/cudf/core/buffer/__init__.py b/python/cudf/cudf/core/buffer/__init__.py index 0d433509497..d8883bd97e5 100644 --- a/python/cudf/cudf/core/buffer/__init__.py +++ b/python/cudf/cudf/core/buffer/__init__.py @@ -1,7 +1,7 @@ # Copyright (c) 2022-2023, NVIDIA CORPORATION. from cudf.core.buffer.buffer import Buffer, cuda_array_interface_wrapper -from cudf.core.buffer.cow_buffer import CopyOnWriteBuffer +from cudf.core.buffer.exposure_tracked_buffer import ExposureTrackedBuffer from cudf.core.buffer.spillable_buffer import SpillableBuffer, SpillLock from cudf.core.buffer.utils import ( acquire_spill_lock, diff --git a/python/cudf/cudf/core/buffer/buffer.py b/python/cudf/cudf/core/buffer/buffer.py index 97f3b16bec8..59d20a2784d 100644 --- a/python/cudf/cudf/core/buffer/buffer.py +++ b/python/cudf/cudf/core/buffer/buffer.py @@ -5,7 +5,7 @@ import math import pickle from types import SimpleNamespace -from typing import Any, Dict, Mapping, Optional, Sequence, Tuple +from typing import Any, Dict, Literal, Mapping, Optional, Sequence, Tuple import numpy from typing_extensions import Self @@ -168,7 +168,7 @@ def _from_host_memory(cls, data: Any) -> Self: # Create from device memory return cls._from_device_memory(buf) - def _getitem(self, offset: int, size: int) -> Buffer: + def _getitem(self, offset: int, size: int) -> Self: """ Sub-classes can overwrite this to implement __getitem__ without having to handle non-slice inputs. @@ -181,7 +181,7 @@ def _getitem(self, offset: int, size: int) -> Buffer: ) ) - def __getitem__(self, key: slice) -> Buffer: + def __getitem__(self, key: slice) -> Self: """Create a new slice of the buffer.""" if not isinstance(key, slice): raise TypeError( @@ -193,7 +193,7 @@ def __getitem__(self, key: slice) -> Buffer: raise ValueError("slice must be C-contiguous") return self._getitem(offset=start, size=stop - start) - def copy(self, deep: bool = True): + def copy(self, deep: bool = True) -> Self: """ Return a copy of Buffer. @@ -233,35 +233,15 @@ def owner(self) -> Any: @property def __cuda_array_interface__(self) -> Mapping: """Implementation of the CUDA Array Interface.""" - return self._get_cuda_array_interface(readonly=False) - - def _get_cuda_array_interface(self, readonly=False): - """Helper function to create a CUDA Array Interface. - - Parameters - ---------- - readonly : bool, default False - If True, returns a CUDA Array Interface with - readonly flag set to True. - If False, returns a CUDA Array Interface with - readonly flag set to False. - - Returns - ------- - dict - """ return { - "data": ( - self.get_ptr(mode="read" if readonly else "write"), - readonly, - ), + "data": (self.get_ptr(mode="write"), False), "shape": (self.size,), "strides": None, "typestr": "|u1", "version": 0, } - def get_ptr(self, *, mode) -> int: + def get_ptr(self, *, mode: Literal["read", "write"]) -> int: """Device pointer to the start of the buffer. Parameters @@ -274,19 +254,26 @@ def get_ptr(self, *, mode) -> int: Failure to fulfill this contract will cause incorrect behavior. + Returns + ------- + int + The device pointer as an integer See Also -------- SpillableBuffer.get_ptr - CopyOnWriteBuffer.get_ptr + ExposureTrackedBuffer.get_ptr """ return self._ptr - def memoryview(self) -> memoryview: + def memoryview( + self, *, offset: int = 0, size: Optional[int] = None + ) -> memoryview: """Read-only access to the buffer through host memory.""" - host_buf = host_memory_allocation(self.size) + size = self._size if size is None else size + host_buf = host_memory_allocation(size) rmm._lib.device_buffer.copy_ptr_to_host( - self.get_ptr(mode="read"), host_buf + self.get_ptr(mode="read") + offset, host_buf ) return memoryview(host_buf).toreadonly() diff --git a/python/cudf/cudf/core/buffer/cow_buffer.py b/python/cudf/cudf/core/buffer/cow_buffer.py deleted file mode 100644 index 6243916b91b..00000000000 --- a/python/cudf/cudf/core/buffer/cow_buffer.py +++ /dev/null @@ -1,168 +0,0 @@ -# Copyright (c) 2022-2023, NVIDIA CORPORATION. - -from __future__ import annotations - -import weakref -from collections import defaultdict -from typing import Any, DefaultDict, Tuple -from weakref import WeakSet - -from typing_extensions import Self - -import rmm - -from cudf.core.buffer.buffer import Buffer - - -def _keys_cleanup(ptr): - weak_set_values = CopyOnWriteBuffer._instances[ptr] - if ( - len(weak_set_values) == 1 - and next(iter(weak_set_values.data))() is None - ): - # When the last remaining reference is being cleaned up we will still - # have a dead reference in `weak_set_values`. If that is the case, then - # we can safely clean up the key - del CopyOnWriteBuffer._instances[ptr] - - -class CopyOnWriteBuffer(Buffer): - """A copy-on-write buffer that implements Buffer. - - This buffer enables making copies of data only when there - is a write operation being performed. - - See more here :ref:`copy-on-write-dev-doc`. - - Use the factory function `as_buffer` to create a CopyOnWriteBuffer - instance. - """ - - _instances: DefaultDict[Tuple, WeakSet] = defaultdict(WeakSet) - """This dict keeps track of all instances that have the same `ptr` - and `size` attributes. Each key of the dict is a `(ptr, size)` - tuple and the corresponding value is a set of weak references to - instances with that `ptr` and `size`.""" - - # TODO: This is synonymous to SpillableBuffer._exposed attribute - # and has to be merged. - _zero_copied: bool - - def _finalize_init(self): - self.__class__._instances[self._ptr].add(self) - self._instances = self.__class__._instances[self._ptr] - self._zero_copied = False - weakref.finalize(self, _keys_cleanup, self._ptr) - - @classmethod - def _from_device_memory(cls, data: Any, *, exposed: bool = False) -> Self: - """Create a Buffer from an object exposing `__cuda_array_interface__`. - - No data is being copied. - - Parameters - ---------- - data : device-buffer-like - An object implementing the CUDA Array Interface. - exposed : bool, optional - Mark the buffer as zero copied. - - Returns - ------- - Buffer - Buffer representing the same device memory as `data` - """ - - # Bypass `__init__` and initialize attributes manually - ret = super()._from_device_memory(data) - ret._finalize_init() - ret._zero_copied = exposed - return ret - - @classmethod - def _from_host_memory(cls, data: Any) -> Self: - ret = super()._from_host_memory(data) - ret._finalize_init() - return ret - - @property - def _is_shared(self): - """ - Return `True` if `self`'s memory is shared with other columns. - """ - return len(self._instances) > 1 - - def get_ptr(self, mode: str = "write") -> int: - """Device pointer to the start of the buffer. - - Parameters - ---------- - mode : str, default 'write' - Supported values are {"read", "write"} - If "write", when weak-references exist, they - are unlinked and the data pointed to may be modified - by the caller. If "read", the data pointed to - must not be modified by the caller. - Failure to fulfill this contract will cause - incorrect behavior. - - See Also - -------- - Buffer.get_ptr - SpillableBuffer.get_ptr - """ - if mode == "write": - self._unlink_shared_buffers() - elif mode != "read": - raise ValueError(f"Incorrect mode passed : {mode}") - return self._ptr - - def copy(self, deep: bool = True): - if deep or self._zero_copied: - return super().copy(deep=True) - else: - cls = type(self) - copied_buf = cls.__new__(cls) - copied_buf._ptr = self._ptr - copied_buf._size = self._size - copied_buf._owner = self._owner - copied_buf._finalize_init() - return copied_buf - - @property - def __cuda_array_interface__(self) -> dict: - # Unlink if there are any weak references. - # Mark the Buffer as ``zero_copied=True``, - # which will prevent any copy-on-write - # mechanism post this operation. - # This is done because we don't have any - # control over knowing if a third-party library - # has modified the data this Buffer is - # pointing to. - self._unlink_shared_buffers() - self._zero_copied = True - return self._get_cuda_array_interface(readonly=False) - - def _get_cuda_array_interface(self, readonly=False): - return { - "data": (self._ptr, readonly), - "shape": (self.size,), - "strides": None, - "typestr": "|u1", - "version": 0, - } - - def _unlink_shared_buffers(self): - """ - Unlinks a Buffer if it is shared with other buffers by - making a true deep-copy. - """ - if not self._zero_copied and self._is_shared: - # make a deep copy of existing DeviceBuffer - # and replace pointer to it. - current_buf = rmm.DeviceBuffer(ptr=self._ptr, size=self._size) - new_buf = current_buf.copy() - self._ptr = new_buf.ptr - self._size = new_buf.size - self._owner = new_buf - self._finalize_init() diff --git a/python/cudf/cudf/core/buffer/exposure_tracked_buffer.py b/python/cudf/cudf/core/buffer/exposure_tracked_buffer.py new file mode 100644 index 00000000000..f2ac6301944 --- /dev/null +++ b/python/cudf/cudf/core/buffer/exposure_tracked_buffer.py @@ -0,0 +1,311 @@ +# Copyright (c) 2020-2023, NVIDIA CORPORATION. + +from __future__ import annotations + +import weakref +from typing import ( + Any, + Container, + Literal, + Mapping, + Optional, + Type, + TypeVar, + cast, +) + +from typing_extensions import Self + +import cudf +from cudf.core.buffer.buffer import Buffer, get_ptr_and_size +from cudf.utils.string import format_bytes + +T = TypeVar("T", bound="ExposureTrackedBuffer") + + +def get_owner(data, klass: Type[T]) -> Optional[T]: + """Get the owner of `data`, if any exist + + Search through the stack of data owners in order to find an + owner of type `klass` (not subclasses). + + Parameters + ---------- + data + The data object + + Return + ------ + klass or None + The owner of `data` if `klass` or None. + """ + + if type(data) is klass: + return data + if hasattr(data, "owner"): + return get_owner(data.owner, klass) + return None + + +def as_exposure_tracked_buffer( + data, exposed: bool, subclass: Optional[Type[T]] = None +) -> BufferSlice: + """Factory function to wrap `data` in a slice of an exposure tracked buffer + + If `subclass` is None, a new ExposureTrackedBuffer that points to the + memory of `data` is created and a BufferSlice that points to all of the + new ExposureTrackedBuffer is returned. + + If `subclass` is not None, a new `subclass` is created instead. Still, + a BufferSlice that points to all of the new `subclass` is returned + + It is illegal for an exposure tracked buffer to own another exposure + tracked buffer. When representing the same memory, we should have a single + exposure tracked buffer and multiple buffer slices. + + Developer Notes + --------------- + This function always returns slices thus all buffers in cudf will use + `BufferSlice` when copy-on-write is enabled. The slices implement + copy-on-write by trigging deep copies when write access is detected + and multiple slices points to the same exposure tracked buffer. + + Parameters + ---------- + data : buffer-like or array-like + A buffer-like or array-like object that represents C-contiguous memory. + exposed + Mark the buffer as permanently exposed. + subclass + If not None, a subclass of ExposureTrackedBuffer to wrap `data`. + + Return + ------ + BufferSlice + A buffer slice that points to a ExposureTrackedBuffer (or `subclass`), + which in turn wraps `data`. + """ + + if not hasattr(data, "__cuda_array_interface__"): + if exposed: + raise ValueError("cannot created exposed host memory") + return cast( + BufferSlice, ExposureTrackedBuffer._from_host_memory(data)[:] + ) + + owner = get_owner(data, subclass or ExposureTrackedBuffer) + if owner is None: + return cast( + BufferSlice, + ExposureTrackedBuffer._from_device_memory(data, exposed=exposed)[ + : + ], + ) + + # At this point, we know that `data` is owned by a exposure tracked buffer + ptr, size = get_ptr_and_size(data.__cuda_array_interface__) + if size > 0 and owner._ptr == 0: + raise ValueError("Cannot create a non-empty slice of a null buffer") + return BufferSlice(base=owner, offset=ptr - owner._ptr, size=size) + + +class ExposureTrackedBuffer(Buffer): + """A Buffer that tracks its "expose" status. + + In order to implement copy-on-write and spillable buffers, we need the + ability to detect external access to the underlying memory. We say that + the buffer has been exposed if the device pointer (integer or void*) has + been accessed outside of ExposureTrackedBuffer. In this case, we have no + control over knowing if the data is being modified by a third-party. + + Attributes + ---------- + _exposed + The current exposure status of the buffer. Notice, once the exposure + status becomes True, it should never change back. + _slices + The set of BufferSlice instances that point to this buffer. + """ + + _exposed: bool + _slices: weakref.WeakSet[BufferSlice] + + @property + def exposed(self) -> bool: + return self._exposed + + def mark_exposed(self) -> None: + """Mark the buffer as "exposed" permanently""" + self._exposed = True + + @classmethod + def _from_device_memory(cls, data: Any, *, exposed: bool = False) -> Self: + """Create an exposure tracked buffer from device memory. + + No data is being copied. + + Parameters + ---------- + data : device-buffer-like + An object implementing the CUDA Array Interface. + exposed : bool, optional + Mark the buffer as permanently exposed. + + Returns + ------- + ExposureTrackedBuffer + Buffer representing the same device memory as `data` + """ + ret = super()._from_device_memory(data) + ret._exposed = exposed + ret._slices = weakref.WeakSet() + return ret + + def _getitem(self, offset: int, size: int) -> BufferSlice: + return BufferSlice(base=self, offset=offset, size=size) + + @property + def __cuda_array_interface__(self) -> Mapping: + self.mark_exposed() + return super().__cuda_array_interface__ + + def __repr__(self) -> str: + return ( + f"" + ) + + +class BufferSlice(ExposureTrackedBuffer): + """A slice (aka. a view) of a exposure tracked buffer. + + Parameters + ---------- + base + The exposure tracked buffer this slice refers to. + offset + The offset relative to the start memory of base (in bytes). + size + The size of the slice (in bytes) + passthrough_attributes + Name of attributes that are passed through to the base as-is. + """ + + def __init__( + self, + base: ExposureTrackedBuffer, + offset: int, + size: int, + *, + passthrough_attributes: Container[str] = ("exposed",), + ) -> None: + if size < 0: + raise ValueError("size cannot be negative") + if offset < 0: + raise ValueError("offset cannot be negative") + if offset + size > base.size: + raise ValueError( + "offset+size cannot be greater than the size of base" + ) + self._base = base + self._offset = offset + self._size = size + self._owner = base + self._passthrough_attributes = passthrough_attributes + base._slices.add(self) + + def __getattr__(self, name): + if name in self._passthrough_attributes: + return getattr(self._base, name) + raise AttributeError( + f"{self.__class__.__name__} object has no attribute {name}" + ) + + def _getitem(self, offset: int, size: int) -> BufferSlice: + return BufferSlice( + base=self._base, offset=offset + self._offset, size=size + ) + + def get_ptr(self, *, mode: Literal["read", "write"]) -> int: + if mode == "write" and cudf.get_option("copy_on_write"): + self.make_single_owner_inplace() + return self._base.get_ptr(mode=mode) + self._offset + + def memoryview( + self, *, offset: int = 0, size: Optional[int] = None + ) -> memoryview: + return self._base.memoryview(offset=self._offset + offset, size=size) + + def copy(self, deep: bool = True) -> Self: + """Return a copy of Buffer. + + What actually happens when `deep == False` depends on the + "copy_on_write" option. When copy-on-write is enabled, a shallow copy + becomes a deep copy if the buffer has been exposed. This is because we + have no control over knowing if the data is being modified when the + buffer has been exposed to third-party. + + Parameters + ---------- + deep : bool, default True + The semantics when copy-on-write is disabled: + - If deep=True, returns a deep copy of the underlying data. + - If deep=False, returns a shallow copy of the Buffer pointing + to the same underlying data. + The semantics when copy-on-write is enabled: + - From the users perspective, always a deep copy of the + underlying data. However, the data isn't actually copied + until someone writers to the returned buffer. + + Returns + ------- + BufferSlice + A slice pointing to either a new or the existing base buffer + depending on the expose status of the base buffer and the + copy-on-write option (see above). + """ + if cudf.get_option("copy_on_write"): + base_copy = self._base.copy(deep=deep or self.exposed) + else: + base_copy = self._base.copy(deep=deep) + return cast(Self, base_copy[self._offset : self._offset + self._size]) + + @property + def __cuda_array_interface__(self) -> Mapping: + if cudf.get_option("copy_on_write"): + self.make_single_owner_inplace() + return super().__cuda_array_interface__ + + def make_single_owner_inplace(self) -> None: + """Make sure this slice is the only one pointing to the base. + + This is used by copy-on-write to trigger a deep copy when write + access is detected. + + Parameters + ---------- + data : device-buffer-like + An object implementing the CUDA Array Interface. + + Returns + ------- + Buffer + Buffer representing the same device memory as `data` + """ + + if len(self._base._slices) > 1: + # If this is not the only slice pointing to `self._base`, we + # point to a new deep copy of the base. + t = self.copy(deep=True) + self._base = t._base + self._offset = t._offset + self._size = t._size + self._owner = t._base + self._base._slices.add(self) + + def __repr__(self) -> str: + return ( + f"" + ) diff --git a/python/cudf/cudf/core/buffer/spillable_buffer.py b/python/cudf/cudf/core/buffer/spillable_buffer.py index c1bc49c7a9e..84fb2044c62 100644 --- a/python/cudf/cudf/core/buffer/spillable_buffer.py +++ b/python/cudf/cudf/core/buffer/spillable_buffer.py @@ -7,7 +7,7 @@ import time import weakref from threading import RLock -from typing import TYPE_CHECKING, Any, Dict, List, Optional, Tuple +from typing import TYPE_CHECKING, Any, Dict, List, Literal, Optional, Tuple import numpy from typing_extensions import Self @@ -265,7 +265,7 @@ def _from_host_memory(cls, data: Any) -> Self: def is_spilled(self) -> bool: return self._ptr_desc["type"] != "gpu" - def copy(self, deep: bool = True): + def copy(self, deep: bool = True) -> Self: spill_lock = SpillLock() self.spill_lock(spill_lock=spill_lock) return super().copy(deep=deep) @@ -351,7 +351,7 @@ def spill_lock(self, spill_lock: SpillLock) -> None: self.spill(target="gpu") self._spill_locks.add(spill_lock) - def get_ptr(self, *, mode) -> int: + def get_ptr(self, *, mode: Literal["read", "write"]) -> int: """Get a device pointer to the memory of the buffer. If this is called within an `acquire_spill_lock` context, @@ -451,7 +451,7 @@ def memoryview( ) return ret - def _getitem(self, offset: int, size: int) -> Buffer: + def _getitem(self, offset: int, size: int) -> SpillableBufferSlice: return SpillableBufferSlice(base=self, offset=offset, size=size) def serialize(self) -> Tuple[dict, list]: @@ -541,14 +541,14 @@ def __init__(self, base: SpillableBuffer, offset: int, size: int) -> None: self._owner = base self.lock = base.lock - def get_ptr(self, *, mode) -> int: + def get_ptr(self, *, mode: Literal["read", "write"]) -> int: """ A passthrough method to `SpillableBuffer.get_ptr` with factoring in the `offset`. """ return self._base.get_ptr(mode=mode) + self._offset - def _getitem(self, offset: int, size: int) -> Buffer: + def _getitem(self, offset: int, size: int) -> SpillableBufferSlice: return SpillableBufferSlice( base=self._base, offset=offset + self._offset, size=size ) diff --git a/python/cudf/cudf/core/buffer/utils.py b/python/cudf/cudf/core/buffer/utils.py index 85e4762641e..373be99ec96 100644 --- a/python/cudf/cudf/core/buffer/utils.py +++ b/python/cudf/cudf/core/buffer/utils.py @@ -7,7 +7,7 @@ from typing import Any, Dict, Optional, Tuple, Union from cudf.core.buffer.buffer import Buffer, cuda_array_interface_wrapper -from cudf.core.buffer.cow_buffer import CopyOnWriteBuffer +from cudf.core.buffer.exposure_tracked_buffer import as_exposure_tracked_buffer from cudf.core.buffer.spill_manager import get_global_manager from cudf.core.buffer.spillable_buffer import SpillLock, as_spillable_buffer from cudf.options import get_option @@ -45,9 +45,9 @@ def as_buffer( Python object to which the lifetime of the memory allocation is tied. A reference to this object is kept in the returned Buffer. exposed : bool, optional - Mark the buffer as permanently exposed (unspillable). This is ignored - unless spilling is enabled and the data represents device memory, see - SpillableBuffer. + Mark the buffer as permanently exposed. This is used by + ExposureTrackedBuffer to determine when a deep copy is required and + by SpillableBuffer to mark the buffer unspillable. Return ------ @@ -74,16 +74,9 @@ def as_buffer( ) if get_option("copy_on_write"): - if isinstance(data, Buffer) or hasattr( - data, "__cuda_array_interface__" - ): - return CopyOnWriteBuffer._from_device_memory(data, exposed=exposed) - if exposed: - raise ValueError("cannot created exposed host memory") - return CopyOnWriteBuffer._from_host_memory(data) + return as_exposure_tracked_buffer(data, exposed=exposed) if get_global_manager() is not None: return as_spillable_buffer(data, exposed=exposed) - if hasattr(data, "__cuda_array_interface__"): return Buffer._from_device_memory(data) return Buffer._from_host_memory(data) diff --git a/python/cudf/cudf/core/column/column.py b/python/cudf/cudf/core/column/column.py index 15a71266cc1..2417aea25c2 100644 --- a/python/cudf/cudf/core/column/column.py +++ b/python/cudf/cudf/core/column/column.py @@ -140,8 +140,9 @@ def data_array_view( """ if self.data is not None: if mode == "read": - obj = _proxy_cai_obj( - self.data._get_cuda_array_interface(readonly=True), + obj = cuda_array_interface_wrapper( + ptr=self.data.get_ptr(mode="read"), + size=self.data.size, owner=self.data, ) elif mode == "write": @@ -176,8 +177,9 @@ def mask_array_view( """ if self.mask is not None: if mode == "read": - obj = _proxy_cai_obj( - self.mask._get_cuda_array_interface(readonly=True), + obj = cuda_array_interface_wrapper( + ptr=self.mask.get_ptr(mode="read"), + size=self.mask.size, owner=self.mask, ) elif mode == "write": @@ -1963,9 +1965,7 @@ def as_column( ): arbitrary = cupy.ascontiguousarray(arbitrary) - data = as_buffer(arbitrary) - if cudf.get_option("copy_on_write"): - data._zero_copied = True + data = as_buffer(arbitrary, exposed=cudf.get_option("copy_on_write")) col = build_column(data, dtype=current_dtype, mask=mask) if dtype is not None: @@ -2636,22 +2636,3 @@ def concat_columns(objs: "MutableSequence[ColumnBase]") -> ColumnBase: # Filter out inputs that have 0 length, then concatenate. return libcudf.concat.concat_columns([o for o in objs if len(o)]) - - -def _proxy_cai_obj(cai, owner): - """ - Returns a proxy CAI SimpleNameSpace wrapped - with the provided `cai` as `__cuda_array_interface__` - and owner as `owner` to keep the object alive. - This is an internal utility for `data_array_view` - and `mask_array_view` where an object with - read-only CAI is required. - """ - return cuda_array_interface_wrapper( - ptr=cai["data"][0], - size=cai["shape"][0], - owner=owner, - readonly=cai["data"][1], - typestr=cai["typestr"], - version=cai["version"], - ) diff --git a/python/cudf/cudf/tests/test_copying.py b/python/cudf/cudf/tests/test_copying.py index 1e9f42d704b..085774e9dbc 100644 --- a/python/cudf/cudf/tests/test_copying.py +++ b/python/cudf/cudf/tests/test_copying.py @@ -113,8 +113,11 @@ def test_series_setitem_partial_slice_cow_on(): assert_eq(new_copy, cudf.Series([1, 2, 300, 300, 5])) new_slice = actual[2:] + # TODO: when COW and spilling has been unified, find a clean way to + # test this without accessing the internal attributes _base and _ptr assert ( - new_slice._column.base_data._ptr == actual._column.base_data._ptr + new_slice._column.base_data._base._ptr + == actual._column.base_data._base._ptr ) new_slice[0:2] = 10 assert_eq(new_slice, cudf.Series([10, 10, 5], index=[2, 3, 4]))