Skip to content

Commit

Permalink
Add partitioning APIs to pylibcudf (rapidsai#16781)
Browse files Browse the repository at this point in the history
Contributes to rapidsai#15162

Authors:
  - Matthew Roeschke (https://github.com/mroeschke)
  - Matthew Murray (https://github.com/Matt711)
  - Vyas Ramasubramani (https://github.com/vyasr)

Approvers:
  - Matthew Murray (https://github.com/Matt711)
  - Vyas Ramasubramani (https://github.com/vyasr)

URL: rapidsai#16781
  • Loading branch information
mroeschke authored Sep 26, 2024
1 parent 61af769 commit b00a718
Show file tree
Hide file tree
Showing 11 changed files with 229 additions and 54 deletions.
1 change: 1 addition & 0 deletions docs/cudf/source/user_guide/api_docs/pylibcudf/index.rst
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ This page provides API documentation for pylibcudf.
lists
merge
null_mask
partitioning
quantiles
reduce
replace
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
============
partitioning
============

.. automodule:: pylibcudf.partitioning
:members:
35 changes: 9 additions & 26 deletions python/cudf/cudf/_lib/hash.pyx
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,8 @@
from cudf.core.buffer import acquire_spill_lock

from libcpp.memory cimport unique_ptr
from libcpp.pair cimport pair
from libcpp.utility cimport move
from libcpp.vector cimport vector

cimport pylibcudf.libcudf.types as libcudf_types
from pylibcudf.libcudf.column.column cimport column
from pylibcudf.libcudf.hash cimport (
md5,
Expand All @@ -19,37 +16,23 @@ from pylibcudf.libcudf.hash cimport (
sha512,
xxhash_64,
)
from pylibcudf.libcudf.partitioning cimport (
hash_partition as cpp_hash_partition,
)
from pylibcudf.libcudf.table.table cimport table
from pylibcudf.libcudf.table.table_view cimport table_view

from cudf._lib.column cimport Column
from cudf._lib.utils cimport columns_from_unique_ptr, table_view_from_columns
from cudf._lib.utils cimport table_view_from_columns

import pylibcudf as plc


@acquire_spill_lock()
def hash_partition(list source_columns, object columns_to_hash,
def hash_partition(list source_columns, list columns_to_hash,
int num_partitions):
cdef vector[libcudf_types.size_type] c_columns_to_hash = columns_to_hash
cdef int c_num_partitions = num_partitions
cdef table_view c_source_view = table_view_from_columns(source_columns)

cdef pair[unique_ptr[table], vector[libcudf_types.size_type]] c_result
with nogil:
c_result = move(
cpp_hash_partition(
c_source_view,
c_columns_to_hash,
c_num_partitions
)
)

return (
columns_from_unique_ptr(move(c_result.first)),
list(c_result.second)
plc_table, offsets = plc.partitioning.hash_partition(
plc.Table([col.to_pylibcudf(mode="read") for col in source_columns]),
columns_to_hash,
num_partitions
)
return [Column.from_pylibcudf(col) for col in plc_table.columns()], offsets


@acquire_spill_lock()
Expand Down
35 changes: 7 additions & 28 deletions python/cudf/cudf/_lib/partitioning.pyx
Original file line number Diff line number Diff line change
Expand Up @@ -2,24 +2,13 @@

from cudf.core.buffer import acquire_spill_lock

from libcpp.memory cimport unique_ptr
from libcpp.pair cimport pair
from libcpp.utility cimport move
from libcpp.vector cimport vector

from pylibcudf.libcudf.column.column_view cimport column_view
from pylibcudf.libcudf.partitioning cimport partition as cpp_partition
from pylibcudf.libcudf.table.table cimport table
from pylibcudf.libcudf.table.table_view cimport table_view

from cudf._lib.column cimport Column
from cudf._lib.utils cimport columns_from_unique_ptr, table_view_from_columns

import pylibcudf as plc

from cudf._lib.reduce import minmax
from cudf._lib.stream_compaction import distinct_count as cpp_distinct_count

cimport pylibcudf.libcudf.types as libcudf_types


@acquire_spill_lock()
def partition(list source_columns, Column partition_map,
Expand Down Expand Up @@ -50,25 +39,15 @@ def partition(list source_columns, Column partition_map,

if num_partitions is None:
num_partitions = cpp_distinct_count(partition_map, ignore_nulls=True)
cdef int c_num_partitions = num_partitions
cdef table_view c_source_view = table_view_from_columns(source_columns)

cdef column_view c_partition_map_view = partition_map.view()

cdef pair[unique_ptr[table], vector[libcudf_types.size_type]] c_result
if partition_map.size > 0:
lo, hi = minmax(partition_map)
if lo < 0 or hi >= num_partitions:
raise ValueError("Partition map has invalid values")
with nogil:
c_result = move(
cpp_partition(
c_source_view,
c_partition_map_view,
c_num_partitions
)
)

return (
columns_from_unique_ptr(move(c_result.first)), list(c_result.second)
plc_table, offsets = plc.partitioning.partition(
plc.Table([col.to_pylibcudf(mode="read") for col in source_columns]),
partition_map.to_pylibcudf(mode="read"),
num_partitions
)
return [Column.from_pylibcudf(col) for col in plc_table.columns()], offsets
1 change: 1 addition & 0 deletions python/pylibcudf/pylibcudf/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ set(cython_sources
lists.pyx
merge.pyx
null_mask.pyx
partitioning.pyx
quantiles.pyx
reduce.pyx
replace.pyx
Expand Down
2 changes: 2 additions & 0 deletions python/pylibcudf/pylibcudf/__init__.pxd
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ from . cimport (
lists,
merge,
null_mask,
partitioning,
quantiles,
reduce,
replace,
Expand Down Expand Up @@ -61,6 +62,7 @@ __all__ = [
"lists",
"merge",
"null_mask",
"partitioning",
"quantiles",
"reduce",
"replace",
Expand Down
2 changes: 2 additions & 0 deletions python/pylibcudf/pylibcudf/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
lists,
merge,
null_mask,
partitioning,
quantiles,
reduce,
replace,
Expand Down Expand Up @@ -75,6 +76,7 @@
"lists",
"merge",
"null_mask",
"partitioning",
"quantiles",
"reduce",
"replace",
Expand Down
7 changes: 7 additions & 0 deletions python/pylibcudf/pylibcudf/libcudf/partitioning.pxd
Original file line number Diff line number Diff line change
Expand Up @@ -25,3 +25,10 @@ cdef extern from "cudf/partitioning.hpp" namespace "cudf" nogil:
const column_view& partition_map,
int num_partitions
) except +

cdef pair[unique_ptr[table], vector[libcudf_types.size_type]] \
round_robin_partition "cudf::round_robin_partition" (
const table_view& input,
int num_partitions,
int start_partition
) except +
19 changes: 19 additions & 0 deletions python/pylibcudf/pylibcudf/partitioning.pxd
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
# Copyright (c) 2024, NVIDIA CORPORATION.

from .column cimport Column
from .table cimport Table


cpdef tuple[Table, list] hash_partition(
Table input,
list columns_to_hash,
int num_partitions
)

cpdef tuple[Table, list] partition(Table t, Column partition_map, int num_partitions)

cpdef tuple[Table, list] round_robin_partition(
Table input,
int num_partitions,
int start_partition=*
)
120 changes: 120 additions & 0 deletions python/pylibcudf/pylibcudf/partitioning.pyx
Original file line number Diff line number Diff line change
@@ -0,0 +1,120 @@
# Copyright (c) 2024, NVIDIA CORPORATION.

cimport pylibcudf.libcudf.types as libcudf_types
from libcpp.memory cimport unique_ptr
from libcpp.pair cimport pair
from libcpp.utility cimport move
from libcpp.vector cimport vector
from pylibcudf.libcudf cimport partitioning as cpp_partitioning
from pylibcudf.libcudf.table.table cimport table

from .column cimport Column
from .table cimport Table


cpdef tuple[Table, list] hash_partition(
Table input,
list columns_to_hash,
int num_partitions
):
"""
Partitions rows from the input table into multiple output tables.
For details, see :cpp:func:`hash_partition`.
Parameters
----------
input : Table
The table to partition
columns_to_hash : list[int]
Indices of input columns to hash
num_partitions : int
The number of partitions to use
Returns
-------
tuple[Table, list[int]]
An output table and a vector of row offsets to each partition
"""
cdef pair[unique_ptr[table], vector[libcudf_types.size_type]] c_result
cdef vector[libcudf_types.size_type] c_columns_to_hash = columns_to_hash
cdef int c_num_partitions = num_partitions

with nogil:
c_result = move(
cpp_partitioning.hash_partition(
input.view(), c_columns_to_hash, c_num_partitions
)
)

return Table.from_libcudf(move(c_result.first)), list(c_result.second)

cpdef tuple[Table, list] partition(Table t, Column partition_map, int num_partitions):
"""
Partitions rows of `t` according to the mapping specified by `partition_map`.
For details, see :cpp:func:`partition`.
Parameters
----------
t : Table
The table to partition
partition_map : Column
Non-nullable column of integer values that map each row
in `t` to it's partition.
num_partitions : int
The total number of partitions
Returns
-------
tuple[Table, list[int]]
An output table and a list of row offsets to each partition
"""
cdef pair[unique_ptr[table], vector[libcudf_types.size_type]] c_result
cdef int c_num_partitions = num_partitions

with nogil:
c_result = move(
cpp_partitioning.partition(t.view(), partition_map.view(), c_num_partitions)
)

return Table.from_libcudf(move(c_result.first)), list(c_result.second)


cpdef tuple[Table, list] round_robin_partition(
Table input,
int num_partitions,
int start_partition=0
):
"""
Round-robin partition.
For details, see :cpp:func:`round_robin_partition`.
Parameters
----------
input : Table
The input table to be round-robin partitioned
num_partitions : int
Number of partitions for the table
start_partition : int, default 0
Index of the 1st partition
Returns
-------
tuple[Table, list[int]]
The partitioned table and the partition offsets
for each partition within the table.
"""
cdef pair[unique_ptr[table], vector[libcudf_types.size_type]] c_result
cdef int c_num_partitions = num_partitions
cdef int c_start_partition = start_partition

with nogil:
c_result = move(
cpp_partitioning.round_robin_partition(
input.view(), c_num_partitions, c_start_partition
)
)

return Table.from_libcudf(move(c_result.first)), list(c_result.second)
55 changes: 55 additions & 0 deletions python/pylibcudf/pylibcudf/tests/test_partitioning.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
# Copyright (c) 2024, NVIDIA CORPORATION.

import pyarrow as pa
import pylibcudf as plc
import pytest
from utils import assert_table_eq


@pytest.fixture(scope="module")
def partitioning_data():
data = {"a": [1, 2, 3], "b": [1, 2, 5], "c": [1, 2, 10]}
pa_table = pa.table(data)
plc_table = plc.interop.from_arrow(pa_table)
return data, plc_table, pa_table


def test_partition(partitioning_data):
raw_data, plc_table, pa_table = partitioning_data
result, result_offsets = plc.partitioning.partition(
plc_table,
plc.interop.from_arrow(pa.array([0, 0, 0])),
1,
)
expected = pa.table(
list(raw_data.values()),
schema=pa.schema([pa.field("", pa.int64(), nullable=False)] * 3),
)
assert_table_eq(expected, result)
assert result_offsets == [0, 3]


def test_hash_partition(partitioning_data):
raw_data, plc_table, pa_table = partitioning_data
result, result_offsets = plc.partitioning.hash_partition(
plc_table, [0, 1], 1
)
expected = pa.table(
list(raw_data.values()),
schema=pa.schema([pa.field("", pa.int64(), nullable=False)] * 3),
)
assert_table_eq(expected, result)
assert result_offsets == [0]


def test_round_robin_partition(partitioning_data):
raw_data, plc_table, pa_table = partitioning_data
result, result_offsets = plc.partitioning.round_robin_partition(
plc_table, 1, 0
)
expected = pa.table(
list(raw_data.values()),
schema=pa.schema([pa.field("", pa.int64(), nullable=False)] * 3),
)
assert_table_eq(expected, result)
assert result_offsets == [0]

0 comments on commit b00a718

Please sign in to comment.