Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Unify Copy-On-Write and Spilling #15436

Merged
merged 30 commits into from
Apr 20, 2024
Merged
Show file tree
Hide file tree
Changes from 18 commits
Commits
Show all changes
30 commits
Select commit Hold shift + click to select a range
f2bf171
clean up
madsbk Apr 2, 2024
276dd80
remove redundant methods
madsbk Apr 2, 2024
2413e99
impl. and use BufferOwner.__init__
madsbk Apr 2, 2024
9f4b0e1
SpillableBufferOwner: use __init__
madsbk Apr 2, 2024
834f6d5
remove SpillableBuffer.mark_exposed()
madsbk Apr 2, 2024
68524e4
remove SpillableBuffer.exposed
madsbk Apr 2, 2024
2e43f85
SpillableBuffer(ExposureTrackedBuffer)
madsbk Apr 2, 2024
659e832
mark cow tests as "spilling"
madsbk Apr 3, 2024
c5355a5
impl. SpillableBuffer.copy
madsbk Apr 3, 2024
3581579
option: allow cow and spilling at the same time
madsbk Apr 3, 2024
58f7e8e
test: skip zero-copy and no-copy-on-write test whe spilling is enabled
madsbk Apr 3, 2024
6018949
cleanup
madsbk Apr 3, 2024
f48c529
impl. set_spill_on_demand_globally
madsbk Apr 3, 2024
dcff299
impl. spill_on_demand_globally
madsbk Apr 3, 2024
4a8b43e
cleanup
madsbk Apr 3, 2024
7344f9a
impl. test_spilling_and_copy_on_write
madsbk Apr 3, 2024
1e9d061
don't copy spilled data
madsbk Apr 3, 2024
dc715e1
test_get_rmm_memory_resource_stack is safe again
madsbk Apr 3, 2024
3eaf5e3
Apply suggestions from code review
madsbk Apr 4, 2024
a4a3ff4
BufferOwner: forcing keyword-only
madsbk Apr 4, 2024
55027fc
Merge branch 'branch-24.06' of github.com:rapidsai/cudf into cow_and_…
madsbk Apr 4, 2024
504ae9a
typo
madsbk Apr 4, 2024
53106ee
Merge branch 'branch-24.06' of github.com:rapidsai/cudf into cow_and_…
madsbk Apr 11, 2024
ac6831a
doc
madsbk Apr 11, 2024
49625fa
rename _from_*_memory => from_*_memory
madsbk Apr 11, 2024
f4458b8
enable must of test_series_zero_copy_cow_off again
madsbk Apr 11, 2024
30cd8e6
doc
madsbk Apr 11, 2024
580dead
Update python/cudf/cudf/tests/test_spilling.py
madsbk Apr 11, 2024
6fe2d58
more tests
madsbk Apr 11, 2024
51b4b82
Merge branch 'branch-24.06' of github.com:rapidsai/cudf into cow_and_…
madsbk Apr 17, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
47 changes: 29 additions & 18 deletions python/cudf/cudf/core/buffer/buffer.py
Original file line number Diff line number Diff line change
Expand Up @@ -117,11 +117,31 @@ class BufferOwner(Serializable):
# The set of buffers that point to this owner.
_slices: weakref.WeakSet[Buffer]

def __init__(self):
raise ValueError(
f"do not create a {self.__class__} directly, please "
"use the factory function `cudf.core.buffer.as_buffer`"
)
def __init__(
self,
ptr: int,
size: int,
owner: object,
exposed: bool,
):
"""Create a new buffer owner.

Do not use this directly, instead use `_from_device_memory` or
`_from_host_memory`.
vyasr marked this conversation as resolved.
Show resolved Hide resolved

Raises
------
ValueError
If size is negative
"""
if size < 0:
raise ValueError("size cannot be negative")

self._ptr = ptr
self._size = size
self._owner = owner
self._exposed = exposed
self._slices = weakref.WeakSet()
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Notice, I have removed the direct-use-of-Buffer guard. I think the four mandatory arguments are enough to prevent accidental use.


