From 639da4acc62a885c1aaee0700eae12143a8858cd Mon Sep 17 00:00:00 2001 From: Matthew Roeschke <10647082+mroeschke@users.noreply.github.com> Date: Fri, 22 Nov 2024 14:05:29 -0800 Subject: [PATCH] Remove cudf._lib.lists in favor of inlining pylibcudf --- python/cudf/cudf/_lib/CMakeLists.txt | 1 - python/cudf/cudf/_lib/lists.pyx | 150 --------------------- python/cudf/cudf/core/column/lists.py | 180 ++++++++++++++++++++----- python/cudf/cudf/core/indexed_frame.py | 19 ++- 4 files changed, 159 insertions(+), 191 deletions(-) delete mode 100644 python/cudf/cudf/_lib/lists.pyx diff --git a/python/cudf/cudf/_lib/CMakeLists.txt b/python/cudf/cudf/_lib/CMakeLists.txt index 8a521f19350..e3fa2fe2b2a 100644 --- a/python/cudf/cudf/_lib/CMakeLists.txt +++ b/python/cudf/cudf/_lib/CMakeLists.txt @@ -24,7 +24,6 @@ set(cython_sources interop.pyx join.pyx json.pyx - lists.pyx merge.pyx null_mask.pyx orc.pyx diff --git a/python/cudf/cudf/_lib/lists.pyx b/python/cudf/cudf/_lib/lists.pyx deleted file mode 100644 index 90a137dd546..00000000000 --- a/python/cudf/cudf/_lib/lists.pyx +++ /dev/null @@ -1,150 +0,0 @@ -# Copyright (c) 2021-2024, NVIDIA CORPORATION. - -from cudf.core.buffer import acquire_spill_lock - -from libcpp cimport bool - -from pylibcudf.libcudf.types cimport size_type - -from cudf._lib.column cimport Column -from cudf._lib.utils cimport columns_from_pylibcudf_table - -import pylibcudf as plc - - -@acquire_spill_lock() -def count_elements(Column col): - return Column.from_pylibcudf( - plc.lists.count_elements( - col.to_pylibcudf(mode="read")) - ) - - -@acquire_spill_lock() -def explode_outer(list source_columns, int explode_column_idx): - return columns_from_pylibcudf_table( - plc.lists.explode_outer( - plc.Table([c.to_pylibcudf(mode="read") for c in source_columns]), - explode_column_idx, - ) - ) - - -@acquire_spill_lock() -def distinct(Column col, bool nulls_equal, bool nans_all_equal): - return Column.from_pylibcudf( - plc.lists.distinct( - col.to_pylibcudf(mode="read"), - ( - plc.types.NullEquality.EQUAL - if nulls_equal - else plc.types.NullEquality.UNEQUAL - ), - ( - plc.types.NanEquality.ALL_EQUAL - if nans_all_equal - else plc.types.NanEquality.UNEQUAL - ), - ) - ) - - -@acquire_spill_lock() -def sort_lists(Column col, bool ascending, str na_position): - return Column.from_pylibcudf( - plc.lists.sort_lists( - col.to_pylibcudf(mode="read"), - plc.types.Order.ASCENDING if ascending else plc.types.Order.DESCENDING, - ( - plc.types.NullOrder.BEFORE - if na_position == "first" - else plc.types.NullOrder.AFTER - ), - False, - ) - ) - - -@acquire_spill_lock() -def extract_element_scalar(Column col, size_type index): - return Column.from_pylibcudf( - plc.lists.extract_list_element( - col.to_pylibcudf(mode="read"), - index, - ) - ) - - -@acquire_spill_lock() -def extract_element_column(Column col, Column index): - return Column.from_pylibcudf( - plc.lists.extract_list_element( - col.to_pylibcudf(mode="read"), - index.to_pylibcudf(mode="read"), - ) - ) - - -@acquire_spill_lock() -def contains_scalar(Column col, py_search_key): - return Column.from_pylibcudf( - plc.lists.contains( - col.to_pylibcudf(mode="read"), - py_search_key.device_value.c_value, - ) - ) - - -@acquire_spill_lock() -def index_of_scalar(Column col, object py_search_key): - return Column.from_pylibcudf( - plc.lists.index_of( - col.to_pylibcudf(mode="read"), - py_search_key.device_value.c_value, - plc.lists.DuplicateFindOption.FIND_FIRST, - ) - ) - - -@acquire_spill_lock() -def index_of_column(Column col, Column search_keys): - return Column.from_pylibcudf( - plc.lists.index_of( - col.to_pylibcudf(mode="read"), - search_keys.to_pylibcudf(mode="read"), - plc.lists.DuplicateFindOption.FIND_FIRST, - ) - ) - - -@acquire_spill_lock() -def concatenate_rows(list source_columns): - return Column.from_pylibcudf( - plc.lists.concatenate_rows( - plc.Table([ - c.to_pylibcudf(mode="read") for c in source_columns - ]) - ) - ) - - -@acquire_spill_lock() -def concatenate_list_elements(Column input_column, dropna=False): - return Column.from_pylibcudf( - plc.lists.concatenate_list_elements( - input_column.to_pylibcudf(mode="read"), - plc.lists.ConcatenateNullPolicy.IGNORE - if dropna - else plc.lists.ConcatenateNullPolicy.NULLIFY_OUTPUT_ROW, - ) - ) - - -@acquire_spill_lock() -def segmented_gather(Column source_column, Column gather_map): - return Column.from_pylibcudf( - plc.lists.segmented_gather( - source_column.to_pylibcudf(mode="read"), - gather_map.to_pylibcudf(mode="read"), - ) - ) diff --git a/python/cudf/cudf/core/column/lists.py b/python/cudf/cudf/core/column/lists.py index 6b25e568f00..9962663e811 100644 --- a/python/cudf/cudf/core/column/lists.py +++ b/python/cudf/cudf/core/column/lists.py @@ -2,31 +2,21 @@ from __future__ import annotations +import itertools from functools import cached_property -from typing import TYPE_CHECKING, cast +from typing import TYPE_CHECKING, Literal, cast -import numpy as np import pandas as pd import pyarrow as pa from typing_extensions import Self +import pylibcudf as plc + import cudf -from cudf._lib.lists import ( - concatenate_list_elements, - concatenate_rows, - contains_scalar, - count_elements, - distinct, - extract_element_column, - extract_element_scalar, - index_of_column, - index_of_scalar, - segmented_gather, - sort_lists, -) from cudf._lib.strings.convert.convert_lists import format_list_column from cudf._lib.types import size_type_dtype from cudf.api.types import _is_non_decimal_numeric_dtype, is_scalar +from cudf.core.buffer import acquire_spill_lock from cudf.core.column import ColumnBase, as_column, column from cudf.core.column.methods import ColumnMethods, ParentType from cudf.core.column.numerical import NumericalColumn @@ -139,7 +129,7 @@ def _binaryop(self, other: ColumnBinaryOperand, op: str) -> ColumnBase: return NotImplemented if isinstance(other.dtype, ListDtype): if op == "__add__": - return concatenate_rows([self, other]) + return self.concatenate_rows([other]) # type: ignore[list-item] else: raise NotImplementedError( "Lists concatenation for this operation is not yet" @@ -326,6 +316,129 @@ def to_pandas( else: return pd.Index(self.to_arrow().tolist(), dtype="object") + @acquire_spill_lock() + def count_elements(self) -> ColumnBase: + return type(self).from_pylibcudf( + plc.lists.count_elements(self.to_pylibcudf(mode="read")) + ) + + @acquire_spill_lock() + def distinct(self, nulls_equal: bool, nans_all_equal: bool) -> ColumnBase: + return type(self).from_pylibcudf( + plc.lists.distinct( + self.to_pylibcudf(mode="read"), + ( + plc.types.NullEquality.EQUAL + if nulls_equal + else plc.types.NullEquality.UNEQUAL + ), + ( + plc.types.NanEquality.ALL_EQUAL + if nans_all_equal + else plc.types.NanEquality.UNEQUAL + ), + ) + ) + + @acquire_spill_lock() + def sort_lists( + self, ascending: bool, na_position: Literal["first", "last"] + ) -> ColumnBase: + return type(self).from_pylibcudf( + plc.lists.sort_lists( + self.to_pylibcudf(mode="read"), + plc.types.Order.ASCENDING + if ascending + else plc.types.Order.DESCENDING, + ( + plc.types.NullOrder.BEFORE + if na_position == "first" + else plc.types.NullOrder.AFTER + ), + False, + ) + ) + + @acquire_spill_lock() + def extract_element_scalar(self, index: int) -> ColumnBase: + return type(self).from_pylibcudf( + plc.lists.extract_list_element( + self.to_pylibcudf(mode="read"), + index, + ) + ) + + @acquire_spill_lock() + def extract_element_column(self, index: ColumnBase) -> ColumnBase: + return type(self).from_pylibcudf( + plc.lists.extract_list_element( + self.to_pylibcudf(mode="read"), + index.to_pylibcudf(mode="read"), + ) + ) + + @acquire_spill_lock() + def contains_scalar(self, search_key: cudf.Scalar) -> ColumnBase: + return type(self).from_pylibcudf( + plc.lists.contains( + self.to_pylibcudf(mode="read"), + search_key.device_value.c_value, + ) + ) + + @acquire_spill_lock() + def index_of_scalar(self, search_key: cudf.Scalar) -> ColumnBase: + return type(self).from_pylibcudf( + plc.lists.index_of( + self.to_pylibcudf(mode="read"), + search_key.device_value.c_value, + plc.lists.DuplicateFindOption.FIND_FIRST, + ) + ) + + @acquire_spill_lock() + def index_of_column(self, search_keys: ColumnBase) -> ColumnBase: + return type(self).from_pylibcudf( + plc.lists.index_of( + self.to_pylibcudf(mode="read"), + search_keys.to_pylibcudf(mode="read"), + plc.lists.DuplicateFindOption.FIND_FIRST, + ) + ) + + @acquire_spill_lock() + def concatenate_rows(self, other_columns: list[ColumnBase]) -> ColumnBase: + return type(self).from_pylibcudf( + plc.lists.concatenate_rows( + plc.Table( + [ + col.to_pylibcudf(mode="read") + for col in itertools.chain([self], other_columns) + ] + ) + ) + ) + + @acquire_spill_lock() + def concatenate_list_elements(self, dropna: bool) -> ColumnBase: + return type(self).from_pylibcudf( + plc.lists.concatenate_list_elements( + self.to_pylibcudf(mode="read"), + plc.lists.ConcatenateNullPolicy.IGNORE + if dropna + else plc.lists.ConcatenateNullPolicy.NULLIFY_OUTPUT_ROW, + ) + ) + + @acquire_spill_lock() + def segmented_gather(self, gather_map: ColumnBase) -> ColumnBase: + return type(self).from_pylibcudf( + plc.lists.segmented_gather( + self.to_pylibcudf(mode="read"), + gather_map.to_pylibcudf(mode="read"), + ) + ) + class ListMethods(ColumnMethods): """ @@ -397,18 +510,16 @@ def get( 2 6 dtype: int64 """ - if is_scalar(index): - out = extract_element_scalar(self._column, cudf.Scalar(index)) + if isinstance(index, int): + out = self._column.extract_element_scalar(index) else: index = as_column(index) - out = extract_element_column(self._column, as_column(index)) + out = self._column.extract_element_column(index) if not (default is None or default is NA): # determine rows for which `index` is out-of-bounds - lengths = count_elements(self._column) - out_of_bounds_mask = (np.negative(index) > lengths) | ( - index >= lengths - ) + lengths = self._column.count_elements() + out_of_bounds_mask = ((-1 * index) > lengths) | (index >= lengths) # replace the value in those rows (should be NA) with `default` if out_of_bounds_mask.any(): @@ -445,7 +556,7 @@ def contains(self, search_key: ScalarLike) -> ParentType: dtype: bool """ return self._return_or_inplace( - contains_scalar(self._column, cudf.Scalar(search_key)) + self._column.contains_scalar(cudf.Scalar(search_key)) ) def index(self, search_key: ScalarLike | ColumnLike) -> ParentType: @@ -494,13 +605,10 @@ def index(self, search_key: ScalarLike | ColumnLike) -> ParentType: """ if is_scalar(search_key): - return self._return_or_inplace( - index_of_scalar(self._column, cudf.Scalar(search_key)) - ) + result = self._column.index_of_scalar(cudf.Scalar(search_key)) else: - return self._return_or_inplace( - index_of_column(self._column, as_column(search_key)) - ) + result = self._column.index_of_column(as_column(search_key)) + return self._return_or_inplace(result) @property def leaves(self) -> ParentType: @@ -550,7 +658,7 @@ def len(self) -> ParentType: 2 2 dtype: int32 """ - return self._return_or_inplace(count_elements(self._column)) + return self._return_or_inplace(self._column.count_elements()) def take(self, lists_indices: ColumnLike) -> ParentType: """ @@ -598,7 +706,7 @@ def take(self, lists_indices: ColumnLike) -> ParentType: ) return self._return_or_inplace( - segmented_gather(self._column, lists_indices_col) + self._column.segmented_gather(lists_indices_col) ) def unique(self) -> ParentType: @@ -631,7 +739,7 @@ def unique(self) -> ParentType: raise NotImplementedError("Nested lists unique is not supported.") return self._return_or_inplace( - distinct(self._column, nulls_equal=True, nans_all_equal=True) + self._column.distinct(nulls_equal=True, nans_all_equal=True) ) def sort_values( @@ -639,7 +747,7 @@ def sort_values( ascending: bool = True, inplace: bool = False, kind: str = "quicksort", - na_position: str = "last", + na_position: Literal["first", "last"] = "last", ignore_index: bool = False, ) -> ParentType: """ @@ -692,7 +800,7 @@ def sort_values( raise NotImplementedError("Nested lists sort is not supported.") return self._return_or_inplace( - sort_lists(self._column, ascending, na_position), + self._column.sort_lists(ascending, na_position), retain_index=not ignore_index, ) @@ -742,7 +850,7 @@ def concat(self, dropna=True) -> ParentType: dtype: list """ return self._return_or_inplace( - concatenate_list_elements(self._column, dropna=dropna) + self._column.concatenate_list_elements(dropna) ) def astype(self, dtype): diff --git a/python/cudf/cudf/core/indexed_frame.py b/python/cudf/cudf/core/indexed_frame.py index 9130779c3e9..4e839aaeb6a 100644 --- a/python/cudf/cudf/core/indexed_frame.py +++ b/python/cudf/cudf/core/indexed_frame.py @@ -3,6 +3,7 @@ from __future__ import annotations +import itertools import operator import textwrap import warnings @@ -5316,10 +5317,20 @@ def _explode(self, explode_column: Any, ignore_index: bool): else: idx_cols = () - exploded = libcudf.lists.explode_outer( - [*idx_cols, *self._columns], - column_index + len(idx_cols), - ) + with acquire_spill_lock(): + plc_table = plc.lists.explode_outer( + plc.Table( + [ + col.to_pylibcudf(mode="read") + for col in itertools.chain(idx_cols, self._columns) + ] + ), + column_index + len(idx_cols), + ) + exploded = [ + libcudf.column.Column.from_pylibcudf(col) + for col in plc_table.columns() + ] # We must copy inner datatype of the exploded list column to # maintain struct dtype key names element_type = cast(