From f2bf171fdf1e20ac9bde63ca1506aef7e4e5fb9d Mon Sep 17 00:00:00 2001 From: "Mads R. B. Kristensen" Date: Tue, 2 Apr 2024 14:18:29 +0200 Subject: [PATCH 01/27] clean up --- python/cudf/cudf/tests/test_spilling.py | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/python/cudf/cudf/tests/test_spilling.py b/python/cudf/cudf/tests/test_spilling.py index f18cb32a091..396552a780e 100644 --- a/python/cudf/cudf/tests/test_spilling.py +++ b/python/cudf/cudf/tests/test_spilling.py @@ -530,8 +530,10 @@ def test_serialize_cuda_dataframe(manager: SpillManager): @pytest.mark.skip( - reason="This test is not safe because other tests may have enabled" - "spilling and already modified rmm's global state" + reason=( + "This test is not safe because other tests may have enabled " + "spilling and already modified rmm's global state" + ) ) def test_get_rmm_memory_resource_stack(): mr1 = rmm.mr.get_current_device_resource() From 276dd80b2773a3dd263e8dded2eb24ba7e1bda4e Mon Sep 17 00:00:00 2001 From: "Mads R. B. Kristensen" Date: Tue, 2 Apr 2024 14:28:57 +0200 Subject: [PATCH 02/27] remove redundant methods --- .../cudf/core/buffer/exposure_tracked_buffer.py | 16 +++++----------- 1 file changed, 5 insertions(+), 11 deletions(-) diff --git a/python/cudf/cudf/core/buffer/exposure_tracked_buffer.py b/python/cudf/cudf/core/buffer/exposure_tracked_buffer.py index 4c08016adbb..55edcac4b10 100644 --- a/python/cudf/cudf/core/buffer/exposure_tracked_buffer.py +++ b/python/cudf/cudf/core/buffer/exposure_tracked_buffer.py @@ -23,8 +23,6 @@ class ExposureTrackedBuffer(Buffer): The size of the slice (in bytes) """ - _owner: BufferOwner - def __init__( self, owner: BufferOwner, @@ -32,11 +30,7 @@ def __init__( size: Optional[int] = None, ) -> None: super().__init__(owner=owner, offset=offset, size=size) - self._owner._slices.add(self) - - @property - def exposed(self) -> bool: - return self._owner.exposed + self.owner._slices.add(self) def get_ptr(self, *, mode: Literal["read", "write"]) -> int: if mode == "write" and cudf.get_option("copy_on_write"): @@ -72,7 +66,7 @@ def copy(self, deep: bool = True) -> Self: copy-on-write option (see above). """ if cudf.get_option("copy_on_write"): - return super().copy(deep=deep or self.exposed) + return super().copy(deep=deep or self.owner.exposed) return super().copy(deep=deep) @property @@ -98,11 +92,11 @@ def make_single_owner_inplace(self) -> None: Buffer representing the same device memory as `data` """ - if len(self._owner._slices) > 1: - # If this is not the only slice pointing to `self._owner`, we + if len(self.owner._slices) > 1: + # If this is not the only slice pointing to `self.owner`, we # point to a new deep copy of the owner. t = self.copy(deep=True) - self._owner = t._owner + self._owner = t.owner self._offset = t._offset self._size = t._size self._owner._slices.add(self) From 2413e99ef455efbb2b27cce02cb84270596c038a Mon Sep 17 00:00:00 2001 From: "Mads R. B. Kristensen" Date: Tue, 2 Apr 2024 15:31:46 +0200 Subject: [PATCH 03/27] impl. and use BufferOwner.__init__ --- python/cudf/cudf/core/buffer/buffer.py | 34 +++++++++++++------------- 1 file changed, 17 insertions(+), 17 deletions(-) diff --git a/python/cudf/cudf/core/buffer/buffer.py b/python/cudf/cudf/core/buffer/buffer.py index 8d278c9c065..e9082712c7b 100644 --- a/python/cudf/cudf/core/buffer/buffer.py +++ b/python/cudf/cudf/core/buffer/buffer.py @@ -117,11 +117,18 @@ class BufferOwner(Serializable): # The set of buffers that point to this owner. _slices: weakref.WeakSet[Buffer] - def __init__(self): - raise ValueError( - f"do not create a {self.__class__} directly, please " - "use the factory function `cudf.core.buffer.as_buffer`" - ) + def __init__( + self, + ptr: int, + size: int, + owner: object, + exposed: bool, + ): + self._ptr = ptr + self._size = size + self._owner = owner + self._exposed = exposed + self._slices = weakref.WeakSet() @classmethod def _from_device_memory(cls, data: Any, exposed: bool) -> Self: @@ -151,21 +158,14 @@ def _from_device_memory(cls, data: Any, exposed: bool) -> Self: If the resulting buffer has negative size """ - # Bypass `__init__` and initialize attributes manually - ret = cls.__new__(cls) - ret._owner = data - ret._exposed = exposed - ret._slices = weakref.WeakSet() if isinstance(data, rmm.DeviceBuffer): # Common case shortcut - ret._ptr = data.ptr - ret._size = data.size + ptr = data.ptr + size = data.size else: - ret._ptr, ret._size = get_ptr_and_size( - data.__cuda_array_interface__ - ) - if ret.size < 0: + ptr, size = get_ptr_and_size(data.__cuda_array_interface__) + if size < 0: raise ValueError("size cannot be negative") - return ret + return cls(ptr, size, owner=data, exposed=exposed) @classmethod def _from_host_memory(cls, data: Any) -> Self: From 9f4b0e127afa867a84c97b667596e66d739d056f Mon Sep 17 00:00:00 2001 From: "Mads R. B. Kristensen" Date: Tue, 2 Apr 2024 15:38:52 +0200 Subject: [PATCH 04/27] SpillableBufferOwner: use __init__ --- python/cudf/cudf/core/buffer/buffer.py | 15 +++++++++++++-- python/cudf/cudf/core/buffer/spillable_buffer.py | 6 +----- 2 files changed, 14 insertions(+), 7 deletions(-) diff --git a/python/cudf/cudf/core/buffer/buffer.py b/python/cudf/cudf/core/buffer/buffer.py index e9082712c7b..cab9573a3d2 100644 --- a/python/cudf/cudf/core/buffer/buffer.py +++ b/python/cudf/cudf/core/buffer/buffer.py @@ -124,6 +124,19 @@ def __init__( owner: object, exposed: bool, ): + """Create a new buffer owner. + + Do not use this directly, instead use `_from_device_memory` or + `_from_host_memory`. + + Raises + ------ + ValueError + If size is negative + """ + if size < 0: + raise ValueError("size cannot be negative") + self._ptr = ptr self._size = size self._owner = owner @@ -163,8 +176,6 @@ def _from_device_memory(cls, data: Any, exposed: bool) -> Self: size = data.size else: ptr, size = get_ptr_and_size(data.__cuda_array_interface__) - if size < 0: - raise ValueError("size cannot be negative") return cls(ptr, size, owner=data, exposed=exposed) @classmethod diff --git a/python/cudf/cudf/core/buffer/spillable_buffer.py b/python/cudf/cudf/core/buffer/spillable_buffer.py index b25af13679c..f124d53ea7d 100644 --- a/python/cudf/cudf/core/buffer/spillable_buffer.py +++ b/python/cudf/cudf/core/buffer/spillable_buffer.py @@ -170,11 +170,7 @@ def _from_host_memory(cls, data: Any) -> Self: data = data.cast("B") # Make sure itemsize==1 # Create an already spilled buffer - ret = cls.__new__(cls) - ret._owner = None - ret._ptr = 0 - ret._size = data.nbytes - ret._exposed = False + ret = cls(ptr=0, size=data.nbytes, owner=None, exposed=False) ret._finalize_init(ptr_desc={"type": "cpu", "memoryview": data}) return ret From 834f6d54383b0d4c836cba8c2ad553bc6c7260d4 Mon Sep 17 00:00:00 2001 From: "Mads R. B. Kristensen" Date: Tue, 2 Apr 2024 15:52:13 +0200 Subject: [PATCH 05/27] remove SpillableBuffer.mark_exposed() --- python/cudf/cudf/core/buffer/spillable_buffer.py | 3 --- python/cudf/cudf/tests/test_spilling.py | 10 +++++----- 2 files changed, 5 insertions(+), 8 deletions(-) diff --git a/python/cudf/cudf/core/buffer/spillable_buffer.py b/python/cudf/cudf/core/buffer/spillable_buffer.py index f124d53ea7d..cd616580a9d 100644 --- a/python/cudf/cudf/core/buffer/spillable_buffer.py +++ b/python/cudf/cudf/core/buffer/spillable_buffer.py @@ -408,9 +408,6 @@ def memory_info(self) -> Tuple[int, int, str]: (ptr, _, device_type) = self._owner.memory_info() return (ptr + self._offset, self.nbytes, device_type) - def mark_exposed(self) -> None: - self._owner.mark_exposed() - def serialize(self) -> Tuple[dict, list]: """Serialize the Buffer diff --git a/python/cudf/cudf/tests/test_spilling.py b/python/cudf/cudf/tests/test_spilling.py index 396552a780e..73b86d9ff44 100644 --- a/python/cudf/cudf/tests/test_spilling.py +++ b/python/cudf/cudf/tests/test_spilling.py @@ -120,7 +120,7 @@ def test_spillable_buffer(manager: SpillManager): buf = as_buffer(data=rmm.DeviceBuffer(size=10), exposed=False) assert isinstance(buf, SpillableBuffer) assert buf.spillable - buf.mark_exposed() + buf.owner.mark_exposed() assert buf.exposed assert not buf.spillable buf = as_buffer(data=rmm.DeviceBuffer(size=10), exposed=False) @@ -210,7 +210,7 @@ def test_spilling_buffer(manager: SpillManager): buf = as_buffer(rmm.DeviceBuffer(size=10), exposed=False) buf.spill(target="cpu") assert buf.is_spilled - buf.mark_exposed() # Expose pointer and trigger unspill + buf.owner.mark_exposed() # Expose pointer and trigger unspill assert not buf.is_spilled with pytest.raises(ValueError, match="unspillable buffer"): buf.spill(target="cpu") @@ -653,7 +653,7 @@ def test_statistics_expose(manager: SpillManager): ] # Expose the first buffer - buffers[0].mark_exposed() + buffers[0].owner.mark_exposed() assert len(manager.statistics.exposes) == 1 stat = list(manager.statistics.exposes.values())[0] assert stat.count == 1 @@ -662,7 +662,7 @@ def test_statistics_expose(manager: SpillManager): # Expose all 10 buffers for i in range(10): - buffers[i].mark_exposed() + buffers[i].owner.mark_exposed() # The rest of the ptr accesses should accumulate to a single stat # because they resolve to the same traceback. @@ -682,7 +682,7 @@ def test_statistics_expose(manager: SpillManager): # Expose the new buffers and check that they are counted as spilled for i in range(10): - buffers[i].mark_exposed() + buffers[i].owner.mark_exposed() assert len(manager.statistics.exposes) == 3 stat = list(manager.statistics.exposes.values())[2] assert stat.count == 10 From 68524e458f95a18aa09d28e12e808f33f76f464a Mon Sep 17 00:00:00 2001 From: "Mads R. B. Kristensen" Date: Tue, 2 Apr 2024 15:57:49 +0200 Subject: [PATCH 06/27] remove SpillableBuffer.exposed --- python/cudf/cudf/core/buffer/spillable_buffer.py | 4 ---- python/cudf/cudf/tests/test_spilling.py | 13 ++++++------- 2 files changed, 6 insertions(+), 11 deletions(-) diff --git a/python/cudf/cudf/core/buffer/spillable_buffer.py b/python/cudf/cudf/core/buffer/spillable_buffer.py index cd616580a9d..eb1bdbb40fd 100644 --- a/python/cudf/cudf/core/buffer/spillable_buffer.py +++ b/python/cudf/cudf/core/buffer/spillable_buffer.py @@ -393,10 +393,6 @@ def spill(self, target: str = "cpu") -> None: def is_spilled(self) -> bool: return self._owner.is_spilled - @property - def exposed(self) -> bool: - return self._owner.exposed - @property def spillable(self) -> bool: return self._owner.spillable diff --git a/python/cudf/cudf/tests/test_spilling.py b/python/cudf/cudf/tests/test_spilling.py index 73b86d9ff44..d5d3992a902 100644 --- a/python/cudf/cudf/tests/test_spilling.py +++ b/python/cudf/cudf/tests/test_spilling.py @@ -121,17 +121,17 @@ def test_spillable_buffer(manager: SpillManager): assert isinstance(buf, SpillableBuffer) assert buf.spillable buf.owner.mark_exposed() - assert buf.exposed + assert buf.owner.exposed assert not buf.spillable buf = as_buffer(data=rmm.DeviceBuffer(size=10), exposed=False) # Notice, accessing `__cuda_array_interface__` itself doesn't # expose the pointer, only accessing the "data" field exposes # the pointer. iface = buf.__cuda_array_interface__ - assert not buf.exposed + assert not buf.owner.exposed assert buf.spillable iface["data"][0] # Expose pointer - assert buf.exposed + assert buf.owner.exposed assert not buf.spillable @@ -141,7 +141,6 @@ def test_spillable_buffer(manager: SpillManager): "get_ptr", "memoryview", "is_spilled", - "exposed", "spillable", "spill_lock", "spill", @@ -562,9 +561,9 @@ def test_df_transpose(manager: SpillManager): df1 = cudf.DataFrame({"a": [1, 2]}) df2 = df1.transpose() # For now, all buffers are marked as exposed - assert df1._data._data["a"].data.exposed - assert df2._data._data[0].data.exposed - assert df2._data._data[1].data.exposed + assert df1._data._data["a"].data.owner.exposed + assert df2._data._data[0].data.owner.exposed + assert df2._data._data[1].data.owner.exposed def test_as_buffer_of_spillable_buffer(manager: SpillManager): From 2e43f85235d4d662012255237ca8800dfa55b748 Mon Sep 17 00:00:00 2001 From: "Mads R. B. Kristensen" Date: Tue, 2 Apr 2024 16:10:24 +0200 Subject: [PATCH 07/27] SpillableBuffer(ExposureTrackedBuffer) --- python/cudf/cudf/core/buffer/spillable_buffer.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/python/cudf/cudf/core/buffer/spillable_buffer.py b/python/cudf/cudf/core/buffer/spillable_buffer.py index eb1bdbb40fd..e038f91a108 100644 --- a/python/cudf/cudf/core/buffer/spillable_buffer.py +++ b/python/cudf/cudf/core/buffer/spillable_buffer.py @@ -20,6 +20,7 @@ cuda_array_interface_wrapper, host_memory_allocation, ) +from cudf.core.buffer.exposure_tracked_buffer import ExposureTrackedBuffer from cudf.utils.nvtx_annotation import _get_color_for_nvtx, annotate from cudf.utils.string import format_bytes @@ -368,7 +369,7 @@ def __str__(self) -> str: ) -class SpillableBuffer(Buffer): +class SpillableBuffer(ExposureTrackedBuffer): """A slice of a spillable buffer This buffer applies the slicing and then delegates all From 659e83283b0b747711a5778320e3d4253eb49315 Mon Sep 17 00:00:00 2001 From: "Mads R. B. Kristensen" Date: Wed, 3 Apr 2024 08:29:27 +0200 Subject: [PATCH 08/27] mark cow tests as "spilling" --- python/cudf/cudf/tests/test_copying.py | 2 ++ 1 file changed, 2 insertions(+) diff --git a/python/cudf/cudf/tests/test_copying.py b/python/cudf/cudf/tests/test_copying.py index e737a73e86b..c0964c84723 100644 --- a/python/cudf/cudf/tests/test_copying.py +++ b/python/cudf/cudf/tests/test_copying.py @@ -9,6 +9,8 @@ from cudf import Series from cudf.testing._utils import NUMERIC_TYPES, OTHER_TYPES, assert_eq +pytestmark = pytest.mark.spilling + @pytest.mark.parametrize("dtype", NUMERIC_TYPES + OTHER_TYPES) def test_repeat(dtype): From c5355a57cedcfa30705e0256020303d4bfb77534 Mon Sep 17 00:00:00 2001 From: "Mads R. B. Kristensen" Date: Wed, 3 Apr 2024 08:51:02 +0200 Subject: [PATCH 09/27] impl. SpillableBuffer.copy --- python/cudf/cudf/core/buffer/spillable_buffer.py | 9 +++++++++ 1 file changed, 9 insertions(+) diff --git a/python/cudf/cudf/core/buffer/spillable_buffer.py b/python/cudf/cudf/core/buffer/spillable_buffer.py index e038f91a108..2f766269b4f 100644 --- a/python/cudf/cudf/core/buffer/spillable_buffer.py +++ b/python/cudf/cudf/core/buffer/spillable_buffer.py @@ -451,6 +451,15 @@ def serialize(self) -> Tuple[dict, list]: ] return header, frames + def copy(self, deep: bool = True) -> Self: + if deep and self.is_spilled: + # In this case, we copy the host data directly instead of moving + # to device memory first + owner = self._owner._from_host_memory(bytearray(self.memoryview())) + return self.__class__(owner=owner, offset=0, size=owner.size) + else: + return super().copy(deep=deep) + @property def __cuda_array_interface__(self) -> dict: return { From 358157951da94bb69449b5b6d908a850a8a7c2bf Mon Sep 17 00:00:00 2001 From: "Mads R. B. Kristensen" Date: Wed, 3 Apr 2024 08:51:31 +0200 Subject: [PATCH 10/27] option: allow cow and spilling at the same time --- python/cudf/cudf/options.py | 15 +-------------- 1 file changed, 1 insertion(+), 14 deletions(-) diff --git a/python/cudf/cudf/options.py b/python/cudf/cudf/options.py index 7a0db49bd20..efa8eabd8b8 100644 --- a/python/cudf/cudf/options.py +++ b/python/cudf/cudf/options.py @@ -1,4 +1,4 @@ -# Copyright (c) 2022-2023, NVIDIA CORPORATION. +# Copyright (c) 2022-2024, NVIDIA CORPORATION. import os import textwrap @@ -152,11 +152,6 @@ def _validator(val): def _cow_validator(val): - if get_option("spill") and val: - raise ValueError( - "Copy-on-write is not supported when spilling is enabled. " - "Please set `spill` to `False`" - ) if val not in {False, True}: raise ValueError( f"{val} is not a valid option. Must be one of {{False, True}}." @@ -164,14 +159,6 @@ def _cow_validator(val): def _spill_validator(val): - try: - if get_option("copy_on_write") and val: - raise ValueError( - "Spilling is not supported when copy-on-write is enabled. " - "Please set `copy_on_write` to `False`" - ) - except KeyError: - pass if val not in {False, True}: raise ValueError( f"{val} is not a valid option. Must be one of {{False, True}}." From 58f7e8eac25053b262e281975b335ee6166d8bff Mon Sep 17 00:00:00 2001 From: "Mads R. B. Kristensen" Date: Wed, 3 Apr 2024 08:56:10 +0200 Subject: [PATCH 11/27] test: skip zero-copy and no-copy-on-write test whe spilling is enabled --- python/cudf/cudf/tests/test_copying.py | 9 +++++++++ 1 file changed, 9 insertions(+) diff --git a/python/cudf/cudf/tests/test_copying.py b/python/cudf/cudf/tests/test_copying.py index c0964c84723..b14c336f163 100644 --- a/python/cudf/cudf/tests/test_copying.py +++ b/python/cudf/cudf/tests/test_copying.py @@ -7,6 +7,7 @@ import cudf from cudf import Series +from cudf.core.buffer.spill_manager import get_global_manager from cudf.testing._utils import NUMERIC_TYPES, OTHER_TYPES, assert_eq pytestmark = pytest.mark.spilling @@ -304,6 +305,14 @@ def test_series_zero_copy_cow_on(): def test_series_zero_copy_cow_off(): + if get_global_manager() is not None: + pytest.skip( + ( + "cannot test zero-copy and no-copy-on-write when " + "spilling is enabled globally, set `CUDF_SPILL=off`" + ), + allow_module_level=True, + ) with cudf.option_context("copy_on_write", False): s = cudf.Series([1, 2, 3, 4, 5]) s1 = s.copy(deep=False) From 60189493adcfb4666849e217e194fb3326e6d710 Mon Sep 17 00:00:00 2001 From: "Mads R. B. Kristensen" Date: Wed, 3 Apr 2024 08:58:59 +0200 Subject: [PATCH 12/27] cleanup --- python/cudf/cudf/tests/test_copying.py | 7 ++----- 1 file changed, 2 insertions(+), 5 deletions(-) diff --git a/python/cudf/cudf/tests/test_copying.py b/python/cudf/cudf/tests/test_copying.py index b14c336f163..e357bc942b3 100644 --- a/python/cudf/cudf/tests/test_copying.py +++ b/python/cudf/cudf/tests/test_copying.py @@ -307,11 +307,8 @@ def test_series_zero_copy_cow_on(): def test_series_zero_copy_cow_off(): if get_global_manager() is not None: pytest.skip( - ( - "cannot test zero-copy and no-copy-on-write when " - "spilling is enabled globally, set `CUDF_SPILL=off`" - ), - allow_module_level=True, + "cannot test zero-copy and no-copy-on-write when " + "spilling is enabled globally, set `CUDF_SPILL=off`" ) with cudf.option_context("copy_on_write", False): s = cudf.Series([1, 2, 3, 4, 5]) From f48c529901e69f26aa7afce9c6fc44153e5161cb Mon Sep 17 00:00:00 2001 From: "Mads R. B. Kristensen" Date: Wed, 3 Apr 2024 11:10:11 +0200 Subject: [PATCH 13/27] impl. set_spill_on_demand_globally --- python/cudf/cudf/core/buffer/spill_manager.py | 67 ++++++++++++------- python/cudf/cudf/tests/test_spilling.py | 12 ++-- 2 files changed, 49 insertions(+), 30 deletions(-) diff --git a/python/cudf/cudf/core/buffer/spill_manager.py b/python/cudf/cudf/core/buffer/spill_manager.py index 3e654e01401..e4144873e41 100644 --- a/python/cudf/cudf/core/buffer/spill_manager.py +++ b/python/cudf/cudf/core/buffer/spill_manager.py @@ -201,10 +201,6 @@ class SpillManager: This class implements tracking of all known spillable buffers, on-demand spilling of said buffers, and (optionally) maintains a memory usage limit. - When `spill_on_demand=True`, the manager registers an RMM out-of-memory - error handler, which will spill spillable buffers in order to free up - memory. - When `device_memory_limit=`, the manager will try keep the device memory usage below the specified limit by spilling of spillable buffers continuously, which will introduce a modest overhead. @@ -213,8 +209,6 @@ class SpillManager: Parameters ---------- - spill_on_demand : bool - Enable spill on demand. device_memory_limit: int, optional If not None, this is the device memory limit in bytes that triggers device to host spilling. The global manager sets this to the value @@ -230,30 +224,15 @@ class SpillManager: def __init__( self, *, - spill_on_demand: bool = False, device_memory_limit: Optional[int] = None, statistic_level: int = 0, ) -> None: self._lock = threading.Lock() self._buffers = weakref.WeakValueDictionary() self._id_counter = 0 - self._spill_on_demand = spill_on_demand self._device_memory_limit = device_memory_limit self.statistics = SpillStatistics(statistic_level) - if self._spill_on_demand: - # Set the RMM out-of-memory handle if not already set - mr = rmm.mr.get_current_device_resource() - if all( - not isinstance(m, rmm.mr.FailureCallbackResourceAdaptor) - for m in get_rmm_memory_resource_stack(mr) - ): - rmm.mr.set_current_device_resource( - rmm.mr.FailureCallbackResourceAdaptor( - mr, self._out_of_memory_handle - ) - ) - def _out_of_memory_handle(self, nbytes: int, *, retry_once=True) -> bool: """Try to handle an out-of-memory error by spilling @@ -408,8 +387,7 @@ def __repr__(self) -> str: dev_limit = format_bytes(self._device_memory_limit) return ( - f"" @@ -442,12 +420,49 @@ def get_global_manager() -> Optional[SpillManager]: """Get the global manager or None if spilling is disabled""" global _global_manager_uninitialized if _global_manager_uninitialized: - manager = None if get_option("spill"): manager = SpillManager( - spill_on_demand=get_option("spill_on_demand"), device_memory_limit=get_option("spill_device_limit"), statistic_level=get_option("spill_stats"), ) - set_global_manager(manager) + set_global_manager(manager) + if get_option("spill_on_demand"): + set_spill_on_demand_globally() + else: + set_global_manager(None) return _global_manager + + +def set_spill_on_demand_globally() -> None: + """Enable spill on demand in the current global spill manager. + + Warning, this modify the current RMM memory resource. A memory resource + to handle out-of-memory errors are pushed on the RMM memory resource stack. + + Raises + ------ + ValueError + If no global spill manager exist (spilling is disable). + ValueError + If a failure callback resource is already in the resource stack. + """ + + manager = get_global_manager() + if manager is None: + raise ValueError( + "Cannot enable spill on demand with no global spill manager" + ) + mr = rmm.mr.get_current_device_resource() + if any( + isinstance(m, rmm.mr.FailureCallbackResourceAdaptor) + for m in get_rmm_memory_resource_stack(mr) + ): + raise ValueError( + "Spill on demand (or another failure callback resource) " + "is already registered" + ) + rmm.mr.set_current_device_resource( + rmm.mr.FailureCallbackResourceAdaptor( + mr, manager._out_of_memory_handle + ) + ) diff --git a/python/cudf/cudf/tests/test_spilling.py b/python/cudf/cudf/tests/test_spilling.py index d5d3992a902..d1ad282fd62 100644 --- a/python/cudf/cudf/tests/test_spilling.py +++ b/python/cudf/cudf/tests/test_spilling.py @@ -236,7 +236,7 @@ def _get_manager_in_env(monkeypatch, var_vals): def test_environment_variables_spill_off(monkeypatch): with _get_manager_in_env( monkeypatch, - [("CUDF_SPILL", "off"), ("CUDF_SPILL_ON_DEMAND", "off")], + [("CUDF_SPILL", "off")], ) as manager: assert manager is None @@ -244,10 +244,9 @@ def test_environment_variables_spill_off(monkeypatch): def test_environment_variables_spill_on(monkeypatch): with _get_manager_in_env( monkeypatch, - [("CUDF_SPILL", "on")], + [("CUDF_SPILL", "on"), ("CUDF_SPILL_ON_DEMAND", "off")], ) as manager: assert isinstance(manager, SpillManager) - assert manager._spill_on_demand is True assert manager._device_memory_limit is None assert manager.statistics.level == 0 @@ -255,7 +254,11 @@ def test_environment_variables_spill_on(monkeypatch): def test_environment_variables_device_limit(monkeypatch): with _get_manager_in_env( monkeypatch, - [("CUDF_SPILL", "on"), ("CUDF_SPILL_DEVICE_LIMIT", "1000")], + [ + ("CUDF_SPILL", "on"), + ("CUDF_SPILL_ON_DEMAND", "off"), + ("CUDF_SPILL_DEVICE_LIMIT", "1000"), + ], ) as manager: assert isinstance(manager, SpillManager) assert manager._device_memory_limit == 1000 @@ -268,6 +271,7 @@ def test_environment_variables_spill_stats(monkeypatch, level): monkeypatch, [ ("CUDF_SPILL", "on"), + ("CUDF_SPILL_ON_DEMAND", "off"), ("CUDF_SPILL_DEVICE_LIMIT", "1000"), ("CUDF_SPILL_STATS", f"{level}"), ], From dcff2996e6c09825f46d290114d19cb8e857d22b Mon Sep 17 00:00:00 2001 From: "Mads R. B. Kristensen" Date: Wed, 3 Apr 2024 12:52:07 +0200 Subject: [PATCH 14/27] impl. spill_on_demand_globally --- python/cudf/cudf/core/buffer/spill_manager.py | 34 ++++++++++++++++++ python/cudf/cudf/tests/test_spilling.py | 35 +++++++++++++++++++ 2 files changed, 69 insertions(+) diff --git a/python/cudf/cudf/core/buffer/spill_manager.py b/python/cudf/cudf/core/buffer/spill_manager.py index e4144873e41..2d0de22acaa 100644 --- a/python/cudf/cudf/core/buffer/spill_manager.py +++ b/python/cudf/cudf/core/buffer/spill_manager.py @@ -10,6 +10,7 @@ import warnings import weakref from collections import defaultdict +from contextlib import contextmanager from dataclasses import dataclass from functools import partial from typing import Dict, List, Optional, Tuple @@ -466,3 +467,36 @@ def set_spill_on_demand_globally() -> None: mr, manager._out_of_memory_handle ) ) + + +@contextmanager +def spill_on_demand_globally(): + """Context to enable spill on demand temporary. + + Warning, this modify the current RMM memory resource. A memory resource + to handle out-of-memory errors are pushed on the RMM memory resource stack + when entering the context and popped again when exiting. + + Raises + ------ + ValueError + If no global spill manager exist (spilling is disable). + ValueError + If a failure callback resource is already in the resource stack. + ValueError + If the RMM memory source stack was changed while in the context. + """ + set_spill_on_demand_globally() + # Save the new memory resource stack for later cleanup + mr_stack = get_rmm_memory_resource_stack( + rmm.mr.get_current_device_resource() + ) + try: + yield + finally: + mr = rmm.mr.get_current_device_resource() + if mr_stack != get_rmm_memory_resource_stack(mr): + raise ValueError( + "RMM memory source stack was changed while in the context" + ) + rmm.mr.set_current_device_resource(mr_stack[1]) diff --git a/python/cudf/cudf/tests/test_spilling.py b/python/cudf/cudf/tests/test_spilling.py index d1ad282fd62..74d3a61f5c1 100644 --- a/python/cudf/cudf/tests/test_spilling.py +++ b/python/cudf/cudf/tests/test_spilling.py @@ -32,6 +32,7 @@ get_global_manager, get_rmm_memory_resource_stack, set_global_manager, + spill_on_demand_globally, ) from cudf.core.buffer.spillable_buffer import ( SpillableBuffer, @@ -47,6 +48,22 @@ ) +@contextlib.contextmanager +def set_rmm_memory_pool(nbytes: int): + mr = rmm.mr.get_current_device_resource() + rmm.mr.set_current_device_resource( + rmm.mr.PoolMemoryResource( + rmm.mr.get_current_device_resource(), + initial_pool_size=nbytes, + maximum_pool_size=nbytes, + ) + ) + try: + yield + finally: + rmm.mr.set_current_device_resource(mr) + + def single_column_df(target="gpu") -> cudf.DataFrame: """Create a standard single column dataframe used for testing @@ -691,3 +708,21 @@ def test_statistics_expose(manager: SpillManager): assert stat.count == 10 assert stat.total_nbytes == buffers[0].nbytes * 10 assert stat.spilled_nbytes == buffers[0].nbytes * 10 + + +def test_spill_on_demand(manager: SpillManager): + with set_rmm_memory_pool(1024): + a = as_buffer(data=rmm.DeviceBuffer(size=1024)) + assert isinstance(a, SpillableBuffer) + assert not a.is_spilled + + with pytest.raises(MemoryError, match="Maximum pool size exceeded"): + as_buffer(data=rmm.DeviceBuffer(size=1024)) + + with spill_on_demand_globally(): + b = as_buffer(data=rmm.DeviceBuffer(size=1024)) + assert a.is_spilled + assert not b.is_spilled + + with pytest.raises(MemoryError, match="Maximum pool size exceeded"): + as_buffer(data=rmm.DeviceBuffer(size=1024)) From 4a8b43e10d30a68d0d5e06ce413506dcbf62b0eb Mon Sep 17 00:00:00 2001 From: "Mads R. B. Kristensen" Date: Wed, 3 Apr 2024 13:07:28 +0200 Subject: [PATCH 15/27] cleanup --- python/cudf/cudf/core/buffer/spillable_buffer.py | 15 +-------------- 1 file changed, 1 insertion(+), 14 deletions(-) diff --git a/python/cudf/cudf/core/buffer/spillable_buffer.py b/python/cudf/cudf/core/buffer/spillable_buffer.py index 2f766269b4f..7f33508579f 100644 --- a/python/cudf/cudf/core/buffer/spillable_buffer.py +++ b/python/cudf/cudf/core/buffer/spillable_buffer.py @@ -370,20 +370,7 @@ def __str__(self) -> str: class SpillableBuffer(ExposureTrackedBuffer): - """A slice of a spillable buffer - - This buffer applies the slicing and then delegates all - operations to its owning buffer. - - Parameters - ---------- - owner : SpillableBufferOwner - The owner of the view - offset : int - Memory offset into the owning buffer - size : int - Size of the view (in bytes) - """ + """A slice of a spillable buffer""" _owner: SpillableBufferOwner From 7344f9a9899f76429cf984fda826c91013806b66 Mon Sep 17 00:00:00 2001 From: "Mads R. B. Kristensen" Date: Wed, 3 Apr 2024 14:11:30 +0200 Subject: [PATCH 16/27] impl. test_spilling_and_copy_on_write --- .../cudf/cudf/core/buffer/spillable_buffer.py | 10 ++++-- python/cudf/cudf/tests/test_spilling.py | 32 +++++++++++++++++++ 2 files changed, 40 insertions(+), 2 deletions(-) diff --git a/python/cudf/cudf/core/buffer/spillable_buffer.py b/python/cudf/cudf/core/buffer/spillable_buffer.py index 7f33508579f..92133f776bb 100644 --- a/python/cudf/cudf/core/buffer/spillable_buffer.py +++ b/python/cudf/cudf/core/buffer/spillable_buffer.py @@ -439,13 +439,19 @@ def serialize(self) -> Tuple[dict, list]: return header, frames def copy(self, deep: bool = True) -> Self: - if deep and self.is_spilled: + from cudf.core.buffer.utils import acquire_spill_lock + + if not deep: + return super().copy(deep=False) + + if self.is_spilled: # In this case, we copy the host data directly instead of moving # to device memory first owner = self._owner._from_host_memory(bytearray(self.memoryview())) return self.__class__(owner=owner, offset=0, size=owner.size) else: - return super().copy(deep=deep) + with acquire_spill_lock(): + return super().copy(deep=deep) @property def __cuda_array_interface__(self) -> dict: diff --git a/python/cudf/cudf/tests/test_spilling.py b/python/cudf/cudf/tests/test_spilling.py index 74d3a61f5c1..b882d2ec6d1 100644 --- a/python/cudf/cudf/tests/test_spilling.py +++ b/python/cudf/cudf/tests/test_spilling.py @@ -726,3 +726,35 @@ def test_spill_on_demand(manager: SpillManager): with pytest.raises(MemoryError, match="Maximum pool size exceeded"): as_buffer(data=rmm.DeviceBuffer(size=1024)) + + +def test_spilling_and_copy_on_write(manager: SpillManager): + with cudf.option_context("copy_on_write", True): + a: SpillableBuffer = as_buffer(data=rmm.DeviceBuffer(size=10)) + + b = a.copy(deep=False) + assert a.owner == b.owner + a.spill(target="cpu") + assert a.is_spilled + assert b.is_spilled + + # Write access trigger copy of `a` into `b` but since `a` is spilled + # the copy is done in host memory and `a` remains spilled. + b.get_ptr(mode="write") + assert a.is_spilled + assert not b.is_spilled + + # Deep copy of the spilled buffer `a` + b = a.copy(deep=True) + assert a.owner != b.owner + assert a.is_spilled + assert b.is_spilled + a.spill(target="gpu") + assert not a.is_spilled + assert b.is_spilled + + # Deep copy of the unspilled buffer `a` + b = a.copy(deep=True) + assert a.spillable + assert not a.is_spilled + assert not b.is_spilled From 1e9d061a1abed44d8ff5c15671553306d071b845 Mon Sep 17 00:00:00 2001 From: "Mads R. B. Kristensen" Date: Wed, 3 Apr 2024 14:34:57 +0200 Subject: [PATCH 17/27] don't copy spilled data --- python/cudf/cudf/core/buffer/spillable_buffer.py | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/python/cudf/cudf/core/buffer/spillable_buffer.py b/python/cudf/cudf/core/buffer/spillable_buffer.py index 92133f776bb..2bc5361555c 100644 --- a/python/cudf/cudf/core/buffer/spillable_buffer.py +++ b/python/cudf/cudf/core/buffer/spillable_buffer.py @@ -445,9 +445,10 @@ def copy(self, deep: bool = True) -> Self: return super().copy(deep=False) if self.is_spilled: - # In this case, we copy the host data directly instead of moving - # to device memory first - owner = self._owner._from_host_memory(bytearray(self.memoryview())) + # In this case, we make the new copy point to the same spilled + # data in host memory. We can do this since spilled data is never + # modified. + owner = self._owner._from_host_memory(self.memoryview()) return self.__class__(owner=owner, offset=0, size=owner.size) else: with acquire_spill_lock(): From dc715e1ff47d047610d76627106eb03d829b79b5 Mon Sep 17 00:00:00 2001 From: "Mads R. B. Kristensen" Date: Wed, 3 Apr 2024 14:35:16 +0200 Subject: [PATCH 18/27] test_get_rmm_memory_resource_stack is safe again --- python/cudf/cudf/tests/test_spilling.py | 8 +------- 1 file changed, 1 insertion(+), 7 deletions(-) diff --git a/python/cudf/cudf/tests/test_spilling.py b/python/cudf/cudf/tests/test_spilling.py index b882d2ec6d1..e6125dc4af8 100644 --- a/python/cudf/cudf/tests/test_spilling.py +++ b/python/cudf/cudf/tests/test_spilling.py @@ -549,14 +549,8 @@ def test_serialize_cuda_dataframe(manager: SpillManager): assert_eq(df1, df2) -@pytest.mark.skip( - reason=( - "This test is not safe because other tests may have enabled " - "spilling and already modified rmm's global state" - ) -) def test_get_rmm_memory_resource_stack(): - mr1 = rmm.mr.get_current_device_resource() + mr1 = rmm.mr.CudaMemoryResource() assert all( not isinstance(m, rmm.mr.FailureCallbackResourceAdaptor) for m in get_rmm_memory_resource_stack(mr1) From 3eaf5e39edecdd21d50d9fa71a8e026e1163a0a0 Mon Sep 17 00:00:00 2001 From: "Mads R. B. Kristensen" Date: Thu, 4 Apr 2024 08:30:56 +0200 Subject: [PATCH 19/27] Apply suggestions from code review Co-authored-by: Bradley Dice --- python/cudf/cudf/core/buffer/spill_manager.py | 12 ++++++------ python/cudf/cudf/core/buffer/spillable_buffer.py | 6 +++--- 2 files changed, 9 insertions(+), 9 deletions(-) diff --git a/python/cudf/cudf/core/buffer/spill_manager.py b/python/cudf/cudf/core/buffer/spill_manager.py index 2d0de22acaa..5373e8692b5 100644 --- a/python/cudf/cudf/core/buffer/spill_manager.py +++ b/python/cudf/cudf/core/buffer/spill_manager.py @@ -437,13 +437,13 @@ def get_global_manager() -> Optional[SpillManager]: def set_spill_on_demand_globally() -> None: """Enable spill on demand in the current global spill manager. - Warning, this modify the current RMM memory resource. A memory resource - to handle out-of-memory errors are pushed on the RMM memory resource stack. + Warning: this modifies the current RMM memory resource. A memory resource + to handle out-of-memory errors is pushed onto the RMM memory resource stack. Raises ------ ValueError - If no global spill manager exist (spilling is disable). + If no global spill manager exists (spilling is disabled). ValueError If a failure callback resource is already in the resource stack. """ @@ -473,14 +473,14 @@ def set_spill_on_demand_globally() -> None: def spill_on_demand_globally(): """Context to enable spill on demand temporary. - Warning, this modify the current RMM memory resource. A memory resource - to handle out-of-memory errors are pushed on the RMM memory resource stack + Warning: this modifies the current RMM memory resource. A memory resource + to handle out-of-memory errors is pushed onto the RMM memory resource stack when entering the context and popped again when exiting. Raises ------ ValueError - If no global spill manager exist (spilling is disable). + If no global spill manager exists (spilling is disabled). ValueError If a failure callback resource is already in the resource stack. ValueError diff --git a/python/cudf/cudf/core/buffer/spillable_buffer.py b/python/cudf/cudf/core/buffer/spillable_buffer.py index 2bc5361555c..3263f164997 100644 --- a/python/cudf/cudf/core/buffer/spillable_buffer.py +++ b/python/cudf/cudf/core/buffer/spillable_buffer.py @@ -450,9 +450,9 @@ def copy(self, deep: bool = True) -> Self: # modified. owner = self._owner._from_host_memory(self.memoryview()) return self.__class__(owner=owner, offset=0, size=owner.size) - else: - with acquire_spill_lock(): - return super().copy(deep=deep) + + with acquire_spill_lock(): + return super().copy(deep=deep) @property def __cuda_array_interface__(self) -> dict: From a4a3ff4955139e79134a71546d7cbb828c388350 Mon Sep 17 00:00:00 2001 From: "Mads R. B. Kristensen" Date: Thu, 4 Apr 2024 08:36:27 +0200 Subject: [PATCH 20/27] BufferOwner: forcing keyword-only --- python/cudf/cudf/core/buffer/buffer.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/python/cudf/cudf/core/buffer/buffer.py b/python/cudf/cudf/core/buffer/buffer.py index cab9573a3d2..3f162e5f7f8 100644 --- a/python/cudf/cudf/core/buffer/buffer.py +++ b/python/cudf/cudf/core/buffer/buffer.py @@ -119,6 +119,7 @@ class BufferOwner(Serializable): def __init__( self, + *, ptr: int, size: int, owner: object, @@ -176,7 +177,7 @@ def _from_device_memory(cls, data: Any, exposed: bool) -> Self: size = data.size else: ptr, size = get_ptr_and_size(data.__cuda_array_interface__) - return cls(ptr, size, owner=data, exposed=exposed) + return cls(ptr=ptr, size=size, owner=data, exposed=exposed) @classmethod def _from_host_memory(cls, data: Any) -> Self: From 504ae9aded96a818c8ddc5264dbd6d0130a9365c Mon Sep 17 00:00:00 2001 From: "Mads R. B. Kristensen" Date: Thu, 4 Apr 2024 09:44:26 +0200 Subject: [PATCH 21/27] typo --- python/cudf/cudf/core/buffer/spill_manager.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/python/cudf/cudf/core/buffer/spill_manager.py b/python/cudf/cudf/core/buffer/spill_manager.py index 5373e8692b5..cd81149bdb8 100644 --- a/python/cudf/cudf/core/buffer/spill_manager.py +++ b/python/cudf/cudf/core/buffer/spill_manager.py @@ -471,7 +471,7 @@ def set_spill_on_demand_globally() -> None: @contextmanager def spill_on_demand_globally(): - """Context to enable spill on demand temporary. + """Context to enable spill on demand temporarily. Warning: this modifies the current RMM memory resource. A memory resource to handle out-of-memory errors is pushed onto the RMM memory resource stack From ac6831a2f4f2ce1652598de66480d04a21cc7a59 Mon Sep 17 00:00:00 2001 From: "Mads R. B. Kristensen" Date: Thu, 11 Apr 2024 10:58:58 +0200 Subject: [PATCH 22/27] doc --- python/cudf/cudf/core/buffer/buffer.py | 27 ++++++++++++++++---------- 1 file changed, 17 insertions(+), 10 deletions(-) diff --git a/python/cudf/cudf/core/buffer/buffer.py b/python/cudf/cudf/core/buffer/buffer.py index 6ab7d3ef5ba..28d3fff888f 100644 --- a/python/cudf/cudf/core/buffer/buffer.py +++ b/python/cudf/cudf/core/buffer/buffer.py @@ -108,6 +108,23 @@ class BufferOwner(Serializable): Use `_from_device_memory` and `_from_host_memory` to create a new instance from either device or host memory respectively. + + Parameters + ---------- + ptr + An integer representing a pointer to memory. + size + The size of the memory in nbytes + owner + Python object to which the lifetime of the memory allocation is tied. + This buffer will keep a reference to `owner`. + exposed + Pointer to the underlying memory + + Raises + ------ + ValueError + If size is negative """ _ptr: int @@ -125,16 +142,6 @@ def __init__( owner: object, exposed: bool, ): - """Create a new buffer owner. - - Do not use this directly, instead use `_from_device_memory` or - `_from_host_memory`. - - Raises - ------ - ValueError - If size is negative - """ if size < 0: raise ValueError("size cannot be negative") From 49625fae50eb132bda578a08ad8c7bfe6caf489b Mon Sep 17 00:00:00 2001 From: "Mads R. B. Kristensen" Date: Thu, 11 Apr 2024 11:00:28 +0200 Subject: [PATCH 23/27] rename _from_*_memory => from_*_memory --- python/cudf/cudf/core/buffer/buffer.py | 14 +++++++------- python/cudf/cudf/core/buffer/spillable_buffer.py | 14 +++++++------- python/cudf/cudf/core/buffer/utils.py | 4 ++-- 3 files changed, 16 insertions(+), 16 deletions(-) diff --git a/python/cudf/cudf/core/buffer/buffer.py b/python/cudf/cudf/core/buffer/buffer.py index 28d3fff888f..b2aba4f978b 100644 --- a/python/cudf/cudf/core/buffer/buffer.py +++ b/python/cudf/cudf/core/buffer/buffer.py @@ -106,7 +106,7 @@ class BufferOwner(Serializable): been accessed outside of BufferOwner. In this case, we have no control over knowing if the data is being modified by a third party. - Use `_from_device_memory` and `_from_host_memory` to create + Use `from_device_memory` and `from_host_memory` to create a new instance from either device or host memory respectively. Parameters @@ -152,7 +152,7 @@ def __init__( self._slices = weakref.WeakSet() @classmethod - def _from_device_memory(cls, data: Any, exposed: bool) -> Self: + def from_device_memory(cls, data: Any, exposed: bool) -> Self: """Create from an object providing a `__cuda_array_interface__`. No data is being copied. @@ -187,7 +187,7 @@ def _from_device_memory(cls, data: Any, exposed: bool) -> Self: return cls(ptr=ptr, size=size, owner=data, exposed=exposed) @classmethod - def _from_host_memory(cls, data: Any) -> Self: + def from_host_memory(cls, data: Any) -> Self: """Create an owner from a buffer or array like object Data must implement `__array_interface__`, the buffer protocol, and/or @@ -215,7 +215,7 @@ def _from_host_memory(cls, data: Any) -> Self: # Copy to device memory buf = rmm.DeviceBuffer(ptr=ptr, size=size) # Create from device memory - return cls._from_device_memory(buf, exposed=False) + return cls.from_device_memory(buf, exposed=False) @property def size(self) -> int: @@ -394,7 +394,7 @@ def copy(self, deep: bool = True) -> Self: ) # Otherwise, we create a new copy of the memory - owner = self._owner._from_device_memory( + owner = self._owner.from_device_memory( rmm.DeviceBuffer( ptr=self._owner.get_ptr(mode="read") + self._offset, size=self.size, @@ -458,9 +458,9 @@ def deserialize(cls, header: dict, frames: list) -> Self: owner_type: BufferOwner = pickle.loads(header["owner-type-serialized"]) if hasattr(frame, "__cuda_array_interface__"): - owner = owner_type._from_device_memory(frame, exposed=False) + owner = owner_type.from_device_memory(frame, exposed=False) else: - owner = owner_type._from_host_memory(frame) + owner = owner_type.from_host_memory(frame) return cls( owner=owner, offset=0, diff --git a/python/cudf/cudf/core/buffer/spillable_buffer.py b/python/cudf/cudf/core/buffer/spillable_buffer.py index e80d3f912e4..a1af3ba8c9d 100644 --- a/python/cudf/cudf/core/buffer/spillable_buffer.py +++ b/python/cudf/cudf/core/buffer/spillable_buffer.py @@ -94,8 +94,8 @@ class SpillableBufferOwner(BufferOwner): def _finalize_init(self, ptr_desc: Dict[str, Any]) -> None: """Finish initialization of the spillable buffer - This implements the common initialization that `_from_device_memory` - and `_from_host_memory` are missing. + This implements the common initialization that `from_device_memory` + and `from_host_memory` are missing. Parameters ---------- @@ -120,7 +120,7 @@ def _finalize_init(self, ptr_desc: Dict[str, Any]) -> None: self._manager.add(self) @classmethod - def _from_device_memory(cls, data: Any, exposed: bool) -> Self: + def from_device_memory(cls, data: Any, exposed: bool) -> Self: """Create a spillabe buffer from device memory. No data is being copied. @@ -137,12 +137,12 @@ def _from_device_memory(cls, data: Any, exposed: bool) -> Self: SpillableBufferOwner Buffer representing the same device memory as `data` """ - ret = super()._from_device_memory(data, exposed=exposed) + ret = super().from_device_memory(data, exposed=exposed) ret._finalize_init(ptr_desc={"type": "gpu"}) return ret @classmethod - def _from_host_memory(cls, data: Any) -> Self: + def from_host_memory(cls, data: Any) -> Self: """Create a spillabe buffer from host memory. Data must implement `__array_interface__`, the buffer protocol, and/or @@ -426,7 +426,7 @@ def serialize(self) -> Tuple[dict, list]: ptr, size, _ = self.memory_info() frames = [ Buffer( - owner=BufferOwner._from_device_memory( + owner=BufferOwner.from_device_memory( cuda_array_interface_wrapper( ptr=ptr, size=size, @@ -448,7 +448,7 @@ def copy(self, deep: bool = True) -> Self: # In this case, we make the new copy point to the same spilled # data in host memory. We can do this since spilled data is never # modified. - owner = self._owner._from_host_memory(self.memoryview()) + owner = self._owner.from_host_memory(self.memoryview()) return self.__class__(owner=owner, offset=0, size=owner.size) with acquire_spill_lock(): diff --git a/python/cudf/cudf/core/buffer/utils.py b/python/cudf/cudf/core/buffer/utils.py index c2ec7effd13..3346d05ed4a 100644 --- a/python/cudf/cudf/core/buffer/utils.py +++ b/python/cudf/cudf/core/buffer/utils.py @@ -133,13 +133,13 @@ def as_buffer( if not hasattr(data, "__cuda_array_interface__"): if exposed: raise ValueError("cannot created exposed host memory") - return buffer_class(owner=owner_class._from_host_memory(data)) + return buffer_class(owner=owner_class.from_host_memory(data)) # Check if `data` is owned by a known class owner = get_buffer_owner(data) if owner is None: # `data` is new device memory return buffer_class( - owner=owner_class._from_device_memory(data, exposed=exposed) + owner=owner_class.from_device_memory(data, exposed=exposed) ) # At this point, we know that `data` is owned by a known class, which From f4458b81f71e99fc701938dac703f97d984aca79 Mon Sep 17 00:00:00 2001 From: "Mads R. B. Kristensen" Date: Thu, 11 Apr 2024 11:13:09 +0200 Subject: [PATCH 24/27] enable must of test_series_zero_copy_cow_off again --- python/cudf/cudf/tests/test_copying.py | 15 ++++++++------- 1 file changed, 8 insertions(+), 7 deletions(-) diff --git a/python/cudf/cudf/tests/test_copying.py b/python/cudf/cudf/tests/test_copying.py index e357bc942b3..0bc9ffa8004 100644 --- a/python/cudf/cudf/tests/test_copying.py +++ b/python/cudf/cudf/tests/test_copying.py @@ -305,11 +305,8 @@ def test_series_zero_copy_cow_on(): def test_series_zero_copy_cow_off(): - if get_global_manager() is not None: - pytest.skip( - "cannot test zero-copy and no-copy-on-write when " - "spilling is enabled globally, set `CUDF_SPILL=off`" - ) + is_spill_enabled = get_global_manager() is not None + with cudf.option_context("copy_on_write", False): s = cudf.Series([1, 2, 3, 4, 5]) s1 = s.copy(deep=False) @@ -342,8 +339,12 @@ def test_series_zero_copy_cow_off(): assert_eq(s, cudf.Series([20, 10, 10, 4, 5])) assert_eq(s1, cudf.Series([20, 10, 10, 4, 5])) assert_eq(cp_array, cp.array([20, 10, 10, 4, 5])) - assert_eq(s2, cudf.Series([20, 10, 10, 4, 5])) - assert_eq(s3, cudf.Series([20, 10, 10, 4, 5])) + if not is_spill_enabled: + # Since spilling might make a copy of the data, we cannot + # expect the two series to be a zero-copy of the cupy array + # when spilling is enabled globally. + assert_eq(s2, cudf.Series([20, 10, 10, 4, 5])) + assert_eq(s3, cudf.Series([20, 10, 10, 4, 5])) s4 = cudf.Series([10, 20, 30, 40, 50]) s5 = cudf.Series(s4) From 30cd8e622bf7d6acf20e24fdc63ea905e0c0568b Mon Sep 17 00:00:00 2001 From: "Mads R. B. Kristensen" Date: Thu, 11 Apr 2024 15:26:13 +0200 Subject: [PATCH 25/27] doc --- python/cudf/cudf/core/buffer/exposure_tracked_buffer.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/python/cudf/cudf/core/buffer/exposure_tracked_buffer.py b/python/cudf/cudf/core/buffer/exposure_tracked_buffer.py index 55edcac4b10..15f00fc670d 100644 --- a/python/cudf/cudf/core/buffer/exposure_tracked_buffer.py +++ b/python/cudf/cudf/core/buffer/exposure_tracked_buffer.py @@ -94,7 +94,7 @@ def make_single_owner_inplace(self) -> None: if len(self.owner._slices) > 1: # If this is not the only slice pointing to `self.owner`, we - # point to a new deep copy of the owner. + # point to a new copy of our slice of `self.owner`. t = self.copy(deep=True) self._owner = t.owner self._offset = t._offset From 580deadf049aa153b891d45c74a27864fb6b95ee Mon Sep 17 00:00:00 2001 From: "Mads R. B. Kristensen" Date: Thu, 11 Apr 2024 15:28:02 +0200 Subject: [PATCH 26/27] Update python/cudf/cudf/tests/test_spilling.py Co-authored-by: Lawrence Mitchell --- python/cudf/cudf/tests/test_spilling.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/python/cudf/cudf/tests/test_spilling.py b/python/cudf/cudf/tests/test_spilling.py index e6125dc4af8..d9d19ee2a21 100644 --- a/python/cudf/cudf/tests/test_spilling.py +++ b/python/cudf/cudf/tests/test_spilling.py @@ -53,7 +53,7 @@ def set_rmm_memory_pool(nbytes: int): mr = rmm.mr.get_current_device_resource() rmm.mr.set_current_device_resource( rmm.mr.PoolMemoryResource( - rmm.mr.get_current_device_resource(), + mr, initial_pool_size=nbytes, maximum_pool_size=nbytes, ) From 6fe2d58fb195a129cc74502155a253ff00898ed1 Mon Sep 17 00:00:00 2001 From: "Mads R. B. Kristensen" Date: Thu, 11 Apr 2024 16:49:27 +0200 Subject: [PATCH 27/27] more tests --- python/cudf/cudf/tests/test_spilling.py | 34 ++++++++++++++++++++++++- 1 file changed, 33 insertions(+), 1 deletion(-) diff --git a/python/cudf/cudf/tests/test_spilling.py b/python/cudf/cudf/tests/test_spilling.py index d9d19ee2a21..913a958b4c2 100644 --- a/python/cudf/cudf/tests/test_spilling.py +++ b/python/cudf/cudf/tests/test_spilling.py @@ -734,7 +734,8 @@ def test_spilling_and_copy_on_write(manager: SpillManager): # Write access trigger copy of `a` into `b` but since `a` is spilled # the copy is done in host memory and `a` remains spilled. - b.get_ptr(mode="write") + with acquire_spill_lock(): + b.get_ptr(mode="write") assert a.is_spilled assert not b.is_spilled @@ -752,3 +753,34 @@ def test_spilling_and_copy_on_write(manager: SpillManager): assert a.spillable assert not a.is_spilled assert not b.is_spilled + + b = a.copy(deep=False) + assert a.owner == b.owner + # Write access trigger copy of `a` into `b` in device memory + with acquire_spill_lock(): + b.get_ptr(mode="write") + assert a.owner != b.owner + assert not a.is_spilled + assert not b.is_spilled + # And `a` and `b` is now seperated with there one spilling status + a.spill(target="cpu") + assert a.is_spilled + assert not b.is_spilled + b.spill(target="cpu") + assert a.is_spilled + assert b.is_spilled + + # Read access with a spill lock unspill `a` and allows copy-on-write + with acquire_spill_lock(): + a.get_ptr(mode="read") + b = a.copy(deep=False) + assert a.owner == b.owner + assert not a.is_spilled + + # Read access without a spill lock exposes `a` and forces a deep copy + a.get_ptr(mode="read") + b = a.copy(deep=False) + assert a.owner != b.owner + assert not a.is_spilled + assert a.owner.exposed + assert not b.owner.exposed