@classmethod
def _from_device_memory(cls, data: Any, exposed: bool) -> Self:
Expand Down Expand Up @@ -151,21 +171,12 @@ def _from_device_memory(cls, data: Any, exposed: bool) -> Self:
If the resulting buffer has negative size
"""

# Bypass `__init__` and initialize attributes manually
ret = cls.__new__(cls)
ret._owner = data
ret._exposed = exposed
ret._slices = weakref.WeakSet()
if isinstance(data, rmm.DeviceBuffer): # Common case shortcut
ret._ptr = data.ptr
ret._size = data.size
ptr = data.ptr
size = data.size
else:
ret._ptr, ret._size = get_ptr_and_size(
data.__cuda_array_interface__
)
if ret.size < 0:
raise ValueError("size cannot be negative")
return ret
ptr, size = get_ptr_and_size(data.__cuda_array_interface__)
return cls(ptr, size, owner=data, exposed=exposed)

@classmethod
def _from_host_memory(cls, data: Any) -> Self:
Expand Down
16 changes: 5 additions & 11 deletions python/cudf/cudf/core/buffer/exposure_tracked_buffer.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,20 +23,14 @@ class ExposureTrackedBuffer(Buffer):
The size of the slice (in bytes)
"""

_owner: BufferOwner

def __init__(
self,
owner: BufferOwner,
offset: int = 0,
size: Optional[int] = None,
) -> None:
super().__init__(owner=owner, offset=offset, size=size)
self._owner._slices.add(self)

@property
def exposed(self) -> bool:
return self._owner.exposed
self.owner._slices.add(self)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

question: should we assert isinstance(owner, BufferOwner) here? Or is it not necessary because we implicitly require it via the _slices attribute?

Copy link
Member Author

@madsbk madsbk Apr 11, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I guess, we could add assert isinstance(owner, BufferOwner) in Buffer.__init__ but I am not sure if that is pythonic?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

An assertion would basically be there to protect against developer error. There's nothing unpythonic about it, that's definitely what the functionality is there for. The main question I would have is, how would we end up with something other than a BufferOwner here, and how bad would the failure mode be? Unless the assertion makes it markedly easier to debug the code it doesn't add a whole lot of value IMHO.

Copy link
Member Author

@madsbk madsbk Apr 17, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it is arguable a little unpythonic to prevent duck-typing by using isinstance to force a specify type (as opposed to a general ABC/protocol type).
Anyways, in this case as_buffer() is the only one creating ExposureTrackedBuffer and it explicitly sets the owner to SpillableBufferOwner | SpillableBuffer | BufferOwner so I don't think a type error here is likely.


def get_ptr(self, *, mode: Literal["read", "write"]) -> int:
if mode == "write" and cudf.get_option("copy_on_write"):
Expand Down Expand Up @@ -72,7 +66,7 @@ def copy(self, deep: bool = True) -> Self:
copy-on-write option (see above).
"""
if cudf.get_option("copy_on_write"):
return super().copy(deep=deep or self.exposed)
return super().copy(deep=deep or self.owner.exposed)
return super().copy(deep=deep)

@property
Expand All @@ -98,11 +92,11 @@ def make_single_owner_inplace(self) -> None:
Buffer representing the same device memory as `data`
"""

if len(self._owner._slices) > 1:
# If this is not the only slice pointing to `self._owner`, we
if len(self.owner._slices) > 1:
# If this is not the only slice pointing to `self.owner`, we
# point to a new deep copy of the owner.
madsbk marked this conversation as resolved.
Show resolved Hide resolved
t = self.copy(deep=True)
self._owner = t._owner
self._owner = t.owner
self._offset = t._offset
self._size = t._size
self._owner._slices.add(self)
101 changes: 75 additions & 26 deletions python/cudf/cudf/core/buffer/spill_manager.py
vyasr marked this conversation as resolved.
Show resolved Hide resolved
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
import warnings
import weakref
from collections import defaultdict
from contextlib import contextmanager
from dataclasses import dataclass
from functools import partial
from typing import Dict, List, Optional, Tuple
Expand Down Expand Up @@ -201,10 +202,6 @@ class SpillManager:
This class implements tracking of all known spillable buffers, on-demand
spilling of said buffers, and (optionally) maintains a memory usage limit.

When `spill_on_demand=True`, the manager registers an RMM out-of-memory
error handler, which will spill spillable buffers in order to free up
memory.

When `device_memory_limit=<limit-in-bytes>`, the manager will try keep
the device memory usage below the specified limit by spilling of spillable
buffers continuously, which will introduce a modest overhead.
Expand All @@ -213,8 +210,6 @@ class SpillManager:

