Skip to content

Commit

Permalink
Merge pull request #3 from shwina/c-o-w-2
Browse files Browse the repository at this point in the history
C o w 2
  • Loading branch information
galipremsagar authored Dec 13, 2022
2 parents b55f039 + 2e8fc97 commit b415fa1
Show file tree
Hide file tree
Showing 7 changed files with 186 additions and 370 deletions.
6 changes: 3 additions & 3 deletions python/cudf/cudf/_lib/column.pyi
Original file line number Diff line number Diff line change
Expand Up @@ -68,9 +68,9 @@ class Column:
@property
def children(self) -> Tuple[ColumnBase, ...]: ...
def set_base_children(self, value: Tuple[ColumnBase, ...]) -> None: ...
def _detach_refs(self, zero_copied=False) -> None: ...
def _has_a_weakref(self) -> bool: ...
def _is_cai_zero_copied(self) -> bool: ...
def _unlink_shared_buffers(self, zero_copied=False) -> None: ...
def _is_shared_buffers(self) -> bool: ...
def _buffers_zero_copied(self) -> bool: ...
def _mimic_inplace(
self, other_col: ColumnBase, inplace=False
) -> Optional[ColumnBase]: ...
Expand Down
59 changes: 24 additions & 35 deletions python/cudf/cudf/_lib/column.pyx
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ import cudf._lib as libcudf
from cudf.api.types import is_categorical_dtype
from cudf.core.buffer import (
Buffer,
RefCountableBuffer,
CopyOnWriteBuffer,
SpillableBuffer,
SpillLock,
acquire_spill_lock,
Expand Down Expand Up @@ -74,7 +74,6 @@ cdef class Column:
self.set_base_children(children)
self.set_base_data(data)
self.set_base_mask(mask)
self._zero_copied = False

@property
def base_size(self):
Expand Down Expand Up @@ -217,7 +216,7 @@ cdef class Column:
# be expensive.
hasattr(value, "__cuda_array_interface__")
):
if isinstance(value, RefCountableBuffer):
if isinstance(value, CopyOnWriteBuffer):
value = SimpleNamespace(
__cuda_array_interface__=(
value._cuda_array_interface_readonly
Expand Down Expand Up @@ -314,49 +313,39 @@ cdef class Column:
self._children = None
self._base_children = value

def _has_a_weakref(self) -> bool:
def _is_shared_buffers(self) -> bool:
"""
Determines if the column has a weak reference.
Determines if any of the buffers underneath the column
have been shared else-where.
"""

return (
(
isinstance(self.base_data, RefCountableBuffer) and
self.base_data._has_a_weakref()
)
or
(
isinstance(self.base_mask, RefCountableBuffer) and
self.base_mask._has_a_weakref()
)
is_data_shared = (
isinstance(self.base_data, CopyOnWriteBuffer) and
self.base_data._is_shared()
)
is_mask_shared = (
isinstance(self.base_mask, CopyOnWriteBuffer) and
self.base_mask._is_shared()
)
return is_mask_shared or is_data_shared

def _is_cai_zero_copied(self) -> bool:
"""
Determines if the column is zero copied.
"""
return (
self._zero_copied or
(
self.base_data is not None and
isinstance(self.base_data, RefCountableBuffer) and
self.base_data._is_cai_zero_copied()
) or
(
self.base_mask is not None and
isinstance(self.base_mask, RefCountableBuffer) and
self.base_mask._is_cai_zero_copied()
)
def _buffers_zero_copied(self):
data_zero_copied = (
isinstance(self.base_data, CopyOnWriteBuffer) and
self.base_data._zero_copied
)
mask_zero_copied = (
isinstance(self.base_mask, CopyOnWriteBuffer) and
self.base_mask._zero_copied
)
return data_zero_copied or mask_zero_copied

def _detach_refs(self, zero_copied=False):
def _unlink_shared_buffers(self, zero_copied=False):
"""
Detaches a column from its current Buffers by making
a true deep-copy.
"""
if not self._is_cai_zero_copied() and self._has_a_weakref():
if not self._buffers_zero_copied() and self._is_shared_buffers():
new_col = self.force_deep_copy()

self._offset = new_col.offset
self._size = new_col.size
self._dtype = new_col._dtype
Expand Down
2 changes: 1 addition & 1 deletion python/cudf/cudf/core/buffer/__init__.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,10 @@
# Copyright (c) 2022, NVIDIA CORPORATION.

from cudf.core.buffer.buffer import Buffer, cuda_array_interface_wrapper
from cudf.core.buffer.cow_buffer import CopyOnWriteBuffer
from cudf.core.buffer.spillable_buffer import SpillableBuffer, SpillLock
from cudf.core.buffer.utils import (
acquire_spill_lock,
as_buffer,
get_spill_lock,
)
from cudf.core.buffer.weakrefable_buffer import RefCountableBuffer
151 changes: 151 additions & 0 deletions python/cudf/cudf/core/buffer/cow_buffer.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,151 @@
# Copyright (c) 2022, NVIDIA CORPORATION.

from __future__ import annotations

import copy
from collections import defaultdict
from typing import Any, DefaultDict, Tuple, Type, TypeVar
from weakref import WeakSet

import rmm

from cudf.core.buffer.buffer import Buffer, cuda_array_interface_wrapper

T = TypeVar("T", bound="CopyOnWriteBuffer")


class CopyOnWriteBuffer(Buffer):
"""A Buffer represents device memory.
Use the factory function `as_buffer` to create a Buffer instance.
"""

# This dict keeps track of all instances that have the same `ptr`
# and `size` attributes. Each key of the dict is a `(ptr, size)`
# tuple and the corresponding value is a set of weak references to
# instances with that `ptr` and `size`.
_instances: DefaultDict[Tuple, WeakSet] = defaultdict(WeakSet)

# TODO: This is synonymous to SpillableBuffer._exposed attribute
# and has to be merged.
_zero_copied: bool

def _finalize_init(self):
# the last step in initializing a `CopyOnWriteBuffer`
# is to track it in `_instances`:
key = (self.ptr, self.size)
self.__class__._instances[key].add(self)
self._zero_copied = False

@classmethod
def _from_device_memory(cls: Type[T], data: Any) -> T:
"""Create a Buffer from an object exposing `__cuda_array_interface__`.
No data is being copied.
Parameters
----------
data : device-buffer-like
An object implementing the CUDA Array Interface.
Returns
-------
Buffer
Buffer representing the same device memory as `data`
"""

# Bypass `__init__` and initialize attributes manually
ret = super()._from_device_memory(data)
ret._finalize_init()
return ret

@classmethod
def _from_host_memory(cls: Type[T], data: Any) -> T:
ret = super()._from_host_memory(data)
ret._finalize_init()
return ret

def _is_shared(self):
"""
Return `True` if `self`'s memory is shared with other columns.
"""
return len(self.__class__._instances[(self.ptr, self.size)]) > 1

def copy(self, deep: bool = True):
"""
Return a copy of Buffer.
Parameters
----------
deep : bool, default True
If True, returns a deep-copy of the underlying Buffer data.
If False, returns a shallow-copy of the Buffer pointing to
the same underlying data.
Returns
-------
Buffer
"""
if not deep:
copied_buf = CopyOnWriteBuffer.__new__(CopyOnWriteBuffer)
copied_buf._ptr = self._ptr
copied_buf._size = self._size
copied_buf._owner = self._owner
copied_buf._finalize_init()
return copied_buf
else:
owner_copy: rmm.DeviceBuffer = copy.copy(self._owner)
return self._from_device_memory(
cuda_array_interface_wrapper(
ptr=owner_copy.ptr,
size=owner_copy.size,
owner=owner_copy,
)
)

@property
def _cuda_array_interface_readonly(self) -> dict:
"""
Internal Implementation for the CUDA Array Interface without
triggering a deepcopy.
"""
return {
"data": (self.ptr, True),
"shape": (self.size,),
"strides": None,
"typestr": "|u1",
"version": 0,
}

@property
def __cuda_array_interface__(self) -> dict:
# Detach if there are any weak-references.

# Mark the Buffer as ``zero_copied=True``,
# which will prevent any copy-on-write
# mechanism post this operation.
# This is done because we don't have any
# control over knowing if a third-party library
# has modified the data this Buffer is
# pointing to.
self._unlink_shared_buffers(zero_copied=True)

result = self._cuda_array_interface_readonly
result["data"] = (self.ptr, False)
return result

def _unlink_shared_buffers(self, zero_copied=False):
"""
Unlinks a Buffer if it is shared with other buffers(i.e., weak references exist) by making
a true deep-copy.
"""
if not self._zero_copied and self._shallow_copied():
# make a deep copy of existing DeviceBuffer
# and replace pointer to it.
current_buf = rmm.DeviceBuffer(ptr=self.ptr, size=self.size)
new_buf = current_buf.copy()
self._ptr = new_buf.ptr
self._size = new_buf.size
self._owner = new_buf
self._finalize_init()
self._zero_copied = zero_copied
8 changes: 4 additions & 4 deletions python/cudf/cudf/core/buffer/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,9 +7,9 @@
from typing import Any, Dict, Optional, Tuple, Union

from cudf.core.buffer.buffer import Buffer, cuda_array_interface_wrapper
from cudf.core.buffer.cow_buffer import CopyOnWriteBuffer
from cudf.core.buffer.spill_manager import get_global_manager
from cudf.core.buffer.spillable_buffer import SpillableBuffer, SpillLock
from cudf.core.buffer.weakrefable_buffer import RefCountableBuffer
from cudf.options import get_option


Expand Down Expand Up @@ -74,11 +74,11 @@ def as_buffer(
)

if get_option("copy_on_write"):
if isinstance(data, (Buffer, RefCountableBuffer)) or hasattr(
if isinstance(data, (Buffer, CopyOnWriteBuffer)) or hasattr(
data, "__cuda_array_interface__"
):
return RefCountableBuffer._from_device_memory(data)
return RefCountableBuffer._from_host_memory(data)
return CopyOnWriteBuffer._from_device_memory(data)
return CopyOnWriteBuffer._from_host_memory(data)
if get_global_manager() is not None:
if hasattr(data, "__cuda_array_interface__"):
return SpillableBuffer._from_device_memory(data, exposed=exposed)
Expand Down
Loading

0 comments on commit b415fa1

Please sign in to comment.