Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Refactor cython interface: copying.pyx #10359

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
193 changes: 90 additions & 103 deletions python/cudf/cudf/_lib/copying.pyx
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ from cudf._lib.cpp.table.table cimport table
from cudf._lib.cpp.table.table_view cimport table_view
from cudf._lib.cpp.types cimport size_type
from cudf._lib.utils cimport (
columns_from_table_view,
columns_from_unique_ptr,
data_from_table_view,
data_from_unique_ptr,
Expand Down Expand Up @@ -166,7 +167,7 @@ def copy_range(Column input_column,


def gather(
columns: list,
list columns,
Column gather_map,
bool nullify=False
):
Expand All @@ -190,60 +191,80 @@ def gather(
return columns_from_unique_ptr(move(c_result))


def scatter(object source, Column scatter_map, Column target_column,
bool bounds_check=True):
"""
Scattering input into target as per the scatter map,
input can be a list of scalars or can be a table
"""

cdef column_view scatter_map_view = scatter_map.view()
cdef table_view target_table_view = table_view_from_columns(
(target_column,))
cdef bool c_bounds_check = bounds_check
cdef scatter_scalar(list source_device_slrs,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Something to think about: is there a better way for us to handle scalar vs column functions? We have this division in a lot of our Cython layers, can we think of a way to reduce it? One option would be to define generic scalar/column functions (not sure exactly what that would look like) and then passing specialized functions as arguments to handle different features. This is a very speculative idea, so no need to have an answer in this PR, but I do dislike duplicating this scalar/column division over and over.

column_view scatter_map,
table_view target_table,
bool bounds_check):
cdef vector[reference_wrapper[constscalar]] c_source
cdef DeviceScalar d_slr
cdef unique_ptr[table] c_result

# Needed for the table branch
cdef table_view source_table_view
c_source.reserve(len(source_device_slrs))
for d_slr in source_device_slrs:
c_source.push_back(
reference_wrapper[constscalar](d_slr.get_raw_ptr()[0])
)

with nogil:
c_result = move(
cpp_copying.scatter(
c_source,
scatter_map,
target_table,
bounds_check
)
)

# Needed for the scalar branch
cdef vector[reference_wrapper[constscalar]] source_scalars
cdef DeviceScalar slr
return columns_from_unique_ptr(move(c_result))

if isinstance(source, Column):
source_table_view = table_view_from_columns((<Column> source,))

with nogil:
c_result = move(
cpp_copying.scatter(
source_table_view,
scatter_map_view,
target_table_view,
c_bounds_check
)
)
else:
slr = as_device_scalar(source, target_column.dtype)
source_scalars.push_back(reference_wrapper[constscalar](
slr.get_raw_ptr()[0]))
cdef scatter_column(list source_columns,
column_view scatter_map,
table_view target_table,
bool bounds_check):
cdef table_view c_source = table_view_from_columns(source_columns)
cdef unique_ptr[table] c_result

with nogil:
c_result = move(
cpp_copying.scatter(
source_scalars,
scatter_map_view,
target_table_view,
c_bounds_check
)
with nogil:
c_result = move(
cpp_copying.scatter(
c_source,
scatter_map,
target_table,
bounds_check
)
)
return columns_from_unique_ptr(move(c_result))

data, _ = data_from_unique_ptr(
move(c_result),
column_names=(None,),
index_names=None
)

return next(iter(data.values()))
def scatter(list sources, Column scatter_map, list target_columns,
bool bounds_check=True):
"""
Scattering source into target as per the scatter map.
`source` can be a list of scalars, or a list of columns. The number of
items in `sources` must equal the number of `target_columns` to scatter.
"""
# TODO: Only single column scatter is used, we should explore multi-column
# scatter for frames for performance increase.

if len(sources) != len(target_columns):
raise ValueError("Mismatched number of source and target columns.")

if len(sources) == 0:
return []

cdef column_view scatter_map_view = scatter_map.view()
cdef table_view target_table_view = table_view_from_columns(target_columns)

if isinstance(sources[0], Column):
return scatter_column(
sources, scatter_map_view, target_table_view, bounds_check
)
else:
source_scalars = [as_device_scalar(slr) for slr in sources]
return scatter_scalar(
source_scalars, scatter_map_view, target_table_view, bounds_check
)


def column_empty_like(Column input_column):
Expand Down Expand Up @@ -281,24 +302,14 @@ def column_allocate_like(Column input_column, size=None):
return Column.from_unique_ptr(move(c_result))


def table_empty_like(input_table, bool keep_index=True):

cdef table_view input_table_view = table_view_from_table(
input_table, not keep_index
)

def columns_empty_like(list input_columns):
cdef table_view input_table_view = table_view_from_columns(input_columns)
cdef unique_ptr[table] c_result

with nogil:
c_result = move(cpp_copying.empty_like(input_table_view))

return data_from_unique_ptr(
move(c_result),
column_names=input_table._column_names,
index_names=(
input_table._index._column_names if keep_index is True else None
)
)
return columns_from_unique_ptr(move(c_result))


def column_slice(Column input_column, object indices):
Expand Down Expand Up @@ -330,40 +341,30 @@ def column_slice(Column input_column, object indices):
return result


def table_slice(input_table, object indices, bool keep_index=True):

cdef table_view input_table_view = table_view_from_table(
input_table, not keep_index
)

cdef vector[size_type] c_indices
c_indices.reserve(len(indices))
def columns_slice(list input_columns, list indices):
"""
Given a list of input columns, return columns sliced by ``indices``.

Returns a list of list of columns. The length of return is
`len(indices) / 2`. The `i`th item in return is a list of columns sliced
from ``input_columns`` with `slice(indices[i*2], indices[i*2 + 1])`.
"""
cdef table_view input_table_view = table_view_from_columns(input_columns)
cdef vector[size_type] c_indices = indices
cdef vector[table_view] c_result

cdef int index
for index in indices:
c_indices.push_back(index)

with nogil:
c_result = move(
cpp_copying.slice(
input_table_view,
c_indices)
)

num_of_result_cols = c_result.size()
return [
data_from_table_view(
c_result[i],
input_table,
column_names=input_table._column_names,
index_names=(
input_table._index._column_names if (
keep_index is True)
else None
)
) for i in range(num_of_result_cols)]
columns_from_table_view(
c_result[i], input_columns
) for i in range(c_result.size())
]


def column_split(Column input_column, object splits):
Expand Down Expand Up @@ -397,38 +398,24 @@ def column_split(Column input_column, object splits):
return result


def table_split(input_table, object splits, bool keep_index=True):

cdef table_view input_table_view = table_view_from_table(
input_table, not keep_index
)

cdef vector[size_type] c_splits
c_splits.reserve(len(splits))
def columns_split(list input_columns, object splits):

cdef table_view input_table_view = table_view_from_columns(input_columns)
cdef vector[size_type] c_splits = splits
cdef vector[table_view] c_result

cdef int split
for split in splits:
c_splits.push_back(split)

with nogil:
c_result = move(
cpp_copying.split(
input_table_view,
c_splits)
)

num_of_result_cols = c_result.size()
return [
data_from_table_view(
c_result[i],
input_table,
column_names=input_table._column_names,
index_names=input_table._index_names if (
keep_index is True)
else None
) for i in range(num_of_result_cols)]
columns_from_table_view(
c_result[i], input_columns
) for i in range(c_result.size())
]


def _copy_if_else_column_column(Column lhs, Column rhs, Column boolean_mask):
Expand Down
6 changes: 3 additions & 3 deletions python/cudf/cudf/_lib/stream_compaction.pyx
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ from cudf._lib.utils cimport (
)


def drop_nulls(columns: list, how="any", keys=None, thresh=None):
def drop_nulls(list columns, how="any", keys=None, thresh=None):
"""
Drops null rows from cols depending on key columns.

Expand Down Expand Up @@ -75,7 +75,7 @@ def drop_nulls(columns: list, how="any", keys=None, thresh=None):
return columns_from_unique_ptr(move(c_result))


def apply_boolean_mask(columns: list, Column boolean_mask):
def apply_boolean_mask(list columns, Column boolean_mask):
"""
Drops the rows which correspond to False in boolean_mask.

Expand Down Expand Up @@ -104,7 +104,7 @@ def apply_boolean_mask(columns: list, Column boolean_mask):
return columns_from_unique_ptr(move(c_result))


def drop_duplicates(columns: list,
def drop_duplicates(list columns,
object keys=None,
object keep='first',
bool nulls_are_equal=True):
Expand Down
3 changes: 2 additions & 1 deletion python/cudf/cudf/_lib/utils.pxd
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
# Copyright (c) 2020-2021, NVIDIA CORPORATION.
# Copyright (c) 2020-2022, NVIDIA CORPORATION.

from libcpp.memory cimport unique_ptr
from libcpp.string cimport string
Expand All @@ -17,3 +17,4 @@ cdef data_from_table_view(
cdef table_view table_view_from_columns(columns) except *
cdef table_view table_view_from_table(tbl, ignore_index=*) except*
cdef columns_from_unique_ptr(unique_ptr[table] c_tbl)
cdef columns_from_table_view(table_view tv, object owners)
18 changes: 18 additions & 0 deletions python/cudf/cudf/_lib/utils.pyx
Original file line number Diff line number Diff line change
Expand Up @@ -311,6 +311,24 @@ cdef data_from_unique_ptr(
}
return data, index

cdef columns_from_table_view(
isVoid marked this conversation as resolved.
Show resolved Hide resolved
table_view tv,
object owners,
):
"""
Given a ``cudf::table_view``, construsts a list of columns from it,
along with referencing an ``owner`` Python object that owns the memory
lifetime. ``owner`` must be either None or a list of column. If ``owner``
is a list of columns, the owner of the `i`th ``cudf::column_view`` in the
table view is ``owners[i]``. For more about memory ownership,
see ``Column.from_column_view``.
"""

return [
Column.from_column_view(
tv.column(i), owners[i] if isinstance(owners, list) else None
) for i in range(tv.num_columns())
]

cdef data_from_table_view(
table_view tv,
Expand Down
3 changes: 3 additions & 0 deletions python/cudf/cudf/core/_base_index.py
Original file line number Diff line number Diff line change
Expand Up @@ -1522,6 +1522,9 @@ def _split_columns_by_levels(self, levels):
[],
)

def _split(self, splits):
raise NotImplementedError

def sample(
self,
n=None,
Expand Down
6 changes: 3 additions & 3 deletions python/cudf/cudf/core/column/column.py
Original file line number Diff line number Diff line change
Expand Up @@ -593,9 +593,9 @@ def _scatter_by_column(
[value], [self], key
)[0]._with_type_metadata(self.dtype)
else:
return libcudf.copying.scatter(
value, key, self
)._with_type_metadata(self.dtype)
return libcudf.copying.scatter([value], key, [self])[
0
]._with_type_metadata(self.dtype)
except RuntimeError as e:
if "out of bounds" in str(e):
raise IndexError(
Expand Down
Loading