Parameters
----------
spill_on_demand : bool
Enable spill on demand.
device_memory_limit: int, optional
If not None, this is the device memory limit in bytes that triggers
device to host spilling. The global manager sets this to the value
Expand All @@ -230,30 +225,15 @@ class SpillManager:
def __init__(
self,
*,
spill_on_demand: bool = False,
device_memory_limit: Optional[int] = None,
statistic_level: int = 0,
) -> None:
self._lock = threading.Lock()
self._buffers = weakref.WeakValueDictionary()
self._id_counter = 0
self._spill_on_demand = spill_on_demand
self._device_memory_limit = device_memory_limit
self.statistics = SpillStatistics(statistic_level)

if self._spill_on_demand:
# Set the RMM out-of-memory handle if not already set
mr = rmm.mr.get_current_device_resource()
if all(
not isinstance(m, rmm.mr.FailureCallbackResourceAdaptor)
for m in get_rmm_memory_resource_stack(mr)
):
rmm.mr.set_current_device_resource(
rmm.mr.FailureCallbackResourceAdaptor(
mr, self._out_of_memory_handle
)
)

def _out_of_memory_handle(self, nbytes: int, *, retry_once=True) -> bool:
"""Try to handle an out-of-memory error by spilling

Expand Down Expand Up @@ -408,8 +388,7 @@ def __repr__(self) -> str:
dev_limit = format_bytes(self._device_memory_limit)

return (
f"<SpillManager spill_on_demand={self._spill_on_demand} "
f"device_memory_limit={dev_limit} | "
f"<SpillManager device_memory_limit={dev_limit} | "
f"{format_bytes(spilled)} spilled | "
f"{format_bytes(unspilled)} ({unspillable_ratio:.0%}) "
f"unspilled (unspillable)>"
Expand Down Expand Up @@ -442,12 +421,82 @@ def get_global_manager() -> Optional[SpillManager]:
"""Get the global manager or None if spilling is disabled"""
global _global_manager_uninitialized
if _global_manager_uninitialized:
manager = None
if get_option("spill"):
manager = SpillManager(
spill_on_demand=get_option("spill_on_demand"),
device_memory_limit=get_option("spill_device_limit"),
statistic_level=get_option("spill_stats"),
)
set_global_manager(manager)
set_global_manager(manager)
if get_option("spill_on_demand"):
set_spill_on_demand_globally()
else:
set_global_manager(None)
return _global_manager


def set_spill_on_demand_globally() -> None:
"""Enable spill on demand in the current global spill manager.

Warning, this modify the current RMM memory resource. A memory resource
to handle out-of-memory errors are pushed on the RMM memory resource stack.
madsbk marked this conversation as resolved.
Show resolved Hide resolved

Raises
------
ValueError
If no global spill manager exist (spilling is disable).
madsbk marked this conversation as resolved.
Show resolved Hide resolved
ValueError
If a failure callback resource is already in the resource stack.
"""

manager = get_global_manager()
if manager is None:
raise ValueError(
"Cannot enable spill on demand with no global spill manager"
)
mr = rmm.mr.get_current_device_resource()
if any(
isinstance(m, rmm.mr.FailureCallbackResourceAdaptor)
for m in get_rmm_memory_resource_stack(mr)
):
raise ValueError(
"Spill on demand (or another failure callback resource) "
"is already registered"
)
rmm.mr.set_current_device_resource(
rmm.mr.FailureCallbackResourceAdaptor(
mr, manager._out_of_memory_handle
)
)


@contextmanager
def spill_on_demand_globally():
"""Context to enable spill on demand temporary.

Warning, this modify the current RMM memory resource. A memory resource
to handle out-of-memory errors are pushed on the RMM memory resource stack
when entering the context and popped again when exiting.
madsbk marked this conversation as resolved.
Show resolved Hide resolved

