diff --git a/cpp/src/partitioning/partitioning.cu b/cpp/src/partitioning/partitioning.cu index 13f46195392..b0174d3bd83 100644 --- a/cpp/src/partitioning/partitioning.cu +++ b/cpp/src/partitioning/partitioning.cu @@ -16,9 +16,10 @@ #include #include +#include #include #include -#include +#include #include #include #include @@ -618,8 +619,7 @@ std::pair, std::vector> hash_partition_table( row_output_locations, num_rows, num_partitions, scanned_block_partition_sizes_ptr); // Use the resulting scatter map to materialize the output - auto output = detail::scatter( - input, row_partition_numbers.begin(), row_partition_numbers.end(), input, stream, mr); + auto output = detail::scatter(input, row_partition_numbers, input, stream, mr); stream.synchronize(); // Async D2H copy must finish before returning host vec return std::pair(std::move(output), std::move(partition_offsets)); @@ -693,7 +693,7 @@ struct dispatch_map_type { // Unfortunately need to materialize the scatter map because // `detail::scatter` requires multiple passes through the iterator - rmm::device_uvector scatter_map(partition_map.size(), stream); + rmm::device_uvector scatter_map(partition_map.size(), stream); // For each `partition_map[i]`, atomically increment the corresponding // partition offset to determine `i`s location in the output @@ -706,8 +706,7 @@ struct dispatch_map_type { }); // Scatter the rows into their partitions - auto scattered = - cudf::detail::scatter(t, scatter_map.begin(), scatter_map.end(), t, stream, mr); + auto scattered = detail::scatter(t, scatter_map, t, stream, mr); return std::pair(std::move(scattered), std::move(partition_offsets)); } diff --git a/cpp/tests/partitioning/partition_test.cpp b/cpp/tests/partitioning/partition_test.cpp index d356a1c405f..8ea224eb9fc 100644 --- a/cpp/tests/partitioning/partition_test.cpp +++ b/cpp/tests/partitioning/partition_test.cpp @@ -20,6 +20,7 @@ #include #include +#include #include #include #include @@ -328,3 +329,19 @@ TEST_F(PartitionTestNotTyped, ListOfListOfListOfIntEmpty) CUDF_TEST_EXPECT_TABLES_EQUAL(table_to_partition, result.first->view()); EXPECT_EQ(3, result.second.size()); } + +TEST_F(PartitionTestNotTyped, NoIntegerOverflow) +{ + auto elements = cudf::detail::make_counting_transform_iterator(0, [](auto i) { return i % 2; }); + fixed_width_column_wrapper map(elements, elements + 129); + auto table_to_partition = cudf::table_view{{map}}; + + std::vector expected_offsets{0, 65, 129}; + + auto expected_elements = + cudf::detail::make_counting_transform_iterator(0, [](auto i) { return i / 65; }); + fixed_width_column_wrapper expected(expected_elements, expected_elements + 129); + auto expected_table = cudf::table_view{{expected}}; + + run_partition_test(table_to_partition, map, 2, expected_table, expected_offsets); +} diff --git a/python/cudf/cudf/_lib/partitioning.pyx b/python/cudf/cudf/_lib/partitioning.pyx index 083407954b3..4bf8b32ea7e 100644 --- a/python/cudf/cudf/_lib/partitioning.pyx +++ b/python/cudf/cudf/_lib/partitioning.pyx @@ -1,4 +1,4 @@ -# Copyright (c) 2020-2022, NVIDIA CORPORATION. +# Copyright (c) 2020-2023, NVIDIA CORPORATION. from cudf.core.buffer import acquire_spill_lock @@ -13,6 +13,8 @@ from cudf._lib.cpp.partitioning cimport partition as cpp_partition from cudf._lib.cpp.table.table cimport table from cudf._lib.cpp.table.table_view cimport table_view from cudf._lib.utils cimport columns_from_unique_ptr, table_view_from_columns + +from cudf._lib.reduce import minmax from cudf._lib.stream_compaction import distinct_count as cpp_distinct_count cimport cudf._lib.cpp.types as libcudf_types @@ -21,6 +23,29 @@ cimport cudf._lib.cpp.types as libcudf_types @acquire_spill_lock() def partition(list source_columns, Column partition_map, object num_partitions): + """Partition source columns given a partitioning map + + Parameters + ---------- + source_columns: list[Column] + Columns to partition + partition_map: Column + Column of integer values that map each row in the input to a + partition + num_partitions: Optional[int] + Number of output partitions (deduced from unique values in + partition_map if None) + + Returns + ------- + Pair of reordered columns and partition offsets + + Raises + ------ + ValueError + If the partition map has invalid entries (not all in [0, + num_partitions)). + """ if num_partitions is None: num_partitions = cpp_distinct_count(partition_map, ignore_nulls=True) @@ -30,6 +55,10 @@ def partition(list source_columns, Column partition_map, cdef column_view c_partition_map_view = partition_map.view() cdef pair[unique_ptr[table], vector[libcudf_types.size_type]] c_result + if partition_map.size > 0: + lo, hi = minmax(partition_map) + if lo < 0 or hi >= num_partitions: + raise ValueError("Partition map has invalid values") with nogil: c_result = move( cpp_partition( diff --git a/python/cudf/cudf/core/dataframe.py b/python/cudf/cudf/core/dataframe.py index 13f6843b1cf..af3ba801a82 100644 --- a/python/cudf/cudf/core/dataframe.py +++ b/python/cudf/cudf/core/dataframe.py @@ -2244,6 +2244,12 @@ def scatter_by_map( Returns ------- A list of cudf.DataFrame objects. + + Raises + ------ + ValueError + If the map_index has invalid entries (not all in [0, + num_partitions)). """ # map_index might be a column name or array, # make it a Column diff --git a/python/cudf/cudf/tests/test_sorting.py b/python/cudf/cudf/tests/test_sorting.py index cd0c3bc8806..b3db1310adb 100644 --- a/python/cudf/cudf/tests/test_sorting.py +++ b/python/cudf/cudf/tests/test_sorting.py @@ -384,3 +384,16 @@ def test_dataframe_sort_values_kind(nelem, dtype, kind): assert_eq(sorted_df.index.values, sorted_index) assert_eq(sorted_df["a"].values, aa[sorted_index]) assert_eq(sorted_df["b"].values, bb[sorted_index]) + + +@pytest.mark.parametrize("ids", [[-1, 0, 1, 0], [0, 2, 3, 0]]) +def test_dataframe_scatter_by_map_7513(ids): + df = DataFrame({"id": ids, "val": [0, 1, 2, 3]}) + with pytest.raises(ValueError): + df.scatter_by_map(df["id"]) + + +def test_dataframe_scatter_by_map_empty(): + df = DataFrame({"a": [], "b": []}) + scattered = df.scatter_by_map(df["a"]) + assert len(scattered) == 0