diff --git a/python/cudf/cudf/_lib/CMakeLists.txt b/python/cudf/cudf/_lib/CMakeLists.txt index 2958c286d20..baba9d964b3 100644 --- a/python/cudf/cudf/_lib/CMakeLists.txt +++ b/python/cudf/cudf/_lib/CMakeLists.txt @@ -29,7 +29,6 @@ set(cython_sources null_mask.pyx orc.pyx parquet.pyx - partitioning.pyx reduce.pyx replace.pyx reshape.pyx diff --git a/python/cudf/cudf/_lib/__init__.py b/python/cudf/cudf/_lib/__init__.py index 19dc4488560..ec77e02d97d 100644 --- a/python/cudf/cudf/_lib/__init__.py +++ b/python/cudf/cudf/_lib/__init__.py @@ -16,7 +16,6 @@ nvtext, orc, parquet, - partitioning, reduce, replace, reshape, diff --git a/python/cudf/cudf/_lib/partitioning.pyx b/python/cudf/cudf/_lib/partitioning.pyx deleted file mode 100644 index 13997da8403..00000000000 --- a/python/cudf/cudf/_lib/partitioning.pyx +++ /dev/null @@ -1,53 +0,0 @@ -# Copyright (c) 2020-2024, NVIDIA CORPORATION. - -from cudf.core.buffer import acquire_spill_lock - -from cudf._lib.column cimport Column - -import pylibcudf as plc - -from cudf._lib.reduce import minmax -from cudf._lib.stream_compaction import distinct_count as cpp_distinct_count - - -@acquire_spill_lock() -def partition(list source_columns, Column partition_map, - object num_partitions): - """Partition source columns given a partitioning map - - Parameters - ---------- - source_columns: list[Column] - Columns to partition - partition_map: Column - Column of integer values that map each row in the input to a - partition - num_partitions: Optional[int] - Number of output partitions (deduced from unique values in - partition_map if None) - - Returns - ------- - Pair of reordered columns and partition offsets - - Raises - ------ - ValueError - If the partition map has invalid entries (not all in [0, - num_partitions)). - """ - - if num_partitions is None: - num_partitions = cpp_distinct_count(partition_map, ignore_nulls=True) - - if partition_map.size > 0: - lo, hi = minmax(partition_map) - if lo < 0 or hi >= num_partitions: - raise ValueError("Partition map has invalid values") - - plc_table, offsets = plc.partitioning.partition( - plc.Table([col.to_pylibcudf(mode="read") for col in source_columns]), - partition_map.to_pylibcudf(mode="read"), - num_partitions - ) - return [Column.from_pylibcudf(col) for col in plc_table.columns()], offsets diff --git a/python/cudf/cudf/core/dataframe.py b/python/cudf/cudf/core/dataframe.py index bd78d5dd9f1..ee5d4804271 100644 --- a/python/cudf/cudf/core/dataframe.py +++ b/python/cudf/cudf/core/dataframe.py @@ -2487,11 +2487,46 @@ def scatter_by_map( f"ERROR: map_size must be >= {count} (got {map_size})." ) - partitioned_columns, output_offsets = libcudf.partitioning.partition( - [*(self.index._columns if keep_index else ()), *self._columns], - map_index, - map_size, + source_columns = ( + itertools.chain(self.index._columns, self._columns) + if keep_index + else self._columns ) + + with acquire_spill_lock(): + if map_size is None: + map_size = plc.stream_compaction.distinct_count( + map_index.to_pylibcudf(mode="read"), + plc.types.NullPolicy.EXCLUDE, + plc.types.NanPolicy.NAN_IS_VALID, + ) + + if map_index.size > 0: + plc_lo, plc_hi = plc.reduce.minmax( + map_index.to_pylibcudf(mode="read") + ) + # TODO: Use pylibcudf Scalar once APIs are more developed + lo = libcudf.column.Column.from_pylibcudf( + plc.Column.from_scalar(plc_lo, 1) + ).element_indexing(0) + hi = libcudf.column.Column.from_pylibcudf( + plc.Column.from_scalar(plc_hi, 1) + ).element_indexing(0) + if lo < 0 or hi >= map_size: + raise ValueError("Partition map has invalid values") + + plc_table, output_offsets = plc.partitioning.partition( + plc.Table( + [col.to_pylibcudf(mode="read") for col in source_columns] + ), + map_index.to_pylibcudf(mode="read"), + map_size, + ) + partitioned_columns = [ + libcudf.column.Column.from_pylibcudf(col) + for col in plc_table.columns() + ] + partitioned = self._from_columns_like_self( partitioned_columns, column_names=self._column_names,