Raises
------
ValueError
If no global spill manager exist (spilling is disable).
madsbk marked this conversation as resolved.
Show resolved Hide resolved
ValueError
If a failure callback resource is already in the resource stack.
ValueError
If the RMM memory source stack was changed while in the context.
"""
set_spill_on_demand_globally()
# Save the new memory resource stack for later cleanup
mr_stack = get_rmm_memory_resource_stack(
rmm.mr.get_current_device_resource()
)
try:
yield
finally:
mr = rmm.mr.get_current_device_resource()
if mr_stack != get_rmm_memory_resource_stack(mr):
raise ValueError(
"RMM memory source stack was changed while in the context"
)
rmm.mr.set_current_device_resource(mr_stack[1])
47 changes: 20 additions & 27 deletions python/cudf/cudf/core/buffer/spillable_buffer.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
cuda_array_interface_wrapper,
host_memory_allocation,
)
from cudf.core.buffer.exposure_tracked_buffer import ExposureTrackedBuffer
from cudf.utils.nvtx_annotation import _get_color_for_nvtx, annotate
from cudf.utils.string import format_bytes

Expand Down Expand Up @@ -170,11 +171,7 @@ def _from_host_memory(cls, data: Any) -> Self:
data = data.cast("B") # Make sure itemsize==1

# Create an already spilled buffer
ret = cls.__new__(cls)
ret._owner = None
ret._ptr = 0
ret._size = data.nbytes
ret._exposed = False
ret = cls(ptr=0, size=data.nbytes, owner=None, exposed=False)
madsbk marked this conversation as resolved.
Show resolved Hide resolved
ret._finalize_init(ptr_desc={"type": "cpu", "memoryview": data})
return ret

Expand Down Expand Up @@ -372,21 +369,8 @@ def __str__(self) -> str:
)


class SpillableBuffer(Buffer):
"""A slice of a spillable buffer

This buffer applies the slicing and then delegates all
operations to its owning buffer.

Parameters
----------
owner : SpillableBufferOwner
The owner of the view
offset : int
Memory offset into the owning buffer
size : int
Size of the view (in bytes)
"""
class SpillableBuffer(ExposureTrackedBuffer):
galipremsagar marked this conversation as resolved.
Show resolved Hide resolved
"""A slice of a spillable buffer"""

_owner: SpillableBufferOwner

Expand All @@ -397,10 +381,6 @@ def spill(self, target: str = "cpu") -> None:
def is_spilled(self) -> bool:
return self._owner.is_spilled

@property
def exposed(self) -> bool:
return self._owner.exposed

@property
def spillable(self) -> bool:
return self._owner.spillable
Expand All @@ -412,9 +392,6 @@ def memory_info(self) -> Tuple[int, int, str]:
(ptr, _, device_type) = self._owner.memory_info()
return (ptr + self._offset, self.nbytes, device_type)

def mark_exposed(self) -> None:
self._owner.mark_exposed()

def serialize(self) -> Tuple[dict, list]:
"""Serialize the Buffer

Expand Down Expand Up @@ -461,6 +438,22 @@ def serialize(self) -> Tuple[dict, list]:
]
return header, frames

def copy(self, deep: bool = True) -> Self:
from cudf.core.buffer.utils import acquire_spill_lock
wence- marked this conversation as resolved.
Show resolved Hide resolved

if not deep:
return super().copy(deep=False)

if self.is_spilled:
# In this case, we make the new copy point to the same spilled
# data in host memory. We can do this since spilled data is never
# modified.
owner = self._owner._from_host_memory(self.memoryview())
return self.__class__(owner=owner, offset=0, size=owner.size)
wence- marked this conversation as resolved.
Show resolved Hide resolved
else:
with acquire_spill_lock():
return super().copy(deep=deep)
madsbk marked this conversation as resolved.
Show resolved Hide resolved

@property
def __cuda_array_interface__(self) -> dict:
return {
Expand Down
15 changes: 1 addition & 14 deletions python/cudf/cudf/options.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
# Copyright (c) 2022-2023, NVIDIA CORPORATION.
# Copyright (c) 2022-2024, NVIDIA CORPORATION.

import os
import textwrap
Expand Down Expand Up @@ -152,26 +152,13 @@ def _validator(val):


def _cow_validator(val):
if get_option("spill") and val:
raise ValueError(
"Copy-on-write is not supported when spilling is enabled. "
"Please set `spill` to `False`"
)
if val not in {False, True}:
raise ValueError(
f"{val} is not a valid option. Must be one of {{False, True}}."
)


def _spill_validator(val):
try:
if get_option("copy_on_write") and val:
raise ValueError(
"Spilling is not supported when copy-on-write is enabled. "
"Please set `copy_on_write` to `False`"
)
except KeyError:
pass
if val not in {False, True}:
raise ValueError(
f"{val} is not a valid option. Must be one of {{False, True}}."
Expand Down
Loading
Loading