Skip to content

Commit

Permalink
Refactor cython interface: copying.pyx (#10359)
Browse files Browse the repository at this point in the history
Part of #10153 

Aside from the two harder cases: `boolean_mask_scatter` and `sample` that's been addressed in #10202  and #10262 , this PR tackles rest of refactors that's in `copying.pyx`, in combination of the other two, this PR should address all interface refactor in `copying.pyx`.

Authors:
  - Michael Wang (https://github.com/isVoid)
  - Vyas Ramasubramani (https://github.com/vyasr)

Approvers:
  - Vyas Ramasubramani (https://github.com/vyasr)

URL: #10359
  • Loading branch information
isVoid authored Mar 8, 2022
1 parent 600b872 commit b3dc9d6
Show file tree
Hide file tree
Showing 10 changed files with 201 additions and 158 deletions.
193 changes: 90 additions & 103 deletions python/cudf/cudf/_lib/copying.pyx
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ from cudf._lib.cpp.table.table cimport table
from cudf._lib.cpp.table.table_view cimport table_view
from cudf._lib.cpp.types cimport size_type
from cudf._lib.utils cimport (
columns_from_table_view,
columns_from_unique_ptr,
data_from_table_view,
data_from_unique_ptr,
Expand Down Expand Up @@ -166,7 +167,7 @@ def copy_range(Column input_column,


def gather(
columns: list,
list columns,
Column gather_map,
bool nullify=False
):
Expand All @@ -190,60 +191,80 @@ def gather(
return columns_from_unique_ptr(move(c_result))


def scatter(object source, Column scatter_map, Column target_column,
bool bounds_check=True):
"""
Scattering input into target as per the scatter map,
input can be a list of scalars or can be a table
"""

cdef column_view scatter_map_view = scatter_map.view()
cdef table_view target_table_view = table_view_from_columns(
(target_column,))
cdef bool c_bounds_check = bounds_check
cdef scatter_scalar(list source_device_slrs,
column_view scatter_map,
table_view target_table,
bool bounds_check):
cdef vector[reference_wrapper[constscalar]] c_source
cdef DeviceScalar d_slr
cdef unique_ptr[table] c_result

# Needed for the table branch
cdef table_view source_table_view
c_source.reserve(len(source_device_slrs))
for d_slr in source_device_slrs:
c_source.push_back(
reference_wrapper[constscalar](d_slr.get_raw_ptr()[0])
)

with nogil:
c_result = move(
cpp_copying.scatter(
c_source,
scatter_map,
target_table,
bounds_check
)
)

# Needed for the scalar branch
cdef vector[reference_wrapper[constscalar]] source_scalars
cdef DeviceScalar slr
return columns_from_unique_ptr(move(c_result))

if isinstance(source, Column):
source_table_view = table_view_from_columns((<Column> source,))

with nogil:
c_result = move(
cpp_copying.scatter(
source_table_view,
scatter_map_view,
target_table_view,
c_bounds_check
)
)
else:
slr = as_device_scalar(source, target_column.dtype)
source_scalars.push_back(reference_wrapper[constscalar](
slr.get_raw_ptr()[0]))
cdef scatter_column(list source_columns,
column_view scatter_map,
table_view target_table,
bool bounds_check):
cdef table_view c_source = table_view_from_columns(source_columns)
cdef unique_ptr[table] c_result

with nogil:
c_result = move(
cpp_copying.scatter(
source_scalars,
scatter_map_view,
target_table_view,
c_bounds_check
)
with nogil:
c_result = move(
cpp_copying.scatter(
c_source,
scatter_map,
target_table,
bounds_check
)
)
return columns_from_unique_ptr(move(c_result))

data, _ = data_from_unique_ptr(
move(c_result),
column_names=(None,),
index_names=None
)

return next(iter(data.values()))
def scatter(list sources, Column scatter_map, list target_columns,
bool bounds_check=True):
"""
Scattering source into target as per the scatter map.
`source` can be a list of scalars, or a list of columns. The number of
items in `sources` must equal the number of `target_columns` to scatter.
"""
# TODO: Only single column scatter is used, we should explore multi-column
# scatter for frames for performance increase.

if len(sources) != len(target_columns):
raise ValueError("Mismatched number of source and target columns.")

if len(sources) == 0:
return []

cdef column_view scatter_map_view = scatter_map.view()
cdef table_view target_table_view = table_view_from_columns(target_columns)

if isinstance(sources[0], Column):
return scatter_column(
sources, scatter_map_view, target_table_view, bounds_check
)
else:
source_scalars = [as_device_scalar(slr) for slr in sources]
return scatter_scalar(
source_scalars, scatter_map_view, target_table_view, bounds_check
)


def column_empty_like(Column input_column):
Expand Down Expand Up @@ -281,24 +302,14 @@ def column_allocate_like(Column input_column, size=None):
return Column.from_unique_ptr(move(c_result))


def table_empty_like(input_table, bool keep_index=True):

cdef table_view input_table_view = table_view_from_table(
input_table, not keep_index
)

def columns_empty_like(list input_columns):
cdef table_view input_table_view = table_view_from_columns(input_columns)
cdef unique_ptr[table] c_result

with nogil:
c_result = move(cpp_copying.empty_like(input_table_view))

return data_from_unique_ptr(
move(c_result),
column_names=input_table._column_names,
index_names=(
input_table._index._column_names if keep_index is True else None
)
)
return columns_from_unique_ptr(move(c_result))


def column_slice(Column input_column, object indices):
Expand Down Expand Up @@ -330,40 +341,30 @@ def column_slice(Column input_column, object indices):
return result


def table_slice(input_table, object indices, bool keep_index=True):

cdef table_view input_table_view = table_view_from_table(
input_table, not keep_index
)

cdef vector[size_type] c_indices
c_indices.reserve(len(indices))
def columns_slice(list input_columns, list indices):
"""
Given a list of input columns, return columns sliced by ``indices``.
Returns a list of list of columns. The length of return is
`len(indices) / 2`. The `i`th item in return is a list of columns sliced
from ``input_columns`` with `slice(indices[i*2], indices[i*2 + 1])`.
"""
cdef table_view input_table_view = table_view_from_columns(input_columns)
cdef vector[size_type] c_indices = indices
cdef vector[table_view] c_result

cdef int index
for index in indices:
c_indices.push_back(index)

with nogil:
c_result = move(
cpp_copying.slice(
input_table_view,
c_indices)
)

num_of_result_cols = c_result.size()
return [
data_from_table_view(
c_result[i],
input_table,
column_names=input_table._column_names,
index_names=(
input_table._index._column_names if (
keep_index is True)
else None
)
) for i in range(num_of_result_cols)]
columns_from_table_view(
c_result[i], input_columns
) for i in range(c_result.size())
]


def column_split(Column input_column, object splits):
Expand Down Expand Up @@ -397,38 +398,24 @@ def column_split(Column input_column, object splits):
return result


def table_split(input_table, object splits, bool keep_index=True):

cdef table_view input_table_view = table_view_from_table(
input_table, not keep_index
)

cdef vector[size_type] c_splits
c_splits.reserve(len(splits))
def columns_split(list input_columns, object splits):

cdef table_view input_table_view = table_view_from_columns(input_columns)
cdef vector[size_type] c_splits = splits
cdef vector[table_view] c_result

cdef int split
for split in splits:
c_splits.push_back(split)

with nogil:
c_result = move(
cpp_copying.split(
input_table_view,
c_splits)
)

num_of_result_cols = c_result.size()
return [
data_from_table_view(
c_result[i],
input_table,
column_names=input_table._column_names,
index_names=input_table._index_names if (
keep_index is True)
else None
) for i in range(num_of_result_cols)]
columns_from_table_view(
c_result[i], input_columns
) for i in range(c_result.size())
]


def _copy_if_else_column_column(Column lhs, Column rhs, Column boolean_mask):
Expand Down
6 changes: 3 additions & 3 deletions python/cudf/cudf/_lib/stream_compaction.pyx
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ from cudf._lib.utils cimport (
)


def drop_nulls(columns: list, how="any", keys=None, thresh=None):
def drop_nulls(list columns, how="any", keys=None, thresh=None):
"""
Drops null rows from cols depending on key columns.
Expand Down Expand Up @@ -75,7 +75,7 @@ def drop_nulls(columns: list, how="any", keys=None, thresh=None):
return columns_from_unique_ptr(move(c_result))


def apply_boolean_mask(columns: list, Column boolean_mask):
def apply_boolean_mask(list columns, Column boolean_mask):
"""
Drops the rows which correspond to False in boolean_mask.
Expand Down Expand Up @@ -104,7 +104,7 @@ def apply_boolean_mask(columns: list, Column boolean_mask):
return columns_from_unique_ptr(move(c_result))


def drop_duplicates(columns: list,
def drop_duplicates(list columns,
object keys=None,
object keep='first',
bool nulls_are_equal=True):
Expand Down
3 changes: 2 additions & 1 deletion python/cudf/cudf/_lib/utils.pxd
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
# Copyright (c) 2020-2021, NVIDIA CORPORATION.
# Copyright (c) 2020-2022, NVIDIA CORPORATION.

from libcpp.memory cimport unique_ptr
from libcpp.string cimport string
Expand All @@ -17,3 +17,4 @@ cdef data_from_table_view(
cdef table_view table_view_from_columns(columns) except *
cdef table_view table_view_from_table(tbl, ignore_index=*) except*
cdef columns_from_unique_ptr(unique_ptr[table] c_tbl)
cdef columns_from_table_view(table_view tv, object owners)
18 changes: 18 additions & 0 deletions python/cudf/cudf/_lib/utils.pyx
Original file line number Diff line number Diff line change
Expand Up @@ -311,6 +311,24 @@ cdef data_from_unique_ptr(
}
return data, index

cdef columns_from_table_view(
table_view tv,
object owners,
):
"""
Given a ``cudf::table_view``, construsts a list of columns from it,
along with referencing an ``owner`` Python object that owns the memory
lifetime. ``owner`` must be either None or a list of column. If ``owner``
is a list of columns, the owner of the `i`th ``cudf::column_view`` in the
table view is ``owners[i]``. For more about memory ownership,
see ``Column.from_column_view``.
"""

return [
Column.from_column_view(
tv.column(i), owners[i] if isinstance(owners, list) else None
) for i in range(tv.num_columns())
]

cdef data_from_table_view(
table_view tv,
Expand Down
3 changes: 3 additions & 0 deletions python/cudf/cudf/core/_base_index.py
Original file line number Diff line number Diff line change
Expand Up @@ -1522,6 +1522,9 @@ def _split_columns_by_levels(self, levels):
[],
)

def _split(self, splits):
raise NotImplementedError

def sample(
self,
n=None,
Expand Down
6 changes: 3 additions & 3 deletions python/cudf/cudf/core/column/column.py
Original file line number Diff line number Diff line change
Expand Up @@ -593,9 +593,9 @@ def _scatter_by_column(
[value], [self], key
)[0]._with_type_metadata(self.dtype)
else:
return libcudf.copying.scatter(
value, key, self
)._with_type_metadata(self.dtype)
return libcudf.copying.scatter([value], key, [self])[
0
]._with_type_metadata(self.dtype)
except RuntimeError as e:
if "out of bounds" in str(e):
raise IndexError(
Expand Down
Loading

0 comments on commit b3dc9d6

Please sign in to comment.